/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.reactive.messaging.nats.jetstream.client;

import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ContextException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageSubscribeConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.AdministrationConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushSubscribeOptionsFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ReaderConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.message.MessageFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.vertx.PushSubscribeMessageConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.vertx.ReaderMessageSubscribeConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrumenter;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.unchecked.Unchecked;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.Optional;
import org.jboss.logging.Logger;

@ApplicationScoped
public class ConnectionFactory {
    private static final Logger logger = Logger.getLogger(ConnectionFactory.class);
    private final ExecutionHolder executionHolder;
    private final MessageFactory messageFactory;
    private final JetStreamInstrumenter instrumenter;

    @Inject
    public ConnectionFactory(ExecutionHolder executionHolder, MessageFactory messageFactory, JetStreamInstrumenter instrumenter) {
        this.executionHolder = executionHolder;
        this.messageFactory = messageFactory;
        this.instrumenter = instrumenter;
    }

    public <T> Uni<? extends MessageSubscribeConnection> subscribe(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener, ReaderConsumerConfiguration<T> consumerConfiguration) {
        return this.getContext().onFailure().invoke(failure -> logger.warn((Object)failure.getMessage(), failure)).onItem().transformToUni(context -> this.subscribe(connectionConfiguration, connectionListener, consumerConfiguration, (Context)context));
    }

    public <T> Uni<? extends MessageSubscribeConnection> subscribe(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener, PushConsumerConfiguration<T> consumerConfiguration, PushSubscribeOptionsFactory optionsFactory) {
        return this.getContext().onFailure().invoke(failure -> logger.warn((Object)failure.getMessage(), failure)).onItem().transformToUni(context -> this.subscribe(connectionConfiguration, connectionListener, consumerConfiguration, optionsFactory, (Context)context));
    }

    public Uni<? extends io.quarkiverse.reactive.messaging.nats.jetstream.client.AdministrationConnection> administration(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener) {
        return Uni.createFrom().item(Unchecked.supplier(() -> new AdministrationConnection(connectionConfiguration, connectionListener)));
    }

    public Uni<? extends MessageConnection> message(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener) {
        return this.getContext().onFailure().invoke(failure -> logger.warn((Object)failure.getMessage(), failure)).onItem().transformToUni(context -> this.message(connectionConfiguration, connectionListener, (Context)context));
    }

    private Optional<Vertx> getVertx() {
        return Optional.ofNullable(this.executionHolder.vertx());
    }

    private <T> Uni<? extends MessageSubscribeConnection> subscribe(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener, ReaderConsumerConfiguration<T> consumerConfiguration, Context context) {
        return Uni.createFrom().item(Unchecked.supplier(() -> new ReaderMessageSubscribeConnection(connectionConfiguration, connectionListener, context, this.instrumenter, consumerConfiguration, this.messageFactory))).emitOn(arg_0 -> ((Context)context).runOnContext(arg_0));
    }

    private <T> Uni<? extends MessageSubscribeConnection> subscribe(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener, PushConsumerConfiguration<T> consumerConfiguration, PushSubscribeOptionsFactory optionsFactory, Context context) {
        return Uni.createFrom().item(Unchecked.supplier(() -> new PushSubscribeMessageConnection(connectionConfiguration, connectionListener, context, this.instrumenter, consumerConfiguration, this.messageFactory, optionsFactory))).emitOn(arg_0 -> ((Context)context).runOnContext(arg_0));
    }

    private Uni<Context> getContext() {
        return Uni.createFrom().item(Unchecked.supplier(() -> this.getVertx().map(Vertx::getOrCreateContext).orElseThrow(() -> new ContextException("No Vertx available"))));
    }

    private Uni<? extends MessageConnection> message(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener, Context context) {
        return Uni.createFrom().item(Unchecked.supplier(() -> new io.quarkiverse.reactive.messaging.nats.jetstream.client.vertx.MessageConnection(connectionConfiguration, connectionListener, this.messageFactory, context, this.instrumenter))).emitOn(arg_0 -> ((Context)context).runOnContext(arg_0));
    }
}

