package one.tomorrow.transactionaloutbox.service;

import jakarta.annotation.PreDestroy;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import one.tomorrow.transactionaloutbox.commons.Longs;
import one.tomorrow.transactionaloutbox.model.OutboxRecord;
import one.tomorrow.transactionaloutbox.repository.OutboxLockRepository;
import one.tomorrow.transactionaloutbox.repository.OutboxRepository;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;

/* loaded from: input_file:one/tomorrow/transactionaloutbox/service/OutboxProcessor.class */
public class OutboxProcessor {
    private static final int BATCH_SIZE = 100;
    private static final Logger logger = LoggerFactory.getLogger(OutboxProcessor.class);
    private final OutboxLockService lockService;
    private final String lockOwnerId;
    private final OutboxRepository repository;
    private final KafkaProducerFactory producerFactory;
    private final Duration processingInterval;
    private final ScheduledExecutorService executor;
    private final byte[] eventSource;
    private KafkaProducer<String, byte[]> producer;
    private boolean active;
    private Instant lastLockAckquisitionAttempt;
    private ScheduledFuture<?> schedule;

    @FunctionalInterface
    /* loaded from: input_file:one/tomorrow/transactionaloutbox/service/OutboxProcessor$KafkaProducerFactory.class */
    public interface KafkaProducerFactory {
        KafkaProducer<String, byte[]> createKafkaProducer();
    }

    public OutboxProcessor(OutboxRepository outboxRepository, KafkaProducerFactory kafkaProducerFactory, Duration duration, Duration duration2, String str, String str2, AutowireCapableBeanFactory autowireCapableBeanFactory) {
        logger.info("Starting outbox processor with lockOwnerId {}, source {} and processing interval {} ms and producer factory {}", new Object[]{str, str2, Long.valueOf(duration.toMillis()), kafkaProducerFactory});
        this.repository = outboxRepository;
        this.processingInterval = duration;
        this.lockService = (OutboxLockService) autowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(new OutboxLockService((OutboxLockRepository) autowireCapableBeanFactory.getBean(OutboxLockRepository.class), duration2), "OutboxLockService");
        this.lockOwnerId = str;
        this.eventSource = str2.getBytes();
        this.producerFactory = kafkaProducerFactory;
        this.producer = kafkaProducerFactory.createKafkaProducer();
        this.executor = Executors.newSingleThreadScheduledExecutor();
        tryLockAcquisition();
    }

    private void scheduleProcessing() {
        if (this.executor.isShutdown()) {
            logger.info("Not scheduling processing for lockOwnerId {} (executor is shutdown)", this.lockOwnerId);
        } else {
            this.schedule = this.executor.schedule(this::processOutboxWithLock, this.processingInterval.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private void scheduleTryLockAcquisition() {
        if (this.executor.isShutdown()) {
            logger.info("Not scheduling acquisition of outbox lock for lockOwnerId {} (executor is shutdown)", this.lockOwnerId);
        } else {
            this.schedule = this.executor.schedule(this::tryLockAcquisition, this.lockService.getLockTimeout().toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    @PreDestroy
    public void close() {
        logger.info("Stopping OutboxProcessor with lockOwnerId {}.", this.lockOwnerId);
        if (this.schedule != null) {
            this.schedule.cancel(false);
        }
        this.executor.shutdown();
        this.producer.close();
        if (this.active) {
            this.lockService.releaseLock(this.lockOwnerId);
        }
    }

    private void tryLockAcquisition() {
        try {
            boolean z = this.active;
            logger.debug("{} trying to acquire outbox lock", this.lockOwnerId);
            this.active = this.lockService.acquireOrRefreshLock(this.lockOwnerId);
            this.lastLockAckquisitionAttempt = Instant.now();
            if (this.active) {
                if (z) {
                    logger.debug("{} acquired outbox lock, starting to process outbox", this.lockOwnerId);
                } else {
                    logger.info("{} acquired outbox lock, starting to process outbox", this.lockOwnerId);
                }
                processOutboxWithLock();
            } else {
                scheduleTryLockAcquisition();
            }
        } catch (Exception e) {
            logger.warn("Failed trying lock acquisition or processing the outbox, trying again in {}", this.lockService.getLockTimeout(), e);
            scheduleTryLockAcquisition();
        }
    }

    private void processOutboxWithLock() {
        if (!this.active) {
            throw new IllegalStateException("processOutbox must only be run when in active state");
        }
        if (Instant.now().isAfter(this.lastLockAckquisitionAttempt.plus((TemporalAmount) this.lockService.getLockTimeout().dividedBy(2L)))) {
            tryLockAcquisition();
        } else {
            if (tryProcessOutbox()) {
                scheduleProcessing();
                return;
            }
            logger.info("Lock was lost, changing to inactive, now trying to acquire lock in {} ms", Long.valueOf(this.lockService.getLockTimeout().toMillis()));
            this.active = false;
            scheduleTryLockAcquisition();
        }
    }

    private boolean tryProcessOutbox() {
        boolean z = false;
        try {
            z = this.lockService.runWithLock(this.lockOwnerId, () -> {
                try {
                    processOutbox();
                } catch (Throwable th) {
                    logger.warn("Recreating producer, due to failure while processing outbox.", th);
                    this.producer.close();
                    this.producer = this.producerFactory.createKafkaProducer();
                }
            });
        } catch (Exception e) {
            logger.warn("Caught exception when trying to run with lock.", e);
        }
        return z;
    }

    void processOutbox() {
        this.repository.getUnprocessedRecords(BATCH_SIZE).stream().map(outboxRecord -> {
            return this.producer.send(toProducerRecord(outboxRecord), (recordMetadata, exc) -> {
                if (exc != null) {
                    logger.warn("Failed to publish {}", outboxRecord, exc);
                } else {
                    logger.info("Sent record to kafka: {}", outboxRecord);
                    this.repository.updateProcessed(outboxRecord.getId(), Instant.now());
                }
            });
        }).toList().forEach(OutboxProcessor::await);
    }

    private static void await(Future<?> future) {
        try {
            future.get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        } catch (ExecutionException e2) {
            throw new RuntimeException(e2);
        }
    }

    private ProducerRecord<String, byte[]> toProducerRecord(OutboxRecord outboxRecord) {
        ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(outboxRecord.getTopic(), outboxRecord.getKey(), outboxRecord.getValue());
        if (outboxRecord.getHeaders() != null) {
            outboxRecord.getHeaders().forEach((str, str2) -> {
                producerRecord.headers().add(str, str2.getBytes());
            });
        }
        producerRecord.headers().add("x-sequence", Longs.toByteArray(outboxRecord.getId().longValue()));
        producerRecord.headers().add("x-source", this.eventSource);
        return producerRecord;
    }
}
