package fr.maif.akka.projections;

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.kafka.ConsumerMessage;
import akka.kafka.ConsumerSettings;
import akka.kafka.Subscriptions;
import akka.stream.javadsl.Flow;
import akka.stream.scaladsl.Source;
import fr.maif.eventsourcing.Event;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.format.JacksonEventFormat;
import fr.maif.eventsourcing.format.JacksonSimpleFormat;
import fr.maif.kafka.JsonDeserializer;
import fr.maif.kafka.consumer.ResilientKafkaConsumer;
import io.vavr.Tuple0;
import io.vavr.concurrent.Future;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.function.Function;
import org.apache.kafka.common.serialization.StringDeserializer;

/* loaded from: input_file:fr/maif/akka/projections/EventuallyConsistentProjection.class */
public abstract class EventuallyConsistentProjection<E extends Event, Meta, Context> extends ResilientKafkaConsumer<String, EventEnvelope<E, Meta, Context>> {

    /* loaded from: input_file:fr/maif/akka/projections/EventuallyConsistentProjection$Config.class */
    public static class Config<E extends Event, Meta, Context> {
        public final String topic;
        public final String groupId;
        public final String bootstrapServers;
        public final Function<ConsumerSettings<String, EventEnvelope<E, Meta, Context>>, ConsumerSettings<String, EventEnvelope<E, Meta, Context>>> completeConfig;
        public final Duration minBackoff;
        public final Duration maxBackoff;
        public final Double randomFactor;
        public final Integer commitSize;

        /* loaded from: input_file:fr/maif/akka/projections/EventuallyConsistentProjection$Config$ConfigBuilder.class */
        public static class ConfigBuilder<E extends Event, Meta, Context> {
            String topic;
            String groupId;
            String bootstrapServers;
            Function<ConsumerSettings<String, EventEnvelope<E, Meta, Context>>, ConsumerSettings<String, EventEnvelope<E, Meta, Context>>> completeConfig;
            Duration minBackoff;
            Duration maxBackoff;
            Double randomFactor;
            Integer commitSize;

            public ConfigBuilder<E, Meta, Context> topic(String str) {
                this.topic = str;
                return this;
            }

            public ConfigBuilder<E, Meta, Context> groupId(String str) {
                this.groupId = str;
                return this;
            }

            public ConfigBuilder<E, Meta, Context> bootstrapServers(String str) {
                this.bootstrapServers = str;
                return this;
            }

            public ConfigBuilder<E, Meta, Context> completeConfig(Function<ConsumerSettings<String, EventEnvelope<E, Meta, Context>>, ConsumerSettings<String, EventEnvelope<E, Meta, Context>>> function) {
                this.completeConfig = function;
                return this;
            }

            public ConfigBuilder<E, Meta, Context> minBackoff(Duration duration) {
                this.minBackoff = duration;
                return this;
            }

            public ConfigBuilder<E, Meta, Context> maxBackoff(Duration duration) {
                this.maxBackoff = duration;
                return this;
            }

            public ConfigBuilder<E, Meta, Context> randomFactor(Double d) {
                this.randomFactor = d;
                return this;
            }

            public ConfigBuilder<E, Meta, Context> commitSize(Integer num) {
                this.commitSize = num;
                return this;
            }

            public Config<E, Meta, Context> build() {
                return new Config<>(this.topic, this.groupId, this.bootstrapServers, this.completeConfig, this.minBackoff, this.maxBackoff, this.randomFactor, this.commitSize);
            }
        }

        private Config(String str, String str2, String str3, Function<ConsumerSettings<String, EventEnvelope<E, Meta, Context>>, ConsumerSettings<String, EventEnvelope<E, Meta, Context>>> function, Duration duration, Duration duration2, Double d, Integer num) {
            this.topic = str;
            this.groupId = str2;
            this.bootstrapServers = str3;
            this.completeConfig = function;
            this.minBackoff = duration;
            this.maxBackoff = duration2;
            this.randomFactor = d;
            this.commitSize = num;
        }

        public static <E extends Event, Meta, Context> Config<E, Meta, Context> create(String str, String str2, String str3) {
            return builder().bootstrapServers(str3).groupId(str2).topic(str).build();
        }

        public static <E extends Event, Meta, Context> ConfigBuilder<E, Meta, Context> builder() {
            return new ConfigBuilder<>();
        }

        public ConfigBuilder<E, Meta, Context> toBuilder() {
            return builder().topic(this.topic).groupId(this.groupId).bootstrapServers(this.bootstrapServers).completeConfig(this.completeConfig).minBackoff(this.minBackoff).maxBackoff(this.maxBackoff).randomFactor(this.randomFactor).commitSize(this.commitSize);
        }
    }

    public EventuallyConsistentProjection(ActorSystem actorSystem, JacksonEventFormat<?, E> jacksonEventFormat, JacksonSimpleFormat<Meta> jacksonSimpleFormat, JacksonSimpleFormat<Context> jacksonSimpleFormat2, Config<E, Meta, Context> config) {
        super(actorSystem, ResilientKafkaConsumer.Config.builder().minBackoff(config.minBackoff).maxBackoff(config.maxBackoff).randomFactor((Double) defaultIfNull(config.randomFactor, Double.valueOf(0.0d))).commitSize(config.commitSize).consumerSettings((ConsumerSettings) ((Function) defaultIfNull(config.completeConfig, consumerSettings -> {
            return consumerSettings;
        })).apply(ConsumerSettings.create(actorSystem, new StringDeserializer(), JsonDeserializer.of(jacksonEventFormat, jacksonSimpleFormat, jacksonSimpleFormat2)).withGroupId(config.groupId).withProperty("auto.offset.reset", "latest").withProperty("enable.auto.commit", "false").withBootstrapServers(config.bootstrapServers))).subscription(Subscriptions.topics(new String[]{config.topic})).groupId(config.groupId).build());
    }

    public static <E extends Event, Meta, Context> EventuallyConsistentProjection<E, Meta, Context> create(ActorSystem actorSystem, String str, Config<E, Meta, Context> config, JacksonEventFormat<?, E> jacksonEventFormat, Flow<ConsumerMessage.CommittableMessage<String, EventEnvelope<E, Meta, Context>>, ConsumerMessage.CommittableOffset, NotUsed> flow) {
        return create(actorSystem, str, config, jacksonEventFormat, JacksonSimpleFormat.empty(), JacksonSimpleFormat.empty(), flow);
    }

    public static <E extends Event, Meta, Context> EventuallyConsistentProjection<E, Meta, Context> create(ActorSystem actorSystem, final String str, Config<E, Meta, Context> config, JacksonEventFormat<?, E> jacksonEventFormat, JacksonSimpleFormat<Meta> jacksonSimpleFormat, JacksonSimpleFormat<Context> jacksonSimpleFormat2, final Flow<ConsumerMessage.CommittableMessage<String, EventEnvelope<E, Meta, Context>>, ConsumerMessage.CommittableOffset, NotUsed> flow) {
        return (EventuallyConsistentProjection<E, Meta, Context>) new EventuallyConsistentProjection<E, Meta, Context>(actorSystem, jacksonEventFormat, jacksonSimpleFormat, jacksonSimpleFormat2, config) { // from class: fr.maif.akka.projections.EventuallyConsistentProjection.1
            @Override // fr.maif.akka.projections.EventuallyConsistentProjection
            protected String name() {
                return str;
            }

            @Override // fr.maif.akka.projections.EventuallyConsistentProjection
            public Flow<ConsumerMessage.CommittableMessage<String, EventEnvelope<E, Meta, Context>>, ConsumerMessage.CommittableOffset, NotUsed> messageHandling() {
                return flow;
            }
        };
    }

    public static <E extends Event, Meta, Context> EventuallyConsistentProjection<E, Meta, Context> create(ActorSystem actorSystem, String str, Config<E, Meta, Context> config, JacksonEventFormat<?, E> jacksonEventFormat, Function<EventEnvelope<E, Meta, Context>, Future<Tuple0>> function) {
        return create(actorSystem, str, config, jacksonEventFormat, JacksonSimpleFormat.empty(), JacksonSimpleFormat.empty(), function);
    }

    public static <E extends Event, Meta, Context> EventuallyConsistentProjection<E, Meta, Context> create(ActorSystem actorSystem, String str, Config<E, Meta, Context> config, JacksonEventFormat<?, E> jacksonEventFormat, JacksonSimpleFormat<Meta> jacksonSimpleFormat, JacksonSimpleFormat<Context> jacksonSimpleFormat2, Function<EventEnvelope<E, Meta, Context>, Future<Tuple0>> function) {
        return create(actorSystem, str, config, jacksonEventFormat, jacksonSimpleFormat, jacksonSimpleFormat2, Flow.create().flatMapConcat(committableMessage -> {
            return Source.completionStage(((Future) function.apply((EventEnvelope) committableMessage.record().value())).map(tuple0 -> {
                return committableMessage.committableOffset();
            }).toCompletableFuture());
        }));
    }

    protected abstract String name();

    public abstract Flow<ConsumerMessage.CommittableMessage<String, EventEnvelope<E, Meta, Context>>, ConsumerMessage.CommittableOffset, NotUsed> messageHandling();

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1698127696:
                if (implMethodName.equals("lambda$create$a8ccbc00$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/akka/projections/EventuallyConsistentProjection") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/function/Function;Lakka/kafka/ConsumerMessage$CommittableMessage;)Lakka/stream/Graph;")) {
                    Function function = (Function) serializedLambda.getCapturedArg(0);
                    return committableMessage -> {
                        return Source.completionStage(((Future) function.apply((EventEnvelope) committableMessage.record().value())).map(tuple0 -> {
                            return committableMessage.committableOffset();
                        }).toCompletableFuture());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
