package io.quarkiverse.reactive.messaging.nats.jetstream.client;

import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.JetStreamSubscription;
import io.nats.client.PushSubscribeOptions;
import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushSubscribeOptionsFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.MessageMapper;
import io.smallrye.mutiny.Multi;
import io.vertx.mutiny.core.Context;
import java.time.Duration;
import java.util.Objects;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/client/PushSubscription.class */
public class PushSubscription<P> implements Subscription<P> {
    private static final Logger logger = Logger.getLogger(PushSubscription.class);
    private final Connection connection;
    private final PushConsumerConfiguration<P> consumerConfiguration;
    private final PushSubscribeOptionsFactory pushSubscribeOptionsFactory = new PushSubscribeOptionsFactory();
    private final io.nats.client.Connection natsConnection;
    private final MessageMapper messageMapper;
    private final Context context;
    private volatile JetStreamSubscription subscription;
    private volatile Dispatcher dispatcher;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PushSubscription(Connection connection, PushConsumerConfiguration<P> pushConsumerConfiguration, io.nats.client.Connection connection2, MessageMapper messageMapper, Context context) {
        this.connection = connection;
        this.consumerConfiguration = pushConsumerConfiguration;
        this.natsConnection = connection2;
        this.messageMapper = messageMapper;
        this.context = context;
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Subscription
    public Multi<Message<P>> subscribe() {
        boolean traceEnabled = this.consumerConfiguration.consumerConfiguration().traceEnabled();
        Class<P> orElse = this.consumerConfiguration.consumerConfiguration().payloadType().orElse(null);
        String subject = this.consumerConfiguration.subject();
        Multi emitter = Multi.createFrom().emitter(multiEmitter -> {
            try {
                JetStream jetStream = this.natsConnection.jetStream();
                this.dispatcher = this.natsConnection.createDispatcher();
                PushSubscribeOptions create = this.pushSubscribeOptionsFactory.create(this.consumerConfiguration);
                Dispatcher dispatcher = this.dispatcher;
                Objects.requireNonNull(multiEmitter);
                this.subscription = jetStream.subscribe(subject, dispatcher, (v1) -> {
                    r4.emit(v1);
                }, false, create);
            } catch (Exception e) {
                logger.errorf(e, "Failed subscribing to stream: %s, subject: %s with message: %s", this.consumerConfiguration.consumerConfiguration().stream(), subject, e.getMessage());
                multiEmitter.fail(e);
            }
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return emitter.emitOn(context::runOnContext).map(message -> {
            return this.messageMapper.of(message, traceEnabled, orElse, this.context, new ExponentialBackoff(this.consumerConfiguration.consumerConfiguration().exponentialBackoff(), this.consumerConfiguration.consumerConfiguration().exponentialBackoffMaxDuration()), this.consumerConfiguration.consumerConfiguration().ackTimeout());
        });
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener
    public void onEvent(ConnectionEvent connectionEvent, String str) {
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        try {
            if (this.subscription.isActive()) {
                this.subscription.drain(Duration.ofMillis(1000L));
            }
        } catch (Throwable th) {
            logger.warnf(th, "Interrupted while draining subscription: %s", th.getMessage());
        }
        try {
            if (this.subscription != null && this.dispatcher != null && this.dispatcher.isActive()) {
                this.dispatcher.unsubscribe(this.subscription);
            }
        } catch (Throwable th2) {
            logger.warnf(th2, "Failed to shutdown pull executor: %s", th2.getMessage());
        }
        this.connection.removeListener(this);
    }
}
