package io.es4j.infrastructure.pgbroker.core;

import io.es4j.infrastructure.pgbroker.models.ConsumerRouter;
import io.es4j.infrastructure.pgbroker.models.ConsumerTransactionKey;
import io.es4j.infrastructure.pgbroker.models.ConsumerTransactionQuery;
import io.es4j.infrastructure.pgbroker.models.ConsumerTransactionRecord;
import io.es4j.infrastructure.pgbroker.models.MessageRecord;
import io.es4j.infrastructure.pgbroker.models.MessageRecordKey;
import io.es4j.infrastructure.pgbroker.models.MessageRecordQuery;
import io.es4j.sql.Repository;
import io.es4j.sql.exceptions.NotFound;
import io.es4j.task.LockLevel;
import io.es4j.task.TimerTask;
import io.es4j.task.TimerTaskConfiguration;
import io.es4j.task.TimerTaskDeployer;
import io.smallrye.mutiny.Uni;
import java.time.Duration;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/infrastructure/pgbroker/core/SessionRefresher.class */
public class SessionRefresher {
    private static final Logger LOGGER = LoggerFactory.getLogger(SessionRefresher.class);
    public static final String TX_PURGE_STATEMENT = "delete from message_broker_tx where inserted <= current_timestamp - interval '%s days'";
    public static final String MESSAGE_PURGE_STATEMENT = "delete from message_broker where state = 'CONSUMED' and inserted <= current_timestamp - interval '%s days'";
    public static final String STUCK_MESSAGES_STATEMENT = "update message_broker set rec_version = rec_version + 1, state = 'STUCK'  where state = 'CONSUMING' and updated + interval '%s seconds' <= current_timestamp;";

    public static void refreshTimers(ConsumerRouter consumerRouter, TimerTaskDeployer timerTaskDeployer, Repository<MessageRecordKey, MessageRecord, MessageRecordQuery> repository, Repository<ConsumerTransactionKey, ConsumerTransactionRecord, ConsumerTransactionQuery> repository2) {
        timerTaskDeployer.deploy(queueDurability(consumerRouter, repository));
        timerTaskDeployer.deploy(recoverStuckMessagesTask(consumerRouter, repository));
        timerTaskDeployer.deploy(messagesTxPurge(consumerRouter, repository2));
    }

    private static TimerTask queueDurability(final ConsumerRouter consumerRouter, final Repository<MessageRecordKey, MessageRecord, MessageRecordQuery> repository) {
        return new TimerTask() { // from class: io.es4j.infrastructure.pgbroker.core.SessionRefresher.1
            public Uni<Void> performTask() {
                return repository.query(SessionRefresher.MESSAGE_PURGE_STATEMENT.formatted(Long.valueOf(consumerRouter.brokerConfiguration().messageDurability().toDays()))).onFailure(NotFound.class).recoverWithNull().onFailure().invoke(th -> {
                    SessionRefresher.LOGGER.error("Error purging messages", th);
                }).replaceWithVoid();
            }

            public TimerTaskConfiguration configuration() {
                return new TimerTaskConfiguration(LockLevel.CLUSTER_WIDE, Duration.ofMinutes(15L), Duration.ofMinutes(15L), Duration.ofMinutes(15L), Optional.empty());
            }
        };
    }

    private static TimerTask messagesTxPurge(final ConsumerRouter consumerRouter, final Repository<ConsumerTransactionKey, ConsumerTransactionRecord, ConsumerTransactionQuery> repository) {
        return new TimerTask() { // from class: io.es4j.infrastructure.pgbroker.core.SessionRefresher.2
            public Uni<Void> performTask() {
                return repository.query(SessionRefresher.TX_PURGE_STATEMENT.formatted(Long.valueOf(consumerRouter.brokerConfiguration().consumerTxDurability().toDays()))).onFailure(NotFound.class).recoverWithNull().onFailure().invoke(th -> {
                    SessionRefresher.LOGGER.error("Error purging broker transactions", th);
                }).replaceWithVoid();
            }

            public TimerTaskConfiguration configuration() {
                return new TimerTaskConfiguration(LockLevel.CLUSTER_WIDE, Duration.ofHours(4L), Duration.ofHours(4L), Duration.ofHours(4L), Optional.empty());
            }
        };
    }

    private static TimerTask recoverStuckMessagesTask(final ConsumerRouter consumerRouter, final Repository<MessageRecordKey, MessageRecord, MessageRecordQuery> repository) {
        return new TimerTask() { // from class: io.es4j.infrastructure.pgbroker.core.SessionRefresher.3
            public Uni<Void> performTask() {
                return repository.query(SessionRefresher.STUCK_MESSAGES_STATEMENT.formatted(Long.valueOf(consumerRouter.brokerConfiguration().messageMaxProcessingTime().getSeconds()))).onFailure(NotFound.class).recoverWithNull().onFailure().invoke(th -> {
                    SessionRefresher.LOGGER.error("Error releasing stuck messages", th);
                }).replaceWithVoid();
            }

            public TimerTaskConfiguration configuration() {
                return new TimerTaskConfiguration(LockLevel.CLUSTER_WIDE, Duration.ofMinutes(15L), Duration.ofMinutes(15L), Duration.ofMinutes(15L), Optional.empty());
            }
        };
    }
}
