/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.reactive.messaging.nats.jetstream.client;

import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.PushSubscribeOptions;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Subscription;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushSubscribeOptionsFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.Tracer;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerType;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.MessageMapper;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.vertx.mutiny.core.Context;
import java.time.Duration;
import lombok.Generated;
import org.jboss.logging.Logger;

public class PushSubscription<T>
implements Subscription<T> {
    @Generated
    private static final Logger log = Logger.getLogger(PushSubscription.class);
    private final PushConsumerConfiguration<T> consumerConfiguration;
    private final PushSubscribeOptionsFactory pushSubscribeOptionsFactory;
    private final Connection connection;
    private final MessageMapper messageMapper;
    private final TracerFactory tracerFactory;
    private final Context context;
    private volatile JetStreamSubscription subscription;
    private volatile Dispatcher dispatcher;

    PushSubscription(Connection connection, PushConsumerConfiguration<T> consumerConfiguration, MessageMapper messageMapper, TracerFactory tracerFactory, Context context) {
        this.connection = connection;
        this.consumerConfiguration = consumerConfiguration;
        this.pushSubscribeOptionsFactory = new PushSubscribeOptionsFactory();
        this.messageMapper = messageMapper;
        this.tracerFactory = tracerFactory;
        this.context = context;
    }

    @Override
    public Multi<org.eclipse.microprofile.reactive.messaging.Message<T>> subscribe() {
        Class payloadType = this.consumerConfiguration.consumerConfiguration().payloadType().orElse(null);
        String subject = this.consumerConfiguration.subject();
        Tracer tracer = this.tracerFactory.create(TracerType.Subscribe);
        return Multi.createFrom().emitter(emitter -> {
            try {
                JetStream jetStream = this.connection.jetStream();
                this.dispatcher = this.connection.createDispatcher();
                PushSubscribeOptions pushOptions = this.pushSubscribeOptionsFactory.create(this.consumerConfiguration);
                this.subscription = jetStream.subscribe(subject, this.dispatcher, arg_0 -> ((MultiEmitter)emitter).emit(arg_0), false, pushOptions);
            }
            catch (Exception e) {
                log.errorf((Throwable)e, "Failed subscribing to stream: %s, subject: %s with message: %s", (Object)this.consumerConfiguration.consumerConfiguration().stream(), (Object)subject, (Object)e.getMessage());
                emitter.fail((Throwable)e);
            }
        }).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0)).map(message -> this.transformMessage((Message)message, payloadType, this.context)).onItem().transformToUniAndMerge(message -> tracer.withTrace(message, msg -> msg));
    }

    @Override
    public void close() {
        try {
            if (this.subscription.isActive()) {
                this.subscription.drain(Duration.ofMillis(1000L));
            }
        }
        catch (Throwable failure) {
            log.warnf(failure, "Interrupted while draining subscription: %s", (Object)failure.getMessage());
        }
        try {
            if (this.subscription != null && this.dispatcher != null && this.dispatcher.isActive()) {
                this.dispatcher.unsubscribe((io.nats.client.Subscription)this.subscription);
            }
        }
        catch (Throwable failure) {
            log.warnf(failure, "Failed to shutdown pull executor: %s", (Object)failure.getMessage());
        }
    }

    private org.eclipse.microprofile.reactive.messaging.Message<T> transformMessage(Message message, Class<T> payloadType, Context context) {
        return this.messageMapper.of(message, payloadType, context);
    }
}

