/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.reactive.messaging.nats.jetstream.client;

import io.nats.client.Connection;
import io.nats.client.ConsumerContext;
import io.nats.client.FetchConsumeOptions;
import io.nats.client.FetchConsumer;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.PublishOptions;
import io.nats.client.StreamContext;
import io.nats.client.api.MessageInfo;
import io.nats.client.impl.Headers;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionEvent;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.DefaultKeyValueStore;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.DefaultKeyValueStoreManagement;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.DefaultStreamManagement;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.FetchException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.InternalConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.JetstreamWorkerThread;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.KeyValueStore;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.KeyValueStoreManagement;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageNotFoundException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.PublishException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.PullSubscription;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.PushSubscription;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ResolveException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.StreamManagement;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.SubscribeException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Subscription;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.SystemException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.Consumer;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PublishMessageMetadata;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.ResolvedMessage;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionOptionsFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConsumerConfigurationFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.FetchConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PullConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.AttachContextTraceSupplier;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerType;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.ConsumerMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.MessageMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.PayloadMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.StreamStateMapper;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.unchecked.Unchecked;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.Generated;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.jboss.logging.Logger;

class DefaultConnection<T>
implements Connection<T> {
    @Generated
    private static final Logger log = Logger.getLogger(DefaultConnection.class);
    private final io.nats.client.Connection connection;
    private final List<ConnectionListener> listeners;
    private final StreamStateMapper streamStateMapper;
    private final ConsumerMapper consumerMapper;
    private final MessageMapper messageMapper;
    private final PayloadMapper payloadMapper;
    private final TracerFactory tracerFactory;
    private final Vertx vertx;
    private final ConcurrentHashMap<String, Subscription<T>> subscriptions;

    DefaultConnection(ConnectionConfiguration configuration, List<ConnectionListener> listeners, MessageMapper messageMapper, PayloadMapper payloadMapper, ConsumerMapper consumerMapper, StreamStateMapper streamStateMapper, TracerFactory tracerFactory, Vertx vertx) throws ConnectionException {
        this.connection = this.connect(configuration, vertx);
        this.listeners = listeners;
        this.streamStateMapper = streamStateMapper;
        this.consumerMapper = consumerMapper;
        this.messageMapper = messageMapper;
        this.payloadMapper = payloadMapper;
        this.tracerFactory = tracerFactory;
        this.vertx = vertx;
        this.subscriptions = new ConcurrentHashMap();
        listeners.forEach(listener -> listener.onEvent(ConnectionEvent.Connected, "Connection established"));
    }

    @Override
    public boolean isConnected() {
        return Connection.Status.CONNECTED.equals((Object)this.connection.getStatus());
    }

    @Override
    public List<ConnectionListener> listeners() {
        return this.listeners;
    }

    @Override
    public void close() {
        this.subscriptions.forEach((subject, subscription) -> {
            try {
                subscription.close();
            }
            catch (Exception failure) {
                log.warnf((Throwable)failure, "Error closing subscription to subject: %s with message: %s", subject, (Object)failure.getMessage());
            }
        });
        try {
            this.connection.close();
        }
        catch (Throwable throwable) {
            log.warnf(throwable, "Error closing connection: %s", (Object)throwable.getMessage());
        }
    }

    @Override
    public Uni<org.eclipse.microprofile.reactive.messaging.Message<T>> publish(org.eclipse.microprofile.reactive.messaging.Message<T> message, PublishConfiguration configuration) {
        return this.context().executeBlocking(this.addPublishMetadata(message, configuration).onItem().transformToUni(msg -> this.tracerFactory.create(TracerType.Publish).withTrace(msg, m -> m)).onItem().transformToUni(this::publishMessage).onItem().transformToUni(this::acknowledge).onFailure().recoverWithUni(failure -> this.notAcknowledge(message, (Throwable)failure)).onFailure().transform(failure -> new PublishException(failure.getMessage(), (Throwable)failure)));
    }

    @Override
    public Uni<org.eclipse.microprofile.reactive.messaging.Message<T>> publish(org.eclipse.microprofile.reactive.messaging.Message<T> message, PublishConfiguration publishConfiguration, ConsumerConfiguration<T> consumerConfiguration) {
        return this.addConsumer(consumerConfiguration).onItem().invoke(consumer -> log.infof("Consumer created: %s", consumer)).onItem().transformToUni(ignore -> this.publish(message, publishConfiguration));
    }

    @Override
    public Uni<Consumer> addConsumer(ConsumerConfiguration<T> configuration) {
        return this.context().executeBlocking(this.addOrUpdateConsumer(configuration).onItem().transform(Unchecked.function(consumerContext -> this.consumerMapper.of(consumerContext.getConsumerInfo())))).onFailure().transform(SystemException::new);
    }

    @Override
    public Uni<org.eclipse.microprofile.reactive.messaging.Message<T>> next(ConsumerConfiguration<T> configuration, Duration timeout) {
        Context context = this.context();
        return context.executeBlocking(this.addOrUpdateConsumer(configuration).onItem().transformToUni(consumerContext -> Uni.createFrom().item(Unchecked.supplier(() -> consumerContext.next(timeout)))).emitOn(arg_0 -> ((Context)context).runOnContext(arg_0)).onItem().ifNull().failWith(MessageNotFoundException::new).onItem().ifNotNull().transformToUni(message -> this.transformMessage((Message)message, configuration, this.context())).onItem().transformToUni(message -> this.tracerFactory.create(TracerType.Subscribe).withTrace(message, new AttachContextTraceSupplier()))).onFailure().transform(failure -> {
            if (failure instanceof MessageNotFoundException) {
                return failure;
            }
            return new FetchException((Throwable)failure);
        });
    }

    @Override
    public Multi<org.eclipse.microprofile.reactive.messaging.Message<T>> fetch(FetchConsumerConfiguration<T> configuration) {
        Context context = this.context();
        return this.addOrUpdateConsumer(configuration.consumerConfiguration()).onItem().transformToMulti(consumerContext -> this.fetchMessages((ConsumerContext)consumerContext, configuration, context)).onItem().transformToUniAndMerge(message -> this.tracerFactory.create(TracerType.Subscribe).withTrace(message, new AttachContextTraceSupplier())).onFailure().transform(FetchException::new);
    }

    @Override
    public Uni<org.eclipse.microprofile.reactive.messaging.Message<T>> resolve(String streamName, long sequence) {
        return this.context().executeBlocking(Uni.createFrom().item(Unchecked.supplier(() -> {
            JetStream jetStream = this.connection.jetStream();
            StreamContext streamContext = jetStream.getStreamContext(streamName);
            MessageInfo messageInfo = streamContext.getMessage(sequence);
            return new ResolvedMessage<Object>(messageInfo, this.payloadMapper.of(messageInfo).orElse(null));
        }))).onFailure().transform(ResolveException::new);
    }

    @Override
    public Uni<Subscription<T>> subscribe(PushConsumerConfiguration<T> configuration) {
        Context context = this.context();
        return context.executeBlocking(Uni.createFrom().item(Unchecked.supplier(() -> {
            PushSubscription subscription = new PushSubscription(this.connection, configuration, this.messageMapper, this.tracerFactory, context);
            this.subscriptions.put(configuration.subject(), subscription);
            return subscription;
        }))).onFailure().transform(SubscribeException::new);
    }

    @Override
    public Uni<Subscription<T>> subscribe(PullConsumerConfiguration<T> configuration) {
        Context context = this.context();
        return context.executeBlocking(this.addOrUpdateConsumer(configuration.consumerConfiguration()).onItem().transform(consumerContext -> new PullSubscription(configuration, (ConsumerContext)consumerContext, this.messageMapper, this.tracerFactory, context))).onItem().transform(subscription -> {
            this.subscriptions.put(configuration.consumerConfiguration().subject(), (Subscription<T>)subscription);
            return subscription;
        }).onFailure().transform(SubscribeException::new);
    }

    @Override
    public Uni<KeyValueStore<T>> keyValueStore(String bucketName) {
        return this.context().executeBlocking(Uni.createFrom().item(() -> new DefaultKeyValueStore(bucketName, this.connection, this.payloadMapper, this.vertx)));
    }

    @Override
    public Uni<StreamManagement> streamManagement() {
        return this.context().executeBlocking(Uni.createFrom().item(() -> new DefaultStreamManagement(this.connection, this.streamStateMapper, this.consumerMapper, this.vertx)));
    }

    @Override
    public Uni<KeyValueStoreManagement> keyValueStoreManagement() {
        return this.context().executeBlocking(Uni.createFrom().item(() -> new DefaultKeyValueStoreManagement(this.connection, this.vertx)));
    }

    private PublishOptions createPublishOptions(String messageId, String streamName) {
        return PublishOptions.builder().messageId(messageId).stream(streamName).build();
    }

    private Uni<org.eclipse.microprofile.reactive.messaging.Message<T>> acknowledge(org.eclipse.microprofile.reactive.messaging.Message<T> message) {
        return Uni.createFrom().completionStage(message.ack()).onItem().transform(v -> message);
    }

    private Uni<org.eclipse.microprofile.reactive.messaging.Message<T>> notAcknowledge(org.eclipse.microprofile.reactive.messaging.Message<T> message, Throwable throwable) {
        return Uni.createFrom().completionStage(message.nack(throwable)).onItem().invoke(() -> log.warnf(throwable, "Message not acknowledged: %s", (Object)throwable.getMessage())).onItem().transformToUni(v -> Uni.createFrom().item((Object)message));
    }

    private Headers toJetStreamHeaders(Map<String, List<String>> headers) {
        Headers result = new Headers();
        headers.forEach((arg_0, arg_1) -> ((Headers)result).add(arg_0, arg_1));
        return result;
    }

    private Uni<org.eclipse.microprofile.reactive.messaging.Message<T>> publishMessage(org.eclipse.microprofile.reactive.messaging.Message<T> message) {
        return this.getJetStream().onItem().transformToUni(jetStream -> this.getMetadata(message).onItem().transformToUni(metadata -> Uni.createFrom().item(Unchecked.supplier(() -> jetStream.publish(metadata.subject(), this.toJetStreamHeaders(metadata.headers()), metadata.payload(), this.createPublishOptions(metadata.messageId(), metadata.stream()))))).onItem().invoke(ack -> log.infof("Message published : %s", ack)).onItem().transform(ignore -> message));
    }

    private Uni<PublishMessageMetadata> getMetadata(org.eclipse.microprofile.reactive.messaging.Message<T> message) {
        return Uni.createFrom().item(() -> message.getMetadata(PublishMessageMetadata.class).orElse(null)).onItem().ifNull().failWith(() -> new RuntimeException("Metadata not found"));
    }

    private Uni<JetStream> getJetStream() {
        return Uni.createFrom().item(Unchecked.supplier(() -> ((io.nats.client.Connection)this.connection).jetStream()));
    }

    private Uni<org.eclipse.microprofile.reactive.messaging.Message<T>> addPublishMetadata(org.eclipse.microprofile.reactive.messaging.Message<T> message, PublishConfiguration configuration) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            PublishMessageMetadata publishMetadata = PublishMessageMetadata.of(message, configuration, this.payloadMapper.of(message.getPayload()));
            Metadata metadata = message.getMetadata().without(PublishMessageMetadata.class);
            return message.withMetadata(metadata.with((Object)publishMetadata));
        }));
    }

    private Multi<org.eclipse.microprofile.reactive.messaging.Message<T>> fetchMessages(ConsumerContext consumerContext, FetchConsumerConfiguration<T> configuration, Context context) {
        ExecutorService executor = Executors.newSingleThreadExecutor(JetstreamWorkerThread::new);
        return Multi.createFrom().emitter(emitter -> {
            try (FetchConsumer fetchConsumer = this.fetchConsumer(consumerContext, configuration);){
                Message message = fetchConsumer.nextMessage();
                while (message != null) {
                    emitter.emit((Object)message);
                    message = fetchConsumer.nextMessage();
                }
                emitter.complete();
            }
            catch (Exception failure) {
                emitter.fail((Throwable)new FetchException(failure));
            }
        }).runSubscriptionOn((Executor)executor).emitOn(arg_0 -> ((Context)context).runOnContext(arg_0)).onItem().transformToUniAndMerge(message -> this.transformMessage((Message)message, configuration.consumerConfiguration(), context));
    }

    private FetchConsumer fetchConsumer(ConsumerContext consumerContext, FetchConsumerConfiguration<T> configuration) throws IOException, JetStreamApiException {
        if (configuration.timeout() == null) {
            return consumerContext.fetch(FetchConsumeOptions.builder().maxMessages(configuration.batchSize().intValue()).noWait().build());
        }
        return consumerContext.fetch(((FetchConsumeOptions.Builder)FetchConsumeOptions.builder().maxMessages(configuration.batchSize().intValue()).expiresIn(configuration.timeout().toMillis())).build());
    }

    private Uni<org.eclipse.microprofile.reactive.messaging.Message<T>> transformMessage(Message message, ConsumerConfiguration<T> configuration, Context context) {
        return Uni.createFrom().item(Unchecked.supplier(() -> this.messageMapper.of(message, (Class)configuration.payloadType().orElse(null), context)));
    }

    private Uni<ConsumerContext> addOrUpdateConsumer(ConsumerConfiguration<T> configuration) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                ConsumerConfigurationFactory factory = new ConsumerConfigurationFactory();
                io.nats.client.api.ConsumerConfiguration consumerConfiguration = factory.create(configuration);
                StreamContext streamContext = this.connection.getStreamContext(configuration.stream());
                return streamContext.createOrUpdateConsumer(consumerConfiguration);
            }
            catch (Exception failure) {
                throw new SystemException(failure);
            }
        }));
    }

    private Context context() {
        return this.vertx.getOrCreateContext();
    }

    private io.nats.client.Connection connect(ConnectionConfiguration configuration, Vertx vertx) throws ConnectionException {
        try {
            ConnectionOptionsFactory factory = new ConnectionOptionsFactory();
            Options options = factory.create(configuration, new InternalConnectionListener(this), vertx);
            return Nats.connect((Options)options);
        }
        catch (Exception failure) {
            throw new ConnectionException(failure);
        }
    }
}

