package io.es4j.infrastructure.pgbroker.messagebroker;

import io.es4j.infrastructure.pgbroker.exceptions.PartitionTakenException;
import io.es4j.infrastructure.pgbroker.models.ConsumerManager;
import io.es4j.infrastructure.pgbroker.models.DeadLetterKey;
import io.es4j.infrastructure.pgbroker.models.DeadLetterRecord;
import io.es4j.infrastructure.pgbroker.models.MessagePartition;
import io.es4j.infrastructure.pgbroker.models.MessageRecord;
import io.es4j.infrastructure.pgbroker.models.MessageRecordID;
import io.es4j.infrastructure.pgbroker.models.MessageRecordQuery;
import io.es4j.infrastructure.pgbroker.models.PartitionKey;
import io.es4j.infrastructure.pgbroker.models.PartitionQuery;
import io.es4j.sql.Repository;
import io.es4j.task.LockLevel;
import io.es4j.task.TimerTask;
import io.es4j.task.TimerTaskConfiguration;
import io.es4j.task.TimerTaskDeployer;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/infrastructure/pgbroker/messagebroker/SessionManager.class */
public class SessionManager {
    private final ConcurrentPollingSession concurrentSession;
    private final Repository<MessageRecordID, MessageRecord, MessageRecordQuery> messageQueue;
    private final Repository<PartitionKey, MessagePartition, PartitionQuery> partitionRepository;
    private final String verticleId;
    private final Map<String, Instant> lastAttempt = new HashMap();
    private final ConsumerManager consumerManager;
    private final MessageProcessor messageProcessor;
    private TimerTaskDeployer timerTasks;
    private static final Map<String, PartitionPollingSession> partitionedSessions = new HashMap();
    private static final Logger LOGGER = LoggerFactory.getLogger(SessionManager.class);

    public Uni<Void> close() {
        this.timerTasks.close();
        return !partitionedSessions.isEmpty() ? Multi.createFrom().iterable(partitionedSessions.values()).onItem().transformToUniAndMerge((v0) -> {
            return v0.close();
        }).collect().asList().replaceWithVoid() : Uni.createFrom().voidItem();
    }

    public SessionManager(String str, ConsumerManager consumerManager, Repository<MessageRecordID, MessageRecord, MessageRecordQuery> repository, Repository<DeadLetterKey, DeadLetterRecord, MessageRecordQuery> repository2, Repository<PartitionKey, MessagePartition, PartitionQuery> repository3, TimerTaskDeployer timerTaskDeployer) {
        this.consumerManager = consumerManager;
        this.concurrentSession = new ConcurrentPollingSession(repository, consumerManager.pgBrokerConfiguration(), str);
        this.messageQueue = repository;
        this.timerTasks = timerTaskDeployer;
        this.partitionRepository = repository3;
        this.verticleId = str;
        this.messageProcessor = new MessageProcessor(consumerManager, repository, repository2);
    }

    public void start() {
        this.timerTasks.deploy(this.concurrentSession.provideTask());
        this.timerTasks.deploy(processorTask());
    }

    private TimerTask processorTask() {
        return new TimerTask() { // from class: io.es4j.infrastructure.pgbroker.messagebroker.SessionManager.1
            public Uni<Void> performTask() {
                ArrayList arrayList = new ArrayList();
                if (!SessionManager.partitionedSessions.isEmpty()) {
                    arrayList.add(Multi.createFrom().iterable(SessionManager.partitionedSessions.entrySet()).onItem().transformToUniAndMerge(entry -> {
                        List<MessageRecord> pollMessages = ((PartitionPollingSession) entry.getValue()).pollMessages();
                        if (pollMessages.isEmpty()) {
                            return Uni.createFrom().voidItem();
                        }
                        SessionManager.LOGGER.info("Processing {} messages in  partition {}", Integer.valueOf(pollMessages.size()), entry.getKey());
                        return SessionManager.this.messageProcessor.processPartitions(pollMessages);
                    }).collect().asList().replaceWithVoid());
                }
                List<MessageRecord> pollMessages = SessionManager.this.concurrentSession.pollMessages();
                if (!pollMessages.isEmpty()) {
                    SessionManager.LOGGER.info("Processing {} messages", Integer.valueOf(pollMessages.size()));
                    arrayList.add(SessionManager.this.messageProcessor.processConcurrent(pollMessages));
                }
                return !arrayList.isEmpty() ? Uni.join().all(arrayList).andCollectFailures().onFailure().invoke(th -> {
                    SessionManager.LOGGER.warn("Consumer dropped exception", th);
                }).onFailure().recoverWithNull().replaceWithVoid() : Uni.createFrom().voidItem();
            }

            public TimerTaskConfiguration configuration() {
                return new TimerTaskConfiguration(LockLevel.NONE, Duration.ofMillis(10L), Duration.ofMillis(10L), Duration.ofMillis(10L), Optional.empty());
            }
        };
    }

    public boolean contains(String str) {
        return partitionedSessions.containsKey(str);
    }

    public void signal(String str) {
        if (str.equals("none")) {
            this.concurrentSession.signalMessage();
        } else if (!contains(str)) {
            startPartitionSession(str);
        } else {
            this.lastAttempt.remove(str);
            ((PartitionPollingSession) Objects.requireNonNull(partitionedSessions.get(str))).signalMessage();
        }
    }

    private void startPartitionSession(String str) {
        if (!this.lastAttempt.containsKey(str)) {
            this.lastAttempt.put(str, Instant.now());
            startSession(str);
            return;
        }
        Instant instant = this.lastAttempt.get(str);
        LOGGER.debug("Partition was previously taken by another verticle, last claim attempt {}", instant);
        if (Instant.now().isAfter(instant.plus(5L, (TemporalUnit) ChronoUnit.MINUTES))) {
            this.lastAttempt.put(str, Instant.now());
            startSession(str);
        }
    }

    private void startSession(String str) {
        LOGGER.debug("Trying to claim partition {}", str);
        PartitionPollingSession partitionPollingSession = new PartitionPollingSession(this.partitionRepository, this.messageQueue, this.consumerManager.pgBrokerConfiguration(), this.verticleId, str);
        partitionPollingSession.start(this.timerTasks).subscribe().with(r6 -> {
            LOGGER.error("subscribed to partition {}", str);
            partitionedSessions.put(str, partitionPollingSession);
        }, th -> {
            if (th instanceof PartitionTakenException) {
                LOGGER.info("partition already taken {}", str);
            } else {
                LOGGER.error("partition claiming dropped exception", th);
            }
        });
    }
}
