package fr.maif.reactor.eventsourcing;

import fr.maif.eventsourcing.Event;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventPublisher;
import fr.maif.eventsourcing.EventStore;
import io.vavr.Tuple;
import io.vavr.Tuple0;
import io.vavr.collection.List;
import io.vavr.control.Option;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
import reactor.util.retry.Retry;

/* loaded from: input_file:fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.class */
public class ReactorKafkaEventPublisher<E extends Event, Meta, Context> implements EventPublisher<E, Meta, Context>, Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactorKafkaEventPublisher.class);
    private AtomicBoolean stop;
    private final String topic;
    private final Integer queueBufferSize;
    private final Sinks.Many<EventEnvelope<E, Meta, Context>> queue;
    private final SenderOptions<String, EventEnvelope<E, Meta, Context>> senderOptions;
    private final Duration restartInterval;
    private final Duration maxRestartInterval;
    private Disposable killSwitch;
    private KafkaSender<String, EventEnvelope<E, Meta, Context>> kafkaSender;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher$CountAndMaxSeqNum.class */
    public static final class CountAndMaxSeqNum extends Record {
        private final Long count;
        private final Long lastSeqNum;

        CountAndMaxSeqNum(Long l, Long l2) {
            this.count = l;
            this.lastSeqNum = l2;
        }

        static CountAndMaxSeqNum empty() {
            return new CountAndMaxSeqNum(0L, 0L);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public CountAndMaxSeqNum handleSeqNum(Long l) {
            return new CountAndMaxSeqNum(Long.valueOf(this.count.longValue() + 1), Long.valueOf(Math.max(this.lastSeqNum.longValue(), l.longValue())));
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, CountAndMaxSeqNum.class), CountAndMaxSeqNum.class, "count;lastSeqNum", "FIELD:Lfr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher$CountAndMaxSeqNum;->count:Ljava/lang/Long;", "FIELD:Lfr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher$CountAndMaxSeqNum;->lastSeqNum:Ljava/lang/Long;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, CountAndMaxSeqNum.class), CountAndMaxSeqNum.class, "count;lastSeqNum", "FIELD:Lfr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher$CountAndMaxSeqNum;->count:Ljava/lang/Long;", "FIELD:Lfr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher$CountAndMaxSeqNum;->lastSeqNum:Ljava/lang/Long;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, CountAndMaxSeqNum.class, Object.class), CountAndMaxSeqNum.class, "count;lastSeqNum", "FIELD:Lfr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher$CountAndMaxSeqNum;->count:Ljava/lang/Long;", "FIELD:Lfr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher$CountAndMaxSeqNum;->lastSeqNum:Ljava/lang/Long;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Long count() {
            return this.count;
        }

        public Long lastSeqNum() {
            return this.lastSeqNum;
        }
    }

    public ReactorKafkaEventPublisher(SenderOptions<String, EventEnvelope<E, Meta, Context>> senderOptions, String str) {
        this(senderOptions, str, null);
    }

    public ReactorKafkaEventPublisher(SenderOptions<String, EventEnvelope<E, Meta, Context>> senderOptions, String str, Integer num) {
        this(senderOptions, str, num, Duration.of(10L, ChronoUnit.SECONDS), Duration.of(30L, ChronoUnit.MINUTES));
    }

    public ReactorKafkaEventPublisher(SenderOptions<String, EventEnvelope<E, Meta, Context>> senderOptions, String str, Integer num, Duration duration, Duration duration2) {
        this.stop = new AtomicBoolean(false);
        this.topic = str;
        this.queueBufferSize = Integer.valueOf(num == null ? 10000 : num.intValue());
        this.restartInterval = duration == null ? Duration.of(1L, ChronoUnit.SECONDS) : duration;
        this.maxRestartInterval = duration2 == null ? Duration.of(1L, ChronoUnit.MINUTES) : duration2;
        this.queue = Sinks.many().multicast().onBackpressureBuffer(this.queueBufferSize.intValue(), false);
        this.senderOptions = senderOptions.stopOnError(true);
        this.kafkaSender = KafkaSender.create(senderOptions);
    }

    private <T> Function<Flux<T>, Flux<List<T>>> fixedSizeGroup(int i) {
        return flux -> {
            return flux.buffer(i).map((v0) -> {
                return List.ofAll(v0);
            });
        };
    }

    private <T> Function<Flux<T>, Flux<List<T>>> bufferTimeout(int i, Duration duration) {
        return flux -> {
            return flux.bufferTimeout(i, duration, true).map((v0) -> {
                return List.ofAll(v0);
            });
        };
    }

    public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, EventStore.ConcurrentReplayStrategy concurrentReplayStrategy) {
        LOGGER.info("Starting event publisher for topic {}", this.topic);
        Sinks.Many<EventEnvelope<E, Meta, Context>> onBackpressureBuffer = Sinks.many().unicast().onBackpressureBuffer();
        logProgress(onBackpressureBuffer.asFlux(), 100).subscribe();
        this.killSwitch = publishFromDb(eventStore, concurrentReplayStrategy, onBackpressureBuffer).concatMap(countAndMaxSeqNum -> {
            LOGGER.debug("Starting consuming in memory queue for {}. Event lower than {} are ignored", this.topic, countAndMaxSeqNum.lastSeqNum);
            return this.queue.asFlux().filter(eventEnvelope -> {
                return eventEnvelope.sequenceNum.longValue() > countAndMaxSeqNum.lastSeqNum.longValue();
            }).transform(publishToKafka(eventStore, Option.none(), bufferTimeout(200, Duration.ofMillis(20L)), bufferTimeout(200, Duration.ofSeconds(1L))));
        }).doOnError(th -> {
            LOGGER.error("Error publishing events to kafka", th);
        }).retryWhen(Retry.backoff(Long.MAX_VALUE, this.restartInterval).transientErrors(true).maxBackoff(this.maxRestartInterval).doBeforeRetry(retrySignal -> {
            LOGGER.error("Error handling events for topic %s retrying for the %s time".formatted(this.topic, Long.valueOf(retrySignal.totalRetries() + 1)), retrySignal.failure());
        })).subscribe();
    }

    public <TxCtx> CompletionStage<Tuple0> publishNonAcknowledgedFromDb(EventStore<TxCtx, E, Meta, Context> eventStore, EventStore.ConcurrentReplayStrategy concurrentReplayStrategy) {
        Sinks.Many<EventEnvelope<E, Meta, Context>> onBackpressureBuffer = Sinks.many().unicast().onBackpressureBuffer();
        logProgress(onBackpressureBuffer.asFlux(), 100).subscribe();
        return publishFromDb(eventStore, concurrentReplayStrategy, onBackpressureBuffer).collectList().map(list -> {
            return Tuple.empty();
        }).toFuture();
    }

    private <TxCtx> Flux<CountAndMaxSeqNum> publishFromDb(EventStore<TxCtx, E, Meta, Context> eventStore, EventStore.ConcurrentReplayStrategy concurrentReplayStrategy, Sinks.Many<EventEnvelope<E, Meta, Context>> many) {
        return Mono.defer(() -> {
            Objects.requireNonNull(eventStore);
            return fromCS(eventStore::openTransaction).flatMap(obj -> {
                LOGGER.info("Replaying events not published from DB in topic {}", this.topic);
                Flux transform = Flux.from(eventStore.loadEventsUnpublished(obj, Objects.isNull(concurrentReplayStrategy) ? EventStore.ConcurrentReplayStrategy.WAIT : concurrentReplayStrategy)).transform(publishToKafka(eventStore, Option.some(obj), fixedSizeGroup(1000), fixedSizeGroup(1000)));
                Objects.requireNonNull(many);
                return transform.doOnNext((v1) -> {
                    r1.tryEmitNext(v1);
                }).reduce(CountAndMaxSeqNum.empty(), (countAndMaxSeqNum, eventEnvelope) -> {
                    return countAndMaxSeqNum.handleSeqNum(eventEnvelope.sequenceNum);
                }).flatMap(countAndMaxSeqNum2 -> {
                    LOGGER.info("Replaying events not published in DB is finished for {}, {} elements published", this.topic, countAndMaxSeqNum2.count);
                    return fromCS(() -> {
                        return eventStore.commitOrRollback(Option.none(), obj);
                    }).thenReturn(countAndMaxSeqNum2);
                }).doOnError(th -> {
                    eventStore.commitOrRollback(Option.of(th), obj);
                    LOGGER.error("Error replaying non published events to kafka for " + this.topic, th);
                }).flatMap(countAndMaxSeqNum3 -> {
                    if (countAndMaxSeqNum3.count.longValue() != 0) {
                        return Mono.just(countAndMaxSeqNum3);
                    }
                    Objects.requireNonNull(eventStore);
                    return fromCS(eventStore::lastPublishedSequence).map(l -> {
                        return new CountAndMaxSeqNum(0L, l);
                    });
                });
            });
        }).flux();
    }

    private <TxCtx> Function<Flux<EventEnvelope<E, Meta, Context>>, Flux<EventEnvelope<E, Meta, Context>>> publishToKafka(EventStore<TxCtx, E, Meta, Context> eventStore, Option<TxCtx> option, Function<Flux<SenderRecord<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>>>, Flux<List<SenderRecord<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>>>>> function, Function<Flux<SenderResult<EventEnvelope<E, Meta, Context>>>, Flux<List<SenderResult<EventEnvelope<E, Meta, Context>>>>> function2) {
        return flux -> {
            return flux.map(this::toKafkaMessage).transform(function).filter((v0) -> {
                return v0.nonEmpty();
            }).concatMap(list -> {
                LOGGER.debug("Sending event {}", list);
                return this.kafkaSender.send(Flux.fromIterable(list)).doOnError(th -> {
                    LOGGER.error("Error publishing to kafka ", th);
                });
            }).transform(function2).filter((v0) -> {
                return v0.nonEmpty();
            }).concatMap(list2 -> {
                return (Publisher) option.fold(() -> {
                    return fromCS(() -> {
                        return eventStore.markAsPublished(list2.map((v0) -> {
                            return v0.correlationMetadata();
                        }));
                    });
                }, obj -> {
                    return fromCS(() -> {
                        return eventStore.markAsPublished(obj, list2.map((v0) -> {
                            return v0.correlationMetadata();
                        }));
                    });
                });
            }).flatMapIterable(list3 -> {
                return list3;
            });
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> Mono<T> fromCS(Supplier<CompletionStage<T>> supplier) {
        return Mono.fromFuture(() -> {
            return ((CompletionStage) supplier.get()).toCompletableFuture();
        });
    }

    public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> list) {
        return Flux.fromIterable(list).map(eventEnvelope -> {
            this.queue.tryEmitNext(eventEnvelope).orThrow();
            return Tuple.empty();
        }).retryWhen(Retry.fixedDelay(50L, Duration.ofMillis(1L)).transientErrors(true).doBeforeRetry(retrySignal -> {
            LOGGER.error("Error publishing events in memory queue for topic %s retrying for the %s time".formatted(this.topic, Long.valueOf(retrySignal.totalRetries() + 1)), retrySignal.failure());
        })).onErrorResume(th -> {
            return Mono.just(Tuple.empty());
        }).collectList().thenReturn(Tuple.empty()).toFuture();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.stop.set(true);
        if (Objects.nonNull(this.killSwitch) && !this.killSwitch.isDisposed()) {
            try {
                this.killSwitch.dispose();
            } catch (UnsupportedOperationException e) {
                LOGGER.error("Error closing Publisher", e);
            }
        }
        this.kafkaSender.close();
    }

    private SenderRecord<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>> toKafkaMessage(EventEnvelope<E, Meta, Context> eventEnvelope) {
        LOGGER.debug("Publishing to kafka topic {} : \n{}", this.topic, eventEnvelope);
        return SenderRecord.create(new ProducerRecord(this.topic, eventEnvelope.event.hash(), eventEnvelope), eventEnvelope);
    }

    private <Any> Flux<Integer> logProgress(Flux<Any> flux, int i) {
        return flux.scan(0, (num, obj) -> {
            return Integer.valueOf(num.intValue() + 1);
        }).doOnNext(num2 -> {
            if (num2.intValue() % i == 0) {
                LOGGER.info("Replayed {} events on {}", num2, this.topic);
            }
        });
    }
}
