package dk.cloudcreate.essentials.components.queue.postgresql;

import dk.cloudcreate.essentials.components.common.transaction.HandleAwareUnitOfWork;
import dk.cloudcreate.essentials.components.common.transaction.HandleAwareUnitOfWorkFactory;
import dk.cloudcreate.essentials.components.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.queue.QueueName;
import dk.cloudcreate.essentials.components.queue.QueueRedeliveryPolicy;
import dk.cloudcreate.essentials.components.queue.QueuedMessageHandler;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.shared.concurrent.ThreadFactoryBuilder;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dk/cloudcreate/essentials/components/queue/postgresql/PostgresqlDurableQueueConsumer.class */
public class PostgresqlDurableQueueConsumer implements DurableQueueConsumer {
    private static final Logger log = LoggerFactory.getLogger(PostgresqlDurableQueueConsumer.class);
    private final QueueRedeliveryPolicy redeliveryPolicy;
    private final QueueName queueName;
    private final QueuedMessageHandler queuedMessageHandler;
    private final ScheduledExecutorService scheduler;
    private final PostgresqlDurableQueues postgresqlDurableQueues;
    private final HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> unitOfWorkFactory;
    private volatile boolean started;

    public PostgresqlDurableQueueConsumer(QueueName queueName, QueuedMessageHandler queuedMessageHandler, QueueRedeliveryPolicy queueRedeliveryPolicy, int i, HandleAwareUnitOfWorkFactory<? extends HandleAwareUnitOfWork> handleAwareUnitOfWorkFactory, PostgresqlDurableQueues postgresqlDurableQueues) {
        this.queueName = (QueueName) FailFast.requireNonNull(queueName, "queueName is missing");
        this.queuedMessageHandler = (QueuedMessageHandler) FailFast.requireNonNull(queuedMessageHandler, "You must specify a queuedMessageHandler");
        this.redeliveryPolicy = (QueueRedeliveryPolicy) FailFast.requireNonNull(queueRedeliveryPolicy, "You must specify a redelivery policy");
        this.unitOfWorkFactory = (HandleAwareUnitOfWorkFactory) FailFast.requireNonNull(handleAwareUnitOfWorkFactory, "You must specify a unitOfWorkFactory");
        this.postgresqlDurableQueues = (PostgresqlDurableQueues) FailFast.requireNonNull(postgresqlDurableQueues, "postgresqlDurableQueues is missing");
        FailFast.requireTrue(i >= 1, "You must specify a number of parallelMessageConsumers >= 1");
        this.scheduler = Executors.newScheduledThreadPool(i, new ThreadFactoryBuilder().nameFormat("Queue-" + queueName + "-Polling-%d").daemon(true).build());
    }

    public void start() {
        if (this.started) {
            return;
        }
        log.info("[{}] Starting DurableQueueConsumer with polling interval {} (based on initialRedeliveryDelay)", this.queueName, this.redeliveryPolicy.initialRedeliveryDelay);
        this.scheduler.scheduleAtFixedRate(this::pollQueue, this.redeliveryPolicy.initialRedeliveryDelay.toMillis(), this.redeliveryPolicy.initialRedeliveryDelay.toMillis(), TimeUnit.MILLISECONDS);
        this.started = true;
    }

    public void stop() {
        if (this.started) {
            log.info("[{}] Stopping DurableQueueConsumer", this.queueName);
            this.scheduler.shutdownNow();
            this.started = false;
            this.postgresqlDurableQueues.removeQueueConsumer(this);
            log.info("[{}] DurableQueueConsumer stopped", this.queueName);
        }
    }

    public boolean isStarted() {
        return this.started;
    }

    @Override // dk.cloudcreate.essentials.components.queue.DurableQueueConsumer
    public QueueName queueName() {
        return this.queueName;
    }

    @Override // dk.cloudcreate.essentials.components.queue.DurableQueueConsumer
    public void cancel() {
        stop();
    }

    private void pollQueue() {
        log.trace("[{}] Polling Queue for the next message ready for delivery", this.queueName);
        this.unitOfWorkFactory.usingUnitOfWork(handleAwareUnitOfWork -> {
            try {
                this.postgresqlDurableQueues.getNextMessageReadyForDelivery(this.queueName).map(queuedMessage -> {
                    log.debug("[{}:{}] Delivering message. Total attempts: {}, Redelivery Attempts: {}", new Object[]{this.queueName, queuedMessage.id, Integer.valueOf(queuedMessage.totalDeliveryAttempts), Integer.valueOf(queuedMessage.redeliveryAttempts)});
                    try {
                        this.queuedMessageHandler.handle(queuedMessage);
                        log.debug("[{}:{}] Message handled successfully. Deleting the message in the Queue Store message. Total attempts: {}, Redelivery Attempts: {}", new Object[]{this.queueName, queuedMessage.id, Integer.valueOf(queuedMessage.totalDeliveryAttempts), Integer.valueOf(queuedMessage.redeliveryAttempts)});
                        return Boolean.valueOf(this.postgresqlDurableQueues.deleteMessage(queuedMessage.id));
                    } catch (Exception e) {
                        log.debug(MessageFormatter.msg("[{}:{}] QueueMessageHandler for failed to handle: {}", new Object[]{this.queueName, queuedMessage.id, queuedMessage}), e);
                        if (queuedMessage.totalDeliveryAttempts >= this.redeliveryPolicy.maximumNumberOfRedeliveries + 1) {
                            log.debug("[{}:{}] Marking Message as Dead Letter: {}", new Object[]{this.queueName, queuedMessage.id, queuedMessage});
                            return Boolean.valueOf(this.postgresqlDurableQueues.markAsDeadLetterMessage(queuedMessage.id, e));
                        }
                        Duration calculateNextRedeliveryDelay = this.redeliveryPolicy.calculateNextRedeliveryDelay(queuedMessage.redeliveryAttempts);
                        log.debug(MessageFormatter.msg("[{}:{}] Using redeliveryDelay '{}' for QueueEntryId '{}' due to: {}", new Object[]{this.queueName, queuedMessage.id, calculateNextRedeliveryDelay, queuedMessage.id, e.getMessage()}));
                        return Boolean.valueOf(this.postgresqlDurableQueues.retryMessage(queuedMessage.id, e, calculateNextRedeliveryDelay));
                    }
                });
            } catch (Exception e) {
                log.error(MessageFormatter.msg("[{}] Error Polling Queue", new Object[]{this.queueName}), e);
            }
        });
    }
}
