package fr.maif.reactor.eventsourcing;

import fr.maif.eventsourcing.Event;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventPublisher;
import fr.maif.eventsourcing.EventStore;
import io.vavr.Tuple;
import io.vavr.Tuple0;
import io.vavr.collection.List;
import io.vavr.control.Option;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
import reactor.util.retry.Retry;

/* loaded from: input_file:fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.class */
public class ReactorKafkaEventPublisher<E extends Event, Meta, Context> implements EventPublisher<E, Meta, Context>, Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactorKafkaEventPublisher.class);
    private final String topic;
    private final Sinks.Many<EventEnvelope<E, Meta, Context>> queue;
    private final SenderOptions<String, EventEnvelope<E, Meta, Context>> senderOptions;
    private final Flux<EventEnvelope<E, Meta, Context>> eventsSource;
    private final Duration restartInterval;
    private final Duration maxRestartInterval;
    private final Function<Flux<SenderResult<EventEnvelope<E, Meta, Context>>>, Flux<List<SenderResult<EventEnvelope<E, Meta, Context>>>>> groupFlow;
    private Disposable killSwitch;
    private KafkaSender<String, EventEnvelope<E, Meta, Context>> kafkaSender;

    public ReactorKafkaEventPublisher(SenderOptions<String, EventEnvelope<E, Meta, Context>> senderOptions, String str) {
        this(senderOptions, str, null);
    }

    public ReactorKafkaEventPublisher(SenderOptions<String, EventEnvelope<E, Meta, Context>> senderOptions, String str, Integer num) {
        this(senderOptions, str, num, Duration.of(10L, ChronoUnit.SECONDS), Duration.of(30L, ChronoUnit.MINUTES));
    }

    public ReactorKafkaEventPublisher(SenderOptions<String, EventEnvelope<E, Meta, Context>> senderOptions, String str, Integer num, Duration duration, Duration duration2) {
        this.groupFlow = flux -> {
            return flux.buffer(1000).map((v0) -> {
                return List.ofAll(v0);
            });
        };
        this.topic = str;
        int intValue = num == null ? 10000 : num.intValue();
        this.restartInterval = duration == null ? Duration.of(10L, ChronoUnit.SECONDS) : duration;
        this.maxRestartInterval = duration2 == null ? Duration.of(30L, ChronoUnit.MINUTES) : duration2;
        EventEnvelope.builder().build();
        this.queue = Sinks.many().multicast().onBackpressureBuffer(intValue);
        this.eventsSource = this.queue.asFlux();
        this.senderOptions = senderOptions;
        this.kafkaSender = KafkaSender.create(senderOptions);
    }

    public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, EventStore.ConcurrentReplayStrategy concurrentReplayStrategy) {
        LOGGER.info("Starting/Restarting publishing event to kafka on topic {}", this.topic);
        Sinks.Many onBackpressureBuffer = Sinks.many().unicast().onBackpressureBuffer();
        logProgress(onBackpressureBuffer.asFlux(), 100).subscribe();
        Objects.requireNonNull(eventStore);
        this.killSwitch = Mono.fromCompletionStage(eventStore::openTransaction).flux().concatMap(obj -> {
            LOGGER.info("Replaying not published in DB for {}", this.topic);
            Flux transform = Flux.from(eventStore.loadEventsUnpublished(obj, Objects.isNull(concurrentReplayStrategy) ? EventStore.ConcurrentReplayStrategy.WAIT : concurrentReplayStrategy)).transform(publishToKafka(eventStore, Option.some(obj), this.groupFlow));
            Objects.requireNonNull(onBackpressureBuffer);
            return transform.doOnNext((v1) -> {
                r1.tryEmitNext(v1);
            }).then(Mono.fromCompletionStage(() -> {
                LOGGER.info("Replaying events not published in DB is finished for {}", this.topic);
                return eventStore.commitOrRollback(Option.none(), obj);
            })).doOnError(th -> {
                eventStore.commitOrRollback(Option.of(th), obj);
                LOGGER.error("Error replaying non published events to kafka for " + this.topic, th);
            });
        }).thenMany(this.eventsSource.transform(publishToKafka(eventStore, Option.none(), flux -> {
            return flux.bufferTimeout(50, Duration.ofMillis(20L)).map((v0) -> {
                return List.ofAll(v0);
            });
        }))).doOnError(th -> {
            LOGGER.error("Error publishing events to kafka", th);
        }).doOnComplete(() -> {
            LOGGER.info("Closing publishing to {}", this.topic);
        }).retryWhen(Retry.backoff(Long.MAX_VALUE, this.restartInterval).maxBackoff(this.maxRestartInterval)).subscribe();
    }

    private <TxCtx> Function<Flux<EventEnvelope<E, Meta, Context>>, Flux<EventEnvelope<E, Meta, Context>>> publishToKafka(EventStore<TxCtx, E, Meta, Context> eventStore, Option<TxCtx> option, Function<Flux<SenderResult<EventEnvelope<E, Meta, Context>>>, Flux<List<SenderResult<EventEnvelope<E, Meta, Context>>>>> function) {
        Function function2 = flux -> {
            return this.kafkaSender.send(flux);
        };
        return flux2 -> {
            return flux2.map(this::toKafkaMessage).transform(function2).transform(function).flatMap(list -> {
                return (Publisher) option.fold(() -> {
                    return Mono.fromCompletionStage(() -> {
                        return eventStore.markAsPublished(list.map((v0) -> {
                            return v0.correlationMetadata();
                        }));
                    });
                }, obj -> {
                    return Mono.fromCompletionStage(() -> {
                        return eventStore.markAsPublished(obj, list.map((v0) -> {
                            return v0.correlationMetadata();
                        }));
                    });
                });
            }).flatMapIterable(list2 -> {
                return list2;
            });
        };
    }

    public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> list) {
        LOGGER.debug("Publishing event in memory : \n{} ", list);
        Flux fromIterable = Flux.fromIterable(list);
        Sinks.Many<EventEnvelope<E, Meta, Context>> many = this.queue;
        Objects.requireNonNull(many);
        return fromIterable.map((v1) -> {
            return r1.tryEmitNext(v1);
        }).collectList().thenReturn(Tuple.empty()).toFuture();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (Objects.nonNull(this.killSwitch)) {
            this.killSwitch.dispose();
        }
        this.kafkaSender.close();
    }

    private SenderRecord<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>> toKafkaMessage(EventEnvelope<E, Meta, Context> eventEnvelope) {
        LOGGER.debug("Publishing to kafka topic {} : \n{}", this.topic, eventEnvelope);
        return SenderRecord.create(new ProducerRecord(this.topic, eventEnvelope.event.hash(), eventEnvelope), eventEnvelope);
    }

    private <Any> Flux<Integer> logProgress(Flux<Any> flux, int i) {
        return flux.scan(0, (num, obj) -> {
            return Integer.valueOf(num.intValue() + 1);
        }).doOnNext(num2 -> {
            if (num2.intValue() % i == 0) {
                LOGGER.info("Replayed {} events on {}", num2, this.topic);
            }
        });
    }
}
