/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.reactive.messaging.nats.jetstream.client.vertx;

import io.nats.client.JetStream;
import io.nats.client.JetStreamReader;
import io.nats.client.JetStreamStatusException;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.JetstreamWorkerThread;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageSubscribeConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PullSubscribeOptionsFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ReaderConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.message.MessageFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.vertx.MessageConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.vertx.ReaderException;
import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrumenter;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Context;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jboss.logging.Logger;

public class ReaderMessageSubscribeConnection<K>
extends MessageConnection
implements MessageSubscribeConnection {
    private static final Logger logger = Logger.getLogger(ReaderMessageSubscribeConnection.class);
    private final ReaderConsumerConfiguration<K> consumerConfiguration;
    private final JetStreamReader reader;
    private final JetStreamSubscription subscription;

    public ReaderMessageSubscribeConnection(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener, Context context, JetStreamInstrumenter instrumenter, ReaderConsumerConfiguration<K> consumerConfiguration, MessageFactory messageFactory) throws ConnectionException {
        super(connectionConfiguration, connectionListener, messageFactory, context, instrumenter);
        this.consumerConfiguration = consumerConfiguration;
        try {
            JetStream jetStream = this.connection.jetStream();
            PullSubscribeOptionsFactory optionsFactory = new PullSubscribeOptionsFactory();
            this.subscription = jetStream.subscribe(consumerConfiguration.subject(), optionsFactory.create(consumerConfiguration));
            this.reader = this.subscription.reader(consumerConfiguration.maxRequestBatch().intValue(), consumerConfiguration.rePullAt().intValue());
        }
        catch (Throwable failure) {
            throw new ReaderException(failure);
        }
    }

    @Override
    public Multi<org.eclipse.microprofile.reactive.messaging.Message<?>> subscribe() {
        boolean traceEnabled = this.consumerConfiguration.consumerConfiguration().traceEnabled();
        Class payloadType = this.consumerConfiguration.consumerConfiguration().payloadType().orElse(null);
        ExecutorService pullExecutor = Executors.newSingleThreadExecutor(JetstreamWorkerThread::new);
        return Multi.createBy().repeating().supplier(this::nextMessage).until(message -> !this.subscription.isActive()).runSubscriptionOn((Executor)pullExecutor).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0)).flatMap(message -> this.createMulti(message.orElse(null), traceEnabled, payloadType, this.context));
    }

    @Override
    public Uni<Void> flush(Duration duration) {
        return super.flush(duration).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public void close() throws Exception {
        try {
            this.reader.stop();
        }
        catch (Throwable e) {
            logger.warnf("Failed to stop reader with message %s", (Object)e.getMessage());
        }
        try {
            if (this.subscription.isActive()) {
                this.subscription.drain(Duration.ofMillis(1000L));
            }
        }
        catch (Throwable e) {
            logger.warnf("Interrupted while draining subscription", new Object[0]);
        }
        try {
            if (this.subscription.isActive()) {
                this.subscription.unsubscribe();
            }
        }
        catch (Throwable e) {
            logger.warnf("Failed to unsubscribe subscription with message %s", (Object)e.getMessage());
        }
        super.close();
    }

    private Optional<Message> nextMessage() {
        try {
            return Optional.ofNullable(this.reader.nextMessage(this.consumerConfiguration.maxRequestExpires().orElse(Duration.ZERO)));
        }
        catch (JetStreamStatusException e) {
            logger.debugf((Throwable)e, e.getMessage(), new Object[0]);
            return Optional.empty();
        }
        catch (IllegalStateException e) {
            logger.debugf((Throwable)e, "The subscription became inactive for stream: %s", (Object)this.consumerConfiguration.consumerConfiguration().stream());
            return Optional.empty();
        }
        catch (InterruptedException e) {
            logger.debugf((Throwable)e, "The reader was interrupted for stream: %s", (Object)this.consumerConfiguration.consumerConfiguration().stream());
            return Optional.empty();
        }
        catch (Throwable throwable) {
            logger.warnf(throwable, "Error reading next message from stream: %s", (Object)this.consumerConfiguration.consumerConfiguration().stream());
            return Optional.empty();
        }
    }

    private Multi<org.eclipse.microprofile.reactive.messaging.Message<K>> createMulti(Message message, boolean tracingEnabled, Class<?> payloadType, Context context) {
        if (message == null || message.getData() == null) {
            return Multi.createFrom().empty();
        }
        return Multi.createFrom().item(() -> this.messageFactory.create(message, tracingEnabled, payloadType, context, new ExponentialBackoff(this.consumerConfiguration.consumerConfiguration().exponentialBackoff(), this.consumerConfiguration.consumerConfiguration().exponentialBackoffMaxDuration()), this.consumerConfiguration.consumerConfiguration().ackTimeout()));
    }
}

