package io.quarkiverse.reactive.messaging.nats.jetstream.client;

import io.nats.client.Connection;
import io.nats.client.ConsumerContext;
import io.nats.client.FetchConsumeOptions;
import io.nats.client.FetchConsumer;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamReader;
import io.nats.client.JetStreamSubscription;
import io.nats.client.KeyValueManagement;
import io.nats.client.Nats;
import io.nats.client.PublishOptions;
import io.nats.client.api.MessageInfo;
import io.nats.client.api.PurgeResponse;
import io.nats.client.api.StreamInfo;
import io.nats.client.api.StreamInfoOptions;
import io.nats.client.impl.Headers;
import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff;
import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamOutgoingMessageMetadata;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.Consumer;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.JetStreamMessage;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PurgeResult;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamResult;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamSetupConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamState;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamStatus;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionOptionsFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConsumerConfigurtationFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.FetchConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.KeyValueConfigurationFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.KeyValueSetupConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PullSubscribeOptionsFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ReaderConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.StreamConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.StreamConfigurationFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.ConsumerMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.DefaultMessageMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.MessageMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.PayloadMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.StreamStateMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrument;
import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamTrace;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniCreate;
import io.smallrye.mutiny.groups.UniOnItem;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.mutiny.tuples.Tuple5;
import io.smallrye.mutiny.unchecked.Unchecked;
import io.smallrye.reactive.messaging.tracing.TracingUtils;
import io.vertx.mutiny.core.Context;
import java.io.IOException;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.class */
public class DefaultConnection implements Connection {
    private static final Logger logger = Logger.getLogger(DefaultConnection.class);
    private final io.nats.client.Connection connection;
    private final List<ConnectionListener> listeners;
    private final Context context;
    private final StreamStateMapper streamStateMapper;
    private final ConsumerMapper consumerMapper;
    private final MessageMapper messageMapper;
    private final PayloadMapper payloadMapper;
    private final JetStreamInstrument instrument;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultConnection(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener, Context context, MessageMapper messageMapper, PayloadMapper payloadMapper, ConsumerMapper consumerMapper, StreamStateMapper streamStateMapper, JetStreamInstrument jetStreamInstrument) throws ConnectionException {
        this.connection = connect(connectionConfiguration);
        this.listeners = new ArrayList(List.of(connectionListener));
        this.context = context;
        this.streamStateMapper = streamStateMapper;
        this.consumerMapper = consumerMapper;
        this.messageMapper = messageMapper;
        this.payloadMapper = payloadMapper;
        this.instrument = jetStreamInstrument;
        fireEvent(ConnectionEvent.Connected, "Connection established");
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public boolean isConnected() {
        return Connection.Status.CONNECTED.equals(this.connection.getStatus());
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Void> flush(Duration duration) {
        Uni item = Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                this.connection.flush(duration);
                return null;
            } catch (InterruptedException | TimeoutException e) {
                throw new ConnectionException(e);
            }
        }));
        Context context = this.context;
        Objects.requireNonNull(context);
        return item.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public List<ConnectionListener> listeners() {
        return this.listeners;
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public void addListener(ConnectionListener connectionListener) {
        this.listeners.add(connectionListener);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public void removeListener(ConnectionListener connectionListener) {
        this.listeners.remove(connectionListener);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        new ArrayList(this.listeners).forEach(connectionListener -> {
            try {
                connectionListener.close();
            } catch (Throwable th) {
                logger.warnf(th, "Error closing listener: %s", th.getMessage());
            }
        });
        try {
            this.connection.close();
        } catch (Throwable th) {
            logger.warnf(th, "Error closing connection: %s", th.getMessage());
        }
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Consumer> getConsumer(String str, String str2) {
        UniOnItem onItem = getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> {
            return Uni.createFrom().emitter(uniEmitter -> {
                try {
                    uniEmitter.complete(jetStreamManagement.getConsumerInfo(str, str2));
                } catch (IOException | JetStreamApiException e) {
                    uniEmitter.fail(new SystemException(e));
                }
            });
        }).onItem();
        ConsumerMapper consumerMapper = this.consumerMapper;
        Objects.requireNonNull(consumerMapper);
        Uni transform = onItem.transform(consumerMapper::of);
        Context context = this.context;
        Objects.requireNonNull(context);
        return transform.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Void> deleteConsumer(String str, String str2) {
        Uni transformToUni = getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> {
            return Uni.createFrom().emitter(uniEmitter -> {
                try {
                    jetStreamManagement.deleteConsumer(str, str2);
                    uniEmitter.complete((Object) null);
                } catch (Throwable th) {
                    uniEmitter.fail(new SystemException(th));
                }
            });
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return transformToUni.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Void> pauseConsumer(String str, String str2, ZonedDateTime zonedDateTime) {
        Uni transformToUni = getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> {
            return Uni.createFrom().emitter(uniEmitter -> {
                try {
                    if (!jetStreamManagement.pauseConsumer(str, str2, zonedDateTime).isPaused()) {
                        uniEmitter.fail(new SystemException(String.format("Unable to pause consumer %s in stream %s", str2, str)));
                    }
                    uniEmitter.complete((Object) null);
                } catch (Throwable th) {
                    uniEmitter.fail(new SystemException(th));
                }
            });
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return transformToUni.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Void> resumeConsumer(String str, String str2) {
        Uni transformToUni = getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> {
            return Uni.createFrom().emitter(uniEmitter -> {
                try {
                    if (!jetStreamManagement.resumeConsumer(str, str2)) {
                        uniEmitter.fail(new SystemException(String.format("Unable to resume consumer %s in stream %s", str2, str)));
                    }
                    uniEmitter.complete((Object) null);
                } catch (Throwable th) {
                    uniEmitter.fail(new SystemException(th));
                }
            });
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return transformToUni.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Long> getFirstSequence(String str) {
        Uni transform = getStreamInfo(str).onItem().transform(tuple2 -> {
            return Long.valueOf(((StreamInfo) tuple2.getItem2()).getStreamState().getFirstSequence());
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return transform.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<List<String>> getStreams() {
        Uni transformToUni = getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> {
            UniCreate createFrom = Uni.createFrom();
            Objects.requireNonNull(jetStreamManagement);
            return createFrom.item(Unchecked.supplier(jetStreamManagement::getStreamNames));
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return transformToUni.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<List<String>> getSubjects(String str) {
        Uni transform = getStreamInfo(str).onItem().transform(tuple2 -> {
            return ((StreamInfo) tuple2.getItem2()).getConfiguration().getSubjects();
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return transform.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<List<String>> getConsumerNames(String str) {
        Uni transformToUni = getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> {
            return Uni.createFrom().emitter(uniEmitter -> {
                try {
                    uniEmitter.complete(jetStreamManagement.getConsumerNames(str));
                } catch (Throwable th) {
                    uniEmitter.fail(new SystemException(th));
                }
            });
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return transformToUni.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<PurgeResult> purgeStream(String str) {
        Uni transformToUni = getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> {
            return Uni.createFrom().emitter(uniEmitter -> {
                try {
                    PurgeResponse purgeStream = jetStreamManagement.purgeStream(str);
                    uniEmitter.complete(new PurgeResult(str, purgeStream.isSuccess(), purgeStream.getPurged()));
                } catch (Throwable th) {
                    uniEmitter.fail(new SystemException(th));
                }
            });
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return transformToUni.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Void> deleteMessage(String str, long j, boolean z) {
        Uni transformToUni = getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> {
            return Uni.createFrom().emitter(uniEmitter -> {
                try {
                    if (!jetStreamManagement.deleteMessage(str, j, z)) {
                        uniEmitter.fail(new DeleteException(String.format("Unable to delete message in stream %s with sequence %d", str, Long.valueOf(j))));
                    }
                    uniEmitter.complete((Object) null);
                } catch (Throwable th) {
                    uniEmitter.fail(new DeleteException(String.format("Unable to delete message in stream %s with sequence %d: %s", str, Long.valueOf(j), th.getMessage()), th));
                }
            });
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return transformToUni.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<StreamState> getStreamState(String str) {
        Uni transform = getStreamInfo(str).onItem().transform(tuple2 -> {
            return this.streamStateMapper.of(((StreamInfo) tuple2.getItem2()).getStreamState());
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return transform.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<StreamConfiguration> getStreamConfiguration(String str) {
        Uni transform = getStreamInfo(str).onItem().transform(tuple2 -> {
            return StreamConfiguration.of(((StreamInfo) tuple2.getItem2()).getConfiguration());
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return transform.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<List<PurgeResult>> purgeAllStreams() {
        Uni transformToUni = getStreams().onItem().transformToUni(this::purgeAllStreams);
        Context context = this.context;
        Objects.requireNonNull(context);
        return transformToUni.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public <T> Uni<Message<T>> publish(Message<T> message, PublishConfiguration publishConfiguration) {
        Uni transform = Uni.createFrom().emitter(uniEmitter -> {
            try {
                Optional metadata = message.getMetadata(JetStreamOutgoingMessageMetadata.class);
                String str = (String) metadata.map((v0) -> {
                    return v0.messageId();
                }).orElseGet(() -> {
                    return UUID.randomUUID().toString();
                });
                byte[] of = this.payloadMapper.of(message.getPayload());
                Optional map = metadata.flatMap((v0) -> {
                    return v0.subtopic();
                }).map(str2 -> {
                    return publishConfiguration.subject() + "." + str2;
                });
                Objects.requireNonNull(publishConfiguration);
                String str3 = (String) map.orElseGet(publishConfiguration::subject);
                HashMap hashMap = new HashMap();
                metadata.ifPresent(jetStreamOutgoingMessageMetadata -> {
                    hashMap.putAll(jetStreamOutgoingMessageMetadata.headers());
                });
                if (message.getPayload() != null) {
                    hashMap.putIfAbsent(DefaultMessageMapper.MESSAGE_TYPE_HEADER, List.of(message.getPayload().getClass().getTypeName()));
                }
                if (publishConfiguration.traceEnabled()) {
                    TracingUtils.traceOutgoing(this.instrument.publisher(), message, JetStreamTrace.builder().stream(publishConfiguration.stream()).subject(str3).messageId(str).headers(hashMap).payload(new String(of)).build());
                }
                uniEmitter.complete(Tuple5.of(this.connection.jetStream(), str3, toJetStreamHeaders(hashMap), of, createPublishOptions(str, publishConfiguration.stream())));
            } catch (Throwable th) {
                uniEmitter.fail(new PublishException(String.format("Failed to publish message: %s", th.getMessage()), th));
            }
        }).onItem().transformToUni(tuple5 -> {
            return Uni.createFrom().completionStage(((JetStream) tuple5.getItem1()).publishAsync((String) tuple5.getItem2(), (Headers) tuple5.getItem3(), (byte[]) tuple5.getItem4(), (PublishOptions) tuple5.getItem5()));
        }).onItem().invoke(publishAck -> {
            logger.debugf("Message published to stream: %s with sequence number: %d", publishAck.getStream(), Long.valueOf(publishAck.getSeqno()));
        }).onItem().transformToUni(publishAck2 -> {
            return acknowledge(message);
        }).onFailure().recoverWithUni(th -> {
            return notAcknowledge(message, th);
        }).onFailure().transform(th2 -> {
            return new PublishException(th2.getMessage(), th2);
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return transform.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public <T> Uni<Message<T>> publish(Message<T> message, PublishConfiguration publishConfiguration, FetchConsumerConfiguration<T> fetchConsumerConfiguration) {
        return addOrUpdateConsumer(fetchConsumerConfiguration).onItem().transformToUni(consumerContext -> {
            return publish(message, publishConfiguration);
        });
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public <T> Uni<Message<T>> nextMessage(FetchConsumerConfiguration<T> fetchConsumerConfiguration) {
        Uni runSubscriptionOn = addOrUpdateConsumer(fetchConsumerConfiguration).onItem().transformToUni(consumerContext -> {
            return nextMessage(consumerContext, fetchConsumerConfiguration);
        }).runSubscriptionOn(Executors.newSingleThreadExecutor(JetstreamWorkerThread::new));
        Context context = this.context;
        Objects.requireNonNull(context);
        return runSubscriptionOn.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public <T> Multi<Message<T>> nextMessages(FetchConsumerConfiguration<T> fetchConsumerConfiguration) {
        Multi runSubscriptionOn = addOrUpdateConsumer(fetchConsumerConfiguration).onItem().transformToMulti(consumerContext -> {
            return nextMessages(consumerContext, fetchConsumerConfiguration);
        }).runSubscriptionOn(Executors.newSingleThreadExecutor(JetstreamWorkerThread::new));
        Context context = this.context;
        Objects.requireNonNull(context);
        return runSubscriptionOn.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public <T> Uni<T> getKeyValue(String str, String str2, Class<T> cls) {
        Uni transform = Uni.createFrom().emitter(uniEmitter -> {
            try {
                uniEmitter.complete(this.connection.keyValue(str).get(str2));
            } catch (Throwable th) {
                uniEmitter.fail(new KeyValueException(th));
            }
        }).onItem().ifNull().failWith(() -> {
            return new KeyValueNotFoundException(str, str2);
        }).onItem().ifNotNull().transform(keyValueEntry -> {
            return this.payloadMapper.of(keyValueEntry.getValue(), cls);
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return transform.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public <T> Uni<Void> putKeyValue(String str, String str2, T t) {
        Uni emitter = Uni.createFrom().emitter(uniEmitter -> {
            try {
                this.connection.keyValue(str).put(str2, this.payloadMapper.of(t));
                uniEmitter.complete((Object) null);
            } catch (Throwable th) {
                uniEmitter.fail(new KeyValueException(th));
            }
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return emitter.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Void> deleteKeyValue(String str, String str2) {
        Uni emitter = Uni.createFrom().emitter(uniEmitter -> {
            try {
                this.connection.keyValue(str).delete(str2);
                uniEmitter.complete((Object) null);
            } catch (Throwable th) {
                uniEmitter.fail(new KeyValueException(th));
            }
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return emitter.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public <T> Uni<Message<T>> resolve(String str, long j) {
        Uni emitter = Uni.createFrom().emitter(uniEmitter -> {
            try {
                MessageInfo message = this.connection.jetStream().getStreamContext(str).getMessage(j);
                uniEmitter.complete(new JetStreamMessage(message, this.payloadMapper.of(message).orElse(null)));
            } catch (IOException | JetStreamApiException e) {
                uniEmitter.fail(e);
            }
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return emitter.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public <T> Uni<Subscription<T>> subscription(PushConsumerConfiguration<T> pushConsumerConfiguration) {
        return Uni.createFrom().item(() -> {
            return new PushSubscription(this, pushConsumerConfiguration, this.connection, this.messageMapper, this.context);
        });
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public <T> Uni<Subscription<T>> subscription(ReaderConsumerConfiguration<T> readerConsumerConfiguration) {
        return createSubscription(readerConsumerConfiguration).onItem().transformToUni(jetStreamSubscription -> {
            return createReader(readerConsumerConfiguration, jetStreamSubscription);
        }).onItem().transformToUni(tuple2 -> {
            return Uni.createFrom().item(Unchecked.supplier(() -> {
                return new ReaderSubscribtion(this, readerConsumerConfiguration, (JetStreamSubscription) tuple2.getItem1(), (JetStreamReader) tuple2.getItem2(), this.messageMapper, this.context);
            }));
        }).onItem().invoke((v1) -> {
            addListener(v1);
        });
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public <T> void close(Subscription<T> subscription) {
        try {
            subscription.close();
        } catch (Throwable th) {
            logger.warnf(th, "Failed to close subscription: %s", th.getMessage());
        }
        removeListener(subscription);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.KeyValueStoreSetup
    public Uni<Void> addOrUpdateKeyValueStores(List<KeyValueSetupConfiguration> list) {
        Uni last = Multi.createFrom().items(list.stream()).onItem().transformToUniAndMerge(this::addOrUpdateKeyValueStore).collect().last();
        Context context = this.context;
        Objects.requireNonNull(context);
        return last.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.StreamSetup
    public Uni<List<StreamResult>> addStreams(List<StreamSetupConfiguration> list) {
        Uni asList = getJetStreamManagement().onItem().transformToMulti(jetStreamManagement -> {
            return Multi.createFrom().items(list.stream().map(streamSetupConfiguration -> {
                return Tuple2.of(jetStreamManagement, streamSetupConfiguration);
            }));
        }).onItem().transformToUniAndMerge(tuple2 -> {
            return addOrUpdateStream((JetStreamManagement) tuple2.getItem1(), (StreamSetupConfiguration) tuple2.getItem2());
        }).collect().asList();
        Context context = this.context;
        Objects.requireNonNull(context);
        return asList.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Void> addSubject(String str, String str2) {
        Uni transformToUni = getStreamInfo(str).onItem().transformToUni(tuple2 -> {
            return Uni.createFrom().emitter(uniEmitter -> {
                try {
                    HashSet hashSet = new HashSet(((StreamInfo) tuple2.getItem2()).getConfiguration().getSubjects());
                    if (!hashSet.contains(str2)) {
                        hashSet.add(str2);
                        ((JetStreamManagement) tuple2.getItem1()).updateStream(io.nats.client.api.StreamConfiguration.builder(((StreamInfo) tuple2.getItem2()).getConfiguration()).subjects(hashSet).build());
                    }
                    uniEmitter.complete((Object) null);
                } catch (Throwable th) {
                    uniEmitter.fail(new SystemException(th));
                }
            });
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return transformToUni.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Void> removeSubject(String str, String str2) {
        Uni transformToUni = getStreamInfo(str).onItem().transformToUni(tuple2 -> {
            return Uni.createFrom().emitter(uniEmitter -> {
                try {
                    HashSet hashSet = new HashSet(((StreamInfo) tuple2.getItem2()).getConfiguration().getSubjects());
                    if (hashSet.contains(str2)) {
                        hashSet.remove(str2);
                        ((JetStreamManagement) tuple2.getItem1()).updateStream(io.nats.client.api.StreamConfiguration.builder(((StreamInfo) tuple2.getItem2()).getConfiguration()).subjects(hashSet).build());
                    }
                    uniEmitter.complete((Object) null);
                } catch (Throwable th) {
                    uniEmitter.fail(new SystemException(th));
                }
            });
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return transformToUni.emitOn(context::runOnContext);
    }

    private <T> Uni<Tuple2<JetStreamSubscription, JetStreamReader>> createReader(ReaderConsumerConfiguration<T> readerConsumerConfiguration, JetStreamSubscription jetStreamSubscription) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            return jetStreamSubscription.reader(readerConsumerConfiguration.maxRequestBatch().intValue(), readerConsumerConfiguration.rePullAt().intValue());
        })).onItem().transform(jetStreamReader -> {
            return Tuple2.of(jetStreamSubscription, jetStreamReader);
        });
    }

    private <T> Uni<JetStreamSubscription> createSubscription(ReaderConsumerConfiguration<T> readerConsumerConfiguration) {
        UniCreate createFrom = Uni.createFrom();
        io.nats.client.Connection connection = this.connection;
        Objects.requireNonNull(connection);
        return createFrom.item(Unchecked.supplier(connection::jetStream)).onItem().transformToUni(jetStream -> {
            return createSubscription(jetStream, readerConsumerConfiguration);
        });
    }

    private <T> Uni<JetStreamSubscription> createSubscription(JetStream jetStream, ReaderConsumerConfiguration<T> readerConsumerConfiguration) {
        return subscribe(jetStream, readerConsumerConfiguration).onFailure().recoverWithUni(th -> {
            return th instanceof IllegalArgumentException ? deleteConsumer(readerConsumerConfiguration.consumerConfiguration().stream(), readerConsumerConfiguration.consumerConfiguration().name()).onItem().transformToUni(r7 -> {
                return subscribe(jetStream, readerConsumerConfiguration);
            }) : Uni.createFrom().failure(th);
        });
    }

    private <T> Uni<JetStreamSubscription> subscribe(JetStream jetStream, ReaderConsumerConfiguration<T> readerConsumerConfiguration) {
        return Uni.createFrom().emitter(uniEmitter -> {
            try {
                uniEmitter.complete(jetStream.subscribe(readerConsumerConfiguration.subject(), new PullSubscribeOptionsFactory().create(readerConsumerConfiguration)));
            } catch (Throwable th) {
                uniEmitter.fail(th);
            }
        });
    }

    private Uni<Tuple2<JetStreamManagement, StreamInfo>> getStreamInfo(String str) {
        return getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> {
            return getStreamInfo(jetStreamManagement, str);
        });
    }

    private Uni<Tuple2<JetStreamManagement, StreamInfo>> getStreamInfo(JetStreamManagement jetStreamManagement, String str) {
        return Uni.createFrom().emitter(uniEmitter -> {
            try {
                uniEmitter.complete(Tuple2.of(jetStreamManagement, jetStreamManagement.getStreamInfo(str, StreamInfoOptions.allSubjects())));
            } catch (Throwable th) {
                uniEmitter.fail(new SystemException(String.format("Unable to read stream %s with message: %s", str, th.getMessage()), th));
            }
        });
    }

    private Optional<PurgeResult> purgeStream(JetStreamManagement jetStreamManagement, String str) {
        try {
            PurgeResponse purgeStream = jetStreamManagement.purgeStream(str);
            return Optional.of(PurgeResult.builder().streamName(str).success(purgeStream.isSuccess()).purgeCount(purgeStream.getPurged()).build());
        } catch (IOException | JetStreamApiException e) {
            logger.warnf(e, "Unable to purge stream %s with message: %s", str, e.getMessage());
            return Optional.empty();
        }
    }

    private Uni<List<PurgeResult>> purgeAllStreams(List<String> list) {
        return getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> {
            return Uni.createFrom().item(Unchecked.supplier(() -> {
                return list.stream().flatMap(str -> {
                    return purgeStream(jetStreamManagement, str).stream();
                }).toList();
            }));
        });
    }

    private Uni<JetStreamManagement> getJetStreamManagement() {
        return Uni.createFrom().emitter(uniEmitter -> {
            try {
                uniEmitter.complete(this.connection.jetStreamManagement());
            } catch (Throwable th) {
                uniEmitter.fail(new SystemException(String.format("Unable to manage JetStream: %s", th.getMessage()), th));
            }
        });
    }

    private PublishOptions createPublishOptions(String str, String str2) {
        return PublishOptions.builder().messageId(str).stream(str2).build();
    }

    private FetchConsumer fetchConsumer(ConsumerContext consumerContext, Duration duration) throws IOException, JetStreamApiException {
        return duration == null ? consumerContext.fetch(FetchConsumeOptions.builder().maxMessages(1).noWait().build()) : consumerContext.fetch(((FetchConsumeOptions.Builder) FetchConsumeOptions.builder().maxMessages(1).expiresIn(duration.toMillis())).build());
    }

    private <T> Uni<Message<T>> acknowledge(Message<T> message) {
        return Uni.createFrom().completionStage(message.ack()).onItem().transform(r3 -> {
            return message;
        });
    }

    private <T> Uni<Message<T>> notAcknowledge(Message<T> message, Throwable th) {
        return Uni.createFrom().completionStage(message.nack(th)).onItem().invoke(() -> {
            logger.warnf(th, "Message not acknowledged: %s", th.getMessage());
        }).onItem().transformToUni(r4 -> {
            return Uni.createFrom().item(message);
        });
    }

    private Uni<io.nats.client.Message> nextMessage(ConsumerContext consumerContext, Duration duration) {
        return Uni.createFrom().emitter(uniEmitter -> {
            try {
                FetchConsumer fetchConsumer = fetchConsumer(consumerContext, duration);
                try {
                    io.nats.client.Message nextMessage = fetchConsumer.nextMessage();
                    if (nextMessage != null) {
                        uniEmitter.complete(nextMessage);
                    } else {
                        uniEmitter.fail(new MessageNotFoundException());
                    }
                    if (fetchConsumer != null) {
                        fetchConsumer.close();
                    }
                } finally {
                }
            } catch (Throwable th) {
                logger.errorf(th, "Failed to fetch message: %s", th.getMessage());
                uniEmitter.fail(new FetchException(th));
            }
        });
    }

    private <T> Uni<Message<T>> nextMessage(ConsumerContext consumerContext, FetchConsumerConfiguration<T> fetchConsumerConfiguration) {
        return nextMessage(consumerContext, fetchConsumerConfiguration.fetchTimeout().orElse(null)).map(message -> {
            return this.messageMapper.of(message, fetchConsumerConfiguration.traceEnabled(), (Class) fetchConsumerConfiguration.payloadType().orElse(null), this.context, new ExponentialBackoff(false, Duration.ZERO), fetchConsumerConfiguration.ackTimeout());
        });
    }

    private Headers toJetStreamHeaders(Map<String, List<String>> map) {
        Headers headers = new Headers();
        Objects.requireNonNull(headers);
        map.forEach((v1, v2) -> {
            r1.add(v1, v2);
        });
        return headers;
    }

    private <T> Uni<ConsumerContext> addOrUpdateConsumer(ConsumerConfiguration<T> consumerConfiguration) {
        Uni item = Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                ConsumerContext createOrUpdateConsumer = this.connection.getStreamContext(consumerConfiguration.stream()).createOrUpdateConsumer(new ConsumerConfigurtationFactory().create(consumerConfiguration));
                this.connection.flush(Duration.ZERO);
                return createOrUpdateConsumer;
            } catch (Throwable th) {
                throw new FetchException(th);
            }
        }));
        Context context = this.context;
        Objects.requireNonNull(context);
        return item.emitOn(context::runOnContext);
    }

    private <T> Multi<Message<T>> nextMessages(ConsumerContext consumerContext, FetchConsumerConfiguration<T> fetchConsumerConfiguration) {
        Multi emitter = Multi.createFrom().emitter(multiEmitter -> {
            try {
                FetchConsumer fetchConsumer = fetchConsumer(consumerContext, fetchConsumerConfiguration.fetchTimeout().orElse(null));
                try {
                    for (io.nats.client.Message nextMessage = fetchConsumer.nextMessage(); nextMessage != null; nextMessage = fetchConsumer.nextMessage()) {
                        multiEmitter.emit(this.messageMapper.of(nextMessage, fetchConsumerConfiguration.traceEnabled(), (Class) fetchConsumerConfiguration.payloadType().orElse(null), this.context, new ExponentialBackoff(false, Duration.ZERO), fetchConsumerConfiguration.ackTimeout()));
                    }
                    multiEmitter.complete();
                    if (fetchConsumer != null) {
                        fetchConsumer.close();
                    }
                } finally {
                }
            } catch (Throwable th) {
                multiEmitter.fail(new FetchException(th));
            }
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return emitter.emitOn(context::runOnContext);
    }

    private io.nats.client.Connection connect(ConnectionConfiguration connectionConfiguration) throws ConnectionException {
        try {
            return Nats.connect(new ConnectionOptionsFactory().create(connectionConfiguration, new InternalConnectionListener(this)));
        } catch (Throwable th) {
            throw new ConnectionException(th);
        }
    }

    private Uni<Void> addOrUpdateKeyValueStore(KeyValueSetupConfiguration keyValueSetupConfiguration) {
        return Uni.createFrom().emitter(uniEmitter -> {
            try {
                KeyValueManagement keyValueManagement = this.connection.keyValueManagement();
                KeyValueConfigurationFactory keyValueConfigurationFactory = new KeyValueConfigurationFactory();
                if (keyValueManagement.getBucketNames().contains(keyValueSetupConfiguration.bucketName())) {
                    keyValueManagement.update(keyValueConfigurationFactory.create(keyValueSetupConfiguration));
                } else {
                    keyValueManagement.create(keyValueConfigurationFactory.create(keyValueSetupConfiguration));
                }
                uniEmitter.complete((Object) null);
            } catch (Throwable th) {
                uniEmitter.fail(new SetupException(String.format("Unable to manage Key Value Store: %s", th.getMessage()), th));
            }
        });
    }

    private Uni<StreamResult> addOrUpdateStream(JetStreamManagement jetStreamManagement, StreamSetupConfiguration streamSetupConfiguration) {
        return getStreamInfo(jetStreamManagement, streamSetupConfiguration.configuration().name()).onItem().transformToUni(tuple2 -> {
            return updateStream((JetStreamManagement) tuple2.getItem1(), (StreamInfo) tuple2.getItem2(), streamSetupConfiguration);
        }).onFailure().recoverWithUni(th -> {
            return createStream(jetStreamManagement, streamSetupConfiguration.configuration());
        });
    }

    private Uni<StreamResult> createStream(JetStreamManagement jetStreamManagement, StreamConfiguration streamConfiguration) {
        return Uni.createFrom().emitter(uniEmitter -> {
            try {
                jetStreamManagement.addStream(new StreamConfigurationFactory().create(streamConfiguration));
                uniEmitter.complete(StreamResult.builder().configuration(streamConfiguration).status(StreamStatus.Created).build());
            } catch (Throwable th) {
                uniEmitter.fail(new SetupException(String.format("Unable to create stream: %s with message: %s", streamConfiguration.name(), th.getMessage()), th));
            }
        });
    }

    private Uni<StreamResult> updateStream(JetStreamManagement jetStreamManagement, StreamInfo streamInfo, StreamSetupConfiguration streamSetupConfiguration) {
        return Uni.createFrom().emitter(uniEmitter -> {
            try {
                Optional<io.nats.client.api.StreamConfiguration> create = new StreamConfigurationFactory().create(streamInfo.getConfiguration(), streamSetupConfiguration.configuration());
                if (create.isPresent()) {
                    logger.debugf("Updating stream %s", streamSetupConfiguration.configuration().name());
                    jetStreamManagement.updateStream(create.get());
                    uniEmitter.complete(StreamResult.builder().configuration(streamSetupConfiguration.configuration()).status(StreamStatus.Updated).build());
                } else {
                    uniEmitter.complete(StreamResult.builder().configuration(streamSetupConfiguration.configuration()).status(StreamStatus.NotModified).build());
                }
            } catch (Throwable th) {
                logger.errorf(th, "message: %s", th.getMessage());
                uniEmitter.fail(new SetupException(String.format("Unable to update stream: %s with message: %s", streamSetupConfiguration.configuration().name(), th.getMessage()), th));
            }
        }).onFailure().recoverWithUni(th -> {
            return ((th.getCause() instanceof JetStreamApiException) && streamSetupConfiguration.overwrite()) ? deleteStream(jetStreamManagement, streamSetupConfiguration.configuration().name()).onItem().transformToUni(r7 -> {
                return createStream(jetStreamManagement, streamSetupConfiguration.configuration());
            }) : Uni.createFrom().failure(th);
        });
    }

    private Uni<Void> deleteStream(JetStreamManagement jetStreamManagement, String str) {
        return Uni.createFrom().emitter(uniEmitter -> {
            try {
                jetStreamManagement.deleteStream(str);
                uniEmitter.complete((Object) null);
            } catch (Throwable th) {
                uniEmitter.fail(new SetupException(String.format("Unable to delete stream: %s with message: %s", str, th.getMessage()), th));
            }
        });
    }
}
