package fr.maif.reactor.projections;

import fr.maif.eventsourcing.Event;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.format.JacksonEventFormat;
import fr.maif.eventsourcing.format.JacksonSimpleFormat;
import fr.maif.kafka.JsonDeserializer;
import fr.maif.kafka.reactor.consumer.ResilientKafkaConsumer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.apache.kafka.common.serialization.StringDeserializer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;

/* loaded from: input_file:fr/maif/reactor/projections/EventuallyConsistentProjection.class */
public abstract class EventuallyConsistentProjection<E extends Event, Meta, Context> extends ResilientKafkaConsumer<String, EventEnvelope<E, Meta, Context>> {

    /* loaded from: input_file:fr/maif/reactor/projections/EventuallyConsistentProjection$Config.class */
    public static class Config<E extends Event, Meta, Context> {
        public final String topic;
        public final String groupId;
        public final String bootstrapServers;
        public final Function<ReceiverOptions<String, EventEnvelope<E, Meta, Context>>, ReceiverOptions<String, EventEnvelope<E, Meta, Context>>> completeConfig;
        public final Duration minBackoff;
        public final Duration maxBackoff;
        public final Double randomFactor;
        public final Integer commitSize;

        /* loaded from: input_file:fr/maif/reactor/projections/EventuallyConsistentProjection$Config$ConfigBuilder.class */
        public static class ConfigBuilder<E extends Event, Meta, Context> {
            String topic;
            String groupId;
            String bootstrapServers;
            Function<ReceiverOptions<String, EventEnvelope<E, Meta, Context>>, ReceiverOptions<String, EventEnvelope<E, Meta, Context>>> completeConfig;
            Duration minBackoff;
            Duration maxBackoff;
            Double randomFactor;
            Integer commitSize;

            public ConfigBuilder<E, Meta, Context> topic(String str) {
                this.topic = str;
                return this;
            }

            public ConfigBuilder<E, Meta, Context> groupId(String str) {
                this.groupId = str;
                return this;
            }

            public ConfigBuilder<E, Meta, Context> bootstrapServers(String str) {
                this.bootstrapServers = str;
                return this;
            }

            public ConfigBuilder<E, Meta, Context> completeConfig(Function<ReceiverOptions<String, EventEnvelope<E, Meta, Context>>, ReceiverOptions<String, EventEnvelope<E, Meta, Context>>> function) {
                this.completeConfig = function;
                return this;
            }

            public ConfigBuilder<E, Meta, Context> minBackoff(Duration duration) {
                this.minBackoff = duration;
                return this;
            }

            public ConfigBuilder<E, Meta, Context> maxBackoff(Duration duration) {
                this.maxBackoff = duration;
                return this;
            }

            public ConfigBuilder<E, Meta, Context> randomFactor(Double d) {
                this.randomFactor = d;
                return this;
            }

            public ConfigBuilder<E, Meta, Context> commitSize(Integer num) {
                this.commitSize = num;
                return this;
            }

            public Config<E, Meta, Context> build() {
                return new Config<>(this.topic, this.groupId, this.bootstrapServers, this.completeConfig, this.minBackoff, this.maxBackoff, this.randomFactor, this.commitSize);
            }
        }

        private Config(String str, String str2, String str3, Function<ReceiverOptions<String, EventEnvelope<E, Meta, Context>>, ReceiverOptions<String, EventEnvelope<E, Meta, Context>>> function, Duration duration, Duration duration2, Double d, Integer num) {
            this.topic = str;
            this.groupId = str2;
            this.bootstrapServers = str3;
            this.completeConfig = function;
            this.minBackoff = duration;
            this.maxBackoff = duration2;
            this.randomFactor = d;
            this.commitSize = num;
        }

        public static <E extends Event, Meta, Context> Config<E, Meta, Context> create(String str, String str2, String str3) {
            return builder().bootstrapServers(str3).groupId(str2).topic(str).build();
        }

        public static <E extends Event, Meta, Context> ConfigBuilder<E, Meta, Context> builder() {
            return new ConfigBuilder<>();
        }

        public ConfigBuilder<E, Meta, Context> toBuilder() {
            return builder().topic(this.topic).groupId(this.groupId).bootstrapServers(this.bootstrapServers).completeConfig(this.completeConfig).minBackoff(this.minBackoff).maxBackoff(this.maxBackoff).randomFactor(this.randomFactor).commitSize(this.commitSize);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:fr/maif/reactor/projections/EventuallyConsistentProjection$MessageHandling.class */
    public interface MessageHandling<E extends Event, Meta, Context> extends Function<Flux<ReceiverRecord<String, EventEnvelope<E, Meta, Context>>>, Flux<ReceiverRecord<String, EventEnvelope<E, Meta, Context>>>> {
    }

    public EventuallyConsistentProjection(JacksonEventFormat<?, E> jacksonEventFormat, JacksonSimpleFormat<Meta> jacksonSimpleFormat, JacksonSimpleFormat<Context> jacksonSimpleFormat2, Config<E, Meta, Context> config) {
        super(ResilientKafkaConsumer.Config.builder().minBackoff(config.minBackoff).maxBackoff(config.maxBackoff).randomFactor((Double) defaultIfNull(config.randomFactor, Double.valueOf(0.0d))).commitSize(config.commitSize).topics(List.of(config.topic)).groupId(config.groupId).receiverOptions((ReceiverOptions) ((Function) defaultIfNull(config.completeConfig, receiverOptions -> {
            return receiverOptions;
        })).apply(ReceiverOptions.create(Map.of("bootstrap.servers", config.bootstrapServers, "auto.offset.reset", "latest")).withKeyDeserializer(new StringDeserializer()).withValueDeserializer(JsonDeserializer.of(jacksonEventFormat, jacksonSimpleFormat, jacksonSimpleFormat2)))).build());
    }

    protected abstract String name();

    public abstract Function<Flux<ReceiverRecord<String, EventEnvelope<E, Meta, Context>>>, Flux<ReceiverRecord<String, EventEnvelope<E, Meta, Context>>>> messageHandling();

    public static <E extends Event, Meta, Context> EventuallyConsistentProjection<E, Meta, Context> create(String str, Config<E, Meta, Context> config, JacksonEventFormat<?, E> jacksonEventFormat, MessageHandling<E, Meta, Context> messageHandling) {
        return create(str, config, jacksonEventFormat, JacksonSimpleFormat.empty(), JacksonSimpleFormat.empty(), messageHandling);
    }

    public static <E extends Event, Meta, Context> EventuallyConsistentProjection<E, Meta, Context> create(final String str, Config<E, Meta, Context> config, JacksonEventFormat<?, E> jacksonEventFormat, JacksonSimpleFormat<Meta> jacksonSimpleFormat, JacksonSimpleFormat<Context> jacksonSimpleFormat2, final MessageHandling<E, Meta, Context> messageHandling) {
        return (EventuallyConsistentProjection<E, Meta, Context>) new EventuallyConsistentProjection<E, Meta, Context>(jacksonEventFormat, jacksonSimpleFormat, jacksonSimpleFormat2, config) { // from class: fr.maif.reactor.projections.EventuallyConsistentProjection.1
            @Override // fr.maif.reactor.projections.EventuallyConsistentProjection
            protected String name() {
                return str;
            }

            @Override // fr.maif.reactor.projections.EventuallyConsistentProjection
            public Function<Flux<ReceiverRecord<String, EventEnvelope<E, Meta, Context>>>, Flux<ReceiverRecord<String, EventEnvelope<E, Meta, Context>>>> messageHandling() {
                return messageHandling;
            }
        };
    }

    public static <E extends Event, Meta, Context> EventuallyConsistentProjection<E, Meta, Context> simpleHandler(String str, Config<E, Meta, Context> config, JacksonEventFormat<?, E> jacksonEventFormat, Function<EventEnvelope<E, Meta, Context>, CompletionStage<Void>> function) {
        return simpleHandler(str, config, jacksonEventFormat, JacksonSimpleFormat.empty(), JacksonSimpleFormat.empty(), function);
    }

    public static <E extends Event, Meta, Context> EventuallyConsistentProjection<E, Meta, Context> simpleHandler(String str, Config<E, Meta, Context> config, JacksonEventFormat<?, E> jacksonEventFormat, JacksonSimpleFormat<Meta> jacksonSimpleFormat, JacksonSimpleFormat<Context> jacksonSimpleFormat2, Function<EventEnvelope<E, Meta, Context>, CompletionStage<Void>> function) {
        return create(str, config, jacksonEventFormat, jacksonSimpleFormat, jacksonSimpleFormat2, flux -> {
            return flux.concatMap(receiverRecord -> {
                return Mono.fromCompletionStage(() -> {
                    return ((CompletionStage) function.apply((EventEnvelope) receiverRecord.value())).thenApply(r3 -> {
                        return receiverRecord;
                    });
                });
            });
        });
    }
}
