package io.quarkiverse.reactive.messaging.nats.jetstream.client;

import io.nats.client.Connection;
import io.nats.client.ConsumerContext;
import io.nats.client.FetchConsumeOptions;
import io.nats.client.FetchConsumer;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamReader;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Nats;
import io.nats.client.PublishOptions;
import io.nats.client.api.MessageInfo;
import io.nats.client.impl.Headers;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.Consumer;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PublishMessageMetadata;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.ResolvedMessage;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionOptionsFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConsumerConfigurationFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.FetchConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PullConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PullSubscribeOptionsFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.AttachContextTraceSupplier;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerType;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.ConsumerMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.MessageMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.PayloadMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.StreamStateMapper;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniCreate;
import io.smallrye.mutiny.unchecked.Unchecked;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import lombok.Generated;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultConnection.class */
class DefaultConnection<T> implements Connection<T> {

    @Generated
    private static final Logger log = Logger.getLogger(DefaultConnection.class);
    private final io.nats.client.Connection connection;
    private final List<ConnectionListener> listeners;
    private final StreamStateMapper streamStateMapper;
    private final ConsumerMapper consumerMapper;
    private final MessageMapper messageMapper;
    private final PayloadMapper payloadMapper;
    private final TracerFactory tracerFactory;
    private final Vertx vertx;
    private final ConcurrentHashMap<String, Subscription<T>> subscriptions = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultConnection(ConnectionConfiguration connectionConfiguration, List<ConnectionListener> list, MessageMapper messageMapper, PayloadMapper payloadMapper, ConsumerMapper consumerMapper, StreamStateMapper streamStateMapper, TracerFactory tracerFactory, Vertx vertx) throws ConnectionException {
        this.connection = connect(connectionConfiguration, vertx);
        this.listeners = list;
        this.streamStateMapper = streamStateMapper;
        this.consumerMapper = consumerMapper;
        this.messageMapper = messageMapper;
        this.payloadMapper = payloadMapper;
        this.tracerFactory = tracerFactory;
        this.vertx = vertx;
        list.forEach(connectionListener -> {
            connectionListener.onEvent(ConnectionEvent.Connected, "Connection established");
        });
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public boolean isConnected() {
        return Connection.Status.CONNECTED.equals(this.connection.getStatus());
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public List<ConnectionListener> listeners() {
        return this.listeners;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.subscriptions.forEach((str, subscription) -> {
            try {
                subscription.close();
            } catch (Exception e) {
                log.warnf(e, "Error closing subscription to subject: %s with message: %s", str, e.getMessage());
            }
        });
        try {
            this.connection.close();
        } catch (Throwable th) {
            log.warnf(th, "Error closing connection: %s", th.getMessage());
        }
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Message<T>> publish(Message<T> message, PublishConfiguration publishConfiguration) {
        return context().executeBlocking(addPublishMetadata(message, publishConfiguration).onItem().transformToUni(message2 -> {
            return this.tracerFactory.create(TracerType.Publish).withTrace(message2, message2 -> {
                return message2;
            });
        }).onItem().transformToUni(this::publishMessage).onItem().transformToUni(this::acknowledge).onFailure().recoverWithUni(th -> {
            return notAcknowledge(message, th);
        }).onFailure().transform(th2 -> {
            return new PublishException(th2.getMessage(), th2);
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Message<T>> publish(Message<T> message, PublishConfiguration publishConfiguration, ConsumerConfiguration<T> consumerConfiguration) {
        return addConsumer(consumerConfiguration).onItem().invoke(consumer -> {
            log.infof("Consumer created: %s", consumer);
        }).onItem().transformToUni(consumer2 -> {
            return publish(message, publishConfiguration);
        });
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Consumer> addConsumer(ConsumerConfiguration<T> consumerConfiguration) {
        return context().executeBlocking(addOrUpdateConsumer(consumerConfiguration).onItem().transform(Unchecked.function(consumerContext -> {
            return this.consumerMapper.of(consumerContext.getConsumerInfo());
        }))).onFailure().transform(SystemException::new);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Message<T>> next(ConsumerConfiguration<T> consumerConfiguration, Duration duration) {
        Context context = context();
        Uni transformToUni = addOrUpdateConsumer(consumerConfiguration).onItem().transformToUni(consumerContext -> {
            return Uni.createFrom().item(Unchecked.supplier(() -> {
                return consumerContext.next(duration);
            }));
        });
        Objects.requireNonNull(context);
        return context.executeBlocking(transformToUni.emitOn(context::runOnContext).onItem().ifNull().failWith(MessageNotFoundException::new).onItem().ifNotNull().transformToUni(message -> {
            return transformMessage(message, consumerConfiguration, context());
        }).onItem().transformToUni(message2 -> {
            return this.tracerFactory.create(TracerType.Subscribe).withTrace(message2, new AttachContextTraceSupplier());
        })).onFailure().transform(th -> {
            return th instanceof MessageNotFoundException ? th : new FetchException(th);
        });
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Multi<Message<T>> fetch(FetchConsumerConfiguration<T> fetchConsumerConfiguration) {
        Context context = context();
        return addOrUpdateConsumer(fetchConsumerConfiguration.consumerConfiguration()).onItem().transformToMulti(consumerContext -> {
            return fetchMessages(consumerContext, fetchConsumerConfiguration, context);
        }).onItem().transformToUniAndMerge(message -> {
            return this.tracerFactory.create(TracerType.Subscribe).withTrace(message, new AttachContextTraceSupplier());
        }).onFailure().transform(FetchException::new);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Message<T>> resolve(String str, long j) {
        return context().executeBlocking(Uni.createFrom().item(Unchecked.supplier(() -> {
            MessageInfo message = this.connection.jetStream().getStreamContext(str).getMessage(j);
            return new ResolvedMessage(message, this.payloadMapper.of(message).orElse(null));
        }))).onFailure().transform(ResolveException::new);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Subscription<T>> subscribe(PushConsumerConfiguration<T> pushConsumerConfiguration) {
        Context context = context();
        return context.executeBlocking(Uni.createFrom().item(Unchecked.supplier(() -> {
            PushSubscription pushSubscription = new PushSubscription(this.connection, pushConsumerConfiguration, this.messageMapper, this.tracerFactory, context);
            this.subscriptions.put(pushConsumerConfiguration.subject(), pushSubscription);
            return pushSubscription;
        }))).onFailure().transform(SubscripeException::new);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Subscription<T>> subscribe(PullConsumerConfiguration<T> pullConsumerConfiguration) {
        Context context = context();
        return context.executeBlocking(createSubscription(pullConsumerConfiguration).onItem().transformToUni(jetStreamSubscription -> {
            return createReader(pullConsumerConfiguration, jetStreamSubscription).onItem().transform(jetStreamReader -> {
                return new PullSubscription(pullConsumerConfiguration, jetStreamSubscription, jetStreamReader, this.messageMapper, this.tracerFactory, context);
            });
        }).onItem().transform(pullSubscription -> {
            this.subscriptions.put(pullConsumerConfiguration.consumerConfiguration().subject(), pullSubscription);
            return pullSubscription;
        })).onFailure().transform(SubscripeException::new);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<KeyValueStore<T>> keyValueStore(String str) {
        return context().executeBlocking(Uni.createFrom().item(() -> {
            return new DefaultKeyValueStore(str, this.connection, this.payloadMapper, this.vertx);
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<StreamManagement> streamManagement() {
        return context().executeBlocking(Uni.createFrom().item(() -> {
            return new DefaultStreamManagement(this.connection, this.streamStateMapper, this.consumerMapper, this.vertx);
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<KeyValueStoreManagement> keyValueStoreManagement() {
        return context().executeBlocking(Uni.createFrom().item(() -> {
            return new DefaultKeyValueStoreManagement(this.connection, this.vertx);
        }));
    }

    private PublishOptions createPublishOptions(String str, String str2) {
        return PublishOptions.builder().messageId(str).stream(str2).build();
    }

    private Uni<Message<T>> acknowledge(Message<T> message) {
        return Uni.createFrom().completionStage(message.ack()).onItem().transform(r3 -> {
            return message;
        });
    }

    private Uni<Message<T>> notAcknowledge(Message<T> message, Throwable th) {
        return Uni.createFrom().completionStage(message.nack(th)).onItem().invoke(() -> {
            log.warnf(th, "Message not acknowledged: %s", th.getMessage());
        }).onItem().transformToUni(r4 -> {
            return Uni.createFrom().item(message);
        });
    }

    private Headers toJetStreamHeaders(Map<String, List<String>> map) {
        Headers headers = new Headers();
        Objects.requireNonNull(headers);
        map.forEach((v1, v2) -> {
            r1.add(v1, v2);
        });
        return headers;
    }

    private Uni<Message<T>> publishMessage(Message<T> message) {
        return getJetStream().onItem().transformToUni(jetStream -> {
            return getMetadata(message).onItem().transformToUni(publishMessageMetadata -> {
                return Uni.createFrom().item(Unchecked.supplier(() -> {
                    return jetStream.publish(publishMessageMetadata.subject(), toJetStreamHeaders(publishMessageMetadata.headers()), publishMessageMetadata.payload(), createPublishOptions(publishMessageMetadata.messageId(), publishMessageMetadata.stream()));
                }));
            }).onItem().invoke(publishAck -> {
                log.infof("Message published : %s", publishAck);
            }).onItem().transform(publishAck2 -> {
                return message;
            });
        });
    }

    private Uni<PublishMessageMetadata> getMetadata(Message<T> message) {
        return Uni.createFrom().item(() -> {
            return (PublishMessageMetadata) message.getMetadata(PublishMessageMetadata.class).orElse(null);
        }).onItem().ifNull().failWith(() -> {
            return new RuntimeException("Metadata not found");
        });
    }

    private Uni<JetStream> getJetStream() {
        UniCreate createFrom = Uni.createFrom();
        io.nats.client.Connection connection = this.connection;
        Objects.requireNonNull(connection);
        return createFrom.item(Unchecked.supplier(connection::jetStream));
    }

    private Uni<Message<T>> addPublishMetadata(Message<T> message, PublishConfiguration publishConfiguration) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            return message.withMetadata(message.getMetadata().without(PublishMessageMetadata.class).with(PublishMessageMetadata.of(message, publishConfiguration, this.payloadMapper.of(message.getPayload()))));
        }));
    }

    private Multi<Message<T>> fetchMessages(ConsumerContext consumerContext, FetchConsumerConfiguration<T> fetchConsumerConfiguration, Context context) {
        Multi runSubscriptionOn = Multi.createFrom().emitter(multiEmitter -> {
            try {
                FetchConsumer fetchConsumer = fetchConsumer(consumerContext, fetchConsumerConfiguration);
                try {
                    for (io.nats.client.Message nextMessage = fetchConsumer.nextMessage(); nextMessage != null; nextMessage = fetchConsumer.nextMessage()) {
                        multiEmitter.emit(nextMessage);
                    }
                    multiEmitter.complete();
                    if (fetchConsumer != null) {
                        fetchConsumer.close();
                    }
                } finally {
                }
            } catch (Throwable th) {
                multiEmitter.fail(new FetchException(th));
            }
        }).runSubscriptionOn(Executors.newSingleThreadExecutor(JetstreamWorkerThread::new));
        Objects.requireNonNull(context);
        return runSubscriptionOn.emitOn(context::runOnContext).onItem().transformToUniAndMerge(message -> {
            return transformMessage(message, fetchConsumerConfiguration.consumerConfiguration(), context);
        });
    }

    private FetchConsumer fetchConsumer(ConsumerContext consumerContext, FetchConsumerConfiguration<T> fetchConsumerConfiguration) throws IOException, JetStreamApiException {
        return fetchConsumerConfiguration.timeout() == null ? consumerContext.fetch(FetchConsumeOptions.builder().maxMessages(fetchConsumerConfiguration.batchSize().intValue()).noWait().build()) : consumerContext.fetch(((FetchConsumeOptions.Builder) FetchConsumeOptions.builder().maxMessages(fetchConsumerConfiguration.batchSize().intValue()).expiresIn(fetchConsumerConfiguration.timeout().toMillis())).build());
    }

    private Uni<Message<T>> transformMessage(io.nats.client.Message message, ConsumerConfiguration<T> consumerConfiguration, Context context) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            return this.messageMapper.of(message, consumerConfiguration.payloadType().orElse(null), context);
        }));
    }

    private Uni<ConsumerContext> addOrUpdateConsumer(ConsumerConfiguration<T> consumerConfiguration) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                return this.connection.getStreamContext(consumerConfiguration.stream()).createOrUpdateConsumer(new ConsumerConfigurationFactory().create(consumerConfiguration));
            } catch (Throwable th) {
                throw new SystemException(th);
            }
        }));
    }

    private Uni<JetStreamSubscription> createSubscription(PullConsumerConfiguration<T> pullConsumerConfiguration) {
        UniCreate createFrom = Uni.createFrom();
        io.nats.client.Connection connection = this.connection;
        Objects.requireNonNull(connection);
        return createFrom.item(Unchecked.supplier(connection::jetStream)).onItem().transformToUni(jetStream -> {
            return createSubscription(jetStream, pullConsumerConfiguration);
        });
    }

    private Uni<JetStreamSubscription> createSubscription(JetStream jetStream, PullConsumerConfiguration<T> pullConsumerConfiguration) {
        return subscribe(jetStream, pullConsumerConfiguration).onFailure().recoverWithUni(th -> {
            return th instanceof IllegalArgumentException ? streamManagement().onItem().transformToUni(streamManagement -> {
                return streamManagement.deleteConsumer(pullConsumerConfiguration.consumerConfiguration().stream(), pullConsumerConfiguration.consumerConfiguration().name());
            }).onItem().transformToUni(r7 -> {
                return subscribe(jetStream, pullConsumerConfiguration);
            }) : Uni.createFrom().failure(th);
        });
    }

    private Uni<JetStreamSubscription> subscribe(JetStream jetStream, PullConsumerConfiguration<T> pullConsumerConfiguration) {
        return Uni.createFrom().emitter(uniEmitter -> {
            try {
                uniEmitter.complete(jetStream.subscribe(pullConsumerConfiguration.consumerConfiguration().subject(), new PullSubscribeOptionsFactory().create(pullConsumerConfiguration)));
            } catch (Throwable th) {
                uniEmitter.fail(th);
            }
        });
    }

    private Uni<JetStreamReader> createReader(PullConsumerConfiguration<T> pullConsumerConfiguration, JetStreamSubscription jetStreamSubscription) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            return jetStreamSubscription.reader(pullConsumerConfiguration.batchSize().intValue(), pullConsumerConfiguration.rePullAt().intValue());
        }));
    }

    private Context context() {
        return this.vertx.getOrCreateContext();
    }

    private io.nats.client.Connection connect(ConnectionConfiguration connectionConfiguration, Vertx vertx) throws ConnectionException {
        try {
            return Nats.connect(new ConnectionOptionsFactory().create(connectionConfiguration, new InternalConnectionListener(this), vertx));
        } catch (Throwable th) {
            throw new ConnectionException(th);
        }
    }
}
