/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher;

import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionEvent;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Context;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Subscription;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.Tracer;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.Status;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher.MessagePublisherConfiguration;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

public abstract class MessagePublisherProcessor<T>
implements MessageProcessor,
ConnectionListener {
    private static final Logger logger = Logger.getLogger(MessagePublisherProcessor.class);
    private final AtomicReference<Status> readiness = new AtomicReference<Status>(Status.builder().event(ConnectionEvent.Closed).message("Publish processor inactive").healthy(false).build());
    private final AtomicReference<Status> liveness = new AtomicReference<Status>(Status.builder().event(ConnectionEvent.Closed).message("Publish processor inactive").healthy(false).build());
    private final AtomicReference<Connection> connection = new AtomicReference();
    private final ConnectionFactory connectionFactory;
    private final ConnectionConfiguration connectionConfiguration;
    private final AtomicReference<Subscription<T>> subscription;
    private final Tracer<T> tracer;
    private final Context context;

    public MessagePublisherProcessor(ConnectionFactory connectionFactory, ConnectionConfiguration connectionConfiguration, TracerFactory tracerFactory, Context context) {
        this.connectionFactory = connectionFactory;
        this.connectionConfiguration = connectionConfiguration;
        this.subscription = new AtomicReference();
        this.tracer = tracerFactory.create();
        this.context = context;
    }

    @Override
    public String channel() {
        return this.configuration().channel();
    }

    @Override
    public Status readiness() {
        return this.readiness.get();
    }

    @Override
    public Status liveness() {
        return this.liveness.get();
    }

    @Override
    public void close() {
        this.close(this.connection.getAndSet(null));
        this.close(this.subscription.getAndSet(null));
    }

    public Multi<Message<T>> publisher() {
        return this.subscribe().onFailure().invoke(failure -> logger.errorf(failure, "Failed to subscribe with message: %s", (Object)failure.getMessage())).onFailure().recoverWithMulti(this::recover);
    }

    @Override
    public void onEvent(ConnectionEvent event, String message) {
        logger.infof("Event: %s, message: %s, channel: %s", (Object)event, (Object)message, (Object)this.configuration().channel());
        switch (event) {
            case Connected: {
                this.readiness.set(Status.builder().event(event).message(message).healthy(true).build());
                this.liveness.set(Status.builder().event(event).message(message).healthy(true).build());
                break;
            }
            case Closed: 
            case CommunicationFailed: 
            case Disconnected: {
                this.readiness.set(Status.builder().event(event).message(message).healthy(false).build());
                break;
            }
            case Reconnected: {
                this.readiness.set(Status.builder().event(event).message(message).healthy(true).build());
            }
        }
    }

    protected abstract MessagePublisherConfiguration configuration();

    protected abstract Uni<Subscription<T>> subscription(Connection var1);

    private Multi<Message<T>> recover(Throwable failure) {
        return Uni.createFrom().item(() -> {
            this.close(this.subscription.getAndSet(null));
            return null;
        }).onItem().transformToMulti(v -> this.subscribe());
    }

    private Multi<Message<T>> subscribe() {
        return this.getOrEstablishConnection().onItem().transformToUni(this::subscription).onItem().invoke(this.subscription::set).onItem().transformToMulti(subscription -> (Flow.Publisher)this.context.withContext(ctx -> subscription.subscribe(this.tracer, ctx))).onSubscription().invoke(() -> logger.infof("Subscribed to channel %s", (Object)this.configuration().channel()));
    }

    private Uni<Connection> getOrEstablishConnection() {
        return Uni.createFrom().item(() -> Optional.ofNullable(this.connection.get()).filter(Connection::isConnected).orElse(null)).onItem().ifNull().switchTo(this::connect).onItem().invoke(this.connection::set);
    }

    private Uni<? extends Connection> connect() {
        return this.connectionFactory.create(this.connectionConfiguration, this);
    }

    private void close(AutoCloseable closeable) {
        try {
            if (closeable != null) {
                closeable.close();
            }
        }
        catch (Throwable failure) {
            logger.warnf(failure, "Failed to close resource with message: %s", (Object)failure.getMessage());
        }
    }
}

