package io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher;

import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionEvent;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageSubscribeConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.Status;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniOnItem;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePullPublisherProcessor.class */
public class MessagePullPublisherProcessor implements MessagePublisherProcessor {
    private static final Logger logger = Logger.getLogger(MessagePullPublisherProcessor.class);
    private final MessagePullPublisherConfiguration<?> configuration;
    private final ConnectionFactory connectionFactory;
    private final AtomicReference<Status> status = new AtomicReference<>(new Status(false, "Not connected", ConnectionEvent.Closed));
    private final AtomicReference<MessageSubscribeConnection> connection = new AtomicReference<>();
    private final ConnectionConfiguration connectionConfiguration;

    public MessagePullPublisherProcessor(ConnectionFactory connectionFactory, ConnectionConfiguration connectionConfiguration, MessagePullPublisherConfiguration<?> messagePullPublisherConfiguration) {
        this.configuration = messagePullPublisherConfiguration;
        this.connectionFactory = connectionFactory;
        this.connectionConfiguration = connectionConfiguration;
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor
    public Status getStatus() {
        return this.status.get();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor
    public void close() {
        try {
            MessageSubscribeConnection messageSubscribeConnection = this.connection.get();
            if (messageSubscribeConnection != null) {
                messageSubscribeConnection.close();
            }
        } catch (Throwable th) {
            logger.warnf(th, "Failed to close connection", th);
        }
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor
    public String getChannel() {
        return this.configuration.channel();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher.MessagePublisherProcessor
    public Multi<Message<?>> publisher() {
        return getOrEstablishConnection().onItem().transformToMulti((v0) -> {
            return v0.subscribe();
        }).onFailure().invoke(th -> {
            if (isConsumerAlreadyInUse(th)) {
                return;
            }
            logger.errorf(th, "Failed to publish messages: %s", th.getMessage());
            this.status.set(new Status(false, th.getMessage(), ConnectionEvent.CommunicationFailed));
        }).onFailure().retry().withBackOff(this.configuration.retryBackoff()).indefinitely();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener
    public void onEvent(ConnectionEvent connectionEvent, String str) {
        switch (connectionEvent) {
            case Connected:
                this.status.set(new Status(true, str, connectionEvent));
                return;
            case Closed:
                this.status.set(new Status(false, str, connectionEvent));
                return;
            case Disconnected:
                this.status.set(new Status(false, str, connectionEvent));
                return;
            case Reconnected:
                this.status.set(new Status(true, str, connectionEvent));
                return;
            case CommunicationFailed:
                this.status.set(new Status(false, str, connectionEvent));
                return;
            default:
                return;
        }
    }

    private Uni<MessageSubscribeConnection> getOrEstablishConnection() {
        UniOnItem onItem = Uni.createFrom().item(() -> {
            return (MessageSubscribeConnection) Optional.ofNullable(this.connection.get()).filter((v0) -> {
                return v0.isConnected();
            }).orElse(null);
        }).onItem().ifNull().switchTo(() -> {
            return this.connectionFactory.subscribe(this.connectionConfiguration, this, this.configuration);
        }).onItem();
        AtomicReference<MessageSubscribeConnection> atomicReference = this.connection;
        Objects.requireNonNull(atomicReference);
        return onItem.invoke((v1) -> {
            r1.set(v1);
        });
    }
}
