package io.quarkiverse.reactive.messaging.nats.jetstream.util;

import io.nats.client.ConsumerContext;
import io.nats.client.FetchConsumeOptions;
import io.nats.client.FetchConsumer;
import io.nats.client.JetStreamApiException;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.api.PurgeResponse;
import io.nats.client.api.StreamInfo;
import io.nats.client.api.StreamInfoOptions;
import io.quarkiverse.reactive.messaging.nats.NatsConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.JetStreamClient;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.JetStreamPublishConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.JetstreamConsumerConfigurtationFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.io.JetStreamPublisher;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.io.MessageFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.PayloadMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrumenter;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

@ApplicationScoped
/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/util/JetStreamUtility.class */
public class JetStreamUtility {
    private static final Logger logger = Logger.getLogger(JetStreamUtility.class);
    private final NatsConfiguration natsConfiguration;
    private final ExecutionHolder executionHolder;
    private final PayloadMapper payloadMapper;
    private final JetStreamInstrumenter jetStreamInstrumenter;
    private final MessageFactory messageFactory;

    @Inject
    public JetStreamUtility(NatsConfiguration natsConfiguration, ExecutionHolder executionHolder, PayloadMapper payloadMapper, JetStreamInstrumenter jetStreamInstrumenter, MessageFactory messageFactory) {
        this.natsConfiguration = natsConfiguration;
        this.executionHolder = executionHolder;
        this.payloadMapper = payloadMapper;
        this.jetStreamInstrumenter = jetStreamInstrumenter;
        this.messageFactory = messageFactory;
    }

    public JetStreamClient getJetStreamClient() {
        return new JetStreamClient(ConnectionConfiguration.of(this.natsConfiguration), this.executionHolder.vertx());
    }

    public Connection getConnection(JetStreamClient jetStreamClient, Duration duration) {
        return (Connection) jetStreamClient.getOrEstablishConnection().await().atMost(duration);
    }

    public <T> Message<T> publish(Connection connection, Message<T> message, JetStreamPublishConfiguration jetStreamPublishConfiguration) {
        return getJetStreamPublisher().publish(connection, jetStreamPublishConfiguration, message);
    }

    public ConsumerInfo getConsumerInfo(Connection connection, String str, String str2) {
        try {
            return connection.jetStreamManagement().getConsumerInfo(str, str2);
        } catch (IOException | JetStreamApiException e) {
            throw new ConsumerException(e);
        }
    }

    private ConsumerContext getConsumerContext(Connection connection, String str, String str2) {
        try {
            return connection.getStreamContext(str).getConsumerContext(str2);
        } catch (IOException | JetStreamApiException e) {
            throw new ConsumerException(e);
        }
    }

    public <T> ConsumerContext addOrUpdateConsumer(Connection connection, ConsumerConfiguration<T> consumerConfiguration) {
        try {
            ConsumerContext createOrUpdateConsumer = connection.getStreamContext(consumerConfiguration.stream()).createOrUpdateConsumer(new JetstreamConsumerConfigurtationFactory().create(consumerConfiguration));
            connection.flush(Duration.ZERO);
            return createOrUpdateConsumer;
        } catch (IOException | JetStreamApiException e) {
            throw new ConsumerException(e);
        }
    }

    public <T> Optional<Message<T>> nextMessage(Connection connection, ConsumerConfiguration<T> consumerConfiguration) {
        return nextMessage(connection, getConsumerContext(connection, consumerConfiguration.stream(), consumerConfiguration.name().orElseThrow(() -> {
            return new IllegalArgumentException("Consumer name is not configured");
        })), consumerConfiguration);
    }

    public <T> Optional<Message<T>> nextMessage(Connection connection, ConsumerContext consumerContext, ConsumerConfiguration<T> consumerConfiguration) {
        return (Optional<Message<T>>) nextMessage(consumerContext, consumerConfiguration.fetchTimeout().orElse(null)).map(message -> {
            return this.messageFactory.create(message, consumerConfiguration.traceEnabled(), (Class) consumerConfiguration.getPayloadType().orElse(null), connection.context(), new ExponentialBackoff(false, Duration.ZERO), consumerConfiguration.ackTimeout().orElseGet(() -> {
                return Duration.ofSeconds(10L);
            }));
        });
    }

    public List<String> getStreams(Connection connection) {
        try {
            return connection.jetStreamManagement().getStreamNames();
        } catch (IOException | JetStreamApiException e) {
            throw new RuntimeException(e);
        }
    }

    public List<String> getSubjects(Connection connection, String str) {
        return (List) getStreamInfo(connection, str).map(streamInfo -> {
            return streamInfo.getConfiguration().getSubjects();
        }).orElseGet(List::of);
    }

    public List<String> getConsumerNames(Connection connection, String str) {
        try {
            return connection.jetStreamManagement().getConsumerNames(str);
        } catch (IOException | JetStreamApiException e) {
            throw new RuntimeException(e);
        }
    }

    public Optional<PurgeResult> purgeStream(Connection connection, String str) {
        try {
            PurgeResponse purgeStream = connection.jetStreamManagement().purgeStream(str);
            return Optional.of(new PurgeResult(str, purgeStream.isSuccess(), purgeStream.getPurged()));
        } catch (IOException | JetStreamApiException e) {
            logger.debugf(e, "Unable to purge stream %s with message: %s", str, e.getMessage());
            return Optional.empty();
        }
    }

    public void deleteMessage(Connection connection, String str, long j, boolean z) {
        try {
            if (connection.jetStreamManagement().deleteMessage(str, j, z)) {
            } else {
                throw new DeleteException(String.format("Unable to delete message in stream %s with sequence %d", str, Long.valueOf(j)));
            }
        } catch (IOException | JetStreamApiException e) {
            throw new DeleteException(String.format("Unable to delete message in stream %s with sequence %d: %s", str, Long.valueOf(j), e.getMessage()), e);
        }
    }

    public List<PurgeResult> purgeAllStreams(Connection connection) {
        return getStreams(connection).stream().flatMap(str -> {
            return purgeStream(connection, str).stream();
        }).toList();
    }

    public Optional<StreamState> getStreamState(Connection connection, String str) {
        return getStreamInfo(connection, str).map(streamInfo -> {
            return StreamState.of(streamInfo.getStreamState());
        });
    }

    private Optional<io.nats.client.Message> nextMessage(ConsumerContext consumerContext, Duration duration) {
        try {
            FetchConsumer fetchConsumer = fetchConsumer(consumerContext, duration);
            try {
                Optional<io.nats.client.Message> ofNullable = Optional.ofNullable(fetchConsumer.nextMessage());
                if (fetchConsumer != null) {
                    fetchConsumer.close();
                }
                return ofNullable;
            } finally {
            }
        } catch (Exception e) {
            logger.errorf(e, "Failed to fetch message: %s", e.getMessage());
            return Optional.empty();
        }
    }

    private FetchConsumer fetchConsumer(ConsumerContext consumerContext, Duration duration) throws IOException, JetStreamApiException {
        return duration == null ? consumerContext.fetch(FetchConsumeOptions.builder().maxMessages(1).noWait().build()) : consumerContext.fetch(((FetchConsumeOptions.Builder) FetchConsumeOptions.builder().maxMessages(1).expiresIn(duration.toMillis())).build());
    }

    private JetStreamPublisher getJetStreamPublisher() {
        return new JetStreamPublisher(this.payloadMapper, this.jetStreamInstrumenter);
    }

    private Optional<StreamInfo> getStreamInfo(Connection connection, String str) {
        try {
            return Optional.of(connection.jetStreamManagement().getStreamInfo(str, StreamInfoOptions.allSubjects()));
        } catch (IOException | JetStreamApiException e) {
            logger.debugf(e, "Unable to read stream %s with message: %s", str, e.getMessage());
            return Optional.empty();
        }
    }
}
