package fr.maif.eventsourcing.impl;

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.kafka.ProducerMessage;
import akka.kafka.ProducerSettings;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.BroadcastHub;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.RestartSource;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.SourceQueueWithComplete;
import fr.maif.eventsourcing.Event;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventPublisher;
import fr.maif.eventsourcing.EventStore;
import io.vavr.Tuple;
import io.vavr.Tuple0;
import io.vavr.collection.List;
import io.vavr.concurrent.Future;
import io.vavr.control.Option;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:fr/maif/eventsourcing/impl/KafkaEventPublisher.class */
public class KafkaEventPublisher<E extends Event, Meta, Context> implements EventPublisher<E, Meta, Context>, Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaEventPublisher.class);
    private final Materializer materializer;
    private final String topic;
    private final Producer<String, EventEnvelope<E, Meta, Context>> kafkaProducer;
    private final SourceQueueWithComplete<EventEnvelope<E, Meta, Context>> queue;
    private final ProducerSettings<String, EventEnvelope<E, Meta, Context>> producerSettings;
    private final Source<EventEnvelope<E, Meta, Context>, NotUsed> eventsSource;
    private final Duration restartInterval;
    private final Duration maxRestartInterval;
    private final Flow<ProducerMessage.Results<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>>, List<ProducerMessage.Results<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>>>, NotUsed> groupFlow;

    public KafkaEventPublisher(ActorSystem actorSystem, ProducerSettings<String, EventEnvelope<E, Meta, Context>> producerSettings, String str) {
        this(actorSystem, producerSettings, str, null);
    }

    public KafkaEventPublisher(ActorSystem actorSystem, ProducerSettings<String, EventEnvelope<E, Meta, Context>> producerSettings, String str, Integer num) {
        this(actorSystem, producerSettings, str, num, Duration.of(10L, ChronoUnit.SECONDS), Duration.of(30L, ChronoUnit.MINUTES));
    }

    public KafkaEventPublisher(ActorSystem actorSystem, ProducerSettings<String, EventEnvelope<E, Meta, Context>> producerSettings, String str, Integer num, Duration duration, Duration duration2) {
        this.groupFlow = Flow.create().grouped(1000).map((v0) -> {
            return List.ofAll(v0);
        });
        this.materializer = Materializer.createMaterializer(actorSystem);
        this.topic = str;
        int intValue = num == null ? 10000 : num.intValue();
        this.restartInterval = duration == null ? Duration.of(10L, ChronoUnit.SECONDS) : duration;
        this.maxRestartInterval = duration2 == null ? Duration.of(30L, ChronoUnit.MINUTES) : duration2;
        Pair pair = (Pair) Source.queue(intValue, OverflowStrategy.backpressure()).toMat(BroadcastHub.of(EventEnvelope.builder().build().getClass(), 256), Keep.both()).run(this.materializer);
        this.kafkaProducer = producerSettings.createKafkaProducer();
        this.producerSettings = producerSettings.withProducer(this.kafkaProducer);
        this.queue = (SourceQueueWithComplete) pair.first();
        this.eventsSource = (Source) pair.second();
    }

    public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, EventStore.ConcurrentReplayStrategy concurrentReplayStrategy) {
        RestartSource.onFailuresWithBackoff(this.restartInterval, this.maxRestartInterval, 0.0d, () -> {
            LOGGER.info("Starting/Restarting publishing event to kafka on topic {}", this.topic);
            return Source.completionStage(eventStore.openTransaction().toCompletableFuture()).flatMapConcat(obj -> {
                LOGGER.info("Replaying not published in DB for {}", this.topic);
                return eventStore.loadEventsUnpublished(obj, Objects.isNull(concurrentReplayStrategy) ? EventStore.ConcurrentReplayStrategy.WAIT : concurrentReplayStrategy).via(publishToKafka(eventStore, Option.some(obj), this.groupFlow)).alsoTo(logProgress(100)).watchTermination((notUsed, completionStage) -> {
                    return completionStage.whenComplete((done, th) -> {
                        eventStore.commitOrRollback(Option.of(th), obj);
                        if (th != null) {
                            LOGGER.error("Error replaying non published events to kafka for " + this.topic, th);
                        } else {
                            LOGGER.info("Replaying events not published in DB is finished for {}", this.topic);
                        }
                    });
                });
            }).concat(this.eventsSource.via(publishToKafka(eventStore, Option.none(), Flow.create().groupedWithin(50, Duration.ofMillis(20L)).map((v0) -> {
                return List.ofAll(v0);
            })))).watchTermination((notUsed, completionStage) -> {
                completionStage.whenComplete((done, th) -> {
                    if (th != null) {
                        LOGGER.error("Error publishing events to kafka", th);
                    } else {
                        LOGGER.info("Closing publishing to {}", this.topic);
                    }
                });
                return completionStage;
            });
        }).toMat(Sink.ignore(), Keep.both()).run(this.materializer);
    }

    private <TxCtx> Flow<EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>, NotUsed> publishToKafka(EventStore<TxCtx, E, Meta, Context> eventStore, Option<TxCtx> option, Flow<ProducerMessage.Results<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>>, List<ProducerMessage.Results<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>>>, NotUsed> flow) {
        return Flow.create().map(this::toKafkaMessage).via(akka.kafka.javadsl.Producer.flexiFlow(this.producerSettings)).via(flow).mapAsync(1, list -> {
            return (CompletableFuture) option.fold(() -> {
                return eventStore.markAsPublished(list.map((v0) -> {
                    return v0.passThrough();
                })).toCompletableFuture();
            }, obj -> {
                return eventStore.markAsPublished((EventStore) obj, (List) list.map((v0) -> {
                    return v0.passThrough();
                })).toCompletableFuture();
            });
        }).mapConcat(list2 -> {
            return list2;
        });
    }

    @Override // fr.maif.eventsourcing.EventPublisher
    public Future<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> list) {
        LOGGER.debug("Publishing event in memory : \n{} ", list);
        Source from = Source.from(list);
        SourceQueueWithComplete<EventEnvelope<E, Meta, Context>> sourceQueueWithComplete = this.queue;
        sourceQueueWithComplete.getClass();
        return Future.fromCompletableFuture(((CompletionStage) from.mapAsync(1, (v1) -> {
            return r2.offer(v1);
        }).runWith(Sink.ignore(), this.materializer)).toCompletableFuture()).map(done -> {
            return Tuple.empty();
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.kafkaProducer.close();
    }

    private ProducerMessage.Envelope<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>> toKafkaMessage(EventEnvelope<E, Meta, Context> eventEnvelope) {
        LOGGER.debug("Publishing to kafka topic {} : \n{}", this.topic, eventEnvelope);
        return new ProducerMessage.Message(new ProducerRecord(this.topic, eventEnvelope.event.hash(), eventEnvelope), eventEnvelope);
    }

    private <Any> Sink<Any, NotUsed> logProgress(int i) {
        return Flow.create().scan(0, (num, obj) -> {
            return Integer.valueOf(num.intValue() + 1);
        }).to(Sink.foreach(num2 -> {
            if (num2.intValue() % i == 0) {
                LOGGER.info("Replayed {} events on {}", num2, this.topic);
            }
        }));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1555081783:
                if (implMethodName.equals("lambda$publishToKafka$1db0ba0a$1")) {
                    z = 7;
                    break;
                }
                break;
            case -1252692964:
                if (implMethodName.equals("toKafkaMessage")) {
                    z = 8;
                    break;
                }
                break;
            case -623320356:
                if (implMethodName.equals("lambda$null$c6f929ac$1")) {
                    z = 6;
                    break;
                }
                break;
            case -23121330:
                if (implMethodName.equals("lambda$logProgress$3540a367$1")) {
                    z = 3;
                    break;
                }
                break;
            case 105615434:
                if (implMethodName.equals("ofAll")) {
                    z = 2;
                    break;
                }
                break;
            case 105650780:
                if (implMethodName.equals("offer")) {
                    z = false;
                    break;
                }
                break;
            case 146620205:
                if (implMethodName.equals("lambda$logProgress$392cd36$1")) {
                    z = true;
                    break;
                }
                break;
            case 183258421:
                if (implMethodName.equals("lambda$publishToKafka$23523200$1")) {
                    z = 9;
                    break;
                }
                break;
            case 195403562:
                if (implMethodName.equals("lambda$null$c097e667$1")) {
                    z = 10;
                    break;
                }
                break;
            case 1514317536:
                if (implMethodName.equals("lambda$start$77d24ea6$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1943268272:
                if (implMethodName.equals("lambda$null$32654c0f$1")) {
                    z = 5;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("akka/stream/javadsl/SourceQueue") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/concurrent/CompletionStage;")) {
                    SourceQueueWithComplete sourceQueueWithComplete = (SourceQueueWithComplete) serializedLambda.getCapturedArg(0);
                    return (v1) -> {
                        return r0.offer(v1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Procedure") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("fr/maif/eventsourcing/impl/KafkaEventPublisher") && serializedLambda.getImplMethodSignature().equals("(ILjava/lang/Integer;)V")) {
                    KafkaEventPublisher kafkaEventPublisher = (KafkaEventPublisher) serializedLambda.getCapturedArg(0);
                    int intValue = ((Integer) serializedLambda.getCapturedArg(1)).intValue();
                    return num2 -> {
                        if (num2.intValue() % intValue == 0) {
                            LOGGER.info("Replayed {} events on {}", num2, this.topic);
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("io/vavr/collection/List") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Lio/vavr/collection/List;")) {
                    return (v0) -> {
                        return List.ofAll(v0);
                    };
                }
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("io/vavr/collection/List") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Lio/vavr/collection/List;")) {
                    return (v0) -> {
                        return List.ofAll(v0);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function2") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/eventsourcing/impl/KafkaEventPublisher") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;Ljava/lang/Object;)Ljava/lang/Integer;")) {
                    return (num, obj) -> {
                        return Integer.valueOf(num.intValue() + 1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Creator") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/eventsourcing/impl/KafkaEventPublisher") && serializedLambda.getImplMethodSignature().equals("(Lfr/maif/eventsourcing/EventStore;Lfr/maif/eventsourcing/EventStore$ConcurrentReplayStrategy;)Lakka/stream/javadsl/Source;")) {
                    KafkaEventPublisher kafkaEventPublisher2 = (KafkaEventPublisher) serializedLambda.getCapturedArg(0);
                    EventStore eventStore = (EventStore) serializedLambda.getCapturedArg(1);
                    EventStore.ConcurrentReplayStrategy concurrentReplayStrategy = (EventStore.ConcurrentReplayStrategy) serializedLambda.getCapturedArg(2);
                    return () -> {
                        LOGGER.info("Starting/Restarting publishing event to kafka on topic {}", this.topic);
                        return Source.completionStage(eventStore.openTransaction().toCompletableFuture()).flatMapConcat(obj2 -> {
                            LOGGER.info("Replaying not published in DB for {}", this.topic);
                            return eventStore.loadEventsUnpublished(obj2, Objects.isNull(concurrentReplayStrategy) ? EventStore.ConcurrentReplayStrategy.WAIT : concurrentReplayStrategy).via(publishToKafka(eventStore, Option.some(obj2), this.groupFlow)).alsoTo(logProgress(100)).watchTermination((notUsed, completionStage) -> {
                                return completionStage.whenComplete((done, th) -> {
                                    eventStore.commitOrRollback(Option.of(th), obj2);
                                    if (th != null) {
                                        LOGGER.error("Error replaying non published events to kafka for " + this.topic, th);
                                    } else {
                                        LOGGER.info("Replaying events not published in DB is finished for {}", this.topic);
                                    }
                                });
                            });
                        }).concat(this.eventsSource.via(publishToKafka(eventStore, Option.none(), Flow.create().groupedWithin(50, Duration.ofMillis(20L)).map((v0) -> {
                            return List.ofAll(v0);
                        })))).watchTermination((notUsed, completionStage) -> {
                            completionStage.whenComplete((done, th) -> {
                                if (th != null) {
                                    LOGGER.error("Error publishing events to kafka", th);
                                } else {
                                    LOGGER.info("Closing publishing to {}", this.topic);
                                }
                            });
                            return completionStage;
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function2") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/eventsourcing/impl/KafkaEventPublisher") && serializedLambda.getImplMethodSignature().equals("(Lfr/maif/eventsourcing/EventStore;Ljava/lang/Object;Lakka/NotUsed;Ljava/util/concurrent/CompletionStage;)Ljava/util/concurrent/CompletionStage;")) {
                    KafkaEventPublisher kafkaEventPublisher3 = (KafkaEventPublisher) serializedLambda.getCapturedArg(0);
                    EventStore eventStore2 = (EventStore) serializedLambda.getCapturedArg(1);
                    Object capturedArg = serializedLambda.getCapturedArg(2);
                    return (notUsed, completionStage) -> {
                        return completionStage.whenComplete((done, th) -> {
                            eventStore2.commitOrRollback(Option.of(th), capturedArg);
                            if (th != null) {
                                LOGGER.error("Error replaying non published events to kafka for " + this.topic, th);
                            } else {
                                LOGGER.info("Replaying events not published in DB is finished for {}", this.topic);
                            }
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function2") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/eventsourcing/impl/KafkaEventPublisher") && serializedLambda.getImplMethodSignature().equals("(Lakka/NotUsed;Ljava/util/concurrent/CompletionStage;)Ljava/util/concurrent/CompletionStage;")) {
                    KafkaEventPublisher kafkaEventPublisher4 = (KafkaEventPublisher) serializedLambda.getCapturedArg(0);
                    return (notUsed2, completionStage2) -> {
                        completionStage2.whenComplete((done, th) -> {
                            if (th != null) {
                                LOGGER.error("Error publishing events to kafka", th);
                            } else {
                                LOGGER.info("Closing publishing to {}", this.topic);
                            }
                        });
                        return completionStage2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/eventsourcing/impl/KafkaEventPublisher") && serializedLambda.getImplMethodSignature().equals("(Lio/vavr/control/Option;Lfr/maif/eventsourcing/EventStore;Lio/vavr/collection/List;)Ljava/util/concurrent/CompletionStage;")) {
                    Option option = (Option) serializedLambda.getCapturedArg(0);
                    EventStore eventStore3 = (EventStore) serializedLambda.getCapturedArg(1);
                    return list -> {
                        return (CompletableFuture) option.fold(() -> {
                            return eventStore3.markAsPublished(list.map((v0) -> {
                                return v0.passThrough();
                            })).toCompletableFuture();
                        }, obj2 -> {
                            return eventStore3.markAsPublished((EventStore) obj2, (List) list.map((v0) -> {
                                return v0.passThrough();
                            })).toCompletableFuture();
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/eventsourcing/impl/KafkaEventPublisher") && serializedLambda.getImplMethodSignature().equals("(Lfr/maif/eventsourcing/EventEnvelope;)Lakka/kafka/ProducerMessage$Envelope;")) {
                    KafkaEventPublisher kafkaEventPublisher5 = (KafkaEventPublisher) serializedLambda.getCapturedArg(0);
                    return kafkaEventPublisher5::toKafkaMessage;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/eventsourcing/impl/KafkaEventPublisher") && serializedLambda.getImplMethodSignature().equals("(Lio/vavr/collection/List;)Ljava/lang/Iterable;")) {
                    return list2 -> {
                        return list2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/eventsourcing/impl/KafkaEventPublisher") && serializedLambda.getImplMethodSignature().equals("(Lfr/maif/eventsourcing/EventStore$ConcurrentReplayStrategy;Lfr/maif/eventsourcing/EventStore;Ljava/lang/Object;)Lakka/stream/Graph;")) {
                    KafkaEventPublisher kafkaEventPublisher6 = (KafkaEventPublisher) serializedLambda.getCapturedArg(0);
                    EventStore.ConcurrentReplayStrategy concurrentReplayStrategy2 = (EventStore.ConcurrentReplayStrategy) serializedLambda.getCapturedArg(1);
                    EventStore eventStore4 = (EventStore) serializedLambda.getCapturedArg(2);
                    return obj2 -> {
                        LOGGER.info("Replaying not published in DB for {}", this.topic);
                        return eventStore4.loadEventsUnpublished(obj2, Objects.isNull(concurrentReplayStrategy2) ? EventStore.ConcurrentReplayStrategy.WAIT : concurrentReplayStrategy2).via(publishToKafka(eventStore4, Option.some(obj2), this.groupFlow)).alsoTo(logProgress(100)).watchTermination((notUsed3, completionStage3) -> {
                            return completionStage3.whenComplete((done, th) -> {
                                eventStore4.commitOrRollback(Option.of(th), obj2);
                                if (th != null) {
                                    LOGGER.error("Error replaying non published events to kafka for " + this.topic, th);
                                } else {
                                    LOGGER.info("Replaying events not published in DB is finished for {}", this.topic);
                                }
                            });
                        });
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
