package fr.maif.kafka.consumer;

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.kafka.AutoSubscription;
import akka.kafka.CommitterSettings;
import akka.kafka.ConsumerMessage;
import akka.kafka.ConsumerSettings;
import akka.kafka.javadsl.Committer;
import akka.kafka.javadsl.Consumer;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.FlowWithContext;
import akka.stream.javadsl.RestartSource;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:fr/maif/kafka/consumer/ResilientKafkaConsumer.class */
public abstract class ResilientKafkaConsumer<K, V> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ResilientKafkaConsumer.class);
    protected final ActorSystem actorSystem;
    protected final Materializer materializer;
    protected final AutoSubscription subscription;
    protected final String groupId;
    protected final Duration minBackoff;
    protected final Duration maxBackoff;
    protected final double randomFactor;
    protected final Integer commitSize;
    protected final ConsumerSettings<K, V> consumerSettings;
    protected final BiFunction<Consumer.Control, Integer, CompletionStage<Done>> onStarted;
    protected final Supplier<CompletionStage<Done>> onStarting;
    protected final Supplier<CompletionStage<Done>> onStopped;
    protected final Function<Consumer.Control, CompletionStage<Done>> onStopping;
    protected final Function<Throwable, CompletionStage<Done>> onFailed;
    protected final AtomicReference<Consumer.Control> controlRef = new AtomicReference<>();
    protected final AtomicReference<Status> innerStatus = new AtomicReference<>(Status.Stopped);

    /* loaded from: input_file:fr/maif/kafka/consumer/ResilientKafkaConsumer$Config.class */
    public static class Config<K, V> {
        public final AutoSubscription subscription;
        public final String groupId;
        public final ConsumerSettings<K, V> consumerSettings;
        public Duration minBackoff;
        public Duration maxBackoff;
        public Double randomFactor;
        public Integer commitSize;
        public BiFunction<Consumer.Control, Integer, CompletionStage<Done>> onStarted;
        public Supplier<CompletionStage<Done>> onStarting;
        public Supplier<CompletionStage<Done>> onStopped;
        public Function<Consumer.Control, CompletionStage<Done>> onStopping;
        public Function<Throwable, CompletionStage<Done>> onFailed;

        /* loaded from: input_file:fr/maif/kafka/consumer/ResilientKafkaConsumer$Config$ConfigBuilder.class */
        public static class ConfigBuilder<K, V> {
            AutoSubscription subscription;
            String groupId;
            ConsumerSettings<K, V> consumerSettings;
            Duration minBackoff;
            Duration maxBackoff;
            Double randomFactor;
            Integer commitSize;
            BiFunction<Consumer.Control, Integer, CompletionStage<Done>> onStarted;
            Supplier<CompletionStage<Done>> onStarting;
            Supplier<CompletionStage<Done>> onStopped;
            Function<Consumer.Control, CompletionStage<Done>> onStopping;
            Function<Throwable, CompletionStage<Done>> onFailed;

            public ConfigBuilder<K, V> subscription(AutoSubscription autoSubscription) {
                this.subscription = autoSubscription;
                return this;
            }

            public ConfigBuilder<K, V> groupId(String str) {
                this.groupId = str;
                return this;
            }

            public ConfigBuilder<K, V> consumerSettings(ConsumerSettings<K, V> consumerSettings) {
                this.consumerSettings = consumerSettings;
                return this;
            }

            public ConfigBuilder<K, V> minBackoff(Duration duration) {
                this.minBackoff = duration;
                return this;
            }

            public ConfigBuilder<K, V> maxBackoff(Duration duration) {
                this.maxBackoff = duration;
                return this;
            }

            public ConfigBuilder<K, V> randomFactor(Double d) {
                this.randomFactor = d;
                return this;
            }

            public ConfigBuilder<K, V> commitSize(Integer num) {
                this.commitSize = num;
                return this;
            }

            public ConfigBuilder<K, V> onStarted(BiFunction<Consumer.Control, Integer, CompletionStage<Done>> biFunction) {
                this.onStarted = biFunction;
                return this;
            }

            public ConfigBuilder<K, V> onStarting(Supplier<CompletionStage<Done>> supplier) {
                this.onStarting = supplier;
                return this;
            }

            public ConfigBuilder<K, V> onStopped(Supplier<CompletionStage<Done>> supplier) {
                this.onStopped = supplier;
                return this;
            }

            public ConfigBuilder<K, V> onStopping(Function<Consumer.Control, CompletionStage<Done>> function) {
                this.onStopping = function;
                return this;
            }

            public ConfigBuilder<K, V> onFailed(Function<Throwable, CompletionStage<Done>> function) {
                this.onFailed = function;
                return this;
            }

            public Config<K, V> build() {
                return new Config<>(this.subscription, this.groupId, this.consumerSettings, this.minBackoff, this.maxBackoff, this.randomFactor, this.commitSize, this.onStarted, this.onStarting, this.onStopped, this.onStopping, this.onFailed);
            }
        }

        private Config(AutoSubscription autoSubscription, String str, ConsumerSettings<K, V> consumerSettings, Duration duration, Duration duration2, Double d, Integer num, BiFunction<Consumer.Control, Integer, CompletionStage<Done>> biFunction, Supplier<CompletionStage<Done>> supplier, Supplier<CompletionStage<Done>> supplier2, Function<Consumer.Control, CompletionStage<Done>> function, Function<Throwable, CompletionStage<Done>> function2) {
            this.subscription = autoSubscription;
            this.groupId = str;
            this.consumerSettings = consumerSettings;
            this.minBackoff = duration;
            this.maxBackoff = duration2;
            this.randomFactor = d;
            this.commitSize = num;
            this.onStarted = biFunction;
            this.onStarting = supplier;
            this.onStopped = supplier2;
            this.onStopping = function;
            this.onFailed = function2;
        }

        public static <K, V> Config<K, V> create(AutoSubscription autoSubscription, String str, ConsumerSettings<K, V> consumerSettings) {
            return builder().consumerSettings(consumerSettings).groupId(str).subscription(autoSubscription).build();
        }

        public static <K, V> ConfigBuilder<K, V> builder() {
            return new ConfigBuilder<>();
        }

        public ConfigBuilder<K, V> toBuilder() {
            return builder().subscription(this.subscription).groupId(this.groupId).consumerSettings(this.consumerSettings).minBackoff(this.minBackoff).maxBackoff(this.maxBackoff).randomFactor(this.randomFactor).commitSize(this.commitSize).onStarted(this.onStarted).onStarting(this.onStarting).onStopped(this.onStopped).onStopping(this.onStopping).onFailed(this.onFailed);
        }

        public Config<K, V> withMinBackoff(Duration duration) {
            return toBuilder().minBackoff(duration).build();
        }

        public Config<K, V> withMaxBackoff(Duration duration) {
            return toBuilder().maxBackoff(duration).build();
        }

        public Config<K, V> withRandomFactor(Double d) {
            return toBuilder().randomFactor(d).build();
        }

        public Config<K, V> withCommitSize(Integer num) {
            return toBuilder().commitSize(num).build();
        }

        public Config<K, V> withOnStarted(BiFunction<Consumer.Control, Integer, CompletionStage<Done>> biFunction) {
            return toBuilder().onStarted(biFunction).build();
        }

        public Config<K, V> withOnStarting(Supplier<CompletionStage<Done>> supplier) {
            return toBuilder().onStarting(supplier).build();
        }

        public Config<K, V> withOnStopped(Supplier<CompletionStage<Done>> supplier) {
            return toBuilder().onStopped(supplier).build();
        }

        public Config<K, V> withOnStopping(Function<Consumer.Control, CompletionStage<Done>> function) {
            return toBuilder().onStopping(function).build();
        }

        public Config<K, V> withOnFailed(Function<Throwable, CompletionStage<Done>> function) {
            return toBuilder().onFailed(function).build();
        }
    }

    public ResilientKafkaConsumer(ActorSystem actorSystem, Config<K, V> config) {
        this.actorSystem = actorSystem;
        this.materializer = Materializer.createMaterializer(actorSystem);
        this.subscription = config.subscription;
        this.groupId = config.groupId;
        this.minBackoff = Objects.isNull(config.minBackoff) ? Duration.ofSeconds(30L) : config.minBackoff;
        this.maxBackoff = Objects.isNull(config.maxBackoff) ? Duration.ofMinutes(30L) : config.maxBackoff;
        this.randomFactor = Objects.isNull(config.randomFactor) ? 0.2d : config.randomFactor.doubleValue();
        this.commitSize = Integer.valueOf(Objects.isNull(config.commitSize) ? 10 : config.commitSize.intValue());
        this.consumerSettings = config.consumerSettings.withGroupId(config.groupId).withProperty("enable.auto.commit", "false");
        this.onStarted = (BiFunction) defaultIfNull(config.onStarted, (control, num) -> {
            return CompletableFuture.completedFuture(Done.done());
        });
        this.onStarting = (Supplier) defaultIfNull(config.onStarting, () -> {
            return CompletableFuture.completedFuture(Done.done());
        });
        this.onStopped = (Supplier) defaultIfNull(config.onStopped, () -> {
            return CompletableFuture.completedFuture(Done.done());
        });
        this.onStopping = (Function) defaultIfNull(config.onStopping, control2 -> {
            return CompletableFuture.completedFuture(Done.done());
        });
        this.onFailed = (Function) defaultIfNull(config.onFailed, th -> {
            return CompletableFuture.completedFuture(Done.done());
        });
        start();
    }

    public static <K, V> ResilientKafkaConsumer<K, V> createFromFlow(ActorSystem actorSystem, final String str, Config<K, V> config, final Flow<ConsumerMessage.CommittableMessage<K, V>, ConsumerMessage.CommittableOffset, NotUsed> flow) {
        return new ResilientKafkaConsumer<K, V>(actorSystem, config) { // from class: fr.maif.kafka.consumer.ResilientKafkaConsumer.1
            @Override // fr.maif.kafka.consumer.ResilientKafkaConsumer
            protected String name() {
                return str;
            }

            @Override // fr.maif.kafka.consumer.ResilientKafkaConsumer
            public Flow<ConsumerMessage.CommittableMessage<K, V>, ConsumerMessage.CommittableOffset, NotUsed> messageHandling() {
                return flow;
            }
        };
    }

    public static <K, V> ResilientKafkaConsumer<K, V> createFromFlowCtx(ActorSystem actorSystem, final String str, Config<K, V> config, final FlowWithContext<ConsumerMessage.CommittableMessage<K, V>, ConsumerMessage.CommittableOffset, ?, ConsumerMessage.CommittableOffset, NotUsed> flowWithContext) {
        return new ResilientKafkaConsumer<K, V>(actorSystem, config) { // from class: fr.maif.kafka.consumer.ResilientKafkaConsumer.2
            @Override // fr.maif.kafka.consumer.ResilientKafkaConsumer
            protected String name() {
                return str;
            }

            @Override // fr.maif.kafka.consumer.ResilientKafkaConsumer
            public Flow<ConsumerMessage.CommittableMessage<K, V>, ConsumerMessage.CommittableOffset, NotUsed> messageHandling() {
                return Flow.create().map(committableMessage -> {
                    return Pair.create(committableMessage, committableMessage.committableOffset());
                }).via(flowWithContext.asFlow()).map((v0) -> {
                    return v0.second();
                });
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case -906279820:
                        if (implMethodName.equals("second")) {
                            z = true;
                            break;
                        }
                        break;
                    case 762980557:
                        if (implMethodName.equals("lambda$messageHandling$b88ccbf1$1")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/kafka/consumer/ResilientKafkaConsumer$2") && serializedLambda.getImplMethodSignature().equals("(Lakka/kafka/ConsumerMessage$CommittableMessage;)Lakka/japi/Pair;")) {
                            return committableMessage -> {
                                return Pair.create(committableMessage, committableMessage.committableOffset());
                            };
                        }
                        break;
                    case true:
                        if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("akka/japi/Pair") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                            return (v0) -> {
                                return v0.second();
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        };
    }

    public static <K, V> ResilientKafkaConsumer<K, V> createFromFlowCtxAgg(ActorSystem actorSystem, final String str, Config<K, V> config, final FlowWithContext<ConsumerMessage.CommittableMessage<K, V>, ConsumerMessage.CommittableOffset, ?, List<ConsumerMessage.CommittableOffset>, NotUsed> flowWithContext) {
        return new ResilientKafkaConsumer<K, V>(actorSystem, config) { // from class: fr.maif.kafka.consumer.ResilientKafkaConsumer.3
            @Override // fr.maif.kafka.consumer.ResilientKafkaConsumer
            protected String name() {
                return str;
            }

            @Override // fr.maif.kafka.consumer.ResilientKafkaConsumer
            public Flow<ConsumerMessage.CommittableMessage<K, V>, ConsumerMessage.CommittableOffset, NotUsed> messageHandling() {
                return Flow.create().map(committableMessage -> {
                    return Pair.create(committableMessage, committableMessage.committableOffset());
                }).via(flowWithContext.asFlow()).mapConcat((v0) -> {
                    return v0.second();
                });
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case -906279820:
                        if (implMethodName.equals("second")) {
                            z = true;
                            break;
                        }
                        break;
                    case 762980557:
                        if (implMethodName.equals("lambda$messageHandling$b88ccbf1$1")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/kafka/consumer/ResilientKafkaConsumer$3") && serializedLambda.getImplMethodSignature().equals("(Lakka/kafka/ConsumerMessage$CommittableMessage;)Lakka/japi/Pair;")) {
                            return committableMessage -> {
                                return Pair.create(committableMessage, committableMessage.committableOffset());
                            };
                        }
                        break;
                    case true:
                        if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("akka/japi/Pair") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                            return (v0) -> {
                                return v0.second();
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        };
    }

    public static <K, V> ResilientKafkaConsumer<K, V> create(ActorSystem actorSystem, final String str, Config<K, V> config, final Function<ConsumerMessage.CommittableMessage<K, V>, CompletionStage<Done>> function) {
        return new ResilientKafkaConsumer<K, V>(actorSystem, config) { // from class: fr.maif.kafka.consumer.ResilientKafkaConsumer.4
            @Override // fr.maif.kafka.consumer.ResilientKafkaConsumer
            protected String name() {
                return str;
            }

            @Override // fr.maif.kafka.consumer.ResilientKafkaConsumer
            public Flow<ConsumerMessage.CommittableMessage<K, V>, ConsumerMessage.CommittableOffset, NotUsed> messageHandling() {
                Flow create = Flow.create();
                Function function2 = function;
                return create.flatMapConcat(committableMessage -> {
                    return Source.completionStage((CompletionStage) function2.apply(committableMessage)).map(done -> {
                        return committableMessage.committableOffset();
                    });
                });
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case 1049774250:
                        if (implMethodName.equals("lambda$messageHandling$cde16ff1$1")) {
                            z = false;
                            break;
                        }
                        break;
                    case 1875472405:
                        if (implMethodName.equals("lambda$messageHandling$f4463f1a$1")) {
                            z = true;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/kafka/consumer/ResilientKafkaConsumer$4") && serializedLambda.getImplMethodSignature().equals("(Lakka/kafka/ConsumerMessage$CommittableMessage;Lakka/Done;)Lakka/kafka/ConsumerMessage$CommittableOffset;")) {
                            ConsumerMessage.CommittableMessage committableMessage = (ConsumerMessage.CommittableMessage) serializedLambda.getCapturedArg(0);
                            return done -> {
                                return committableMessage.committableOffset();
                            };
                        }
                        break;
                    case true:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/kafka/consumer/ResilientKafkaConsumer$4") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/function/Function;Lakka/kafka/ConsumerMessage$CommittableMessage;)Lakka/stream/Graph;")) {
                            Function function2 = (Function) serializedLambda.getCapturedArg(0);
                            return committableMessage2 -> {
                                return Source.completionStage((CompletionStage) function2.apply(committableMessage2)).map(done2 -> {
                                    return committableMessage2.committableOffset();
                                });
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        };
    }

    public static <K, V> ResilientKafkaConsumer<K, V> create(ActorSystem actorSystem, final String str, Config<K, V> config, final Executor executor, final java.util.function.Consumer<ConsumerMessage.CommittableMessage<K, V>> consumer) {
        return new ResilientKafkaConsumer<K, V>(actorSystem, config) { // from class: fr.maif.kafka.consumer.ResilientKafkaConsumer.5
            @Override // fr.maif.kafka.consumer.ResilientKafkaConsumer
            protected String name() {
                return str;
            }

            @Override // fr.maif.kafka.consumer.ResilientKafkaConsumer
            public Flow<ConsumerMessage.CommittableMessage<K, V>, ConsumerMessage.CommittableOffset, NotUsed> messageHandling() {
                Flow create = Flow.create();
                java.util.function.Consumer consumer2 = consumer;
                Executor executor2 = executor;
                return create.flatMapConcat(committableMessage -> {
                    return Source.completionStage(CompletableFuture.supplyAsync(() -> {
                        consumer2.accept(committableMessage);
                        return Done.done();
                    }, executor2)).map(done -> {
                        return committableMessage.committableOffset();
                    });
                });
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case -161894995:
                        if (implMethodName.equals("lambda$messageHandling$d706ce1a$1")) {
                            z = false;
                            break;
                        }
                        break;
                    case 1049774250:
                        if (implMethodName.equals("lambda$messageHandling$cde16ff1$1")) {
                            z = true;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/kafka/consumer/ResilientKafkaConsumer$5") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/function/Consumer;Ljava/util/concurrent/Executor;Lakka/kafka/ConsumerMessage$CommittableMessage;)Lakka/stream/Graph;")) {
                            java.util.function.Consumer consumer2 = (java.util.function.Consumer) serializedLambda.getCapturedArg(0);
                            Executor executor2 = (Executor) serializedLambda.getCapturedArg(1);
                            return committableMessage -> {
                                return Source.completionStage(CompletableFuture.supplyAsync(() -> {
                                    consumer2.accept(committableMessage);
                                    return Done.done();
                                }, executor2)).map(done -> {
                                    return committableMessage.committableOffset();
                                });
                            };
                        }
                        break;
                    case true:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/kafka/consumer/ResilientKafkaConsumer$5") && serializedLambda.getImplMethodSignature().equals("(Lakka/kafka/ConsumerMessage$CommittableMessage;Lakka/Done;)Lakka/kafka/ConsumerMessage$CommittableOffset;")) {
                            ConsumerMessage.CommittableMessage committableMessage2 = (ConsumerMessage.CommittableMessage) serializedLambda.getCapturedArg(0);
                            return done -> {
                                return committableMessage2.committableOffset();
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        };
    }

    public static <K, V> ResilientKafkaConsumer<K, V> create(ActorSystem actorSystem, String str, Config<K, V> config, java.util.function.Consumer<ConsumerMessage.CommittableMessage<K, V>> consumer) {
        return create(actorSystem, str, config, Executors.newCachedThreadPool(), consumer);
    }

    protected static <T> T defaultIfNull(T t, T t2) {
        return Objects.isNull(t) ? t2 : t;
    }

    protected abstract String name();

    protected abstract Flow<ConsumerMessage.CommittableMessage<K, V>, ConsumerMessage.CommittableOffset, NotUsed> messageHandling();

    private Status updateStatus(Status status) {
        this.innerStatus.set(status);
        return status;
    }

    public Status status() {
        return this.innerStatus.get();
    }

    public Status start() {
        Status status = status();
        if (Status.Starting.equals(status) || Status.Started.equals(status)) {
            LOGGER.info("{} already started", name());
            return status;
        }
        updateStatus(Status.Starting);
        CommitterSettings create = CommitterSettings.create(this.actorSystem);
        LOGGER.info("Starting {} on topic '{}' with group id '{}'", new Object[]{name(), this.subscription, this.groupId});
        AtomicInteger atomicInteger = new AtomicInteger(0);
        RestartSource.onFailuresWithBackoff(this.minBackoff, this.maxBackoff, this.randomFactor, () -> {
            this.onStarting.get();
            int incrementAndGet = atomicInteger.incrementAndGet();
            if (incrementAndGet > 1) {
                LOGGER.info("Stream for {} is restarting for the {} time", name(), Integer.valueOf(incrementAndGet));
            } else {
                LOGGER.info("Stream for {} is starting", name());
            }
            return Consumer.committablePartitionedSource(this.consumerSettings, this.subscription).flatMapMerge(100, pair -> {
                return ((Source) pair.second()).via(messageHandling()).via(Committer.flow(create.withMaxBatch(this.commitSize.intValue())));
            }).mapMaterializedValue(control -> {
                updateStatus(Status.Started);
                this.onStarted.apply(control, Integer.valueOf(incrementAndGet));
                LOGGER.info("Stream for {} has started", name());
                this.controlRef.set(control);
                return control;
            }).watchTermination((control2, completionStage) -> {
                return handleTerminaison(completionStage);
            });
        }).watchTermination((notUsed, completionStage) -> {
            return handleTerminaison(completionStage);
        }).runWith(Sink.ignore(), this.materializer);
        return Status.Starting;
    }

    public CompletionStage<Done> stop() {
        updateStatus(Status.Stopping);
        return this.onStopping.apply(this.controlRef.get()).exceptionally(th -> {
            return Done.done();
        }).thenCompose(done -> {
            return stopConsumingKafka();
        }).exceptionally(th2 -> {
            return Done.done();
        }).whenComplete((done2, th3) -> {
            updateStatus(Status.Stopped);
        }).thenCompose(done3 -> {
            return this.onStopped.get();
        }).exceptionally(th4 -> {
            return Done.done();
        });
    }

    protected CompletionStage<Status> handleTerminaison(CompletionStage<Done> completionStage) {
        return completionStage.thenApply(done -> {
            LOGGER.info("Stopping {}", name());
            updateStatus(Status.Stopped);
            return stopConsumingKafka().exceptionally(th -> {
                return Done.done();
            }).thenCompose(done -> {
                return this.onStopped.get();
            }).thenApply(done2 -> {
                return Status.Stopped;
            }).exceptionally(th2 -> {
                return Status.Stopped;
            });
        }).exceptionally(th -> {
            LOGGER.error("Error during " + name(), th);
            updateStatus(Status.Failed);
            return stopConsumingKafka().exceptionally(th -> {
                return Done.done();
            }).thenCompose(done2 -> {
                return this.onFailed.apply(th);
            }).thenApply(done3 -> {
                return Status.Failed;
            }).exceptionally(th2 -> {
                return Status.Failed;
            });
        }).thenCompose(Function.identity());
    }

    protected CompletionStage<Done> stopConsumingKafka() {
        Consumer.Control andSet = this.controlRef.getAndSet(null);
        return andSet != null ? andSet.shutdown().whenComplete((done, th) -> {
            if (th != null) {
                LOGGER.error("Error shutting down kafka consumer for {}", name());
            } else {
                LOGGER.info("Kafka consumer for {} is shutdown", name());
            }
        }).thenCompose(done2 -> {
            return andSet.isShutdown();
        }).thenApply(done3 -> {
            return Done.done();
        }) : CompletableFuture.completedFuture(Done.done());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1134537213:
                if (implMethodName.equals("lambda$start$9656ab91$1")) {
                    z = false;
                    break;
                }
                break;
            case 386153035:
                if (implMethodName.equals("lambda$start$9734aa11$1")) {
                    z = 4;
                    break;
                }
                break;
            case 386153036:
                if (implMethodName.equals("lambda$start$9734aa11$2")) {
                    z = 3;
                    break;
                }
                break;
            case 1032372526:
                if (implMethodName.equals("lambda$start$5a82ea88$1")) {
                    z = true;
                    break;
                }
                break;
            case 1808624389:
                if (implMethodName.equals("lambda$start$1fc21a38$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/kafka/consumer/ResilientKafkaConsumer") && serializedLambda.getImplMethodSignature().equals("(ILakka/kafka/javadsl/Consumer$Control;)Lakka/kafka/javadsl/Consumer$Control;")) {
                    ResilientKafkaConsumer resilientKafkaConsumer = (ResilientKafkaConsumer) serializedLambda.getCapturedArg(0);
                    int intValue = ((Integer) serializedLambda.getCapturedArg(1)).intValue();
                    return control -> {
                        updateStatus(Status.Started);
                        this.onStarted.apply(control, Integer.valueOf(intValue));
                        LOGGER.info("Stream for {} has started", name());
                        this.controlRef.set(control);
                        return control;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/kafka/consumer/ResilientKafkaConsumer") && serializedLambda.getImplMethodSignature().equals("(Lakka/kafka/CommitterSettings;Lakka/japi/Pair;)Lakka/stream/Graph;")) {
                    ResilientKafkaConsumer resilientKafkaConsumer2 = (ResilientKafkaConsumer) serializedLambda.getCapturedArg(0);
                    CommitterSettings committerSettings = (CommitterSettings) serializedLambda.getCapturedArg(1);
                    return pair -> {
                        return ((Source) pair.second()).via(messageHandling()).via(Committer.flow(committerSettings.withMaxBatch(this.commitSize.intValue())));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Creator") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/kafka/consumer/ResilientKafkaConsumer") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lakka/kafka/CommitterSettings;)Lakka/stream/javadsl/Source;")) {
                    ResilientKafkaConsumer resilientKafkaConsumer3 = (ResilientKafkaConsumer) serializedLambda.getCapturedArg(0);
                    AtomicInteger atomicInteger = (AtomicInteger) serializedLambda.getCapturedArg(1);
                    CommitterSettings committerSettings2 = (CommitterSettings) serializedLambda.getCapturedArg(2);
                    return () -> {
                        this.onStarting.get();
                        int incrementAndGet = atomicInteger.incrementAndGet();
                        if (incrementAndGet > 1) {
                            LOGGER.info("Stream for {} is restarting for the {} time", name(), Integer.valueOf(incrementAndGet));
                        } else {
                            LOGGER.info("Stream for {} is starting", name());
                        }
                        return Consumer.committablePartitionedSource(this.consumerSettings, this.subscription).flatMapMerge(100, pair2 -> {
                            return ((Source) pair2.second()).via(messageHandling()).via(Committer.flow(committerSettings2.withMaxBatch(this.commitSize.intValue())));
                        }).mapMaterializedValue(control2 -> {
                            updateStatus(Status.Started);
                            this.onStarted.apply(control2, Integer.valueOf(incrementAndGet));
                            LOGGER.info("Stream for {} has started", name());
                            this.controlRef.set(control2);
                            return control2;
                        }).watchTermination((control22, completionStage) -> {
                            return handleTerminaison(completionStage);
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function2") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/kafka/consumer/ResilientKafkaConsumer") && serializedLambda.getImplMethodSignature().equals("(Lakka/NotUsed;Ljava/util/concurrent/CompletionStage;)Ljava/util/concurrent/CompletionStage;")) {
                    ResilientKafkaConsumer resilientKafkaConsumer4 = (ResilientKafkaConsumer) serializedLambda.getCapturedArg(0);
                    return (notUsed, completionStage) -> {
                        return handleTerminaison(completionStage);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function2") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("fr/maif/kafka/consumer/ResilientKafkaConsumer") && serializedLambda.getImplMethodSignature().equals("(Lakka/kafka/javadsl/Consumer$Control;Ljava/util/concurrent/CompletionStage;)Ljava/util/concurrent/CompletionStage;")) {
                    ResilientKafkaConsumer resilientKafkaConsumer5 = (ResilientKafkaConsumer) serializedLambda.getCapturedArg(0);
                    return (control22, completionStage2) -> {
                        return handleTerminaison(completionStage2);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
