/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher;

import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionEvent;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.Status;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher.MessagePublisherConfiguration;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Generated;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

public abstract class MessagePublisherProcessor<T>
implements MessageProcessor,
ConnectionListener {
    @Generated
    private static final Logger log = Logger.getLogger(MessagePublisherProcessor.class);
    private final AtomicReference<Status> readiness = new AtomicReference<Status>(Status.builder().event(ConnectionEvent.Closed).message("Publish processor inactive").healthy(false).build());
    private final AtomicReference<Status> liveness = new AtomicReference<Status>(Status.builder().event(ConnectionEvent.Closed).message("Publish processor inactive").healthy(false).build());
    private final AtomicReference<Connection<T>> connection = new AtomicReference();
    private final ConnectionFactory connectionFactory;
    private final ConnectionConfiguration connectionConfiguration;

    public MessagePublisherProcessor(ConnectionFactory connectionFactory, ConnectionConfiguration connectionConfiguration) {
        this.connectionFactory = connectionFactory;
        this.connectionConfiguration = connectionConfiguration;
    }

    @Override
    public String channel() {
        return this.configuration().channel();
    }

    @Override
    public Status readiness() {
        return this.readiness.get();
    }

    @Override
    public Status liveness() {
        return this.liveness.get();
    }

    @Override
    public void close() {
        this.close(this.connection.getAndSet(null));
    }

    public Multi<Message<T>> publisher() {
        return this.subscribe().onFailure().invoke(failure -> log.errorf(failure, "Failed to subscribe with message: %s", (Object)failure.getMessage())).onFailure().retry().withBackOff(this.retryBackoff()).indefinitely();
    }

    @Override
    public void onEvent(ConnectionEvent event, String message) {
        log.infof("Event: %s, message: %s, channel: %s", (Object)event, (Object)message, (Object)this.configuration().channel());
        switch (event) {
            case Connected: {
                this.readiness.set(Status.builder().event(event).message(message).healthy(true).build());
                this.liveness.set(Status.builder().event(event).message(message).healthy(true).build());
                break;
            }
            case Closed: 
            case CommunicationFailed: 
            case Disconnected: {
                this.readiness.set(Status.builder().event(event).message(message).healthy(false).build());
                break;
            }
            case Reconnected: {
                this.readiness.set(Status.builder().event(event).message(message).healthy(true).build());
            }
        }
    }

    protected abstract MessagePublisherConfiguration configuration();

    protected abstract Multi<Message<T>> subscription(Connection<T> var1);

    protected abstract Duration retryBackoff();

    private Multi<Message<T>> recover(Throwable failure) {
        log.errorf(failure, "Failed to subscribe with message: %s", (Object)failure.getMessage());
        return this.subscribe();
    }

    private Multi<Message<T>> subscribe() {
        return this.getOrEstablishConnection().onItem().transformToMulti(this::subscription).onSubscription().invoke(() -> log.infof("Subscribed to channel %s", (Object)this.configuration().channel()));
    }

    private Uni<Connection<T>> getOrEstablishConnection() {
        return Uni.createFrom().item(() -> Optional.ofNullable(this.connection.get()).filter(Connection::isConnected).orElse(null)).onItem().ifNull().switchTo(this::connect).onItem().invoke(this.connection::set);
    }

    private Uni<Connection<T>> connect() {
        return this.connectionFactory.create(this.connectionConfiguration, this);
    }

    private void close(Connection<T> connection) {
        try {
            if (connection != null) {
                connection.close();
            }
        }
        catch (Exception failure) {
            log.warnf((Throwable)failure, "Failed to close resource with message: %s", (Object)failure.getMessage());
        }
    }
}

