package one.tomorrow.transactionaloutbox.reactive.service;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.PreDestroy;
import lombok.Generated;
import one.tomorrow.transactionaloutbox.commons.Longs;
import one.tomorrow.transactionaloutbox.reactive.model.OutboxRecord;
import one.tomorrow.transactionaloutbox.reactive.repository.OutboxRepository;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:one/tomorrow/transactionaloutbox/reactive/service/OutboxProcessor.class */
public class OutboxProcessor {
    private static final int DEFAULT_BATCH_SIZE = 100;
    private static final Logger logger = LoggerFactory.getLogger(OutboxProcessor.class);
    private final List<Consumer<KafkaProducer<String, byte[]>>> producerClosedListeners;
    private final List<Consumer<KafkaProducer<String, byte[]>>> producerCreatedListeners;
    private final OutboxLockService lockService;
    private final String lockOwnerId;
    private final OutboxRepository repository;
    private final KafkaProducerFactory producerFactory;
    private final Duration processingInterval;
    private final Duration lockTimeout;
    private final ScheduledExecutorService executor;
    private final ScheduledExecutorService cleanupExecutor;
    private final byte[] eventSource;
    private final int batchSize;
    private KafkaProducer<String, byte[]> producer;
    private boolean active;
    private Instant lastLockAckquisitionAttempt;
    private ScheduledFuture<?> schedule;
    private ScheduledFuture<?> cleanupSchedule;

    /* loaded from: input_file:one/tomorrow/transactionaloutbox/reactive/service/OutboxProcessor$CleanupSettings.class */
    public static final class CleanupSettings {
        private final Duration interval;
        private final Duration retention;

        @Generated
        /* loaded from: input_file:one/tomorrow/transactionaloutbox/reactive/service/OutboxProcessor$CleanupSettings$CleanupSettingsBuilder.class */
        public static class CleanupSettingsBuilder {

            @Generated
            private Duration interval;

            @Generated
            private Duration retention;

            @Generated
            CleanupSettingsBuilder() {
            }

            @Generated
            public CleanupSettingsBuilder interval(Duration duration) {
                this.interval = duration;
                return this;
            }

            @Generated
            public CleanupSettingsBuilder retention(Duration duration) {
                this.retention = duration;
                return this;
            }

            @Generated
            public CleanupSettings build() {
                return new CleanupSettings(this.interval, this.retention);
            }

            @Generated
            public String toString() {
                return "OutboxProcessor.CleanupSettings.CleanupSettingsBuilder(interval=" + this.interval + ", retention=" + this.retention + ")";
            }
        }

        @Generated
        CleanupSettings(Duration duration, Duration duration2) {
            this.interval = duration;
            this.retention = duration2;
        }

        @Generated
        public static CleanupSettingsBuilder builder() {
            return new CleanupSettingsBuilder();
        }

        @Generated
        public Duration getInterval() {
            return this.interval;
        }

        @Generated
        public Duration getRetention() {
            return this.retention;
        }

        @Generated
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof CleanupSettings)) {
                return false;
            }
            CleanupSettings cleanupSettings = (CleanupSettings) obj;
            Duration interval = getInterval();
            Duration interval2 = cleanupSettings.getInterval();
            if (interval == null) {
                if (interval2 != null) {
                    return false;
                }
            } else if (!interval.equals(interval2)) {
                return false;
            }
            Duration retention = getRetention();
            Duration retention2 = cleanupSettings.getRetention();
            return retention == null ? retention2 == null : retention.equals(retention2);
        }

        @Generated
        public int hashCode() {
            Duration interval = getInterval();
            int hashCode = (1 * 59) + (interval == null ? 43 : interval.hashCode());
            Duration retention = getRetention();
            return (hashCode * 59) + (retention == null ? 43 : retention.hashCode());
        }

        @Generated
        public String toString() {
            return "OutboxProcessor.CleanupSettings(interval=" + getInterval() + ", retention=" + getRetention() + ")";
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:one/tomorrow/transactionaloutbox/reactive/service/OutboxProcessor$KafkaProducerFactory.class */
    public interface KafkaProducerFactory {
        KafkaProducer<String, byte[]> createKafkaProducer();
    }

    public OutboxProcessor(OutboxRepository outboxRepository, OutboxLockService outboxLockService, KafkaProducerFactory kafkaProducerFactory, Duration duration, Duration duration2, String str, String str2) {
        this(outboxRepository, outboxLockService, kafkaProducerFactory, duration, duration2, str, str2, DEFAULT_BATCH_SIZE, null);
    }

    public OutboxProcessor(OutboxRepository outboxRepository, OutboxLockService outboxLockService, KafkaProducerFactory kafkaProducerFactory, Duration duration, Duration duration2, String str, String str2, CleanupSettings cleanupSettings) {
        this(outboxRepository, outboxLockService, kafkaProducerFactory, duration, duration2, str, str2, DEFAULT_BATCH_SIZE, cleanupSettings);
    }

    public OutboxProcessor(OutboxRepository outboxRepository, OutboxLockService outboxLockService, KafkaProducerFactory kafkaProducerFactory, Duration duration, Duration duration2, String str, String str2, int i, CleanupSettings cleanupSettings) {
        this.producerClosedListeners = new ArrayList();
        this.producerCreatedListeners = new ArrayList();
        logger.info("Starting outbox processor with lockOwnerId {}, source {} and processing interval {} ms and producer factory {}", new Object[]{str, str2, Long.valueOf(duration.toMillis()), kafkaProducerFactory});
        this.repository = outboxRepository;
        this.processingInterval = duration;
        this.lockTimeout = duration2;
        this.lockService = outboxLockService;
        this.lockOwnerId = str;
        this.eventSource = str2.getBytes();
        this.batchSize = i;
        this.producerFactory = kafkaProducerFactory;
        createProducer(kafkaProducerFactory);
        this.executor = Executors.newSingleThreadScheduledExecutor();
        tryLockAcquisition();
        this.cleanupExecutor = cleanupSettings != null ? setupCleanupSchedule(outboxRepository, cleanupSettings) : null;
    }

    public OutboxProcessor onBeforeProducerClosed(Consumer<KafkaProducer<String, byte[]>> consumer) {
        this.producerClosedListeners.add(consumer);
        return this;
    }

    public OutboxProcessor onProducerCreated(Consumer<KafkaProducer<String, byte[]>> consumer) {
        this.producerCreatedListeners.add(consumer);
        if (this.producer != null) {
            consumer.accept(this.producer);
        }
        return this;
    }

    private void createProducer(KafkaProducerFactory kafkaProducerFactory) {
        this.producer = kafkaProducerFactory.createKafkaProducer();
        this.producerCreatedListeners.forEach(consumer -> {
            consumer.accept(this.producer);
        });
    }

    private void closeProducer() {
        this.producerClosedListeners.forEach(consumer -> {
            consumer.accept(this.producer);
        });
        this.producer.close(Duration.ZERO);
    }

    private ScheduledExecutorService setupCleanupSchedule(OutboxRepository outboxRepository, CleanupSettings cleanupSettings) {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        this.cleanupSchedule = newSingleThreadScheduledExecutor.scheduleAtFixedRate(() -> {
            if (this.active) {
                Instant minus = Instant.now().minus((TemporalAmount) cleanupSettings.getRetention());
                logger.info("Cleaning up outbox records processed before {}", minus);
                outboxRepository.deleteOutboxRecordByProcessedNotNullAndProcessedIsBefore(minus).block();
            }
        }, 0L, cleanupSettings.getInterval().toMillis(), TimeUnit.MILLISECONDS);
        return newSingleThreadScheduledExecutor;
    }

    private void scheduleProcessing() {
        if (this.executor.isShutdown()) {
            logger.info("Not scheduling processing for lockOwnerId {} (executor is shutdown)", this.lockOwnerId);
        } else {
            this.schedule = this.executor.schedule(this::processOutboxWithLock, this.processingInterval.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private void scheduleTryLockAcquisition() {
        if (this.executor.isShutdown()) {
            logger.info("Not scheduling acquisition of outbox lock for lockOwnerId {} (executor is shutdown)", this.lockOwnerId);
        } else {
            this.schedule = this.executor.schedule(this::tryLockAcquisition, this.lockTimeout.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    @PreDestroy
    public void close() {
        logger.info("Stopping OutboxProcessor.");
        if (this.schedule != null) {
            this.schedule.cancel(false);
        }
        this.executor.shutdown();
        if (this.cleanupSchedule != null) {
            this.cleanupSchedule.cancel(false);
        }
        if (this.cleanupExecutor != null) {
            this.cleanupExecutor.shutdown();
        }
        closeProducer();
        this.lockService.releaseLock(this.lockOwnerId).subscribe();
    }

    private void tryLockAcquisition() {
        boolean z = this.active;
        logger.debug("{} trying to acquire outbox lock", this.lockOwnerId);
        this.lockService.acquireOrRefreshLock(this.lockOwnerId, this.lockTimeout, this.active).doOnNext(bool -> {
            this.active = bool.booleanValue();
            this.lastLockAckquisitionAttempt = Instant.now();
            if (!bool.booleanValue()) {
                scheduleTryLockAcquisition();
                return;
            }
            if (z) {
                logger.debug("{} acquired outbox lock, starting to process outbox", this.lockOwnerId);
            } else {
                logger.info("{} acquired outbox lock, starting to process outbox", this.lockOwnerId);
            }
            processOutboxWithLock();
        }).doOnError(th -> {
            logger.warn("Failed trying to acquire outbox lock, trying again in {}", this.lockTimeout, th);
            scheduleTryLockAcquisition();
        }).subscribe();
    }

    private void processOutboxWithLock() {
        if (!this.active) {
            logger.warn("processOutbox must only be run when in active state");
            scheduleTryLockAcquisition();
        } else if (Instant.now().isAfter(this.lastLockAckquisitionAttempt.plus((TemporalAmount) this.lockTimeout.dividedBy(2L)))) {
            tryLockAcquisition();
        } else {
            this.lockService.runWithLock(this.lockOwnerId, Mono.defer(() -> {
                return processOutbox().onErrorResume(th -> {
                    logger.warn("Recreating producer, due to failure while processing outbox.", th);
                    closeProducer();
                    createProducer(this.producerFactory);
                    return Mono.empty();
                }).then();
            })).onErrorResume(th -> {
                logger.warn("Failed to run with lock, trying to acquire lock in {} ms", Long.valueOf(this.lockTimeout.toMillis()), th);
                this.active = false;
                scheduleTryLockAcquisition();
                return Mono.empty();
            }).doOnNext(bool -> {
                if (bool.booleanValue()) {
                    scheduleProcessing();
                    return;
                }
                logger.info("Lock was lost, changing to inactive, now trying to acquire lock in {} ms", Long.valueOf(this.lockTimeout.toMillis()));
                this.active = false;
                scheduleTryLockAcquisition();
            }).subscribe();
        }
    }

    private Mono<List<OutboxRecord>> processOutbox() {
        return this.repository.getUnprocessedRecords(this.batchSize).flatMap(this::publish).concatMap(outboxRecord -> {
            return this.repository.saveInNewTransaction(outboxRecord.toBuilder().processed(Instant.now()).build());
        }).collectList();
    }

    private Mono<OutboxRecord> publish(OutboxRecord outboxRecord) {
        return Mono.create(monoSink -> {
            this.producer.send(toProducerRecord(outboxRecord), (recordMetadata, exc) -> {
                if (exc != null) {
                    monoSink.error(exc);
                } else {
                    logger.info("Sent record to kafka: {} (got metadata: {})", outboxRecord, recordMetadata);
                    monoSink.success(outboxRecord);
                }
            });
        });
    }

    private ProducerRecord<String, byte[]> toProducerRecord(OutboxRecord outboxRecord) {
        ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(outboxRecord.getTopic(), outboxRecord.getKey(), outboxRecord.getValue());
        Map<String, String> headersAsMap = outboxRecord.getHeadersAsMap();
        if (headersAsMap != null) {
            headersAsMap.forEach((str, str2) -> {
                producerRecord.headers().add(str, str2.getBytes());
            });
        }
        producerRecord.headers().add("x-sequence", Longs.toByteArray(outboxRecord.getId().longValue()));
        producerRecord.headers().add("x-source", this.eventSource);
        return producerRecord;
    }

    @Generated
    public boolean isActive() {
        return this.active;
    }
}
