/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.reactive.messaging.nats.jetstream.client.vertx;

import io.nats.client.Connection;
import io.nats.client.ConsumerContext;
import io.nats.client.FetchConsumeOptions;
import io.nats.client.FetchConsumer;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.KeyValue;
import io.nats.client.Message;
import io.nats.client.PublishOptions;
import io.nats.client.StreamContext;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.MessageInfo;
import io.nats.client.api.PublishAck;
import io.nats.client.impl.Headers;
import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff;
import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamOutgoingMessageMetadata;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.AbstractConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.FetchException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.JetstreamWorkerThread;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.KeyValueException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageNotFoundException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.PublishException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.JetStreamSetupException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConsumerConfigurtationFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.FetchConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.message.MessageFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.vertx.JetStreamMessage;
import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrumenter;
import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamTrace;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.unchecked.Unchecked;
import io.smallrye.reactive.messaging.tracing.TracingUtils;
import io.vertx.mutiny.core.Context;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jboss.logging.Logger;

public class MessageConnection
extends AbstractConnection
implements io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageConnection {
    private static final Logger logger = Logger.getLogger(MessageConnection.class);
    protected final MessageFactory messageFactory;
    protected final Context context;
    protected final JetStreamInstrumenter instrumenter;

    public MessageConnection(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener, MessageFactory messageFactory, Context context, JetStreamInstrumenter instrumenter) {
        super(connectionConfiguration, connectionListener);
        this.messageFactory = messageFactory;
        this.context = context;
        this.instrumenter = instrumenter;
    }

    @Override
    public <T> Uni<org.eclipse.microprofile.reactive.messaging.Message<T>> publish(org.eclipse.microprofile.reactive.messaging.Message<T> message, PublishConfiguration configuration) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                Optional metadata = message.getMetadata(JetStreamOutgoingMessageMetadata.class);
                String messageId = metadata.map(JetStreamOutgoingMessageMetadata::messageId).orElseGet(() -> UUID.randomUUID().toString());
                byte[] payload = this.messageFactory.toByteArray(message.getPayload());
                String subject = metadata.flatMap(JetStreamOutgoingMessageMetadata::subtopic).map(subtopic -> configuration.subject() + "." + subtopic).orElseGet(configuration::subject);
                HashMap<String, List<String>> headers = new HashMap<String, List<String>>();
                metadata.ifPresent(m -> headers.putAll(m.headers()));
                if (message.getPayload() != null) {
                    headers.putIfAbsent("message.type", List.of(message.getPayload().getClass().getTypeName()));
                }
                if (configuration.traceEnabled()) {
                    TracingUtils.traceOutgoing(this.instrumenter.publisher(), (org.eclipse.microprofile.reactive.messaging.Message)message, (Object)new JetStreamTrace(configuration.stream(), subject, messageId, headers, new String(payload)));
                }
                JetStream jetStream = this.connection.jetStream();
                PublishOptions options = this.createPublishOptions(messageId, configuration.stream());
                PublishAck ack = jetStream.publish(subject, this.toJetStreamHeaders(headers), payload, options);
                if (logger.isDebugEnabled()) {
                    logger.debugf("Published message: %s", (Object)ack);
                }
                this.connection.flush(Duration.ZERO);
                return message;
            }
            catch (JetStreamApiException | JetStreamSetupException | IOException e) {
                throw new PublishException(String.format("Failed to publish message: %s", e.getMessage()), e);
            }
        })).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0)).onItem().transformToUni(this::acknowledge).onFailure().recoverWithUni(throwable -> this.notAcknowledge(message, (Throwable)throwable));
    }

    @Override
    public <T> Uni<org.eclipse.microprofile.reactive.messaging.Message<T>> publish(org.eclipse.microprofile.reactive.messaging.Message<T> message, PublishConfiguration publishConfiguration, FetchConsumerConfiguration<T> consumerConfiguration) {
        return this.addOrUpdateConsumer(consumerConfiguration).onItem().transformToUni(v -> this.publish(message, publishConfiguration));
    }

    @Override
    public <T> Uni<org.eclipse.microprofile.reactive.messaging.Message<T>> nextMessage(FetchConsumerConfiguration<T> configuration) {
        ExecutorService executor = Executors.newSingleThreadExecutor(JetstreamWorkerThread::new);
        return this.getConsumerContext(this.connection, this.context, configuration.stream(), configuration.name().orElseThrow(() -> new IllegalArgumentException("Consumer name is not configured"))).runSubscriptionOn((Executor)executor).onItem().transformToUni(consumerContext -> this.nextMessage((ConsumerContext)consumerContext, configuration));
    }

    @Override
    public <T> Uni<T> getKeyValue(String bucketName, String key, Class<T> valueType) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                KeyValue keyValue = this.connection.keyValue(bucketName);
                return Optional.ofNullable(keyValue.get(key)).map(value -> this.messageFactory.decode(value.getValue(), valueType)).orElse(null);
            }
            catch (JetStreamApiException | IOException e) {
                throw new KeyValueException(e);
            }
        })).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public <T> Uni<Void> putKeyValue(String bucketName, String key, T value) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                KeyValue keyValue = this.connection.keyValue(bucketName);
                keyValue.put(key, this.messageFactory.toByteArray(value));
                return null;
            }
            catch (JetStreamApiException | IOException e) {
                throw new KeyValueException(e);
            }
        })).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public Uni<Void> deleteKeyValue(String bucketName, String key) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                KeyValue keyValue = this.connection.keyValue(bucketName);
                keyValue.delete(key);
                return null;
            }
            catch (JetStreamApiException | IOException e) {
                throw new KeyValueException(e);
            }
        })).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    @Override
    public <T> Uni<org.eclipse.microprofile.reactive.messaging.Message<T>> resolve(String streamName, long sequence) {
        return Uni.createFrom().emitter(emitter -> {
            try {
                JetStream jetStream = this.connection.jetStream();
                StreamContext streamContext = jetStream.getStreamContext(streamName);
                MessageInfo messageInfo = streamContext.getMessage(sequence);
                emitter.complete(new JetStreamMessage<Object>(messageInfo, this.messageFactory.toPayload(messageInfo).orElse(null)));
            }
            catch (JetStreamApiException | IOException e) {
                emitter.fail(e);
            }
        }).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    private PublishOptions createPublishOptions(String messageId, String streamName) {
        return PublishOptions.builder().messageId(messageId).stream(streamName).build();
    }

    private FetchConsumer fetchConsumer(ConsumerContext consumerContext, Duration timeout) throws IOException, JetStreamApiException {
        if (timeout == null) {
            return consumerContext.fetch(FetchConsumeOptions.builder().maxMessages(1).noWait().build());
        }
        return consumerContext.fetch(((FetchConsumeOptions.Builder)FetchConsumeOptions.builder().maxMessages(1).expiresIn(timeout.toMillis())).build());
    }

    private <T> Uni<org.eclipse.microprofile.reactive.messaging.Message<T>> acknowledge(org.eclipse.microprofile.reactive.messaging.Message<T> message) {
        return Uni.createFrom().completionStage(message.ack()).onItem().transform(v -> message);
    }

    private <T> Uni<org.eclipse.microprofile.reactive.messaging.Message<T>> notAcknowledge(org.eclipse.microprofile.reactive.messaging.Message<T> message, Throwable throwable) {
        return Uni.createFrom().completionStage(message.nack(throwable)).onItem().invoke(() -> logger.warnf(throwable, "Message not published: %s", (Object)throwable.getMessage())).onItem().transformToUni(v -> Uni.createFrom().item((Object)message));
    }

    private Uni<ConsumerContext> getConsumerContext(Connection connection, Context context, String stream, String consumerName) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                StreamContext streamContext = connection.getStreamContext(stream);
                return streamContext.getConsumerContext(consumerName);
            }
            catch (JetStreamApiException | IOException e) {
                throw new FetchException(e);
            }
        })).emitOn(arg_0 -> ((Context)context).runOnContext(arg_0));
    }

    private Uni<Message> nextMessage(ConsumerContext consumerContext, Duration timeout) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try (FetchConsumer fetchConsumer = this.fetchConsumer(consumerContext, timeout);){
                Message message2 = fetchConsumer.nextMessage();
                if (message2 == null) throw new MessageNotFoundException();
                Message message = message2;
                return message;
            }
            catch (Throwable failure) {
                logger.errorf(failure, "Failed to fetch message: %s", (Object)failure.getMessage());
                throw new FetchException(failure);
            }
        })).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }

    private <T> Uni<org.eclipse.microprofile.reactive.messaging.Message<T>> nextMessage(ConsumerContext consumerContext, FetchConsumerConfiguration<T> configuration) {
        return this.nextMessage(consumerContext, (Duration)configuration.fetchTimeout().orElse(null)).map(message -> this.messageFactory.create((Message)message, configuration.traceEnabled(), configuration.payloadType().orElse(null), this.context, new ExponentialBackoff(false, Duration.ZERO), configuration.ackTimeout()));
    }

    private Headers toJetStreamHeaders(Map<String, List<String>> headers) {
        Headers result = new Headers();
        headers.forEach((arg_0, arg_1) -> ((Headers)result).add(arg_0, arg_1));
        return result;
    }

    private <T> Uni<Void> addOrUpdateConsumer(FetchConsumerConfiguration<T> configuration) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                ConsumerConfigurtationFactory factory = new ConsumerConfigurtationFactory();
                ConsumerConfiguration consumerConfiguration = factory.create(configuration);
                StreamContext streamContext = this.connection.getStreamContext(configuration.stream());
                streamContext.createOrUpdateConsumer(consumerConfiguration);
                this.connection.flush(Duration.ZERO);
                return null;
            }
            catch (JetStreamApiException | IOException e) {
                throw new FetchException(e);
            }
        })).emitOn(arg_0 -> ((Context)this.context).runOnContext(arg_0));
    }
}

