/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.io.kafka;

import cz.o2.proxima.core.functional.BiConsumer;
import cz.o2.proxima.core.functional.Factory;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.core.time.WatermarkSupplier;
import cz.o2.proxima.direct.core.LogObserver;
import cz.o2.proxima.direct.core.commitlog.CommitLogObserver;
import cz.o2.proxima.direct.core.commitlog.ObserverUtils;
import cz.o2.proxima.direct.core.commitlog.Offset;
import cz.o2.proxima.direct.io.kafka.ElementConsumer;
import cz.o2.proxima.direct.io.kafka.OffsetCommitter;
import cz.o2.proxima.direct.io.kafka.PartitionWithTopic;
import cz.o2.proxima.direct.io.kafka.TopicOffset;
import cz.o2.proxima.internal.com.google.common.base.MoreObjects;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.TopicPartition;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ElementConsumers {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ElementConsumers.class);

    private ElementConsumers() {
    }

    static final class BulkConsumer<K, V>
    extends ConsumerBase<K, V> {
        private final CommitLogObserver observer;
        private final BiConsumer<TopicPartition, Long> commit;
        private final Factory<Map<TopicPartition, OffsetAndMetadata>> prepareCommit;
        private final Runnable onStart;

        BulkConsumer(CommitLogObserver observer, BiConsumer<TopicPartition, Long> commit, Factory<Map<TopicPartition, OffsetAndMetadata>> prepareCommit, Runnable onStart) {
            this.observer = observer;
            this.commit = commit;
            this.prepareCommit = prepareCommit;
            this.onStart = onStart;
        }

        @Override
        public boolean consumeWithConfirm(@Nullable StreamElement element, TopicPartition tp, long offset, WatermarkSupplier watermarkSupplier, Consumer<Throwable> errorHandler) {
            this.processing.put(tp, offset);
            this.watermark = watermarkSupplier.getWatermark();
            if (element != null) {
                return this.observer.onNext(element, (LogObserver.OnNextContext)this.context(tp, offset, watermarkSupplier, errorHandler));
            }
            return true;
        }

        private CommitLogObserver.OnNextContext context(TopicPartition tp, long offset, WatermarkSupplier watermarkSupplier, Consumer<Throwable> errorHandler) {
            HashMap toCommit = new HashMap(this.processing);
            return ObserverUtils.asOnNextContext((CommitLogObserver.OffsetCommitter & Serializable)(succ, err) -> {
                if (succ) {
                    toCommit.forEach((part, off) -> this.committed.compute(part, (k, v) -> Math.max((Long)MoreObjects.firstNonNull((Object)v, (Object)0L), off + 1L)));
                    this.committed.forEach((arg_0, arg_1) -> this.commit.accept(arg_0, arg_1));
                } else if (err != null) {
                    errorHandler.accept(err);
                }
            }, (Offset)new TopicOffset(new PartitionWithTopic(tp.topic(), tp.partition()), offset, watermarkSupplier.getWatermark()));
        }

        @Override
        CommitLogObserver observer() {
            return this.observer;
        }

        @Override
        public Map<TopicPartition, OffsetAndMetadata> prepareOffsetsForCommit() {
            return (Map)this.prepareCommit.apply();
        }

        @Override
        public void onAssign(KafkaConsumer<K, V> consumer, Collection<TopicOffset> offsets) {
            super.onAssign(consumer, offsets);
            this.observer.onRepartition(ObserverUtils.asRepartitionContext((Collection)offsets.stream().map(TopicOffset::getPartition).collect(Collectors.toList())));
        }

        @Override
        public void onStart() {
            this.onStart.run();
        }

        @Override
        public void onIdle(WatermarkSupplier watermarkSupplier) {
            this.watermark = watermarkSupplier.getWatermark();
            this.observer.onIdle(ObserverUtils.asOnIdleContext((WatermarkSupplier & Serializable)() -> this.watermark));
        }
    }

    static final class OnlineConsumer<K, V>
    extends ConsumerBase<K, V> {
        private final CommitLogObserver observer;
        private final OffsetCommitter<TopicPartition> committer;
        private final Factory<Map<TopicPartition, OffsetAndMetadata>> prepareCommit;

        OnlineConsumer(CommitLogObserver observer, OffsetCommitter<TopicPartition> committer, Factory<Map<TopicPartition, OffsetAndMetadata>> prepareCommit) {
            this.observer = observer;
            this.committer = committer;
            this.prepareCommit = prepareCommit;
        }

        @Override
        public boolean consumeWithConfirm(@Nullable StreamElement element, TopicPartition tp, long offset, WatermarkSupplier watermarkSupplier, Consumer<Throwable> errorHandler) {
            this.processing.put(tp, offset);
            this.watermark = watermarkSupplier.getWatermark();
            if (element != null && this.watermark < Long.MAX_VALUE) {
                return this.observer.onNext(element, (LogObserver.OnNextContext)ObserverUtils.asOnNextContext((CommitLogObserver.OffsetCommitter & Serializable)(succ, exc) -> {
                    if (succ) {
                        this.confirmOffset(tp, offset);
                    } else {
                        errorHandler.accept(Objects.requireNonNullElseGet(exc, () -> new IllegalStateException("Either confirm processing or return exception.")));
                    }
                }, (Offset)new TopicOffset(new PartitionWithTopic(tp.topic(), tp.partition()), offset, this.watermark)));
            }
            this.confirmOffset(tp, offset);
            return this.watermark < Long.MAX_VALUE;
        }

        private void confirmOffset(TopicPartition tp, long offset) {
            this.committed.compute(tp, (k, v) -> v == null || v <= offset ? offset + 1L : v);
            this.committer.confirm(tp, offset);
        }

        @Override
        public Map<TopicPartition, OffsetAndMetadata> prepareOffsetsForCommit() {
            return (Map)this.prepareCommit.apply();
        }

        @Override
        CommitLogObserver observer() {
            return this.observer;
        }

        @Override
        public void onAssign(KafkaConsumer<K, V> consumer, Collection<TopicOffset> offsets) {
            super.onAssign(consumer, offsets);
            this.committer.clear();
            this.observer.onRepartition(ObserverUtils.asRepartitionContext((Collection)offsets.stream().map(TopicOffset::getPartition).collect(Collectors.toList())));
        }

        @Override
        public void onStart() {
            this.committer.clear();
        }

        @Override
        public void onIdle(WatermarkSupplier watermarkSupplier) {
            this.watermark = watermarkSupplier.getWatermark();
            this.observer.onIdle(ObserverUtils.asOnIdleContext((WatermarkSupplier & Serializable)() -> this.watermark));
        }
    }

    private static abstract class ConsumerBase<K, V>
    implements ElementConsumer<K, V> {
        final Map<TopicPartition, Long> committed = Collections.synchronizedMap(new HashMap());
        final Map<TopicPartition, Long> processing = Collections.synchronizedMap(new HashMap());
        long watermark;

        private ConsumerBase() {
        }

        @Override
        public void onCompleted() {
            this.observer().onCompleted();
        }

        @Override
        public void onCancelled() {
            this.observer().onCancelled();
        }

        @Override
        public boolean onError(Throwable err) {
            return this.observer().onError(err);
        }

        @Override
        public void onAssign(KafkaConsumer<K, V> consumer, Collection<TopicOffset> offsets) {
            this.committed.clear();
            this.committed.putAll(offsets.stream().collect(Collectors.toMap(o -> new TopicPartition(o.getPartition().getTopic(), o.getPartition().getId()), TopicOffset::getOffset)));
            this.processing.clear();
            offsets.forEach(tp -> this.processing.put(new TopicPartition(tp.getPartition().getTopic(), tp.getPartition().getId()), tp.getOffset() - 1L));
            this.watermark = Long.MIN_VALUE;
        }

        @Override
        public List<TopicOffset> getCurrentOffsets() {
            return TopicOffset.fromMap(this.processing, this.watermark);
        }

        @Override
        public List<TopicOffset> getCommittedOffsets() {
            return TopicOffset.fromMap(this.committed, this.watermark);
        }

        abstract CommitLogObserver observer();
    }
}

