/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.reactive.messaging.nats.jetstream.processors.subscriber;

import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionEvent;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.Status;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.subscriber.MessageSubscriberConfiguration;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

public class MessageSubscriberProcessor
implements MessageProcessor,
ConnectionListener {
    private static final Logger logger = Logger.getLogger(MessageSubscriberProcessor.class);
    private final ConnectionConfiguration connectionConfiguration;
    private final MessageSubscriberConfiguration configuration;
    private final ConnectionFactory connectionFactory;
    private final AtomicReference<Status> status;
    private final AtomicReference<Connection> connection;

    public MessageSubscriberProcessor(ConnectionConfiguration connectionConfiguration, ConnectionFactory connectionFactory, MessageSubscriberConfiguration configuration) {
        this.connectionConfiguration = connectionConfiguration;
        this.connectionFactory = connectionFactory;
        this.configuration = configuration;
        this.status = new AtomicReference<Status>(new Status(true, "Subscriber processor inactive", ConnectionEvent.Closed));
        this.connection = new AtomicReference();
    }

    public <T> Flow.Subscriber<Message<T>> subscriber() {
        return MultiUtils.via(this::subscribe);
    }

    private <T> Multi<Message<T>> subscribe(Multi<Message<T>> subscription) {
        return subscription.onItem().transformToUniAndConcatenate(this::publish);
    }

    @Override
    public String channel() {
        return this.configuration.channel();
    }

    @Override
    public Status readiness() {
        return this.status.get();
    }

    @Override
    public Status liveness() {
        return this.status.get();
    }

    @Override
    public void close() {
        try {
            Connection connection = this.connection.getAndSet(null);
            if (connection != null) {
                connection.close();
            }
        }
        catch (Throwable failure) {
            logger.warnf(failure, "Failed to close connection with message: %s", (Object)failure.getMessage());
        }
    }

    @Override
    public void onEvent(ConnectionEvent event, String message) {
        logger.infof("Event: %s, message: %s, channel: %s", (Object)event, (Object)message, (Object)this.configuration.channel());
        this.status.set(Status.builder().healthy(true).message(message).event(event).build());
    }

    private <T> Uni<Message<T>> publish(Message<T> message) {
        return this.getOrEstablishConnection().onItem().transformToUni(connection -> connection.publish(message, this.configuration)).onFailure().invoke(failure -> logger.errorf(failure, "Failed to publish with message: %s", (Object)failure.getMessage())).onFailure().recoverWithUni(() -> this.recover(message));
    }

    private <T> Uni<Message<T>> recover(Message<T> message) {
        return Uni.createFrom().item(() -> {
            this.close();
            return null;
        }).onItem().transformToUni(v -> this.publish(message));
    }

    private Uni<? extends Connection> getOrEstablishConnection() {
        return Uni.createFrom().item(() -> Optional.ofNullable(this.connection.get()).filter(Connection::isConnected).orElse(null)).onItem().ifNull().switchTo(() -> this.connectionFactory.create(this.connectionConfiguration, this)).onItem().invoke(this.connection::set);
    }
}

