package fr.maif.projections;

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.kafka.CommitterSettings;
import akka.kafka.ConsumerMessage;
import akka.kafka.ConsumerSettings;
import akka.kafka.Subscriptions;
import akka.kafka.javadsl.Committer;
import akka.kafka.javadsl.Consumer;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.RestartSource;
import akka.stream.javadsl.Sink;
import akka.stream.scaladsl.Source;
import fr.maif.eventsourcing.Event;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.format.JacksonEventFormat;
import fr.maif.eventsourcing.format.JacksonSimpleFormat;
import fr.maif.kafka.JsonDeserializer;
import io.vavr.Tuple;
import io.vavr.Tuple0;
import io.vavr.concurrent.Future;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:fr/maif/projections/EventuallyConsistentProjection.class */
public abstract class EventuallyConsistentProjection<E extends Event, Meta, Context> {
    private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentProjection.class);
    protected final ActorSystem actorSystem;
    protected final Materializer materializer;
    protected final String topic;
    protected final String groupId;
    protected final String bootstrapServers;
    protected final Duration minBackoff;
    protected final Duration maxBackoff;
    protected final double randomFactor;
    protected final Integer commitSize;
    protected final ConsumerSettings<String, EventEnvelope<E, Meta, Context>> consumerSettings;
    protected final AtomicReference<Consumer.Control> controlRef = new AtomicReference<>();
    protected final AtomicReference<Status> innerStatus = new AtomicReference<>(Status.stopped);

    /* loaded from: input_file:fr/maif/projections/EventuallyConsistentProjection$Config.class */
    public static class Config<E extends Event, Meta, Context> {
        public final String topic;
        public final String groupId;
        public final String bootstrapServers;
        public final Function<ConsumerSettings<String, EventEnvelope<E, Meta, Context>>, ConsumerSettings<String, EventEnvelope<E, Meta, Context>>> completeConfig;
        public final Duration minBackoff;
        public final Duration maxBackoff;
        public final Integer randomFactor;
        public final Integer commitSize;

        /* loaded from: input_file:fr/maif/projections/EventuallyConsistentProjection$Config$ConfigBuilder.class */
        public static class ConfigBuilder<E extends Event, Meta, Context> {
            private String topic;
            private String groupId;
            private String bootstrapServers;
            private Function<ConsumerSettings<String, EventEnvelope<E, Meta, Context>>, ConsumerSettings<String, EventEnvelope<E, Meta, Context>>> completeConfig;
            private Duration minBackoff;
            private Duration maxBackoff;
            private Integer randomFactor;
            private Integer commitSize;

            ConfigBuilder() {
            }

            public ConfigBuilder<E, Meta, Context> topic(String str) {
                this.topic = str;
                return this;
            }

            public ConfigBuilder<E, Meta, Context> groupId(String str) {
                this.groupId = str;
                return this;
            }

            public ConfigBuilder<E, Meta, Context> bootstrapServers(String str) {
                this.bootstrapServers = str;
                return this;
            }

            public ConfigBuilder<E, Meta, Context> completeConfig(Function<ConsumerSettings<String, EventEnvelope<E, Meta, Context>>, ConsumerSettings<String, EventEnvelope<E, Meta, Context>>> function) {
                this.completeConfig = function;
                return this;
            }

            public ConfigBuilder<E, Meta, Context> minBackoff(Duration duration) {
                this.minBackoff = duration;
                return this;
            }

            public ConfigBuilder<E, Meta, Context> maxBackoff(Duration duration) {
                this.maxBackoff = duration;
                return this;
            }

            public ConfigBuilder<E, Meta, Context> randomFactor(Integer num) {
                this.randomFactor = num;
                return this;
            }

            public ConfigBuilder<E, Meta, Context> commitSize(Integer num) {
                this.commitSize = num;
                return this;
            }

            public Config<E, Meta, Context> build() {
                return new Config<>(this.topic, this.groupId, this.bootstrapServers, this.completeConfig, this.minBackoff, this.maxBackoff, this.randomFactor, this.commitSize);
            }

            public String toString() {
                return "EventuallyConsistentProjection.Config.ConfigBuilder(topic=" + this.topic + ", groupId=" + this.groupId + ", bootstrapServers=" + this.bootstrapServers + ", completeConfig=" + this.completeConfig + ", minBackoff=" + this.minBackoff + ", maxBackoff=" + this.maxBackoff + ", randomFactor=" + this.randomFactor + ", commitSize=" + this.commitSize + ")";
            }
        }

        public static <E extends Event, Meta, Context> Config<E, Meta, Context> create(String str, String str2, String str3) {
            return builder().bootstrapServers(str3).groupId(str2).topic(str).build();
        }

        public static <E extends Event, Meta, Context> ConfigBuilder<E, Meta, Context> builder() {
            return new ConfigBuilder<>();
        }

        public ConfigBuilder<E, Meta, Context> toBuilder() {
            return new ConfigBuilder().topic(this.topic).groupId(this.groupId).bootstrapServers(this.bootstrapServers).completeConfig(this.completeConfig).minBackoff(this.minBackoff).maxBackoff(this.maxBackoff).randomFactor(this.randomFactor).commitSize(this.commitSize);
        }

        private Config(String str, String str2, String str3, Function<ConsumerSettings<String, EventEnvelope<E, Meta, Context>>, ConsumerSettings<String, EventEnvelope<E, Meta, Context>>> function, Duration duration, Duration duration2, Integer num, Integer num2) {
            this.topic = str;
            this.groupId = str2;
            this.bootstrapServers = str3;
            this.completeConfig = function;
            this.minBackoff = duration;
            this.maxBackoff = duration2;
            this.randomFactor = num;
            this.commitSize = num2;
        }
    }

    public EventuallyConsistentProjection(ActorSystem actorSystem, Config<E, Meta, Context> config) {
        this.actorSystem = actorSystem;
        this.materializer = Materializer.createMaterializer(actorSystem);
        this.topic = config.topic;
        this.groupId = config.groupId;
        this.bootstrapServers = config.bootstrapServers;
        this.minBackoff = Objects.isNull(config.minBackoff) ? Duration.ofSeconds(30L) : config.minBackoff;
        this.maxBackoff = Objects.isNull(config.maxBackoff) ? Duration.ofMinutes(30L) : config.maxBackoff;
        this.randomFactor = Objects.isNull(config.randomFactor) ? 0.2d : config.randomFactor.intValue();
        this.commitSize = Integer.valueOf(Objects.isNull(config.commitSize) ? 10 : config.commitSize.intValue());
        this.consumerSettings = (Objects.isNull(config.completeConfig) ? consumerSettings -> {
            return consumerSettings;
        } : config.completeConfig).apply(ConsumerSettings.create(actorSystem, new StringDeserializer(), deserializer()).withGroupId(this.groupId).withProperty("auto.offset.reset", "latest").withProperty("enable.auto.commit", "true").withBootstrapServers(this.bootstrapServers));
        start();
    }

    public static <E extends Event, Meta, Context> EventuallyConsistentProjection<E, Meta, Context> create(ActorSystem actorSystem, String str, Config<E, Meta, Context> config, JacksonEventFormat<?, E> jacksonEventFormat, Flow<ConsumerMessage.CommittableMessage<String, EventEnvelope<E, Meta, Context>>, ConsumerMessage.CommittableOffset, NotUsed> flow) {
        return create(actorSystem, str, config, jacksonEventFormat, JacksonSimpleFormat.empty(), JacksonSimpleFormat.empty(), flow);
    }

    public static <E extends Event, Meta, Context> EventuallyConsistentProjection<E, Meta, Context> create(ActorSystem actorSystem, final String str, Config<E, Meta, Context> config, final JacksonEventFormat<?, E> jacksonEventFormat, final JacksonSimpleFormat<Meta> jacksonSimpleFormat, final JacksonSimpleFormat<Context> jacksonSimpleFormat2, final Flow<ConsumerMessage.CommittableMessage<String, EventEnvelope<E, Meta, Context>>, ConsumerMessage.CommittableOffset, NotUsed> flow) {
        return (EventuallyConsistentProjection<E, Meta, Context>) new EventuallyConsistentProjection<E, Meta, Context>(actorSystem, config) { // from class: fr.maif.projections.EventuallyConsistentProjection.1
            @Override // fr.maif.projections.EventuallyConsistentProjection
            protected String name() {
                return str;
            }

            @Override // fr.maif.projections.EventuallyConsistentProjection
            protected JacksonEventFormat<?, E> eventFormat() {
                return jacksonEventFormat;
            }

            @Override // fr.maif.projections.EventuallyConsistentProjection
            protected JacksonSimpleFormat<Meta> metaFormat() {
                return jacksonSimpleFormat;
            }

            @Override // fr.maif.projections.EventuallyConsistentProjection
            protected JacksonSimpleFormat<Context> contextFormat() {
                return jacksonSimpleFormat2;
            }

            @Override // fr.maif.projections.EventuallyConsistentProjection
            public Flow<ConsumerMessage.CommittableMessage<String, EventEnvelope<E, Meta, Context>>, ConsumerMessage.CommittableOffset, NotUsed> messageHandling() {
                return flow;
            }
        };
    }

    public static <E extends Event, Meta, Context> EventuallyConsistentProjection<E, Meta, Context> create(ActorSystem actorSystem, String str, Config<E, Meta, Context> config, JacksonEventFormat<?, E> jacksonEventFormat, Function<EventEnvelope<E, Meta, Context>, Future<Tuple0>> function) {
        return create(actorSystem, str, config, jacksonEventFormat, JacksonSimpleFormat.empty(), JacksonSimpleFormat.empty(), function);
    }

    public static <E extends Event, Meta, Context> EventuallyConsistentProjection<E, Meta, Context> create(ActorSystem actorSystem, String str, Config<E, Meta, Context> config, JacksonEventFormat<?, E> jacksonEventFormat, JacksonSimpleFormat<Meta> jacksonSimpleFormat, JacksonSimpleFormat<Context> jacksonSimpleFormat2, Function<EventEnvelope<E, Meta, Context>, Future<Tuple0>> function) {
        return create(actorSystem, str, config, jacksonEventFormat, jacksonSimpleFormat, jacksonSimpleFormat2, Flow.create().flatMapConcat(committableMessage -> {
            return Source.completionStage(((Future) function.apply(committableMessage.record().value())).map(tuple0 -> {
                return committableMessage.committableOffset();
            }).toCompletableFuture());
        }));
    }

    protected abstract String name();

    protected abstract JacksonEventFormat<?, E> eventFormat();

    protected abstract JacksonSimpleFormat<Meta> metaFormat();

    protected abstract JacksonSimpleFormat<Context> contextFormat();

    protected Deserializer<EventEnvelope<E, Meta, Context>> deserializer() {
        return JsonDeserializer.of(eventFormat(), metaFormat(), contextFormat());
    }

    public abstract Flow<ConsumerMessage.CommittableMessage<String, EventEnvelope<E, Meta, Context>>, ConsumerMessage.CommittableOffset, NotUsed> messageHandling();

    protected Logger logger() {
        return log;
    }

    protected Status updateStatus(Status status) {
        this.innerStatus.set(status);
        return status;
    }

    public Status status() {
        return this.innerStatus.get();
    }

    public Status start() {
        Status status = status();
        if (Status.starting.equals(status) || Status.started.equals(status)) {
            logger().info("{} already started", name());
            return status;
        }
        updateStatus(Status.starting);
        CommitterSettings create = CommitterSettings.create(this.actorSystem);
        logger().info("Starting {} @{} on topic '{}' with group id '{}'", new Object[]{name(), Integer.toHexString(hashCode()), this.topic, this.groupId});
        RestartSource.onFailuresWithBackoff(this.minBackoff, this.maxBackoff, this.randomFactor, () -> {
            logger().info("Stream for {} is starting", name());
            return Consumer.committablePartitionedSource(this.consumerSettings, Subscriptions.topics(new String[]{this.topic})).flatMapMerge(100, pair -> {
                return ((akka.stream.javadsl.Source) pair.second()).via(messageHandling()).via(Committer.flow(create.withMaxBatch(100L)));
            }).mapMaterializedValue(control -> {
                updateStatus(Status.started);
                logger().info("Stream for {} has started", name());
                this.controlRef.set(control);
                return control;
            }).watchTermination((control2, completionStage) -> {
                return handleTerminaison(completionStage);
            });
        }).watchTermination((notUsed, completionStage) -> {
            return handleTerminaison(completionStage);
        }).runWith(Sink.ignore(), this.materializer);
        return Status.starting;
    }

    private CompletableFuture<Status> handleTerminaison(CompletionStage<Done> completionStage) {
        return Future.fromCompletableFuture(completionStage.toCompletableFuture()).map(done -> {
            logger().info("Stopping {}", name());
            return updateStatus(Status.stopped);
        }).recoverWith(th -> {
            logger().error("Error during " + name(), th);
            return Future.successful(updateStatus(Status.failed));
        }).recover(th2 -> {
            logger().error("Error persisting " + name() + " status in db", th2);
            return Status.failed;
        }).flatMap(status -> {
            return stopConsumingKafka().map(tuple0 -> {
                return status;
            });
        }).toCompletableFuture();
    }

    public Future<Tuple0> stopConsumingKafka() {
        Consumer.Control andSet = this.controlRef.getAndSet(null);
        return andSet != null ? Future.fromCompletableFuture(andSet.shutdown().toCompletableFuture()).onFailure(th -> {
            logger().error("Error shutting down kafka consumer for {}", name());
        }).onSuccess(done -> {
            logger().info("Kafka consumer for {} is shutdown", name());
        }).flatMap(done2 -> {
            return Future.fromCompletableFuture(andSet.isShutdown().toCompletableFuture());
        }).map(done3 -> {
            return Tuple.empty();
        }) : Future.successful(Tuple.empty());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1430643804:
                if (implMethodName.equals("lambda$start$8e88fd4a$1")) {
                    z = 5;
                    break;
                }
                break;
            case -1057521007:
                if (implMethodName.equals("lambda$null$6dfda823$1")) {
                    z = 2;
                    break;
                }
                break;
            case -623320356:
                if (implMethodName.equals("lambda$null$c6f929ac$1")) {
                    z = false;
                    break;
                }
                break;
            case 227509528:
                if (implMethodName.equals("lambda$start$b64f1c0d$1")) {
                    z = 3;
                    break;
                }
                break;
            case 292652772:
                if (implMethodName.equals("lambda$create$6bc531d4$1")) {
                    z = true;
                    break;
                }
                break;
            case 2059684542:
                if (implMethodName.equals("lambda$null$2fb574e2$1")) {
                    z = 4;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function2") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/projections/EventuallyConsistentProjection") && serializedLambda.getImplMethodSignature().equals("(Lakka/kafka/javadsl/Consumer$Control;Ljava/util/concurrent/CompletionStage;)Ljava/util/concurrent/CompletableFuture;")) {
                    EventuallyConsistentProjection eventuallyConsistentProjection = (EventuallyConsistentProjection) serializedLambda.getCapturedArg(0);
                    return (control2, completionStage) -> {
                        return handleTerminaison(completionStage);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/projections/EventuallyConsistentProjection") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/function/Function;Lakka/kafka/ConsumerMessage$CommittableMessage;)Lakka/stream/Graph;")) {
                    Function function = (Function) serializedLambda.getCapturedArg(0);
                    return committableMessage -> {
                        return Source.completionStage(((Future) function.apply(committableMessage.record().value())).map(tuple0 -> {
                            return committableMessage.committableOffset();
                        }).toCompletableFuture());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/projections/EventuallyConsistentProjection") && serializedLambda.getImplMethodSignature().equals("(Lakka/kafka/CommitterSettings;Lakka/japi/Pair;)Lakka/stream/Graph;")) {
                    EventuallyConsistentProjection eventuallyConsistentProjection2 = (EventuallyConsistentProjection) serializedLambda.getCapturedArg(0);
                    CommitterSettings committerSettings = (CommitterSettings) serializedLambda.getCapturedArg(1);
                    return pair -> {
                        return ((akka.stream.javadsl.Source) pair.second()).via(messageHandling()).via(Committer.flow(committerSettings.withMaxBatch(100L)));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Creator") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/projections/EventuallyConsistentProjection") && serializedLambda.getImplMethodSignature().equals("(Lakka/kafka/CommitterSettings;)Lakka/stream/javadsl/Source;")) {
                    EventuallyConsistentProjection eventuallyConsistentProjection3 = (EventuallyConsistentProjection) serializedLambda.getCapturedArg(0);
                    CommitterSettings committerSettings2 = (CommitterSettings) serializedLambda.getCapturedArg(1);
                    return () -> {
                        logger().info("Stream for {} is starting", name());
                        return Consumer.committablePartitionedSource(this.consumerSettings, Subscriptions.topics(new String[]{this.topic})).flatMapMerge(100, pair2 -> {
                            return ((akka.stream.javadsl.Source) pair2.second()).via(messageHandling()).via(Committer.flow(committerSettings2.withMaxBatch(100L)));
                        }).mapMaterializedValue(control -> {
                            updateStatus(Status.started);
                            logger().info("Stream for {} has started", name());
                            this.controlRef.set(control);
                            return control;
                        }).watchTermination((control22, completionStage2) -> {
                            return handleTerminaison(completionStage2);
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/projections/EventuallyConsistentProjection") && serializedLambda.getImplMethodSignature().equals("(Lakka/kafka/javadsl/Consumer$Control;)Lakka/kafka/javadsl/Consumer$Control;")) {
                    EventuallyConsistentProjection eventuallyConsistentProjection4 = (EventuallyConsistentProjection) serializedLambda.getCapturedArg(0);
                    return control -> {
                        updateStatus(Status.started);
                        logger().info("Stream for {} has started", name());
                        this.controlRef.set(control);
                        return control;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function2") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/projections/EventuallyConsistentProjection") && serializedLambda.getImplMethodSignature().equals("(Lakka/NotUsed;Ljava/util/concurrent/CompletionStage;)Ljava/util/concurrent/CompletableFuture;")) {
                    EventuallyConsistentProjection eventuallyConsistentProjection5 = (EventuallyConsistentProjection) serializedLambda.getCapturedArg(0);
                    return (notUsed, completionStage2) -> {
                        return handleTerminaison(completionStage2);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
