package io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher;

import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.JetStreamSubscription;
import io.nats.client.PushSubscribeOptions;
import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionEvent;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.JetStreamClient;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushSubscribeOptionsFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.io.MessageFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.Status;
import io.smallrye.mutiny.Multi;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePushPublisherProcessor.class */
public class MessagePushPublisherProcessor implements MessagePublisherProcessor {
    private static final Logger logger = Logger.getLogger(MessagePushPublisherProcessor.class);
    private final MessagePushPublisherConfiguration<?> configuration;
    private final JetStreamClient jetStreamClient;
    private final AtomicReference<Status> status = new AtomicReference<>(new Status(false, "Not connected", ConnectionEvent.Closed));
    private final PushSubscribeOptionsFactory optionsFactory = new PushSubscribeOptionsFactory();
    private final MessageFactory messageFactory;
    private volatile JetStreamSubscription subscription;
    private volatile Dispatcher dispatcher;

    public MessagePushPublisherProcessor(JetStreamClient jetStreamClient, MessagePushPublisherConfiguration<?> messagePushPublisherConfiguration, MessageFactory messageFactory) {
        this.configuration = messagePushPublisherConfiguration;
        this.jetStreamClient = jetStreamClient;
        this.messageFactory = messageFactory;
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher.MessagePublisherProcessor
    public Multi<Message<?>> publisher() {
        return this.jetStreamClient.getOrEstablishConnection().onItem().transformToMulti(this::publisher).onFailure().invoke(th -> {
            if (isConsumerAlreadyInUse(th)) {
                return;
            }
            logger.errorf(th, "Failed to publish messages: %s", th.getMessage());
            this.jetStreamClient.fireEvent(ConnectionEvent.CommunicationFailed, th.getMessage());
        }).onFailure().retry().withBackOff(this.configuration.retryBackoff()).indefinitely();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor
    public Status getStatus() {
        return this.status.get();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor
    public void close() {
        shutDown();
        this.jetStreamClient.close();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor
    public String getChannel() {
        return this.configuration.channel();
    }

    private Multi<Message<?>> publisher(Connection connection) {
        boolean traceEnabled = this.configuration.traceEnabled();
        Class<?> orElse = this.configuration.payloadType().orElse(null);
        String subject = this.configuration.subject();
        return Multi.createFrom().emitter(multiEmitter -> {
            try {
                JetStream jetStream = connection.jetStream();
                this.dispatcher = connection.createDispatcher();
                PushSubscribeOptions create = this.optionsFactory.create(this.configuration);
                Dispatcher dispatcher = this.dispatcher;
                Objects.requireNonNull(multiEmitter);
                this.subscription = jetStream.subscribe(subject, dispatcher, (v1) -> {
                    r4.emit(v1);
                }, false, create);
            } catch (Throwable th) {
                logger.errorf(th, "Failed subscribing to stream: %s, subject: %s with message: %s", this.configuration.consumerConfiguration().stream(), subject, th.getMessage());
                multiEmitter.fail(th);
            }
        }).emitOn(runnable -> {
            connection.context().runOnContext(runnable);
        }).map(message -> {
            return this.messageFactory.create(message, traceEnabled, orElse, connection.context(), new ExponentialBackoff(this.configuration.exponentialBackoff(), this.configuration.exponentialBackoffMaxDuration()), this.configuration.ackTimeout());
        });
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener
    public void onEvent(ConnectionEvent connectionEvent, String str) {
        switch (connectionEvent) {
            case Connected:
                this.status.set(new Status(true, str, connectionEvent));
                return;
            case Closed:
                this.status.set(new Status(false, str, connectionEvent));
                return;
            case Disconnected:
                this.status.set(new Status(false, str, connectionEvent));
                return;
            case Reconnected:
                this.status.set(new Status(true, str, connectionEvent));
                return;
            case CommunicationFailed:
                this.status.set(new Status(false, str, connectionEvent));
                return;
            default:
                return;
        }
    }

    private void shutDown() {
        try {
            if (this.subscription.isActive()) {
                this.subscription.drain(Duration.ofMillis(1000L));
            }
        } catch (IllegalStateException | InterruptedException e) {
            logger.warnf("Interrupted while draining subscription", new Object[0]);
        }
        try {
            if (this.subscription != null && this.dispatcher != null && this.dispatcher.isActive()) {
                this.dispatcher.unsubscribe(this.subscription);
            }
        } catch (Exception e2) {
            logger.errorf(e2, "Failed to shutdown pull executor", new Object[0]);
        }
    }
}
