/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.reactive.messaging.nats.jetstream.processors.subscriber;

import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionEvent;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.Status;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.subscriber.MessageSubscriberConfiguration;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

public class MessageSubscriberProcessor
implements MessageProcessor,
ConnectionListener {
    private static final Logger logger = Logger.getLogger(MessageSubscriberProcessor.class);
    private final ConnectionConfiguration connectionConfiguration;
    private final MessageSubscriberConfiguration configuration;
    private final ConnectionFactory connectionFactory;
    private final AtomicReference<Status> status;
    private final AtomicReference<MessageConnection> connection;

    public MessageSubscriberProcessor(ConnectionConfiguration connectionConfiguration, ConnectionFactory connectionFactory, MessageSubscriberConfiguration configuration) {
        this.connectionConfiguration = connectionConfiguration;
        this.connectionFactory = connectionFactory;
        this.configuration = configuration;
        this.status = new AtomicReference<Status>(new Status(true, "Connection closed", ConnectionEvent.Closed));
        this.connection = new AtomicReference();
    }

    public Flow.Subscriber<? extends Message<?>> subscriber() {
        return MultiUtils.via(m -> m.onSubscription().call(this::getOrEstablishConnection).onItem().transformToUniAndConcatenate(this::publish).onFailure().invoke(throwable -> this.connection.get().fireEvent(ConnectionEvent.CommunicationFailed, throwable.getMessage())));
    }

    @Override
    public Status getStatus() {
        return this.status.get();
    }

    @Override
    public void close() {
        try {
            MessageConnection connection = this.connection.get();
            if (connection != null) {
                connection.close();
            }
        }
        catch (Throwable failure) {
            logger.warnf(failure, "Failed to close connection", (Object)failure);
        }
    }

    @Override
    public String getChannel() {
        return this.configuration.getChannel();
    }

    @Override
    public void onEvent(ConnectionEvent event, String message) {
        switch (event) {
            case Connected: {
                this.status.set(new Status(true, message, event));
                break;
            }
            case Closed: {
                this.status.set(new Status(true, message, event));
                break;
            }
            case Disconnected: {
                this.status.set(new Status(false, message, event));
                break;
            }
            case Reconnected: {
                this.status.set(new Status(true, message, event));
                break;
            }
            case CommunicationFailed: {
                this.status.set(new Status(false, message, event));
            }
        }
    }

    private Uni<? extends Message<?>> publish(Message<?> message) {
        return this.getOrEstablishConnection().onItem().transformToUni(connection -> connection.publish(message, this.configuration));
    }

    private Uni<? extends MessageConnection> getOrEstablishConnection() {
        return Uni.createFrom().item(() -> Optional.ofNullable(this.connection.get()).filter(Connection::isConnected).orElse(null)).onItem().ifNull().switchTo(() -> this.connectionFactory.message(this.connectionConfiguration, this)).onItem().invoke(this.connection::set);
    }
}

