package io.es4j.infrastructure.pgbroker;

import io.es4j.infrastructure.misc.Es4jServiceLoader;
import io.es4j.infrastructure.pgbroker.core.PgChannel;
import io.es4j.infrastructure.pgbroker.mappers.BrokerPartitionMapper;
import io.es4j.infrastructure.pgbroker.mappers.ConsumerFailureMapper;
import io.es4j.infrastructure.pgbroker.mappers.MessageMapper;
import io.es4j.infrastructure.pgbroker.mappers.MessageTransactionMapper;
import io.es4j.infrastructure.pgbroker.models.BrokerConfiguration;
import io.es4j.infrastructure.pgbroker.models.ConsumerRouter;
import io.es4j.infrastructure.pgbroker.models.QueueConsumerWrapper;
import io.es4j.infrastructure.pgbroker.models.TopicSubscriberWrapper;
import io.es4j.sql.Repository;
import io.es4j.sql.RepositoryHandler;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import io.vertx.mutiny.pgclient.pubsub.PgSubscriber;
import java.util.List;
import java.util.ServiceLoader;
import java.util.UUID;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/infrastructure/pgbroker/PgBrokerVerticle.class */
public class PgBrokerVerticle extends AbstractVerticle {
    private final BrokerConfiguration configuration;
    private PgChannel pgChannel;
    private RepositoryHandler repositoryHandler;
    private static final Logger LOGGER = LoggerFactory.getLogger(PgBrokerVerticle.class);
    private final String deploymentID = UUID.randomUUID().toString();

    public PgBrokerVerticle(BrokerConfiguration brokerConfiguration) {
        this.configuration = brokerConfiguration;
    }

    public Uni<Void> asyncStart() {
        this.repositoryHandler = RepositoryHandler.leasePool(config(), this.vertx);
        this.pgChannel = createChannel();
        LOGGER.info("starting pg broker {}", deploymentID());
        return deploy();
    }

    public Uni<Void> asyncStop() {
        LOGGER.info("stopping pg broker {}", deploymentID());
        return this.pgChannel.stop().flatMap(r3 -> {
            return this.repositoryHandler.close();
        }).replaceWithVoid();
    }

    public String deploymentID() {
        return this.deploymentID;
    }

    public Uni<Void> deploy() {
        try {
            List<TopicSubscription> list = ServiceLoader.load(TopicSubscription.class).stream().map((v0) -> {
                return v0.get();
            }).toList();
            List list2 = ServiceLoader.load(QueueConsumer.class).stream().map((v0) -> {
                return v0.get();
            }).toList();
            ConsumerTransactionProvider consumerTransactionProvider = this.configuration.consumerTransactionProvider();
            consumerTransactionProvider.start(this.repositoryHandler);
            return startTopicConsumers(list).flatMap(r5 -> {
                return startQueueConsumers(list2);
            }).flatMap(r13 -> {
                return this.pgChannel.start(new ConsumerRouter(this.configuration, wrapTopic(list), wrapQueue(list2), consumerTransactionProvider, this.repositoryHandler.vertx()));
            });
        } catch (Exception e) {
            return Uni.createFrom().failure(e);
        }
    }

    private PgChannel createChannel() {
        PgSubscriber subscriber = PgSubscriber.subscriber(this.repositoryHandler.vertx(), RepositoryHandler.connectionOptions(this.repositoryHandler.configuration()));
        subscriber.reconnectPolicy(num -> {
            return 0L;
        });
        return new PgChannel(new Repository(MessageTransactionMapper.INSTANCE, this.repositoryHandler), new Repository(MessageMapper.INSTANCE, this.repositoryHandler), new Repository(ConsumerFailureMapper.INSTANCE, this.repositoryHandler), new Repository(BrokerPartitionMapper.INSTANCE, this.repositoryHandler), subscriber, deploymentID());
    }

    private List<QueueConsumerWrapper> wrapQueue(List<QueueConsumer> list) {
        return list.stream().map(queueConsumer -> {
            return new QueueConsumerWrapper(queueConsumer, Es4jServiceLoader.getFirstGenericType(queueConsumer));
        }).toList();
    }

    public List<TopicSubscriberWrapper> wrapTopic(List<TopicSubscription> list) {
        return list.stream().map(topicSubscription -> {
            return new TopicSubscriberWrapper(topicSubscription, Es4jServiceLoader.getFirstGenericType(topicSubscription), Pattern.compile(topicSubscription.address()));
        }).toList();
    }

    private Uni<Void> startQueueConsumers(List<QueueConsumer> list) {
        return Multi.createFrom().iterable(list).onItem().transformToUniAndMerge(queueConsumer -> {
            return queueConsumer.start(this.vertx, config());
        }).collect().asList().replaceWithVoid();
    }

    private Uni<Void> startTopicConsumers(List<TopicSubscription> list) {
        return Multi.createFrom().iterable(list).onItem().transformToUniAndMerge(topicSubscription -> {
            return topicSubscription.start(this.vertx, config());
        }).collect().asList().replaceWithVoid();
    }
}
