/*
 * Decompiled with CFR 0.152.
 */
package fr.maif.reactor.eventsourcing;

import fr.maif.eventsourcing.Event;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventPublisher;
import fr.maif.eventsourcing.EventStore;
import io.vavr.Tuple;
import io.vavr.Tuple0;
import io.vavr.collection.List;
import io.vavr.control.Option;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
import reactor.util.retry.Retry;

public class ReactorKafkaEventPublisher<E extends Event, Meta, Context>
implements EventPublisher<E, Meta, Context>,
Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactorKafkaEventPublisher.class);
    private final String topic;
    private final Sinks.Many<EventEnvelope<E, Meta, Context>> queue;
    private final SenderOptions<String, EventEnvelope<E, Meta, Context>> senderOptions;
    private final Flux<EventEnvelope<E, Meta, Context>> eventsSource;
    private final Duration restartInterval;
    private final Duration maxRestartInterval;
    private final Function<Flux<SenderResult<EventEnvelope<E, Meta, Context>>>, Flux<List<SenderResult<EventEnvelope<E, Meta, Context>>>>> groupFlow = it -> it.buffer(1000).map(List::ofAll);
    private Disposable killSwitch;
    private KafkaSender<String, EventEnvelope<E, Meta, Context>> kafkaSender;

    public ReactorKafkaEventPublisher(SenderOptions<String, EventEnvelope<E, Meta, Context>> senderOptions, String topic) {
        this(senderOptions, topic, null);
    }

    public ReactorKafkaEventPublisher(SenderOptions<String, EventEnvelope<E, Meta, Context>> senderOptions, String topic, Integer queueBufferSize) {
        this(senderOptions, topic, queueBufferSize, Duration.of(10L, ChronoUnit.SECONDS), Duration.of(30L, ChronoUnit.MINUTES));
    }

    public ReactorKafkaEventPublisher(SenderOptions<String, EventEnvelope<E, Meta, Context>> senderOptions, String topic, Integer queueBufferSize, Duration restartInterval, Duration maxRestartInterval) {
        this.topic = topic;
        int queueBufferSize1 = queueBufferSize == null ? 10000 : queueBufferSize;
        this.restartInterval = restartInterval == null ? Duration.of(1L, ChronoUnit.SECONDS) : restartInterval;
        this.maxRestartInterval = maxRestartInterval == null ? Duration.of(1L, ChronoUnit.MINUTES) : maxRestartInterval;
        EventEnvelope e = EventEnvelope.builder().build();
        this.queue = Sinks.many().multicast().onBackpressureBuffer(queueBufferSize1);
        this.eventsSource = this.queue.asFlux();
        this.senderOptions = senderOptions;
        this.kafkaSender = KafkaSender.create(senderOptions);
    }

    public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, EventStore.ConcurrentReplayStrategy concurrentReplayStrategy) {
        LOGGER.info("Starting/Restarting publishing event to kafka on topic {}", (Object)this.topic);
        Sinks.Many logProgressSink = Sinks.many().unicast().onBackpressureBuffer();
        this.logProgress(logProgressSink.asFlux(), 100).subscribe();
        this.killSwitch = Mono.fromCompletionStage(() -> eventStore.openTransaction()).flatMapMany(tx -> {
            LOGGER.info("Replaying not published in DB for {}", (Object)this.topic);
            EventStore.ConcurrentReplayStrategy strategy = Objects.isNull(concurrentReplayStrategy) ? EventStore.ConcurrentReplayStrategy.WAIT : concurrentReplayStrategy;
            return Flux.from((Publisher)eventStore.loadEventsUnpublished(tx, strategy)).transform(this.publishToKafka(eventStore, Option.some((Object)tx), this.groupFlow)).doOnNext(arg_0 -> ((Sinks.Many)logProgressSink).tryEmitNext(arg_0)).collectList().flatMap(any -> Mono.fromCompletionStage(() -> {
                LOGGER.info("Replaying events not published in DB is finished for {}", (Object)this.topic);
                return eventStore.commitOrRollback(Option.none(), tx);
            })).doOnError(e -> {
                eventStore.commitOrRollback(Option.of((Object)e), tx);
                LOGGER.error("Error replaying non published events to kafka for " + this.topic, e);
            }).map(__ -> Tuple.empty());
        }).retryWhen((Retry)Retry.backoff((long)10L, (Duration)this.restartInterval).transientErrors(true).maxBackoff(this.maxRestartInterval).doBeforeRetry(ctx -> LOGGER.error("Error republishing events for topic %s retrying for the %s time".formatted(this.topic, ctx.totalRetries()), ctx.failure()))).onErrorReturn((Object)Tuple.empty()).switchIfEmpty((Publisher)Mono.just((Object)Tuple.empty())).concatMap(__ -> this.eventsSource.transform(this.publishToKafka(eventStore, Option.none(), it -> it.bufferTimeout(50, Duration.ofMillis(20L)).map(List::ofAll)))).doOnError(e -> LOGGER.error("Error publishing events to kafka", e)).doOnComplete(() -> LOGGER.info("Closing publishing to {}", (Object)this.topic)).retryWhen((Retry)Retry.backoff((long)Long.MAX_VALUE, (Duration)this.restartInterval).transientErrors(true).maxBackoff(this.maxRestartInterval).doBeforeRetry(ctx -> LOGGER.error("Error handling events for topic %s retrying for the %s time".formatted(this.topic, ctx.totalRetries()), ctx.failure()))).subscribe();
    }

    private <TxCtx> Function<Flux<EventEnvelope<E, Meta, Context>>, Flux<EventEnvelope<E, Meta, Context>>> publishToKafka(EventStore<TxCtx, E, Meta, Context> eventStore, Option<TxCtx> tx, Function<Flux<SenderResult<EventEnvelope<E, Meta, Context>>>, Flux<List<SenderResult<EventEnvelope<E, Meta, Context>>>>> groupFlow) {
        return it -> it.map(this::toKafkaMessage).concatMap(events -> {
            LOGGER.debug("Sending event {}", events);
            return this.kafkaSender.send((Publisher)Flux.just((Object)events)).doOnError(e -> LOGGER.error("Error publishing to kafka ", e));
        }).transform(groupFlow).concatMap(m -> (Publisher)tx.fold(() -> Mono.fromCompletionStage(() -> eventStore.markAsPublished(m.map(SenderResult::correlationMetadata))), txCtx -> Mono.fromCompletionStage(() -> eventStore.markAsPublished(txCtx, m.map(SenderResult::correlationMetadata))))).flatMapIterable(e -> e);
    }

    public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> events) {
        LOGGER.debug("Publishing event in memory : \n{} ", events);
        return Flux.fromIterable(events).map(t -> {
            try {
                this.queue.emitNext(t, Sinks.EmitFailureHandler.busyLooping((Duration)Duration.ofSeconds(1L)));
                return Tuple.empty();
            }
            catch (Exception e) {
                LOGGER.error("Error publishing to topic %s".formatted(this.topic), (Throwable)e);
                return Tuple.empty();
            }
        }).collectList().thenReturn((Object)Tuple.empty()).toFuture();
    }

    @Override
    public void close() throws IOException {
        if (Objects.nonNull(this.killSwitch) && !this.killSwitch.isDisposed()) {
            try {
                this.killSwitch.dispose();
            }
            catch (UnsupportedOperationException e) {
                LOGGER.error("Error closing Publisher", (Throwable)e);
            }
        }
        this.kafkaSender.close();
    }

    private SenderRecord<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>> toKafkaMessage(EventEnvelope<E, Meta, Context> eventEnvelope) {
        LOGGER.debug("Publishing to kafka topic {} : \n{}", (Object)this.topic, eventEnvelope);
        return SenderRecord.create((ProducerRecord)new ProducerRecord(this.topic, (Object)eventEnvelope.event.hash(), eventEnvelope), eventEnvelope);
    }

    private <Any> Flux<Integer> logProgress(Flux<Any> logProgress, int every) {
        return logProgress.scan((Object)0, (acc, elt) -> acc + 1).doOnNext(count -> {
            if (count % every == 0) {
                LOGGER.info("Replayed {} events on {}", count, (Object)this.topic);
            }
        });
    }

    public Integer getBufferedElementCount() {
        return (Integer)this.queue.scan(Scannable.Attr.BUFFERED);
    }
}

