package io.es4j.infrastructure.pgbroker.messagebroker;

import io.es4j.infrastructure.pgbroker.models.ConsumerManager;
import io.es4j.infrastructure.pgbroker.models.DeadLetterKey;
import io.es4j.infrastructure.pgbroker.models.DeadLetterRecord;
import io.es4j.infrastructure.pgbroker.models.MessagePartition;
import io.es4j.infrastructure.pgbroker.models.MessageRecord;
import io.es4j.infrastructure.pgbroker.models.MessageRecordID;
import io.es4j.infrastructure.pgbroker.models.MessageRecordQuery;
import io.es4j.infrastructure.pgbroker.models.PartitionKey;
import io.es4j.infrastructure.pgbroker.models.PartitionQuery;
import io.es4j.sql.Repository;
import io.es4j.sql.misc.EnvVars;
import io.es4j.task.TimerTaskDeployer;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.pgclient.pubsub.PgSubscriber;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/infrastructure/pgbroker/messagebroker/PgChannel.class */
public class PgChannel {
    private static final Logger LOGGER = LoggerFactory.getLogger(PgChannel.class);
    public static final AtomicBoolean REFRESHER_DEPLOYED = new AtomicBoolean(false);
    private final Repository<MessageRecordID, MessageRecord, MessageRecordQuery> messageQueue;
    private final Repository<DeadLetterKey, DeadLetterRecord, MessageRecordQuery> deadLetterQueue;
    private final PgSubscriber pgSubscriber;
    private final Repository<PartitionKey, MessagePartition, PartitionQuery> partitionRepository;
    private final String verticleId;
    private TimerTaskDeployer timerTasks;
    private SessionManager sessionManager;

    public PgChannel(Repository<MessageRecordID, MessageRecord, MessageRecordQuery> repository, Repository<DeadLetterKey, DeadLetterRecord, MessageRecordQuery> repository2, Repository<PartitionKey, MessagePartition, PartitionQuery> repository3, PgSubscriber pgSubscriber, String str) {
        this.messageQueue = repository;
        this.deadLetterQueue = repository2;
        this.pgSubscriber = pgSubscriber;
        this.partitionRepository = repository3;
        this.verticleId = str;
    }

    public Uni<Void> stop() {
        return this.sessionManager.close().flatMap(r3 -> {
            return this.pgSubscriber.close();
        });
    }

    public Uni<Void> start(ConsumerManager consumerManager) {
        this.timerTasks = new TimerTaskDeployer(this.messageQueue.repositoryHandler().vertx());
        if (!REFRESHER_DEPLOYED.get()) {
            SessionRefresher.refreshTimers(consumerManager, this.timerTasks, this.messageQueue);
        }
        return PartitionHashRing.populateHashRing(this.partitionRepository).flatMap(r12 -> {
            this.sessionManager = new SessionManager(this.verticleId, consumerManager, this.messageQueue, this.deadLetterQueue, this.partitionRepository, this.timerTasks);
            this.sessionManager.start();
            this.pgSubscriber.channel(parseChannel()).handler(str -> {
                LOGGER.info("Incoming message for partition {}", str);
                if (str.isEmpty()) {
                    throw new IllegalArgumentException("partition not present in channel message");
                }
                this.sessionManager.signal(str);
            }).endHandler(() -> {
                LOGGER.info("channel stopped");
            }).subscribeHandler(() -> {
                LOGGER.info("channel started");
            }).exceptionHandler(th -> {
                LOGGER.error("channel error", th);
            });
            return this.pgSubscriber.connect();
        });
    }

    private String parseChannel() {
        return this.messageQueue.repositoryHandler().configuration().getString("schema", EnvVars.SCHEMA) + "-queue-channel";
    }
}
