package io.quarkiverse.reactive.messaging.nats.jetstream.processors.subscriber;

import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionEvent;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.Status;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniOnItem;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/processors/subscriber/MessageSubscriberProcessor.class */
public class MessageSubscriberProcessor implements MessageProcessor, ConnectionListener {
    private static final Logger logger = Logger.getLogger(MessageSubscriberProcessor.class);
    private final ConnectionConfiguration connectionConfiguration;
    private final MessageSubscriberConfiguration configuration;
    private final ConnectionFactory connectionFactory;
    private final AtomicReference<Status> status = new AtomicReference<>(new Status(true, "Subscriber processor inactive", ConnectionEvent.Closed));
    private final AtomicReference<Connection> connection = new AtomicReference<>();

    public MessageSubscriberProcessor(ConnectionConfiguration connectionConfiguration, ConnectionFactory connectionFactory, MessageSubscriberConfiguration messageSubscriberConfiguration) {
        this.connectionConfiguration = connectionConfiguration;
        this.connectionFactory = connectionFactory;
        this.configuration = messageSubscriberConfiguration;
    }

    public <T> Flow.Subscriber<Message<T>> subscriber() {
        return MultiUtils.via(this::subscribe);
    }

    private <T> Multi<Message<T>> subscribe(Multi<Message<T>> multi) {
        return multi.onItem().transformToUniAndConcatenate(this::publish);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor
    public String channel() {
        return this.configuration.channel();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor
    public Status readiness() {
        return this.status.get();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor
    public Status liveness() {
        return this.status.get();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor
    public AtomicReference<? extends Connection> connection() {
        return this.connection;
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener
    public void onEvent(ConnectionEvent connectionEvent, String str) {
        logger.infof("Event: %s, message: %s, channel: %s", connectionEvent, str, this.configuration.channel());
        this.status.set(Status.builder().healthy(true).message(str).event(connectionEvent).build());
    }

    private <T> Uni<Message<T>> publish(Message<T> message) {
        return getOrEstablishConnection().onItem().transformToUni(connection -> {
            return connection.publish(message, this.configuration);
        }).onFailure().invoke(th -> {
            logger.errorf(th, "Failed to publish with message: %s", th.getMessage());
        }).onFailure().recoverWithUni(() -> {
            return recover(message);
        });
    }

    private <T> Uni<Message<T>> recover(Message<T> message) {
        return close().onItem().transformToUni(r5 -> {
            return publish(message);
        });
    }

    private Uni<? extends Connection> getOrEstablishConnection() {
        UniOnItem onItem = Uni.createFrom().item(() -> {
            return (Connection) Optional.ofNullable(this.connection.get()).filter((v0) -> {
                return v0.isConnected();
            }).orElse(null);
        }).onItem().ifNull().switchTo(() -> {
            return this.connectionFactory.create(this.connectionConfiguration, this);
        }).onItem();
        AtomicReference<Connection> atomicReference = this.connection;
        Objects.requireNonNull(atomicReference);
        return onItem.invoke((v1) -> {
            r1.set(v1);
        });
    }
}
