package io.quarkiverse.reactive.messaging.nats.jetstream.client.vertx;

import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.JetStreamSubscription;
import io.nats.client.PushSubscribeOptions;
import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageSubscribeConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushSubscribeOptionsFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.message.MessageFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrumenter;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Context;
import java.time.Duration;
import java.util.Objects;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/client/vertx/PushSubscribeMessageConnection.class */
public class PushSubscribeMessageConnection<K> extends MessageConnection implements MessageSubscribeConnection {
    private static final Logger logger = Logger.getLogger(PushSubscribeMessageConnection.class);
    private final PushConsumerConfiguration<K> consumerConfiguration;
    private final PushSubscribeOptionsFactory optionsFactory;
    private volatile JetStreamSubscription subscription;
    private volatile Dispatcher dispatcher;

    public PushSubscribeMessageConnection(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener, Context context, JetStreamInstrumenter jetStreamInstrumenter, PushConsumerConfiguration<K> pushConsumerConfiguration, MessageFactory messageFactory, PushSubscribeOptionsFactory pushSubscribeOptionsFactory) throws ConnectionException {
        super(connectionConfiguration, connectionListener, messageFactory, context, jetStreamInstrumenter);
        this.consumerConfiguration = pushConsumerConfiguration;
        this.optionsFactory = pushSubscribeOptionsFactory;
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageSubscribeConnection
    public Multi<Message<?>> subscribe() {
        boolean traceEnabled = this.consumerConfiguration.consumerConfiguration().traceEnabled();
        Class<K> orElse = this.consumerConfiguration.consumerConfiguration().payloadType().orElse(null);
        String subject = this.consumerConfiguration.subject();
        Multi emitter = Multi.createFrom().emitter(multiEmitter -> {
            try {
                JetStream jetStream = this.connection.jetStream();
                this.dispatcher = this.connection.createDispatcher();
                PushSubscribeOptions create = this.optionsFactory.create(this.consumerConfiguration);
                Dispatcher dispatcher = this.dispatcher;
                Objects.requireNonNull(multiEmitter);
                this.subscription = jetStream.subscribe(subject, dispatcher, (v1) -> {
                    r4.emit(v1);
                }, false, create);
            } catch (Throwable th) {
                logger.errorf(th, "Failed subscribing to stream: %s, subject: %s with message: %s", this.consumerConfiguration.consumerConfiguration().stream(), subject, th.getMessage());
                multiEmitter.fail(th);
            }
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return emitter.emitOn(context::runOnContext).map(message -> {
            return this.messageFactory.create(message, traceEnabled, orElse, this.context, new ExponentialBackoff(this.consumerConfiguration.consumerConfiguration().exponentialBackoff(), this.consumerConfiguration.consumerConfiguration().exponentialBackoffMaxDuration()), this.consumerConfiguration.consumerConfiguration().ackTimeout());
        });
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.AbstractConnection, io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Void> flush(Duration duration) {
        Uni<Void> flush = super.flush(duration);
        Context context = this.context;
        Objects.requireNonNull(context);
        return flush.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.AbstractConnection, java.lang.AutoCloseable
    public void close() throws Exception {
        try {
            if (this.subscription.isActive()) {
                this.subscription.drain(Duration.ofMillis(1000L));
            }
        } catch (IllegalStateException | InterruptedException e) {
            logger.warnf("Interrupted while draining subscription", new Object[0]);
        }
        try {
            if (this.subscription != null && this.dispatcher != null && this.dispatcher.isActive()) {
                this.dispatcher.unsubscribe(this.subscription);
            }
        } catch (Exception e2) {
            logger.errorf(e2, "Failed to shutdown pull executor", new Object[0]);
        }
        super.close();
    }
}
