/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher;

import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionEvent;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageSubscribeConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.Status;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher.MessagePublisherProcessor;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher.MessagePullPublisherConfiguration;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

public class MessagePullPublisherProcessor
implements MessagePublisherProcessor {
    private static final Logger logger = Logger.getLogger(MessagePullPublisherProcessor.class);
    private final MessagePullPublisherConfiguration<?> configuration;
    private final ConnectionFactory connectionFactory;
    private final AtomicReference<Status> status;
    private final AtomicReference<MessageSubscribeConnection> connection;
    private final ConnectionConfiguration connectionConfiguration;

    public MessagePullPublisherProcessor(ConnectionFactory connectionFactory, ConnectionConfiguration connectionConfiguration, MessagePullPublisherConfiguration<?> configuration) {
        this.configuration = configuration;
        this.connectionFactory = connectionFactory;
        this.status = new AtomicReference<Status>(new Status(false, "Not connected", ConnectionEvent.Closed));
        this.connection = new AtomicReference();
        this.connectionConfiguration = connectionConfiguration;
    }

    @Override
    public Status getStatus() {
        return this.status.get();
    }

    @Override
    public void close() {
        try {
            MessageSubscribeConnection connection = this.connection.get();
            if (connection != null) {
                connection.close();
            }
        }
        catch (Throwable failure) {
            logger.warnf(failure, "Failed to close connection", (Object)failure);
        }
    }

    @Override
    public String getChannel() {
        return this.configuration.channel();
    }

    @Override
    public Multi<Message<?>> publisher() {
        return this.getOrEstablishConnection().onItem().transformToMulti(MessageSubscribeConnection::subscribe).onFailure().invoke(throwable -> {
            if (!this.isConsumerAlreadyInUse((Throwable)throwable)) {
                logger.errorf(throwable, "Failed to publish messages: %s", (Object)throwable.getMessage());
                this.status.set(new Status(false, throwable.getMessage(), ConnectionEvent.CommunicationFailed));
            }
        }).onFailure().retry().withBackOff(this.configuration.retryBackoff()).indefinitely();
    }

    @Override
    public void onEvent(ConnectionEvent event, String message) {
        switch (event) {
            case Connected: {
                this.status.set(new Status(true, message, event));
                break;
            }
            case Closed: {
                this.status.set(new Status(false, message, event));
                break;
            }
            case Disconnected: {
                this.status.set(new Status(false, message, event));
                break;
            }
            case Reconnected: {
                this.status.set(new Status(true, message, event));
                break;
            }
            case CommunicationFailed: {
                this.status.set(new Status(false, message, event));
            }
        }
    }

    private Uni<MessageSubscribeConnection> getOrEstablishConnection() {
        return Uni.createFrom().item(() -> Optional.ofNullable(this.connection.get()).filter(Connection::isConnected).orElse(null)).onItem().ifNull().switchTo(() -> this.connectionFactory.subscribe(this.connectionConfiguration, this, this.configuration)).onItem().invoke(this.connection::set);
    }
}

