/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.reactive.messaging.nats.jetstream.client;

import io.nats.client.JetStreamReader;
import io.nats.client.JetStreamStatusException;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionEvent;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.JetstreamWorkerThread;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ReaderException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Subscription;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ReaderConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.MessageMapper;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Context;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jboss.logging.Logger;

class ReaderSubscribtion<P>
implements Subscription<P> {
    private static final Logger logger = Logger.getLogger(ReaderSubscribtion.class);
    private final Connection connection;
    private final ReaderConsumerConfiguration<P> consumerConfiguration;
    private final JetStreamReader reader;
    private final JetStreamSubscription subscription;
    private final MessageMapper messageMapper;
    private final Context context;
    private final AtomicBoolean closed;

    ReaderSubscribtion(Connection connection, ReaderConsumerConfiguration<P> consumerConfiguration, JetStreamSubscription subscription, JetStreamReader reader, MessageMapper messageMapper, Context context) {
        this.connection = connection;
        this.consumerConfiguration = consumerConfiguration;
        this.subscription = subscription;
        this.reader = reader;
        this.messageMapper = messageMapper;
        this.context = context;
        this.closed = new AtomicBoolean(false);
    }

    @Override
    public Multi<org.eclipse.microprofile.reactive.messaging.Message<P>> subscribe() {
        boolean traceEnabled = this.consumerConfiguration.consumerConfiguration().traceEnabled();
        Class payloadType = this.consumerConfiguration.consumerConfiguration().payloadType().orElse(null);
        ExecutorService pullExecutor = Executors.newSingleThreadExecutor(JetstreamWorkerThread::new);
        return Multi.createBy().repeating().uni(this::readNextMessage).whilst(message -> !this.closed.get()).runSubscriptionOn((Executor)pullExecutor).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0)).flatMap(message -> this.createMulti(message.orElse(null), traceEnabled, payloadType, this.context));
    }

    @Override
    public void onEvent(ConnectionEvent event, String message) {
    }

    @Override
    public void close() {
        this.closed.set(true);
        try {
            this.reader.stop();
        }
        catch (Throwable e) {
            logger.warnf("Failed to stop reader with message %s", (Object)e.getMessage());
        }
        try {
            if (this.subscription.isActive()) {
                this.subscription.drain(Duration.ofMillis(1000L));
            }
        }
        catch (Throwable e) {
            logger.warnf("Interrupted while draining subscription", new Object[0]);
        }
        try {
            if (this.subscription.isActive()) {
                this.subscription.unsubscribe();
            }
        }
        catch (Throwable e) {
            logger.warnf("Failed to unsubscribe subscription with message %s", (Object)e.getMessage());
        }
        this.connection.removeListener(this);
    }

    private Uni<Optional<Message>> readNextMessage() {
        return Uni.createFrom().emitter(emitter -> {
            try {
                if (!this.connection.isConnected()) {
                    emitter.fail((Throwable)new ConnectionException("The connection is not connected"));
                } else if (!this.subscription.isActive()) {
                    emitter.fail((Throwable)new ReaderException("The subscription is not active"));
                } else {
                    emitter.complete(Optional.ofNullable(this.reader.nextMessage(this.consumerConfiguration.maxRequestExpires().orElse(Duration.ZERO))));
                }
            }
            catch (JetStreamStatusException e) {
                emitter.fail((Throwable)new ReaderException(e));
            }
            catch (IllegalStateException e) {
                logger.warnf("The subscription became inactive for stream: %s", (Object)this.consumerConfiguration.consumerConfiguration().stream());
                emitter.complete(Optional.empty());
            }
            catch (InterruptedException e) {
                emitter.fail((Throwable)new ReaderException(String.format("The reader was interrupted for stream: %s", this.consumerConfiguration.consumerConfiguration().stream()), e));
            }
            catch (Throwable throwable) {
                emitter.fail((Throwable)new ReaderException(String.format("Error reading next message from stream: %s", this.consumerConfiguration.consumerConfiguration().stream()), throwable));
            }
        });
    }

    private Multi<org.eclipse.microprofile.reactive.messaging.Message<P>> createMulti(Message message, boolean tracingEnabled, Class<P> payloadType, Context context) {
        if (message == null || message.getData() == null) {
            return Multi.createFrom().empty();
        }
        return Multi.createFrom().item(() -> this.messageMapper.of(message, tracingEnabled, payloadType, context, new ExponentialBackoff(this.consumerConfiguration.consumerConfiguration().exponentialBackoff(), this.consumerConfiguration.consumerConfiguration().exponentialBackoffMaxDuration()), this.consumerConfiguration.consumerConfiguration().ackTimeout()));
    }
}

