/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.reactive.messaging.nats.jetstream.client;

import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.PushSubscribeOptions;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionEvent;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Subscription;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.ExponentialBackoff;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushSubscribeOptionsFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.Tracer;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.MessageMapper;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.vertx.mutiny.core.Context;
import java.time.Duration;
import org.jboss.logging.Logger;

public class PushSubscription<P>
implements Subscription<P> {
    private static final Logger logger = Logger.getLogger(PushSubscription.class);
    private final Connection connection;
    private final PushConsumerConfiguration<P> consumerConfiguration;
    private final PushSubscribeOptionsFactory pushSubscribeOptionsFactory;
    private final io.nats.client.Connection natsConnection;
    private final MessageMapper messageMapper;
    private volatile JetStreamSubscription subscription;
    private volatile Dispatcher dispatcher;

    PushSubscription(Connection connection, PushConsumerConfiguration<P> consumerConfiguration, io.nats.client.Connection natsConnection, MessageMapper messageMapper) {
        this.connection = connection;
        this.consumerConfiguration = consumerConfiguration;
        this.pushSubscribeOptionsFactory = new PushSubscribeOptionsFactory();
        this.natsConnection = natsConnection;
        this.messageMapper = messageMapper;
    }

    @Override
    public Multi<org.eclipse.microprofile.reactive.messaging.Message<P>> subscribe(Tracer<P> tracer, Context context) {
        Class payloadType = this.consumerConfiguration.consumerConfiguration().payloadType().orElse(null);
        String subject = this.consumerConfiguration.subject();
        return Multi.createFrom().emitter(emitter -> {
            try {
                JetStream jetStream = this.natsConnection.jetStream();
                this.dispatcher = this.natsConnection.createDispatcher();
                PushSubscribeOptions pushOptions = this.pushSubscribeOptionsFactory.create(this.consumerConfiguration);
                this.subscription = jetStream.subscribe(subject, this.dispatcher, arg_0 -> ((MultiEmitter)emitter).emit(arg_0), false, pushOptions);
            }
            catch (Exception e) {
                logger.errorf((Throwable)e, "Failed subscribing to stream: %s, subject: %s with message: %s", (Object)this.consumerConfiguration.consumerConfiguration().stream(), (Object)subject, (Object)e.getMessage());
                emitter.fail((Throwable)e);
            }
        }).emitOn(arg_0 -> ((Context)context).runOnContext(arg_0)).map(message -> this.messageMapper.of((Message)message, payloadType, context, new ExponentialBackoff(this.consumerConfiguration.consumerConfiguration().exponentialBackoff(), this.consumerConfiguration.consumerConfiguration().exponentialBackoffMaxDuration()), this.consumerConfiguration.consumerConfiguration().ackTimeout())).onItem().transformToUniAndMerge(tracer::withTrace);
    }

    @Override
    public void onEvent(ConnectionEvent event, String message) {
    }

    @Override
    public void close() {
        try {
            if (this.subscription.isActive()) {
                this.subscription.drain(Duration.ofMillis(1000L));
            }
        }
        catch (Throwable failure) {
            logger.warnf(failure, "Interrupted while draining subscription: %s", (Object)failure.getMessage());
        }
        try {
            if (this.subscription != null && this.dispatcher != null && this.dispatcher.isActive()) {
                this.dispatcher.unsubscribe((io.nats.client.Subscription)this.subscription);
            }
        }
        catch (Throwable failure) {
            logger.warnf(failure, "Failed to shutdown pull executor: %s", (Object)failure.getMessage());
        }
        this.connection.removeListener(this);
    }
}

