/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.reactive.messaging.nats.jetstream.client;

import io.nats.client.Connection;
import io.nats.client.ConsumerContext;
import io.nats.client.FetchConsumeOptions;
import io.nats.client.FetchConsumer;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamReader;
import io.nats.client.JetStreamSubscription;
import io.nats.client.KeyValue;
import io.nats.client.KeyValueManagement;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.PublishOptions;
import io.nats.client.StreamContext;
import io.nats.client.api.ConsumerPauseResponse;
import io.nats.client.api.MessageInfo;
import io.nats.client.api.PurgeResponse;
import io.nats.client.api.StreamInfo;
import io.nats.client.api.StreamInfoOptions;
import io.nats.client.impl.Headers;
import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff;
import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamOutgoingMessageMetadata;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionEvent;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.DeleteException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.FetchException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.InternalConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.KeyValueException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.KeyValueNotFoundException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageNotFoundException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.PublishException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.PushSubscription;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ReaderSubscribtion;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.SetupException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Subscription;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.SystemException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.Consumer;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.JetStreamMessage;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PurgeResult;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamResult;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamSetupConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamState;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamStatus;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionOptionsFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConsumerConfigurtationFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.FetchConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.KeyValueConfigurationFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.KeyValueSetupConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PullSubscribeOptionsFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ReaderConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.StreamConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.StreamConfigurationFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.ConsumerMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.MessageMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.PayloadMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.StreamStateMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrument;
import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamTrace;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.mutiny.tuples.Tuple5;
import io.smallrye.mutiny.unchecked.Unchecked;
import io.smallrye.reactive.messaging.tracing.TracingUtils;
import io.vertx.mutiny.core.Context;
import java.io.IOException;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeoutException;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

public class DefaultConnection
implements Connection {
    private static final Logger logger = Logger.getLogger(DefaultConnection.class);
    private final io.nats.client.Connection connection;
    private final List<ConnectionListener> listeners;
    private final Context context;
    private final StreamStateMapper streamStateMapper;
    private final ConsumerMapper consumerMapper;
    private final MessageMapper messageMapper;
    private final PayloadMapper payloadMapper;
    private final JetStreamInstrument instrument;

    DefaultConnection(ConnectionConfiguration configuration, ConnectionListener connectionListener, Context context, MessageMapper messageMapper, PayloadMapper payloadMapper, ConsumerMapper consumerMapper, StreamStateMapper streamStateMapper, JetStreamInstrument instrumenter) throws ConnectionException {
        this.connection = this.connect(configuration);
        this.listeners = new ArrayList<ConnectionListener>(List.of(connectionListener));
        this.context = context;
        this.streamStateMapper = streamStateMapper;
        this.consumerMapper = consumerMapper;
        this.messageMapper = messageMapper;
        this.payloadMapper = payloadMapper;
        this.instrument = instrumenter;
        this.fireEvent(ConnectionEvent.Connected, "Connection established");
    }

    @Override
    public boolean isConnected() {
        return Connection.Status.CONNECTED.equals((Object)this.connection.getStatus());
    }

    @Override
    public Uni<Void> flush(Duration duration) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                this.connection.flush(duration);
                return null;
            }
            catch (InterruptedException | TimeoutException e) {
                throw new ConnectionException(e);
            }
        })).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public List<ConnectionListener> listeners() {
        return this.listeners;
    }

    @Override
    public void addListener(ConnectionListener listener) {
        this.listeners.add(listener);
    }

    @Override
    public void removeListener(ConnectionListener listener) {
        this.listeners.remove(listener);
    }

    @Override
    public void close() {
        new ArrayList<ConnectionListener>(this.listeners).forEach(listener -> {
            try {
                listener.close();
            }
            catch (Throwable failure) {
                logger.warnf(failure, "Error closing listener: %s", (Object)failure.getMessage());
            }
        });
        try {
            this.connection.close();
        }
        catch (Throwable throwable) {
            logger.warnf(throwable, "Error closing connection: %s", (Object)throwable.getMessage());
        }
    }

    @Override
    public Uni<Consumer> getConsumer(String stream, String consumerName) {
        return this.getJetStreamManagement().onItem().transformToUni(jsm -> Uni.createFrom().emitter(emitter -> {
            try {
                emitter.complete((Object)jsm.getConsumerInfo(stream, consumerName));
            }
            catch (JetStreamApiException | IOException e) {
                emitter.fail((Throwable)new SystemException(e));
            }
        })).onItem().transform(this.consumerMapper::of).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public Uni<Void> deleteConsumer(String streamName, String consumerName) {
        return this.getJetStreamManagement().onItem().transformToUni(jsm -> Uni.createFrom().emitter(emitter -> {
            try {
                jsm.deleteConsumer(streamName, consumerName);
                emitter.complete(null);
            }
            catch (Throwable failure) {
                emitter.fail((Throwable)new SystemException(failure));
            }
        })).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public Uni<Void> pauseConsumer(String streamName, String consumerName, ZonedDateTime pauseUntil) {
        return this.getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> Uni.createFrom().emitter(emitter -> {
            try {
                ConsumerPauseResponse response = jetStreamManagement.pauseConsumer(streamName, consumerName, pauseUntil);
                if (!response.isPaused()) {
                    emitter.fail((Throwable)new SystemException(String.format("Unable to pause consumer %s in stream %s", consumerName, streamName)));
                }
                emitter.complete(null);
            }
            catch (Throwable failure) {
                emitter.fail((Throwable)new SystemException(failure));
            }
        })).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public Uni<Void> resumeConsumer(String streamName, String consumerName) {
        return this.getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> Uni.createFrom().emitter(emitter -> {
            try {
                boolean response = jetStreamManagement.resumeConsumer(streamName, consumerName);
                if (!response) {
                    emitter.fail((Throwable)new SystemException(String.format("Unable to resume consumer %s in stream %s", consumerName, streamName)));
                }
                emitter.complete(null);
            }
            catch (Throwable failure) {
                emitter.fail((Throwable)new SystemException(failure));
            }
        })).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public Uni<Long> getFirstSequence(String streamName) {
        return this.getStreamState(streamName).onItem().transform(StreamState::firstSequence);
    }

    @Override
    public Uni<List<String>> getStreams() {
        return this.getJetStreamManagement().onItem().transformToUni(jsm -> Uni.createFrom().item(Unchecked.supplier(() -> ((JetStreamManagement)jsm).getStreamNames()))).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public Uni<List<String>> getSubjects(String streamName) {
        return this.getStreamInfo(streamName).onItem().transform(stream -> stream.getConfiguration().getSubjects()).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public Uni<List<String>> getConsumerNames(String streamName) {
        return this.getJetStreamManagement().onItem().transformToUni(jsm -> Uni.createFrom().emitter(emitter -> {
            try {
                emitter.complete((Object)jsm.getConsumerNames(streamName));
            }
            catch (Throwable failure) {
                emitter.fail((Throwable)new SystemException(failure));
            }
        })).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public Uni<PurgeResult> purgeStream(String streamName) {
        return this.getJetStreamManagement().onItem().transformToUni(jsm -> Uni.createFrom().emitter(emitter -> {
            try {
                PurgeResponse response = jsm.purgeStream(streamName);
                emitter.complete((Object)new PurgeResult(streamName, response.isSuccess(), response.getPurged()));
            }
            catch (Throwable failure) {
                emitter.fail((Throwable)new SystemException(failure));
            }
        })).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public Uni<Void> deleteMessage(String stream, long sequence, boolean erase) {
        return this.getJetStreamManagement().onItem().transformToUni(jsm -> Uni.createFrom().emitter(emitter -> {
            try {
                if (!jsm.deleteMessage(stream, sequence, erase)) {
                    emitter.fail((Throwable)new DeleteException(String.format("Unable to delete message in stream %s with sequence %d", stream, sequence)));
                }
                emitter.complete(null);
            }
            catch (Throwable failure) {
                emitter.fail((Throwable)new DeleteException(String.format("Unable to delete message in stream %s with sequence %d: %s", stream, sequence, failure.getMessage()), failure));
            }
        })).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public Uni<StreamState> getStreamState(String streamName) {
        return this.getStreamInfo(streamName).onItem().transform(streamInfo -> this.streamStateMapper.of(streamInfo.getStreamState())).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public Uni<StreamConfiguration> getStreamConfiguration(String streamName) {
        return this.getStreamInfo(streamName).onItem().transform(streamInfo -> StreamConfiguration.of(streamInfo.getConfiguration())).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public Uni<List<PurgeResult>> purgeAllStreams() {
        return this.getStreams().onItem().transformToUni(this::purgeAllStreams).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public <T> Uni<Message<T>> publish(Message<T> message, PublishConfiguration configuration) {
        return Uni.createFrom().emitter(emitter -> {
            try {
                Optional metadata = message.getMetadata(JetStreamOutgoingMessageMetadata.class);
                String messageId = metadata.map(JetStreamOutgoingMessageMetadata::messageId).orElseGet(() -> UUID.randomUUID().toString());
                byte[] payload = this.payloadMapper.of(message.getPayload());
                String subject = metadata.flatMap(JetStreamOutgoingMessageMetadata::subtopic).map(subtopic -> configuration.subject() + "." + subtopic).orElseGet(configuration::subject);
                HashMap<String, List<String>> headers = new HashMap<String, List<String>>();
                metadata.ifPresent(m -> headers.putAll(m.headers()));
                if (message.getPayload() != null) {
                    headers.putIfAbsent("message.type", List.of(message.getPayload().getClass().getTypeName()));
                }
                if (configuration.traceEnabled()) {
                    TracingUtils.traceOutgoing(this.instrument.publisher(), (Message)message, (Object)new JetStreamTrace(configuration.stream(), subject, messageId, headers, new String(payload)));
                }
                JetStream jetStream = this.connection.jetStream();
                PublishOptions options = this.createPublishOptions(messageId, configuration.stream());
                emitter.complete((Object)Tuple5.of((Object)jetStream, (Object)subject, (Object)this.toJetStreamHeaders(headers), (Object)payload, (Object)options));
            }
            catch (Throwable failure) {
                emitter.fail((Throwable)new PublishException(String.format("Failed to publish message: %s", failure.getMessage()), failure));
            }
        }).onItem().transformToUni(tuple -> Uni.createFrom().completionStage((CompletionStage)((JetStream)tuple.getItem1()).publishAsync((String)tuple.getItem2(), (Headers)tuple.getItem3(), (byte[])tuple.getItem4(), (PublishOptions)tuple.getItem5()))).onItem().invoke(ack -> logger.debugf("Message published to stream: %s with sequence number: %d", (Object)ack.getStream(), (Object)ack.getSeqno())).onItem().transformToUni(ignore -> this.acknowledge(message)).onFailure().recoverWithUni(failure -> this.notAcknowledge(message, (Throwable)failure)).onFailure().transform(failure -> new PublishException(failure.getMessage(), (Throwable)failure)).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public <T> Uni<Message<T>> publish(Message<T> message, PublishConfiguration publishConfiguration, FetchConsumerConfiguration<T> consumerConfiguration) {
        return this.addOrUpdateConsumer(consumerConfiguration).onItem().transformToUni(v -> this.publish(message, publishConfiguration));
    }

    @Override
    public <T> Uni<Message<T>> nextMessage(FetchConsumerConfiguration<T> configuration) {
        return this.addOrUpdateConsumer(configuration).onItem().transformToUni(consumerContext -> this.nextMessage((ConsumerContext)consumerContext, configuration));
    }

    @Override
    public <T> Multi<Message<T>> nextMessages(FetchConsumerConfiguration<T> configuration) {
        return this.addOrUpdateConsumer(configuration).onItem().transformToMulti(consumerContext -> this.nextMessages((ConsumerContext)consumerContext, configuration)).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public <T> Uni<T> getKeyValue(String bucketName, String key, Class<T> valueType) {
        return Uni.createFrom().emitter(emitter -> {
            try {
                KeyValue keyValue = this.connection.keyValue(bucketName);
                emitter.complete((Object)keyValue.get(key));
            }
            catch (Throwable failure) {
                emitter.fail((Throwable)new KeyValueException(failure));
            }
        }).onItem().ifNull().failWith(() -> new KeyValueNotFoundException(bucketName, key)).onItem().ifNotNull().transform(keyValueEntry -> this.payloadMapper.of(keyValueEntry.getValue(), valueType)).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public <T> Uni<Void> putKeyValue(String bucketName, String key, T value) {
        return Uni.createFrom().emitter(emitter -> {
            try {
                KeyValue keyValue = this.connection.keyValue(bucketName);
                keyValue.put(key, this.payloadMapper.of(value));
                emitter.complete(null);
            }
            catch (Throwable failure) {
                emitter.fail((Throwable)new KeyValueException(failure));
            }
        }).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public Uni<Void> deleteKeyValue(String bucketName, String key) {
        return Uni.createFrom().emitter(emitter -> {
            try {
                KeyValue keyValue = this.connection.keyValue(bucketName);
                keyValue.delete(key);
                emitter.complete(null);
            }
            catch (Throwable failure) {
                emitter.fail((Throwable)new KeyValueException(failure));
            }
        }).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public <T> Uni<Message<T>> resolve(String streamName, long sequence) {
        return Uni.createFrom().emitter(emitter -> {
            try {
                JetStream jetStream = this.connection.jetStream();
                StreamContext streamContext = jetStream.getStreamContext(streamName);
                MessageInfo messageInfo = streamContext.getMessage(sequence);
                emitter.complete(new JetStreamMessage<Object>(messageInfo, this.payloadMapper.of(messageInfo).orElse(null)));
            }
            catch (JetStreamApiException | IOException e) {
                emitter.fail(e);
            }
        }).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public <T> Uni<Subscription<T>> subscribtion(PushConsumerConfiguration<T> configuration) {
        return Uni.createFrom().item(() -> new PushSubscription(this, configuration, this.connection, this.messageMapper, this.context));
    }

    @Override
    public <T> Uni<Subscription<T>> subscribtion(ReaderConsumerConfiguration<T> configuration) {
        return this.createSubscription(configuration).onItem().transformToUni(subscription -> this.createReader(configuration, (JetStreamSubscription)subscription)).onItem().transformToUni(pair -> Uni.createFrom().item(Unchecked.supplier(() -> new ReaderSubscribtion(this, configuration, (JetStreamSubscription)pair.getItem1(), (JetStreamReader)pair.getItem2(), this.messageMapper, this.context)))).onItem().invoke(this::addListener);
    }

    @Override
    public <T> void close(Subscription<T> subscription) {
        try {
            subscription.close();
        }
        catch (Throwable failure) {
            logger.warnf(failure, "Failed to close subscription: %s", (Object)failure.getMessage());
        }
        this.removeListener(subscription);
    }

    @Override
    public Uni<Void> addOrUpdateKeyValueStores(List<KeyValueSetupConfiguration> keyValueConfigurations) {
        return Multi.createFrom().items(keyValueConfigurations.stream()).onItem().transformToUniAndMerge(this::addOrUpdateKeyValueStore).collect().last().emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public Uni<List<StreamResult>> addStreams(List<StreamSetupConfiguration> streamConfigurations) {
        return this.getJetStreamManagement().onItem().transformToMulti(jetStreamManagement -> Multi.createFrom().items(streamConfigurations.stream().map(streamConfiguration -> Tuple2.of((Object)jetStreamManagement, (Object)streamConfiguration)))).onItem().transformToUniAndMerge(tuple -> this.addOrUpdateStream((JetStreamManagement)tuple.getItem1(), (StreamSetupConfiguration)tuple.getItem2())).collect().asList().emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    private <T> Uni<Tuple2<JetStreamSubscription, JetStreamReader>> createReader(ReaderConsumerConfiguration<T> configuration, JetStreamSubscription subscription) {
        return Uni.createFrom().item(Unchecked.supplier(() -> subscription.reader(configuration.maxRequestBatch().intValue(), configuration.rePullAt().intValue()))).onItem().transform(reader -> Tuple2.of((Object)subscription, (Object)reader));
    }

    private <T> Uni<JetStreamSubscription> createSubscription(ReaderConsumerConfiguration<T> configuration) {
        return Uni.createFrom().item(Unchecked.supplier(() -> ((io.nats.client.Connection)this.connection).jetStream())).onItem().transformToUni(jetStream -> this.createSubscription((JetStream)jetStream, configuration));
    }

    private <T> Uni<JetStreamSubscription> createSubscription(JetStream jetStream, ReaderConsumerConfiguration<T> configuration) {
        return this.subscribe(jetStream, configuration).onFailure().recoverWithUni(failure -> {
            if (failure instanceof IllegalArgumentException) {
                return this.deleteConsumer(configuration.consumerConfiguration().stream(), configuration.consumerConfiguration().name()).onItem().transformToUni(v -> this.subscribe(jetStream, configuration));
            }
            return Uni.createFrom().failure(failure);
        });
    }

    private <T> Uni<JetStreamSubscription> subscribe(JetStream jetStream, ReaderConsumerConfiguration<T> configuration) {
        return Uni.createFrom().emitter(emitter -> {
            try {
                PullSubscribeOptionsFactory optionsFactory = new PullSubscribeOptionsFactory();
                emitter.complete((Object)jetStream.subscribe(configuration.subject(), optionsFactory.create(configuration)));
            }
            catch (Throwable failure) {
                emitter.fail(failure);
            }
        });
    }

    private Uni<StreamInfo> getStreamInfo(String streamName) {
        return this.getJetStreamManagement().onItem().transformToUni(jsm -> this.getStreamInfo((JetStreamManagement)jsm, streamName));
    }

    private Uni<StreamInfo> getStreamInfo(JetStreamManagement jsm, String streamName) {
        return Uni.createFrom().emitter(emitter -> {
            try {
                emitter.complete((Object)jsm.getStreamInfo(streamName, StreamInfoOptions.allSubjects()));
            }
            catch (Throwable failure) {
                emitter.fail((Throwable)new SystemException(String.format("Unable to read stream %s with message: %s", streamName, failure.getMessage()), failure));
            }
        });
    }

    private Optional<PurgeResult> purgeStream(JetStreamManagement jetStreamManagement, String streamName) {
        try {
            PurgeResponse response = jetStreamManagement.purgeStream(streamName);
            return Optional.of(PurgeResult.builder().streamName(streamName).success(response.isSuccess()).purgeCount(response.getPurged()).build());
        }
        catch (JetStreamApiException | IOException e) {
            logger.warnf(e, "Unable to purge stream %s with message: %s", (Object)streamName, (Object)e.getMessage());
            return Optional.empty();
        }
    }

    private Uni<List<PurgeResult>> purgeAllStreams(List<String> streams) {
        return this.getJetStreamManagement().onItem().transformToUni(jsm -> Uni.createFrom().item(Unchecked.supplier(() -> streams.stream().flatMap(streamName -> this.purgeStream((JetStreamManagement)jsm, (String)streamName).stream()).toList())));
    }

    private Uni<JetStreamManagement> getJetStreamManagement() {
        return Uni.createFrom().emitter(emitter -> {
            try {
                emitter.complete((Object)this.connection.jetStreamManagement());
            }
            catch (Throwable failure) {
                emitter.fail((Throwable)new SystemException(String.format("Unable to manage JetStream: %s", failure.getMessage()), failure));
            }
        });
    }

    private PublishOptions createPublishOptions(String messageId, String streamName) {
        return PublishOptions.builder().messageId(messageId).stream(streamName).build();
    }

    private FetchConsumer fetchConsumer(ConsumerContext consumerContext, Duration timeout) throws IOException, JetStreamApiException {
        if (timeout == null) {
            return consumerContext.fetch(FetchConsumeOptions.builder().maxMessages(1).noWait().build());
        }
        return consumerContext.fetch(((FetchConsumeOptions.Builder)FetchConsumeOptions.builder().maxMessages(1).expiresIn(timeout.toMillis())).build());
    }

    private <T> Uni<Message<T>> acknowledge(Message<T> message) {
        return Uni.createFrom().completionStage(message.ack()).onItem().transform(v -> message);
    }

    private <T> Uni<Message<T>> notAcknowledge(Message<T> message, Throwable throwable) {
        return Uni.createFrom().completionStage(message.nack(throwable)).onItem().invoke(() -> logger.warnf(throwable, "Message not acknowledged: %s", (Object)throwable.getMessage())).onItem().transformToUni(v -> Uni.createFrom().item((Object)message));
    }

    private Uni<io.nats.client.Message> nextMessage(ConsumerContext consumerContext, Duration timeout) {
        return Uni.createFrom().emitter(emitter -> {
            try {
                io.nats.client.Message message = consumerContext.next(timeout);
                if (message != null) {
                    emitter.complete((Object)message);
                } else {
                    emitter.fail((Throwable)new MessageNotFoundException());
                }
            }
            catch (Throwable failure) {
                logger.errorf(failure, "Failed to fetch message: %s", (Object)failure.getMessage());
                emitter.fail((Throwable)new FetchException(failure));
            }
        }).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    private <T> Uni<Message<T>> nextMessage(ConsumerContext consumerContext, FetchConsumerConfiguration<T> configuration) {
        return this.nextMessage(consumerContext, (Duration)configuration.fetchTimeout().orElse(null)).map(message -> this.messageMapper.of((io.nats.client.Message)message, configuration.traceEnabled(), configuration.payloadType().orElse(null), this.context, new ExponentialBackoff(false, Duration.ZERO), configuration.ackTimeout()));
    }

    private Headers toJetStreamHeaders(Map<String, List<String>> headers) {
        Headers result = new Headers();
        headers.forEach((arg_0, arg_1) -> ((Headers)result).add(arg_0, arg_1));
        return result;
    }

    private <T> Uni<ConsumerContext> addOrUpdateConsumer(ConsumerConfiguration<T> configuration) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                ConsumerConfigurtationFactory factory = new ConsumerConfigurtationFactory();
                io.nats.client.api.ConsumerConfiguration consumerConfiguration = factory.create(configuration);
                StreamContext streamContext = this.connection.getStreamContext(configuration.stream());
                ConsumerContext consumerContext = streamContext.createOrUpdateConsumer(consumerConfiguration);
                this.connection.flush(Duration.ZERO);
                return consumerContext;
            }
            catch (Throwable failure) {
                throw new FetchException(failure);
            }
        })).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    private <T> Multi<Message<T>> nextMessages(ConsumerContext consumerContext, FetchConsumerConfiguration<T> configuration) {
        return Multi.createFrom().emitter(emitter -> {
            try (FetchConsumer fetchConsumer = this.fetchConsumer(consumerContext, configuration.fetchTimeout().orElse(null));){
                io.nats.client.Message message = fetchConsumer.nextMessage();
                while (message != null) {
                    emitter.emit(this.messageMapper.of(message, configuration.traceEnabled(), configuration.payloadType().orElse(null), this.context, new ExponentialBackoff(false, Duration.ZERO), configuration.ackTimeout()));
                    message = fetchConsumer.nextMessage();
                }
                emitter.complete();
            }
            catch (Throwable failure) {
                emitter.fail((Throwable)new FetchException(failure));
            }
        }).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    private io.nats.client.Connection connect(ConnectionConfiguration configuration) throws ConnectionException {
        try {
            ConnectionOptionsFactory optionsFactory = new ConnectionOptionsFactory();
            Options options = optionsFactory.create(configuration, new InternalConnectionListener(this));
            return Nats.connect((Options)options);
        }
        catch (Throwable failure) {
            throw new ConnectionException(failure);
        }
    }

    private Uni<Void> addOrUpdateKeyValueStore(KeyValueSetupConfiguration keyValueSetupConfiguration) {
        return Uni.createFrom().emitter(emitter -> {
            try {
                KeyValueManagement kvm = this.connection.keyValueManagement();
                KeyValueConfigurationFactory factory = new KeyValueConfigurationFactory();
                if (kvm.getBucketNames().contains(keyValueSetupConfiguration.bucketName())) {
                    kvm.update(factory.create(keyValueSetupConfiguration));
                } else {
                    kvm.create(factory.create(keyValueSetupConfiguration));
                }
                emitter.complete(null);
            }
            catch (Throwable failure) {
                emitter.fail((Throwable)new SetupException(String.format("Unable to manage Key Value Store: %s", failure.getMessage()), failure));
            }
        });
    }

    private Uni<StreamResult> addOrUpdateStream(JetStreamManagement jsm, StreamSetupConfiguration setupConfiguration) {
        return this.getStreamInfo(jsm, setupConfiguration.configuration().name()).onItem().transformToUni(streamInfo -> this.updateStream(jsm, (StreamInfo)streamInfo, setupConfiguration)).onFailure().recoverWithUni(failure -> this.createStream(jsm, setupConfiguration.configuration()));
    }

    private Uni<StreamResult> createStream(JetStreamManagement jsm, StreamConfiguration streamConfiguration) {
        return Uni.createFrom().emitter(emitter -> {
            try {
                StreamConfigurationFactory factory = new StreamConfigurationFactory();
                io.nats.client.api.StreamConfiguration streamConfig = factory.create(streamConfiguration);
                jsm.addStream(streamConfig);
                emitter.complete((Object)StreamResult.builder().configuration(streamConfiguration).status(StreamStatus.Created).build());
            }
            catch (Throwable failure) {
                emitter.fail((Throwable)new SetupException(String.format("Unable to create stream: %s with message: %s", streamConfiguration.name(), failure.getMessage()), failure));
            }
        });
    }

    private Uni<StreamResult> updateStream(JetStreamManagement jsm, StreamInfo streamInfo, StreamSetupConfiguration setupConfiguration) {
        return Uni.createFrom().emitter(emitter -> {
            try {
                io.nats.client.api.StreamConfiguration currentConfiguration = streamInfo.getConfiguration();
                StreamConfigurationFactory factory = new StreamConfigurationFactory();
                Optional<io.nats.client.api.StreamConfiguration> configuration = factory.create(currentConfiguration, setupConfiguration.configuration());
                if (configuration.isPresent()) {
                    logger.debugf("Updating stream %s", (Object)setupConfiguration.configuration().name());
                    jsm.updateStream(configuration.get());
                    emitter.complete((Object)StreamResult.builder().configuration(setupConfiguration.configuration()).status(StreamStatus.Updated).build());
                } else {
                    emitter.complete((Object)StreamResult.builder().configuration(setupConfiguration.configuration()).status(StreamStatus.NotModified).build());
                }
            }
            catch (Throwable failure) {
                logger.errorf(failure, "message: %s", (Object)failure.getMessage());
                emitter.fail((Throwable)new SetupException(String.format("Unable to update stream: %s with message: %s", setupConfiguration.configuration().name(), failure.getMessage()), failure));
            }
        }).onFailure().recoverWithUni(failure -> {
            if (failure.getCause() instanceof JetStreamApiException && setupConfiguration.overwrite()) {
                return this.deleteStream(jsm, setupConfiguration.configuration().name()).onItem().transformToUni(v -> this.createStream(jsm, setupConfiguration.configuration()));
            }
            return Uni.createFrom().failure(failure);
        });
    }

    private Uni<Void> deleteStream(JetStreamManagement jsm, String streamName) {
        return Uni.createFrom().emitter(emitter -> {
            try {
                jsm.deleteStream(streamName);
                emitter.complete(null);
            }
            catch (Throwable failure) {
                emitter.fail((Throwable)new SetupException(String.format("Unable to delete stream: %s with message: %s", streamName, failure.getMessage()), failure));
            }
        });
    }
}

