package io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher;

import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionEvent;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.JetStreamClient;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.io.JetStreamReader;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.io.MessageFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.Status;
import io.smallrye.mutiny.Multi;
import io.vertx.mutiny.core.Context;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePullPublisherProcessor.class */
public class MessagePullPublisherProcessor implements MessagePublisherProcessor {
    private static final Logger logger = Logger.getLogger(MessagePullPublisherProcessor.class);
    private final MessagePullPublisherConfiguration<?> configuration;
    private final JetStreamClient jetStreamClient;
    private final AtomicReference<Status> status = new AtomicReference<>(new Status(false, "Not connected", ConnectionEvent.Closed));
    private final MessageFactory messageFactory;
    private volatile JetStreamReader jetStreamReader;

    public MessagePullPublisherProcessor(JetStreamClient jetStreamClient, MessagePullPublisherConfiguration<?> messagePullPublisherConfiguration, MessageFactory messageFactory) {
        this.configuration = messagePullPublisherConfiguration;
        this.jetStreamClient = jetStreamClient;
        this.messageFactory = messageFactory;
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor
    public Status getStatus() {
        return this.status.get();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor
    public void close() {
        if (this.jetStreamReader != null) {
            this.jetStreamReader.close();
        }
        this.jetStreamClient.close();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor
    public String getChannel() {
        return this.configuration.channel();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher.MessagePublisherProcessor
    public Multi<Message<?>> publisher() {
        return this.jetStreamClient.getOrEstablishConnection().onItem().transformToMulti(this::publisher).onFailure().invoke(th -> {
            if (isConsumerAlreadyInUse(th)) {
                return;
            }
            logger.errorf(th, "Failed to publish messages: %s", th.getMessage());
            this.status.set(new Status(false, th.getMessage(), ConnectionEvent.CommunicationFailed));
        }).onFailure().retry().withBackOff(this.configuration.retryBackoff()).indefinitely();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener
    public void onEvent(ConnectionEvent connectionEvent, String str) {
        switch (connectionEvent) {
            case Connected:
                this.status.set(new Status(true, str, connectionEvent));
                return;
            case Closed:
                this.status.set(new Status(false, str, connectionEvent));
                return;
            case Disconnected:
                this.status.set(new Status(false, str, connectionEvent));
                return;
            case Reconnected:
                this.status.set(new Status(true, str, connectionEvent));
                return;
            case CommunicationFailed:
                this.status.set(new Status(false, str, connectionEvent));
                return;
            default:
                return;
        }
    }

    private Multi<Message<?>> publisher(Connection connection) {
        boolean traceEnabled = this.configuration.traceEnabled();
        Class<?> orElse = this.configuration.payloadType().orElse(null);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(JetstreamWorkerThread::new);
        this.jetStreamReader = JetStreamReader.of(connection, this.configuration);
        return Multi.createBy().repeating().supplier(() -> {
            return this.jetStreamReader.nextMessage();
        }).until(optional -> {
            return !this.jetStreamReader.isActive();
        }).runSubscriptionOn(newSingleThreadExecutor).emitOn(runnable -> {
            connection.context().runOnContext(runnable);
        }).flatMap(optional2 -> {
            return createMulti((io.nats.client.Message) optional2.orElse(null), traceEnabled, orElse, connection.context());
        });
    }

    private Multi<Message<?>> createMulti(io.nats.client.Message message, boolean z, Class<?> cls, Context context) {
        return (message == null || message.getData() == null) ? Multi.createFrom().empty() : Multi.createFrom().item(() -> {
            return this.messageFactory.create(message, z, cls, context, new ExponentialBackoff(this.configuration.exponentialBackoff(), this.configuration.exponentialBackoffMaxDuration()), this.configuration.ackTimeout());
        });
    }
}
