package io.quarkiverse.reactive.messaging.nats.jetstream.client;

import io.nats.client.JetStreamReader;
import io.nats.client.JetStreamStatusException;
import io.nats.client.JetStreamSubscription;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.ExponentialBackoff;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PublishMessage;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ReaderConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.Tracer;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.MessageMapper;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.MultiOnItem;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscription.class */
class ReaderSubscription<P> implements Subscription<P> {
    private static final Logger logger = Logger.getLogger(ReaderSubscription.class);
    private final Connection connection;
    private final ReaderConsumerConfiguration<P> consumerConfiguration;
    private final JetStreamReader reader;
    private final JetStreamSubscription subscription;
    private final MessageMapper messageMapper;
    private final AtomicBoolean closed = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReaderSubscription(Connection connection, ReaderConsumerConfiguration<P> readerConsumerConfiguration, JetStreamSubscription jetStreamSubscription, JetStreamReader jetStreamReader, MessageMapper messageMapper) {
        this.connection = connection;
        this.consumerConfiguration = readerConsumerConfiguration;
        this.subscription = jetStreamSubscription;
        this.reader = jetStreamReader;
        this.messageMapper = messageMapper;
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Subscription
    public Multi<Message<P>> subscribe(Tracer<P> tracer, io.vertx.mutiny.core.Context context) {
        Class<P> orElse = this.consumerConfiguration.consumerConfiguration().payloadType().orElse(null);
        MultiOnItem onItem = Multi.createBy().repeating().uni(this::readNextMessage).whilst(optional -> {
            return true;
        }).runSubscriptionOn(Executors.newSingleThreadExecutor(JetstreamWorkerThread::new)).flatMap(optional2 -> {
            return createMulti((io.nats.client.Message) optional2.orElse(null), orElse, context);
        }).onItem();
        Objects.requireNonNull(tracer);
        return onItem.transformToUniAndMerge(tracer::withTrace);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener
    public void onEvent(ConnectionEvent connectionEvent, String str) {
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.closed.set(true);
        try {
            this.reader.stop();
        } catch (Throwable th) {
            logger.warnf("Failed to stop reader with message %s", th.getMessage());
        }
        try {
            if (this.subscription.isActive()) {
                this.subscription.drain(Duration.ofMillis(1000L));
            }
        } catch (Throwable th2) {
            logger.warnf("Interrupted while draining subscription", new Object[0]);
        }
        try {
            if (this.subscription.isActive()) {
                this.subscription.unsubscribe();
            }
        } catch (Throwable th3) {
            logger.warnf("Failed to unsubscribe subscription with message %s", th3.getMessage());
        }
        this.connection.removeListener(this);
    }

    private Uni<Optional<io.nats.client.Message>> readNextMessage() {
        return Uni.createFrom().emitter(uniEmitter -> {
            try {
                uniEmitter.complete(Optional.ofNullable(this.reader.nextMessage(this.consumerConfiguration.maxRequestExpires().orElse(Duration.ZERO))));
            } catch (IllegalStateException e) {
                logger.warnf("The subscription became inactive for stream: %s", this.consumerConfiguration.consumerConfiguration().stream());
                uniEmitter.complete(Optional.empty());
            } catch (InterruptedException e2) {
                uniEmitter.fail(new ReaderException(String.format("The reader was interrupted for stream: %s", this.consumerConfiguration.consumerConfiguration().stream()), e2));
            } catch (Exception e3) {
                uniEmitter.fail(new ReaderException(String.format("Error reading next message from stream: %s", this.consumerConfiguration.consumerConfiguration().stream()), e3));
            } catch (JetStreamStatusException e4) {
                uniEmitter.fail(new ReaderException((Throwable) e4));
            }
        });
    }

    private Multi<PublishMessage<P>> createMulti(io.nats.client.Message message, Class<P> cls, io.vertx.mutiny.core.Context context) {
        return (message == null || message.getData() == null) ? Multi.createFrom().empty() : Multi.createFrom().item(() -> {
            return this.messageMapper.of(message, cls, context, new ExponentialBackoff(this.consumerConfiguration.consumerConfiguration().exponentialBackoff(), this.consumerConfiguration.consumerConfiguration().exponentialBackoffMaxDuration()), this.consumerConfiguration.consumerConfiguration().ackTimeout());
        });
    }
}
