package io.es4j.infrastructure.pgbroker.core;

import io.es4j.infrastructure.pgbroker.models.BrokerPartitionRecord;
import io.es4j.infrastructure.pgbroker.models.ConsumerFailureKey;
import io.es4j.infrastructure.pgbroker.models.ConsumerFailureQuery;
import io.es4j.infrastructure.pgbroker.models.ConsumerFailureRecord;
import io.es4j.infrastructure.pgbroker.models.ConsumerRouter;
import io.es4j.infrastructure.pgbroker.models.ConsumerTransactionKey;
import io.es4j.infrastructure.pgbroker.models.ConsumerTransactionQuery;
import io.es4j.infrastructure.pgbroker.models.ConsumerTransactionRecord;
import io.es4j.infrastructure.pgbroker.models.MessageRecord;
import io.es4j.infrastructure.pgbroker.models.MessageRecordKey;
import io.es4j.infrastructure.pgbroker.models.MessageRecordQuery;
import io.es4j.infrastructure.pgbroker.models.PartitionKey;
import io.es4j.infrastructure.pgbroker.models.PartitionQuery;
import io.es4j.sql.Repository;
import io.es4j.sql.misc.EnvVars;
import io.es4j.task.TimerTaskDeployer;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.pgclient.pubsub.PgSubscriber;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/infrastructure/pgbroker/core/PgChannel.class */
public class PgChannel {
    private static final Logger LOGGER = LoggerFactory.getLogger(PgChannel.class);
    public static final AtomicBoolean REFRESHER_DEPLOYED = new AtomicBoolean(false);
    private final Repository<MessageRecordKey, MessageRecord, MessageRecordQuery> messageQueue;
    private final Repository<ConsumerFailureKey, ConsumerFailureRecord, ConsumerFailureQuery> consumerFailure;
    private final PgSubscriber pgSubscriber;
    private final Repository<PartitionKey, BrokerPartitionRecord, PartitionQuery> partitionRepository;
    private final Repository<ConsumerTransactionKey, ConsumerTransactionRecord, ConsumerTransactionQuery> messageTx;
    private final String verticleId;
    private TimerTaskDeployer timerTasks;
    private SessionManager sessionManager;

    public PgChannel(Repository<ConsumerTransactionKey, ConsumerTransactionRecord, ConsumerTransactionQuery> repository, Repository<MessageRecordKey, MessageRecord, MessageRecordQuery> repository2, Repository<ConsumerFailureKey, ConsumerFailureRecord, ConsumerFailureQuery> repository3, Repository<PartitionKey, BrokerPartitionRecord, PartitionQuery> repository4, PgSubscriber pgSubscriber, String str) {
        this.messageTx = repository;
        this.messageQueue = repository2;
        this.consumerFailure = repository3;
        this.pgSubscriber = pgSubscriber;
        this.partitionRepository = repository4;
        this.verticleId = str;
    }

    public Uni<Void> stop() {
        return this.sessionManager.close().flatMap(r3 -> {
            return this.pgSubscriber.close();
        });
    }

    public Uni<Void> start(ConsumerRouter consumerRouter) {
        this.timerTasks = new TimerTaskDeployer(this.messageQueue.repositoryHandler().vertx());
        if (!REFRESHER_DEPLOYED.get()) {
            SessionRefresher.refreshTimers(consumerRouter, this.timerTasks, this.messageQueue, this.messageTx);
        }
        return PartitionHashRing.populateHashRing(this.partitionRepository).flatMap(r12 -> {
            this.sessionManager = new SessionManager(this.verticleId, consumerRouter, this.messageQueue, this.consumerFailure, this.partitionRepository, this.timerTasks);
            this.sessionManager.start();
            this.pgSubscriber.channel(parseChannel()).handler(str -> {
                LOGGER.info("Incoming message for partition {}", str);
                if (str.isEmpty()) {
                    throw new IllegalArgumentException("partition not present in channel message");
                }
                this.sessionManager.signal(str);
            }).endHandler(() -> {
                LOGGER.info("channel stopped");
            }).subscribeHandler(() -> {
                LOGGER.info("channel started");
            }).exceptionHandler(th -> {
                LOGGER.error("channel error", th);
            });
            return this.pgSubscriber.connect();
        });
    }

    private String parseChannel() {
        return this.messageQueue.repositoryHandler().configuration().getString("schema", EnvVars.SCHEMA) + "-message-broker-channel";
    }
}
