/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.reactive.messaging.nats.jetstream.client.vertx;

import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.Subscription;
import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageSubscribeConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushSubscribeOptionsFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.message.MessageFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.vertx.MessageConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrumenter;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.vertx.mutiny.core.Context;
import java.time.Duration;
import org.jboss.logging.Logger;

public class PushSubscribeMessageConnection<K>
extends MessageConnection
implements MessageSubscribeConnection {
    private static final Logger logger = Logger.getLogger(PushSubscribeMessageConnection.class);
    private final PushConsumerConfiguration<K> consumerConfiguration;
    private final PushSubscribeOptionsFactory optionsFactory;
    private volatile JetStreamSubscription subscription;
    private volatile Dispatcher dispatcher;

    public PushSubscribeMessageConnection(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener, Context context, JetStreamInstrumenter instrumenter, PushConsumerConfiguration<K> consumerConfiguration, MessageFactory messageFactory, PushSubscribeOptionsFactory optionsFactory) throws ConnectionException {
        super(connectionConfiguration, connectionListener, messageFactory, context, instrumenter);
        this.consumerConfiguration = consumerConfiguration;
        this.optionsFactory = optionsFactory;
    }

    @Override
    public Multi<org.eclipse.microprofile.reactive.messaging.Message<?>> subscribe() {
        boolean traceEnabled = this.consumerConfiguration.consumerConfiguration().traceEnabled();
        Class payloadType = this.consumerConfiguration.consumerConfiguration().payloadType().orElse(null);
        String subject = this.consumerConfiguration.subject();
        return Multi.createFrom().emitter(emitter -> {
            try {
                JetStream jetStream = this.connection.jetStream();
                this.dispatcher = this.connection.createDispatcher();
                PushSubscribeOptions pushOptions = this.optionsFactory.create(this.consumerConfiguration);
                this.subscription = jetStream.subscribe(subject, this.dispatcher, arg_0 -> ((MultiEmitter)emitter).emit(arg_0), false, pushOptions);
            }
            catch (Throwable e) {
                logger.errorf(e, "Failed subscribing to stream: %s, subject: %s with message: %s", (Object)this.consumerConfiguration.consumerConfiguration().stream(), (Object)subject, (Object)e.getMessage());
                emitter.fail(e);
            }
        }).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0)).map(message -> this.messageFactory.create((Message)message, traceEnabled, payloadType, this.context, new ExponentialBackoff(this.consumerConfiguration.consumerConfiguration().exponentialBackoff(), this.consumerConfiguration.consumerConfiguration().exponentialBackoffMaxDuration()), this.consumerConfiguration.consumerConfiguration().ackTimeout()));
    }

    @Override
    public Uni<Void> flush(Duration duration) {
        return super.flush(duration).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public void close() throws Exception {
        try {
            if (this.subscription.isActive()) {
                this.subscription.drain(Duration.ofMillis(1000L));
            }
        }
        catch (IllegalStateException | InterruptedException e) {
            logger.warnf("Interrupted while draining subscription", new Object[0]);
        }
        try {
            if (this.subscription != null && this.dispatcher != null && this.dispatcher.isActive()) {
                this.dispatcher.unsubscribe((Subscription)this.subscription);
            }
        }
        catch (Exception e) {
            logger.errorf((Throwable)e, "Failed to shutdown pull executor", new Object[0]);
        }
        super.close();
    }
}

