/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.reactive.messaging.nats.jetstream.client;

import io.nats.client.ConsumerContext;
import io.nats.client.JetStreamStatusException;
import io.nats.client.Message;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.JetstreamWorkerThread;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.PullException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Subscription;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PullConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.Tracer;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerType;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.MessageMapper;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Context;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.Generated;
import org.jboss.logging.Logger;

public class PullSubscription<T>
implements Subscription<T> {
    @Generated
    private static final Logger log = Logger.getLogger(PullSubscription.class);
    private final PullConsumerConfiguration<T> consumerConfiguration;
    private final ConsumerContext consumerContext;
    private final MessageMapper messageMapper;
    private final TracerFactory tracerFactory;
    private final Context context;

    @Override
    public Multi<org.eclipse.microprofile.reactive.messaging.Message<T>> subscribe() {
        Class payloadType = this.consumerConfiguration.consumerConfiguration().payloadType().orElse(null);
        Tracer tracer = this.tracerFactory.create(TracerType.Subscribe);
        ExecutorService pullExecutor = Executors.newSingleThreadExecutor(JetstreamWorkerThread::new);
        return Multi.createBy().repeating().uni(this::readNextMessage).whilst(message -> true).runSubscriptionOn((Executor)pullExecutor).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0)).flatMap(message -> this.createMulti(message.orElse(null), payloadType, this.context)).onItem().transformToUniAndMerge(message -> tracer.withTrace(message, msg -> msg));
    }

    @Override
    public void close() {
    }

    private Uni<Optional<Message>> readNextMessage() {
        return Uni.createFrom().emitter(emitter -> {
            try {
                Duration maxExpires = this.consumerConfiguration.maxExpires();
                if (maxExpires != null) {
                    emitter.complete(Optional.ofNullable(this.consumerContext.next(maxExpires)));
                } else {
                    emitter.complete(Optional.ofNullable(this.consumerContext.next()));
                }
            }
            catch (JetStreamStatusException e) {
                emitter.fail((Throwable)new PullException(e));
            }
            catch (IllegalStateException e) {
                emitter.complete(Optional.empty());
            }
            catch (InterruptedException e) {
                emitter.fail((Throwable)new PullException(String.format("The reader was interrupted for stream: %s", this.consumerConfiguration.consumerConfiguration().stream()), e));
            }
            catch (Exception exception) {
                emitter.fail((Throwable)new PullException(String.format("Error reading next message from stream: %s", this.consumerConfiguration.consumerConfiguration().stream()), exception));
            }
        });
    }

    private Multi<org.eclipse.microprofile.reactive.messaging.Message<T>> createMulti(Message message, Class<T> payloadType, Context context) {
        if (message == null || message.getData() == null) {
            return Multi.createFrom().empty();
        }
        return Multi.createFrom().item(() -> this.messageMapper.of(message, payloadType, context));
    }

    @Generated
    public PullSubscription(PullConsumerConfiguration<T> consumerConfiguration, ConsumerContext consumerContext, MessageMapper messageMapper, TracerFactory tracerFactory, Context context) {
        this.consumerConfiguration = consumerConfiguration;
        this.consumerContext = consumerContext;
        this.messageMapper = messageMapper;
        this.tracerFactory = tracerFactory;
        this.context = context;
    }
}

