package io.quarkiverse.reactive.messaging.nats.jetstream.client.io;

import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamStatusException;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.JetStreamReaderConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PullSubscribeOptionsFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/client/io/JetStreamReader.class */
public class JetStreamReader implements AutoCloseable {
    private static final Logger logger = Logger.getLogger(JetStreamReader.class);
    private final JetStreamReaderConsumerConfiguration configuration;
    private final io.nats.client.JetStreamReader reader;
    private final JetStreamSubscription subscription;

    private JetStreamReader(JetStreamReaderConsumerConfiguration jetStreamReaderConsumerConfiguration, JetStreamSubscription jetStreamSubscription, io.nats.client.JetStreamReader jetStreamReader) {
        this.configuration = jetStreamReaderConsumerConfiguration;
        this.subscription = jetStreamSubscription;
        this.reader = jetStreamReader;
    }

    public static JetStreamReader of(Connection connection, JetStreamReaderConsumerConfiguration jetStreamReaderConsumerConfiguration) {
        try {
            JetStreamSubscription subscribe = connection.jetStream().subscribe(jetStreamReaderConsumerConfiguration.subject(), new PullSubscribeOptionsFactory().create(jetStreamReaderConsumerConfiguration));
            return new JetStreamReader(jetStreamReaderConsumerConfiguration, subscribe, subscribe.reader(jetStreamReaderConsumerConfiguration.maxRequestBatch().intValue(), jetStreamReaderConsumerConfiguration.rePullAt().intValue()));
        } catch (IOException | JetStreamApiException e) {
            throw new JetStreamReaderException(e);
        }
    }

    public boolean isActive() {
        return this.subscription.isActive();
    }

    public Optional<Message> nextMessage() {
        try {
            return Optional.ofNullable(this.reader.nextMessage(this.configuration.maxRequestExpires().orElse(Duration.ZERO)));
        } catch (JetStreamStatusException e) {
            logger.debugf(e, e.getMessage(), new Object[0]);
            return Optional.empty();
        } catch (IllegalStateException e2) {
            logger.debugf(e2, "The subscription become inactive for stream: %s and subject: %s", this.configuration.consumerConfiguration().stream(), this.configuration.subject());
            return Optional.empty();
        } catch (InterruptedException e3) {
            logger.debugf(e3, "The reader was interrupted for stream: %s and subject: %s", this.configuration.consumerConfiguration().stream(), this.configuration.subject());
            return Optional.empty();
        } catch (Throwable th) {
            logger.warnf(th, "Error reading next message from stream: %s and subject: %s", this.configuration.consumerConfiguration().stream(), this.configuration.subject());
            return Optional.empty();
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        try {
            this.reader.stop();
        } catch (Exception e) {
            logger.warnf("Failed to stop reader with message %s", e.getMessage());
        }
        try {
            if (this.subscription.isActive()) {
                this.subscription.drain(Duration.ofMillis(1000L));
            }
        } catch (IllegalStateException | InterruptedException e2) {
            logger.warnf("Interrupted while draining subscription", new Object[0]);
        }
        try {
            if (this.subscription.isActive()) {
                this.subscription.unsubscribe();
            }
        } catch (IllegalStateException e3) {
            logger.warnf("Failed to unsubscribe subscription with message %s", e3.getMessage());
        }
    }
}
