package cz.o2.proxima.direct.kafka;

import cz.o2.proxima.direct.commitlog.CommitLogObserver;
import cz.o2.proxima.direct.commitlog.ObserverUtils;
import cz.o2.proxima.functional.BiConsumer;
import cz.o2.proxima.functional.Factory;
import cz.o2.proxima.internal.shaded.com.google.common.base.MoreObjects;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.TopicPartition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.time.WatermarkSupplier;
import java.lang.invoke.SerializedLambda;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/direct/kafka/ElementConsumers.class */
class ElementConsumers {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ElementConsumers.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cz/o2/proxima/direct/kafka/ElementConsumers$BulkConsumer.class */
    public static final class BulkConsumer<K, V> extends ConsumerBase<K, V> {
        private final CommitLogObserver observer;
        private final BiConsumer<TopicPartition, Long> commit;
        private final Factory<Map<TopicPartition, OffsetAndMetadata>> prepareCommit;
        private final Runnable onStart;

        /* JADX INFO: Access modifiers changed from: package-private */
        public BulkConsumer(CommitLogObserver commitLogObserver, BiConsumer<TopicPartition, Long> biConsumer, Factory<Map<TopicPartition, OffsetAndMetadata>> factory, Runnable runnable) {
            super();
            this.observer = commitLogObserver;
            this.commit = biConsumer;
            this.prepareCommit = factory;
            this.onStart = runnable;
        }

        @Override // cz.o2.proxima.direct.kafka.ElementConsumer
        public boolean consumeWithConfirm(@Nullable StreamElement streamElement, TopicPartition topicPartition, long j, WatermarkSupplier watermarkSupplier, Consumer<Throwable> consumer) {
            this.processing.put(topicPartition, Long.valueOf(j));
            this.watermark = watermarkSupplier.getWatermark();
            if (streamElement != null) {
                return this.observer.onNext(streamElement, context(topicPartition, j, watermarkSupplier, consumer));
            }
            return true;
        }

        private CommitLogObserver.OnNextContext context(TopicPartition topicPartition, long j, WatermarkSupplier watermarkSupplier, Consumer<Throwable> consumer) {
            HashMap hashMap = new HashMap(this.processing);
            return ObserverUtils.asOnNextContext((z, th) -> {
                if (!z) {
                    if (th != null) {
                        consumer.accept(th);
                    }
                } else {
                    hashMap.forEach((topicPartition2, l) -> {
                        this.committed.compute(topicPartition2, (topicPartition2, l) -> {
                            return Long.valueOf(Math.max(((Long) MoreObjects.firstNonNull(l, 0L)).longValue(), l.longValue() + 1));
                        });
                    });
                    Map<TopicPartition, Long> map = this.committed;
                    BiConsumer<TopicPartition, Long> biConsumer = this.commit;
                    biConsumer.getClass();
                    map.forEach((v1, v2) -> {
                        r1.accept(v1, v2);
                    });
                }
            }, new TopicOffset(new PartitionWithTopic(topicPartition.topic(), topicPartition.partition()), j, watermarkSupplier.getWatermark()));
        }

        @Override // cz.o2.proxima.direct.kafka.ElementConsumers.ConsumerBase
        CommitLogObserver observer() {
            return this.observer;
        }

        @Override // cz.o2.proxima.direct.kafka.ElementConsumer
        public Map<TopicPartition, OffsetAndMetadata> prepareOffsetsForCommit() {
            return (Map) this.prepareCommit.apply();
        }

        @Override // cz.o2.proxima.direct.kafka.ElementConsumers.ConsumerBase, cz.o2.proxima.direct.kafka.ElementConsumer
        public void onAssign(KafkaConsumer<K, V> kafkaConsumer, Collection<TopicOffset> collection) {
            super.onAssign(kafkaConsumer, collection);
            this.observer.onRepartition(ObserverUtils.asRepartitionContext((Collection) collection.stream().map((v0) -> {
                return v0.m56getPartition();
            }).collect(Collectors.toList())));
            Utils.seekToOffsets(collection, kafkaConsumer);
        }

        @Override // cz.o2.proxima.direct.kafka.ElementConsumer
        public void onStart() {
            this.onStart.run();
        }

        @Override // cz.o2.proxima.direct.kafka.ElementConsumer
        public void onIdle(WatermarkSupplier watermarkSupplier) {
            this.observer.onIdle(ObserverUtils.asOnIdleContext(watermarkSupplier));
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -865013812:
                    if (implMethodName.equals("lambda$context$43bba7f$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/commitlog/CommitLogObserver$OffsetCommitter") && serializedLambda.getFunctionalInterfaceMethodName().equals("commit") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(ZLjava/lang/Throwable;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/ElementConsumers$BulkConsumer") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map;Ljava/util/function/Consumer;ZLjava/lang/Throwable;)V")) {
                        BulkConsumer bulkConsumer = (BulkConsumer) serializedLambda.getCapturedArg(0);
                        Map map = (Map) serializedLambda.getCapturedArg(1);
                        Consumer consumer = (Consumer) serializedLambda.getCapturedArg(2);
                        return (z2, th) -> {
                            if (!z2) {
                                if (th != null) {
                                    consumer.accept(th);
                                }
                            } else {
                                map.forEach((topicPartition2, l) -> {
                                    this.committed.compute(topicPartition2, (topicPartition2, l) -> {
                                        return Long.valueOf(Math.max(((Long) MoreObjects.firstNonNull(l, 0L)).longValue(), l.longValue() + 1));
                                    });
                                });
                                Map<TopicPartition, Long> map2 = this.committed;
                                BiConsumer<TopicPartition, Long> biConsumer = this.commit;
                                biConsumer.getClass();
                                map2.forEach((v1, v2) -> {
                                    r1.accept(v1, v2);
                                });
                            }
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/o2/proxima/direct/kafka/ElementConsumers$ConsumerBase.class */
    public static abstract class ConsumerBase<K, V> implements ElementConsumer<K, V> {
        final Map<TopicPartition, Long> committed;
        final Map<TopicPartition, Long> processing;
        final AtomicReference<CompletableFuture<Map<PartitionWithTopic, Long>>> endOffsetsFuture;
        long watermark;

        private ConsumerBase() {
            this.committed = Collections.synchronizedMap(new HashMap());
            this.processing = Collections.synchronizedMap(new HashMap());
            this.endOffsetsFuture = new AtomicReference<>();
        }

        @Override // cz.o2.proxima.direct.kafka.ElementConsumer
        public void onCompleted() {
            observer().onCompleted();
        }

        @Override // cz.o2.proxima.direct.kafka.ElementConsumer
        public void onCancelled() {
            observer().onCancelled();
        }

        @Override // cz.o2.proxima.direct.kafka.ElementConsumer
        public boolean onError(Throwable th) {
            return observer().onError(th);
        }

        @Override // cz.o2.proxima.direct.kafka.ElementConsumer
        public void onAssign(KafkaConsumer<K, V> kafkaConsumer, Collection<TopicOffset> collection) {
            this.committed.clear();
            this.committed.putAll((Map) collection.stream().collect(Collectors.toMap(topicOffset -> {
                return new TopicPartition(topicOffset.m56getPartition().getTopic(), topicOffset.m56getPartition().getId());
            }, (v0) -> {
                return v0.getOffset();
            })));
            this.processing.clear();
            collection.forEach(topicOffset2 -> {
                this.processing.put(new TopicPartition(topicOffset2.m56getPartition().getTopic(), topicOffset2.m56getPartition().getId()), Long.valueOf(topicOffset2.getOffset() - 1));
            });
        }

        @Override // cz.o2.proxima.direct.kafka.ElementConsumer
        public List<TopicOffset> getCurrentOffsets() {
            return TopicOffset.fromMap(this.processing, this.watermark);
        }

        @Override // cz.o2.proxima.direct.kafka.ElementConsumer
        public List<TopicOffset> getCommittedOffsets() {
            return TopicOffset.fromMap(this.committed, this.watermark);
        }

        abstract CommitLogObserver observer();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cz/o2/proxima/direct/kafka/ElementConsumers$OnlineConsumer.class */
    public static final class OnlineConsumer<K, V> extends ConsumerBase<K, V> {
        private final CommitLogObserver observer;
        private final OffsetCommitter<TopicPartition> committer;
        private final Factory<Map<TopicPartition, OffsetAndMetadata>> prepareCommit;

        /* JADX INFO: Access modifiers changed from: package-private */
        public OnlineConsumer(CommitLogObserver commitLogObserver, OffsetCommitter<TopicPartition> offsetCommitter, Factory<Map<TopicPartition, OffsetAndMetadata>> factory) {
            super();
            this.observer = commitLogObserver;
            this.committer = offsetCommitter;
            this.prepareCommit = factory;
        }

        @Override // cz.o2.proxima.direct.kafka.ElementConsumer
        public boolean consumeWithConfirm(@Nullable StreamElement streamElement, TopicPartition topicPartition, long j, WatermarkSupplier watermarkSupplier, Consumer<Throwable> consumer) {
            this.processing.put(topicPartition, Long.valueOf(j));
            this.watermark = watermarkSupplier.getWatermark();
            if (streamElement != null && this.watermark < Long.MAX_VALUE) {
                return this.observer.onNext(streamElement, ObserverUtils.asOnNextContext((z, th) -> {
                    if (z) {
                        this.committed.compute(topicPartition, (topicPartition2, l) -> {
                            return Long.valueOf((l == null || l.longValue() <= j) ? j + 1 : l.longValue());
                        });
                        this.committer.confirm(topicPartition, j);
                    } else if (th != null) {
                        consumer.accept(th);
                    }
                }, new TopicOffset(new PartitionWithTopic(topicPartition.topic(), topicPartition.partition()), j, this.watermark)));
            }
            this.committed.compute(topicPartition, (topicPartition2, l) -> {
                return Long.valueOf((l == null || l.longValue() <= j) ? j + 1 : l.longValue());
            });
            this.committer.confirm(topicPartition, j);
            return this.watermark < Long.MAX_VALUE;
        }

        @Override // cz.o2.proxima.direct.kafka.ElementConsumer
        public Map<TopicPartition, OffsetAndMetadata> prepareOffsetsForCommit() {
            return (Map) this.prepareCommit.apply();
        }

        @Override // cz.o2.proxima.direct.kafka.ElementConsumers.ConsumerBase
        CommitLogObserver observer() {
            return this.observer;
        }

        @Override // cz.o2.proxima.direct.kafka.ElementConsumers.ConsumerBase, cz.o2.proxima.direct.kafka.ElementConsumer
        public void onAssign(KafkaConsumer<K, V> kafkaConsumer, Collection<TopicOffset> collection) {
            super.onAssign(kafkaConsumer, collection);
            this.committer.clear();
            this.observer.onRepartition(ObserverUtils.asRepartitionContext((Collection) collection.stream().map((v0) -> {
                return v0.m56getPartition();
            }).collect(Collectors.toList())));
        }

        @Override // cz.o2.proxima.direct.kafka.ElementConsumer
        public void onStart() {
            this.committer.clear();
        }

        @Override // cz.o2.proxima.direct.kafka.ElementConsumer
        public void onIdle(WatermarkSupplier watermarkSupplier) {
            this.observer.onIdle(ObserverUtils.asOnIdleContext(watermarkSupplier));
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1889246405:
                    if (implMethodName.equals("lambda$consumeWithConfirm$d6c1f094$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/commitlog/CommitLogObserver$OffsetCommitter") && serializedLambda.getFunctionalInterfaceMethodName().equals("commit") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(ZLjava/lang/Throwable;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/ElementConsumers$OnlineConsumer") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/common/TopicPartition;JLjava/util/function/Consumer;ZLjava/lang/Throwable;)V")) {
                        OnlineConsumer onlineConsumer = (OnlineConsumer) serializedLambda.getCapturedArg(0);
                        TopicPartition topicPartition = (TopicPartition) serializedLambda.getCapturedArg(1);
                        long longValue = ((Long) serializedLambda.getCapturedArg(2)).longValue();
                        Consumer consumer = (Consumer) serializedLambda.getCapturedArg(3);
                        return (z2, th) -> {
                            if (z2) {
                                this.committed.compute(topicPartition, (topicPartition2, l) -> {
                                    return Long.valueOf((l == null || l.longValue() <= longValue) ? longValue + 1 : l.longValue());
                                });
                                this.committer.confirm(topicPartition, longValue);
                            } else if (th != null) {
                                consumer.accept(th);
                            }
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    private ElementConsumers() {
    }
}
