package io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher;

import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionEvent;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.Status;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniOnItem;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Generated;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessor.class */
public abstract class MessagePublisherProcessor<T> implements MessageProcessor, ConnectionListener {

    @Generated
    private static final Logger log = Logger.getLogger(MessagePublisherProcessor.class);
    private final AtomicReference<Status> readiness = new AtomicReference<>(Status.builder().event(ConnectionEvent.Closed).message("Publish processor inactive").healthy(false).build());
    private final AtomicReference<Status> liveness = new AtomicReference<>(Status.builder().event(ConnectionEvent.Closed).message("Publish processor inactive").healthy(false).build());
    private final AtomicReference<Connection<T>> connection = new AtomicReference<>();
    private final ConnectionFactory connectionFactory;
    private final ConnectionConfiguration connectionConfiguration;

    public MessagePublisherProcessor(ConnectionFactory connectionFactory, ConnectionConfiguration connectionConfiguration) {
        this.connectionFactory = connectionFactory;
        this.connectionConfiguration = connectionConfiguration;
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor
    public String channel() {
        return configuration().channel();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor
    public Status readiness() {
        return this.readiness.get();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor
    public Status liveness() {
        return this.liveness.get();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        close(this.connection.getAndSet(null));
    }

    public Multi<Message<T>> publisher() {
        return subscribe().onFailure().invoke(th -> {
            log.errorf(th, "Failed to subscribe with message: %s", th.getMessage());
        }).onFailure().retry().withBackOff(retryBackoff()).indefinitely();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener
    public void onEvent(ConnectionEvent connectionEvent, String str) {
        log.infof("Event: %s, message: %s, channel: %s", connectionEvent, str, configuration().channel());
        switch (connectionEvent) {
            case Connected:
                this.readiness.set(Status.builder().event(connectionEvent).message(str).healthy(true).build());
                this.liveness.set(Status.builder().event(connectionEvent).message(str).healthy(true).build());
                return;
            case Closed:
            case CommunicationFailed:
            case Disconnected:
                this.readiness.set(Status.builder().event(connectionEvent).message(str).healthy(false).build());
                return;
            case Reconnected:
                this.readiness.set(Status.builder().event(connectionEvent).message(str).healthy(true).build());
                return;
            default:
                return;
        }
    }

    protected abstract MessagePublisherConfiguration configuration();

    protected abstract Multi<Message<T>> subscription(Connection<T> connection);

    protected abstract Duration retryBackoff();

    private Multi<Message<T>> recover(Throwable th) {
        log.errorf(th, "Failed to subscribe with message: %s", th.getMessage());
        return subscribe();
    }

    private Multi<Message<T>> subscribe() {
        return getOrEstablishConnection().onItem().transformToMulti(this::subscription).onSubscription().invoke(() -> {
            log.infof("Subscribed to channel %s", configuration().channel());
        });
    }

    private Uni<Connection<T>> getOrEstablishConnection() {
        UniOnItem onItem = Uni.createFrom().item(() -> {
            return (Connection) Optional.ofNullable(this.connection.get()).filter((v0) -> {
                return v0.isConnected();
            }).orElse(null);
        }).onItem().ifNull().switchTo(this::connect).onItem();
        AtomicReference<Connection<T>> atomicReference = this.connection;
        Objects.requireNonNull(atomicReference);
        return onItem.invoke((v1) -> {
            r1.set(v1);
        });
    }

    private Uni<Connection<T>> connect() {
        return this.connectionFactory.create(this.connectionConfiguration, this);
    }

    private void close(Connection<T> connection) {
        if (connection != null) {
            try {
                connection.close();
            } catch (Exception e) {
                log.warnf(e, "Failed to close resource with message: %s", e.getMessage());
            }
        }
    }
}
