package io.es4j.infrastructure.pgbroker;

import io.es4j.infrastructure.misc.Es4jServiceLoader;
import io.es4j.infrastructure.pgbroker.mappers.DeadLetterMapper;
import io.es4j.infrastructure.pgbroker.mappers.MessageQueueMapper;
import io.es4j.infrastructure.pgbroker.mappers.QueuePartitionMapper;
import io.es4j.infrastructure.pgbroker.messagebroker.PgChannel;
import io.es4j.infrastructure.pgbroker.models.ConsumerManager;
import io.es4j.infrastructure.pgbroker.models.ConsumerWrap;
import io.es4j.infrastructure.pgbroker.models.PgBrokerConfiguration;
import io.es4j.sql.Repository;
import io.es4j.sql.RepositoryHandler;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import io.vertx.mutiny.pgclient.pubsub.PgSubscriber;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/infrastructure/pgbroker/PgBrokerVerticle.class */
public class PgBrokerVerticle extends AbstractVerticle {
    private final PgBrokerConfiguration configuration;
    private PgChannel pgChannel;
    private RepositoryHandler repositoryHandler;
    private static final Logger LOGGER = LoggerFactory.getLogger(PgBrokerVerticle.class);
    private final String deploymentID = UUID.randomUUID().toString();

    public PgBrokerVerticle(PgBrokerConfiguration pgBrokerConfiguration) {
        this.configuration = pgBrokerConfiguration;
    }

    public Uni<Void> asyncStart() {
        this.repositoryHandler = RepositoryHandler.leasePool(config(), this.vertx);
        this.pgChannel = createChannel();
        LOGGER.info("starting pg broker {}", deploymentID());
        return deploy();
    }

    public Uni<Void> asyncStop() {
        return this.pgChannel.stop().flatMap(r3 -> {
            return this.repositoryHandler.close();
        }).replaceWithVoid();
    }

    public String deploymentID() {
        return this.deploymentID;
    }

    public Uni<Void> deploy() {
        try {
            List list = ServiceLoader.load(QueueConsumer.class).stream().map((v0) -> {
                return v0.get();
            }).toList();
            ConsumerTransactionProvider consumerTransactionProvider = this.configuration.consumerTransactionProvider();
            consumerTransactionProvider.start(this.repositoryHandler);
            return Multi.createFrom().iterable(list).onItem().transformToUniAndMerge(queueConsumer -> {
                return queueConsumer.start(this.vertx, config());
            }).collect().asList().replaceWithVoid().flatMap(obj -> {
                return this.pgChannel.start(new ConsumerManager(this.configuration, wrap(this.deploymentID, list), consumerTransactionProvider, this.repositoryHandler.vertx()));
            });
        } catch (Exception e) {
            return Uni.createFrom().failure(e);
        }
    }

    private PgChannel createChannel() {
        PgSubscriber subscriber = PgSubscriber.subscriber(this.repositoryHandler.vertx(), RepositoryHandler.connectionOptions(this.repositoryHandler.configuration()));
        subscriber.reconnectPolicy(num -> {
            return 0L;
        });
        return new PgChannel(new Repository(MessageQueueMapper.INSTANCE, this.repositoryHandler), new Repository(DeadLetterMapper.INSTANCE, this.repositoryHandler), new Repository(QueuePartitionMapper.INSTANCE, this.repositoryHandler), subscriber, (String) Objects.requireNonNullElse(this.repositoryHandler.vertx().getOrCreateContext().deploymentID(), UUID.randomUUID().toString()));
    }

    public List<ConsumerWrap> wrap(String str, List<QueueConsumer> list) {
        return ((Map) list.stream().map(queueConsumer -> {
            return ImmutablePair.of(Es4jServiceLoader.getFirstGenericType(queueConsumer), queueConsumer);
        }).collect(Collectors.groupingBy((v0) -> {
            return v0.getLeft();
        }, Collectors.mapping((v0) -> {
            return v0.getRight();
        }, Collectors.toList())))).entrySet().stream().map(entry -> {
            Class cls = (Class) entry.getKey();
            validateProcessors((List) entry.getValue(), cls);
            return new ConsumerWrap(str, (QueueConsumer) ((List) entry.getValue()).stream().filter(queueConsumer2 -> {
                return queueConsumer2.tenants() == null;
            }).findFirst().orElseThrow(), (Map) ((List) entry.getValue()).stream().filter(queueConsumer3 -> {
                return queueConsumer3.tenants() != null;
            }).collect(Collectors.groupingBy((v0) -> {
                return v0.tenants();
            })), cls);
        }).toList();
    }

    private static void validateProcessors(List<QueueConsumer> list, Class<?> cls) {
        if (list.stream().filter(queueConsumer -> {
            return queueConsumer.tenants() == null;
        }).toList().size() > 1) {
            throw new IllegalStateException("More than one default implementation for -> " + String.valueOf(cls));
        }
        ((Map) list.stream().filter(queueConsumer2 -> {
            return queueConsumer2.tenants() != null;
        }).collect(Collectors.groupingBy((v0) -> {
            return v0.tenants();
        }))).forEach((list2, list3) -> {
            if (list3.size() > 1) {
                throw new IllegalStateException("More than one custom implementation for tenant " + String.valueOf(list2) + " queue -> " + String.valueOf(cls));
            }
        });
    }
}
