package io.quarkiverse.reactive.messaging.nats.jetstream.client;

import io.nats.client.JetStreamReader;
import io.nats.client.JetStreamStatusException;
import io.nats.client.JetStreamSubscription;
import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.Consumer;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PurgeResult;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamState;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.FetchConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PullSubscribeOptionsFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ReaderConsumerConfiguration;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Context;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/client/ReaderSubscribeConnection.class */
public class ReaderSubscribeConnection<P> implements SubscribeConnection<P> {
    private static final Logger logger = Logger.getLogger(ReaderSubscribeConnection.class);
    private final DefaultConnection delegate;
    private final ReaderConsumerConfiguration<P> consumerConfiguration;
    private final JetStreamReader reader;
    private final JetStreamSubscription subscription;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReaderSubscribeConnection(DefaultConnection defaultConnection, ReaderConsumerConfiguration<P> readerConsumerConfiguration) throws ConnectionException {
        this.delegate = defaultConnection;
        this.consumerConfiguration = readerConsumerConfiguration;
        try {
            this.subscription = defaultConnection.connection().jetStream().subscribe(readerConsumerConfiguration.subject(), new PullSubscribeOptionsFactory().create(readerConsumerConfiguration));
            this.reader = this.subscription.reader(readerConsumerConfiguration.maxRequestBatch().intValue(), readerConsumerConfiguration.rePullAt().intValue());
        } catch (Throwable th) {
            throw new ConnectionException(th);
        }
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.SubscribeConnection
    public Multi<Message<P>> subscribe() {
        boolean traceEnabled = this.consumerConfiguration.consumerConfiguration().traceEnabled();
        Class<P> orElse = this.consumerConfiguration.consumerConfiguration().payloadType().orElse(null);
        return Multi.createBy().repeating().uni(this::readNextMessage).whilst(optional -> {
            return isConnected() && this.subscription.isActive();
        }).runSubscriptionOn(Executors.newSingleThreadExecutor(JetstreamWorkerThread::new)).emitOn(runnable -> {
            this.delegate.context().runOnContext(runnable);
        }).flatMap(optional2 -> {
            return createMulti((io.nats.client.Message) optional2.orElse(null), traceEnabled, orElse, this.delegate.context());
        }).onCompletion().invoke(() -> {
            fireEvent(ConnectionEvent.SubscriptionInactive, "Subscription became inactive");
        });
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public boolean isConnected() {
        return this.delegate.isConnected();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public List<ConnectionListener> listeners() {
        return this.delegate.listeners();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public void addListener(ConnectionListener connectionListener) {
        this.delegate.addListener(connectionListener);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Consumer> getConsumer(String str, String str2) {
        return this.delegate.getConsumer(str, str2);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<List<String>> getStreams() {
        return this.delegate.getStreams();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<List<String>> getSubjects(String str) {
        return this.delegate.getSubjects(str);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<List<String>> getConsumerNames(String str) {
        return this.delegate.getConsumerNames(str);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<PurgeResult> purgeStream(String str) {
        return this.delegate.purgeStream(str);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Void> deleteMessage(String str, long j, boolean z) {
        return this.delegate.deleteMessage(str, j, z);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<StreamState> getStreamState(String str) {
        return this.delegate.getStreamState(str);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<List<PurgeResult>> purgeAllStreams() {
        return this.delegate.purgeAllStreams();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public <T> Uni<Message<T>> publish(Message<T> message, PublishConfiguration publishConfiguration) {
        return this.delegate.publish(message, publishConfiguration);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public <T> Uni<Message<T>> publish(Message<T> message, PublishConfiguration publishConfiguration, FetchConsumerConfiguration<T> fetchConsumerConfiguration) {
        return this.delegate.publish(message, publishConfiguration, fetchConsumerConfiguration);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public <T> Uni<Message<T>> nextMessage(FetchConsumerConfiguration<T> fetchConsumerConfiguration) {
        return this.delegate.nextMessage(fetchConsumerConfiguration);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public <T> Multi<Message<T>> nextMessages(FetchConsumerConfiguration<T> fetchConsumerConfiguration) {
        return this.delegate.nextMessages(fetchConsumerConfiguration);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public <T> Uni<T> getKeyValue(String str, String str2, Class<T> cls) {
        return this.delegate.getKeyValue(str, str2, cls);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public <T> Uni<Void> putKeyValue(String str, String str2, T t) {
        return this.delegate.putKeyValue(str, str2, t);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Void> deleteKeyValue(String str, String str2) {
        return this.delegate.deleteKeyValue(str, str2);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public <T> Uni<Message<T>> resolve(String str, long j) {
        return this.delegate.resolve(str, j);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Void> flush(Duration duration) {
        return this.delegate.flush(duration);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        try {
            this.reader.stop();
        } catch (Throwable th) {
            logger.warnf("Failed to stop reader with message %s", th.getMessage());
        }
        try {
            if (this.subscription.isActive()) {
                this.subscription.drain(Duration.ofMillis(1000L));
            }
        } catch (Throwable th2) {
            logger.warnf("Interrupted while draining subscription", new Object[0]);
        }
        try {
            if (this.subscription.isActive()) {
                this.subscription.unsubscribe();
            }
        } catch (Throwable th3) {
            logger.warnf("Failed to unsubscribe subscription with message %s", th3.getMessage());
        }
        this.delegate.close();
    }

    private Uni<Optional<io.nats.client.Message>> readNextMessage() {
        return Uni.createFrom().emitter(uniEmitter -> {
            try {
                uniEmitter.complete(Optional.ofNullable(this.reader.nextMessage(this.consumerConfiguration.maxRequestExpires().orElse(Duration.ZERO))));
            } catch (InterruptedException e) {
                uniEmitter.fail(new ReaderException(String.format("The reader was interrupted for stream: %s", this.consumerConfiguration.consumerConfiguration().stream()), e));
            } catch (JetStreamStatusException e2) {
                uniEmitter.fail(new ReaderException(e2));
            } catch (IllegalStateException e3) {
                logger.warnf("The subscription became inactive for stream: %s", this.consumerConfiguration.consumerConfiguration().stream());
                uniEmitter.complete(Optional.empty());
            } catch (Throwable th) {
                uniEmitter.fail(new ReaderException(String.format("Error reading next message from stream: %s", this.consumerConfiguration.consumerConfiguration().stream()), th));
            }
        });
    }

    private Multi<Message<P>> createMulti(io.nats.client.Message message, boolean z, Class<P> cls, Context context) {
        return (message == null || message.getData() == null) ? Multi.createFrom().empty() : Multi.createFrom().item(() -> {
            return this.delegate.messageMapper().of(message, z, cls, context, new ExponentialBackoff(this.consumerConfiguration.consumerConfiguration().exponentialBackoff(), this.consumerConfiguration.consumerConfiguration().exponentialBackoffMaxDuration()), this.consumerConfiguration.consumerConfiguration().ackTimeout());
        });
    }
}
