/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.io.kafka;

import cz.o2.proxima.core.functional.BiConsumer;
import cz.o2.proxima.core.functional.Factory;
import cz.o2.proxima.core.storage.AbstractStorage;
import cz.o2.proxima.core.storage.Partition;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.core.storage.commitlog.Position;
import cz.o2.proxima.core.time.PartitionedWatermarkEstimator;
import cz.o2.proxima.core.time.WatermarkEstimator;
import cz.o2.proxima.core.util.ExceptionUtils;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.core.commitlog.CommitLogObserver;
import cz.o2.proxima.direct.core.commitlog.CommitLogReader;
import cz.o2.proxima.direct.core.commitlog.ObserveHandle;
import cz.o2.proxima.direct.core.commitlog.Offset;
import cz.o2.proxima.direct.core.commitlog.OffsetExternalizer;
import cz.o2.proxima.direct.core.time.MinimalPartitionWatermarkEstimator;
import cz.o2.proxima.direct.io.kafka.ElementConsumer;
import cz.o2.proxima.direct.io.kafka.ElementConsumers;
import cz.o2.proxima.direct.io.kafka.ElementSerializer;
import cz.o2.proxima.direct.io.kafka.KafkaAccessor;
import cz.o2.proxima.direct.io.kafka.KafkaConsumerFactory;
import cz.o2.proxima.direct.io.kafka.KafkaThroughputLimiter;
import cz.o2.proxima.direct.io.kafka.OffsetCommitter;
import cz.o2.proxima.direct.io.kafka.PartitionWithTopic;
import cz.o2.proxima.direct.io.kafka.TopicOffset;
import cz.o2.proxima.direct.io.kafka.TopicOffsetExternalizer;
import cz.o2.proxima.direct.io.kafka.Utils;
import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.com.google.common.base.Preconditions;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecords;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.PartitionInfo;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.TopicPartition;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.errors.RebalanceInProgressException;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaLogReader
extends AbstractStorage
implements CommitLogReader {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaLogReader.class);
    final KafkaAccessor accessor;
    private final Context context;
    private final long consumerPollInterval;
    private final long maxBytesPerSec;
    private final Map<String, Object> cfg;
    private final ElementSerializer<Object, Object> serializer;

    KafkaLogReader(KafkaAccessor accessor, Context context) {
        super(accessor.getEntityDescriptor(), accessor.getUri());
        this.accessor = accessor;
        this.context = context;
        this.consumerPollInterval = accessor.getConsumerPollInterval();
        this.maxBytesPerSec = accessor.getMaxBytesPerSec();
        this.cfg = accessor.getCfg();
        this.serializer = accessor.getSerializer();
        log.debug("Created {} for accessor {}", (Object)((Object)((Object)this)).getClass().getSimpleName(), (Object)accessor);
    }

    public boolean restoresSequentialIds() {
        return this.serializer.storesSequentialId();
    }

    public ObserveHandle observe(String name, Position position, CommitLogObserver observer) {
        return this.observeKafka(name, null, position, false, observer);
    }

    public ObserveHandle observePartitions(String name, @Nullable Collection<Partition> partitions, Position position, boolean stopAtCurrent, CommitLogObserver observer) {
        return this.observeKafka(null, partitions, position, stopAtCurrent, observer);
    }

    public ObserveHandle observeBulk(String name, Position position, boolean stopAtCurrent, CommitLogObserver observer) {
        return this.observeKafkaBulk(name, null, position, stopAtCurrent, observer);
    }

    public ObserveHandle observeBulkPartitions(String name, Collection<Partition> partitions, Position position, boolean stopAtCurrent, CommitLogObserver observer) {
        return this.observeKafkaBulk(null, KafkaLogReader.createDefaultOffsets(partitions), position, stopAtCurrent, observer);
    }

    public ObserveHandle observeBulkOffsets(Collection<Offset> offsets, boolean stopAtCurrent, CommitLogObserver observer) {
        return this.observeKafkaBulk(null, offsets, Position.CURRENT, stopAtCurrent, observer);
    }

    public List<Partition> getPartitions() {
        if (this.accessor.isTopicRegex()) {
            throw new UnsupportedOperationException(String.format("Partitions of URI %s are unstable and should not be used.", this.getUri()));
        }
        try (KafkaConsumer<Object, Object> consumer = this.createConsumer();){
            List<Partition> list = consumer.partitionsFor(this.accessor.getTopic()).stream().map(pi -> new PartitionWithTopic(pi.topic(), pi.partition())).collect(Collectors.toList());
            return list;
        }
    }

    public Map<Partition, Offset> fetchOffsets(Position position, List<Partition> partitions) {
        Map<TopicPartition, Long> res;
        Preconditions.checkArgument((position == Position.NEWEST || position == Position.OLDEST ? 1 : 0) != 0, (String)"Position %s does not have well defined offsets.", (Object)position);
        List<TopicPartition> topicPartitions = partitions.stream().map(PartitionWithTopic.class::cast).map(p -> new TopicPartition(p.getTopic(), p.getPartition())).collect(Collectors.toList());
        try (KafkaConsumer<Object, Object> consumer = this.createConsumer();){
            res = position == Position.OLDEST ? consumer.beginningOffsets(topicPartitions) : consumer.endOffsets(topicPartitions);
        }
        return res.entrySet().stream().collect(Collectors.toMap(e -> new PartitionWithTopic(((TopicPartition)e.getKey()).topic(), ((TopicPartition)e.getKey()).partition()), e -> new TopicOffset(new PartitionWithTopic(((TopicPartition)e.getKey()).topic(), ((TopicPartition)e.getKey()).partition()), (Long)e.getValue(), Long.MIN_VALUE)));
    }

    @VisibleForTesting
    ObserveHandle observeKafka(@Nullable String name, @Nullable Collection<Partition> partitions, Position position, boolean stopAtCurrent, CommitLogObserver observer) {
        Preconditions.checkArgument((name != null || partitions != null ? 1 : 0) != 0, (Object)"Either name or offsets have to be non null");
        Preconditions.checkArgument((!this.accessor.isTopicRegex() || partitions == null ? 1 : 0) != 0, (String)"Regex URI %s cannot observe specific partitions, because these cannot be made stable.", (Object)this.getUri());
        try {
            return this.processConsumer(name, KafkaLogReader.createDefaultOffsets(partitions), position, stopAtCurrent, name != null, observer, this.context.getExecutorService());
        }
        catch (InterruptedException ex) {
            log.warn("Interrupted waiting for kafka observer to start", (Throwable)ex);
            Thread.currentThread().interrupt();
            throw new RuntimeException(ex);
        }
    }

    private ObserveHandle observeKafkaBulk(@Nullable String name, @Nullable Collection<Offset> offsets, Position position, boolean stopAtCurrent, CommitLogObserver observer) {
        Preconditions.checkArgument((name != null || offsets != null ? 1 : 0) != 0, (Object)"Either name or offsets have to be non null");
        Preconditions.checkArgument((position != null ? 1 : 0) != 0, (Object)"Position cannot be null");
        Preconditions.checkArgument((!this.accessor.isTopicRegex() || offsets == null ? 1 : 0) != 0, (String)"Regex URI %s cannot observe specific offsets, because these cannot be made stable.", (Object)this.getUri());
        try {
            return this.processConsumerBulk(name, offsets, position, stopAtCurrent, name != null, observer, this.context.getExecutorService());
        }
        catch (InterruptedException ex) {
            log.warn("Interrupted waiting for kafka observer to start", (Throwable)ex);
            Thread.currentThread().interrupt();
            throw new RuntimeException(ex);
        }
    }

    @VisibleForTesting
    ObserveHandle processConsumer(@Nullable String name, @Nullable Collection<Offset> offsets, Position position, boolean stopAtCurrent, boolean commitToKafka, CommitLogObserver observer, ExecutorService executor) throws InterruptedException {
        Map kafkaCommitMap = Collections.synchronizedMap(new HashMap());
        OffsetCommitter<TopicPartition> offsetCommitter = this.createOffsetCommitter();
        BiConsumer & Serializable preWrite = (BiConsumer & Serializable)(tp, r) -> {
            long offset = r.offset();
            offsetCommitter.register((TopicPartition)tp, offset, 1, () -> {
                OffsetAndMetadata mtd = new OffsetAndMetadata(offset + 1L);
                if (commitToKafka) {
                    kafkaCommitMap.compute(tp, (k, v) -> v == null || v.offset() <= offset ? mtd : v);
                }
            });
        };
        ElementConsumers.OnlineConsumer<Object, Object> onlineConsumer = new ElementConsumers.OnlineConsumer<Object, Object>(observer, offsetCommitter, (Factory<Map<TopicPartition, OffsetAndMetadata>>)(Factory & Serializable)() -> {
            Map map = kafkaCommitMap;
            synchronized (map) {
                HashMap clone = new HashMap(kafkaCommitMap);
                kafkaCommitMap.clear();
                return clone;
            }
        });
        AtomicReference<ObserveHandle> handle = new AtomicReference<ObserveHandle>();
        this.submitConsumerWithObserver(name, offsets, position, stopAtCurrent, (BiConsumer<TopicPartition, ConsumerRecord<Object, Object>>)preWrite, onlineConsumer, executor, handle);
        return KafkaLogReader.dynamicHandle(handle);
    }

    @VisibleForTesting
    ObserveHandle processConsumerBulk(@Nullable String name, @Nullable Collection<Offset> offsets, Position position, boolean stopAtCurrent, boolean commitToKafka, CommitLogObserver observer, ExecutorService executor) throws InterruptedException {
        Map kafkaCommitMap = Collections.synchronizedMap(new HashMap());
        ElementConsumers.BulkConsumer<Object, Object> bulkConsumer = new ElementConsumers.BulkConsumer<Object, Object>(observer, (BiConsumer<TopicPartition, Long>)(BiConsumer & Serializable)(tp, o) -> {
            if (commitToKafka) {
                OffsetAndMetadata off = new OffsetAndMetadata((long)o);
                kafkaCommitMap.put(tp, off);
            }
        }, (Factory<Map<TopicPartition, OffsetAndMetadata>>)(Factory & Serializable)() -> {
            Map map = kafkaCommitMap;
            synchronized (map) {
                HashMap clone = new HashMap(kafkaCommitMap);
                kafkaCommitMap.clear();
                return clone;
            }
        }, kafkaCommitMap::clear);
        AtomicReference<ObserveHandle> handle = new AtomicReference<ObserveHandle>();
        this.submitConsumerWithObserver(name, offsets, position, stopAtCurrent, (BiConsumer<TopicPartition, ConsumerRecord<Object, Object>>)(BiConsumer & Serializable)(tp, r) -> {}, bulkConsumer, executor, handle);
        return KafkaLogReader.dynamicHandle(handle);
    }

    private void submitConsumerWithObserver(@Nullable String name, @Nullable Collection<Offset> offsets, Position position, boolean stopAtCurrent, BiConsumer<TopicPartition, ConsumerRecord<Object, Object>> preWrite, ElementConsumer<Object, Object> consumer, ExecutorService executor, AtomicReference<ObserveHandle> handle) throws InterruptedException {
        CountDownLatch readyLatch = new CountDownLatch(1);
        CountDownLatch completedLatch = new CountDownLatch(1);
        AtomicBoolean completed = new AtomicBoolean();
        AtomicBoolean shutdown = new AtomicBoolean();
        List seekOffsets = Collections.synchronizedList(new ArrayList());
        Preconditions.checkArgument((!this.accessor.isTopicRegex() || !stopAtCurrent ? 1 : 0) != 0, (Object)"Cannot use stopAtCurrent with regex URI");
        executor.submit(() -> {
            block27: {
                AtomicReference<KafkaConsumer<Object, Object>> consumerRef = new AtomicReference<KafkaConsumer<Object, Object>>();
                AtomicReference<Object> watermarkEstimator = new AtomicReference<Object>(null);
                ConcurrentHashMap<TopicPartition, Integer> emptyPollCount = new ConcurrentHashMap<TopicPartition, Integer>();
                HashMap<TopicPartition, Integer> topicPartitionToId = new HashMap<TopicPartition, Integer>();
                Duration pollDuration = Duration.ofMillis(this.consumerPollInterval);
                KafkaThroughputLimiter throughputLimiter = new KafkaThroughputLimiter(this.maxBytesPerSec);
                handle.set(this.createObserveHandle(shutdown, seekOffsets, consumer, readyLatch, completedLatch));
                consumer.onStart();
                ConsumerRebalanceListener listener = this.listener(name, consumerRef, consumer, emptyPollCount, topicPartitionToId, watermarkEstimator);
                try (KafkaConsumer<Object, Object> kafka = this.createConsumer(name, offsets, name != null ? listener : null, position);){
                    Map<TopicPartition, Long> endOffsets;
                    ConsumerRecords<Object, Object> poll;
                    consumerRef.set(kafka);
                    do {
                        poll = kafka.poll(pollDuration);
                        Map<TopicPartition, Long> map = endOffsets = stopAtCurrent ? this.findNonEmptyEndOffsets(kafka) : null;
                        if (!log.isDebugEnabled()) continue;
                        log.debug("End offsets of current assignment {}: {}", kafka.assignment(), endOffsets);
                    } while (poll.isEmpty() && this.accessor.isTopicRegex() && kafka.assignment().isEmpty() && !shutdown.get() && !Thread.currentThread().isInterrupted());
                    Set<TopicPartition> assignment = kafka.assignment();
                    if (!assignment.isEmpty()) {
                        listener.onPartitionsRevoked(assignment);
                        listener.onPartitionsAssigned(assignment);
                    }
                    readyLatch.countDown();
                    AtomicReference<Throwable> error = new AtomicReference<Throwable>();
                    long pollTimeMs = 0L;
                    do {
                        if (poll.isEmpty()) {
                            Optional.ofNullable(watermarkEstimator.get()).ifPresent(consumer::onIdle);
                        }
                        this.logConsumerWatermark(name, offsets, watermarkEstimator, poll.count());
                        poll = this.seekToNewOffsetsIfNeeded(seekOffsets, consumer, watermarkEstimator, kafka, poll);
                        long bytesPolled = 0L;
                        emptyPollCount.replaceAll((k, v) -> v + 1);
                        for (ConsumerRecord<Object, Object> consumerRecord : poll) {
                            Long end;
                            bytesPolled += (long)(consumerRecord.serializedKeySize() + consumerRecord.serializedValueSize());
                            TopicPartition tp = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
                            emptyPollCount.put(tp, 0);
                            preWrite.accept((Object)tp, consumerRecord);
                            StreamElement ingest = this.serializer.read(consumerRecord, this.getEntityDescriptor());
                            if (ingest != null) {
                                ((PartitionedWatermarkEstimator)watermarkEstimator.get()).update(Objects.requireNonNull((Integer)topicPartitionToId.get(tp)).intValue(), ingest);
                            }
                            if (log.isDebugEnabled()) {
                                log.debug("Processing element {} with {}", (Object)ingest, (Object)(ingest == null ? null : ingest.getParsed()));
                            }
                            boolean cont = consumer.consumeWithConfirm(ingest, tp, consumerRecord.offset(), watermarkEstimator.get(), error::set);
                            if (!cont) {
                                log.info("Terminating consumption by request");
                                completed.set(true);
                                shutdown.set(true);
                                break;
                            }
                            if (!stopAtCurrent || (end = endOffsets.get(tp)) == null || end - 1L > consumerRecord.offset()) continue;
                            log.debug("Reached end of partition {} at offset {}", (Object)tp, (Object)consumerRecord.offset());
                            endOffsets.remove(tp);
                        }
                        this.increaseWatermarkOnEmptyPolls(emptyPollCount, topicPartitionToId, watermarkEstimator);
                        if (!this.flushCommits(kafka, consumer)) {
                            this.handleRebalanceInOffsetCommit(kafka, listener);
                        }
                        this.rethrowErrorIfPresent(name, error);
                        this.terminateIfConsumed(stopAtCurrent, kafka, endOffsets, emptyPollCount, completed);
                        throughputLimiter.sleepToLimitThroughput(bytesPolled, pollTimeMs);
                        long startTime = System.currentTimeMillis();
                        poll = kafka.poll(pollDuration);
                        pollTimeMs = System.currentTimeMillis() - startTime;
                    } while (!shutdown.get() && !completed.get() && !Thread.currentThread().isInterrupted());
                    if (log.isDebugEnabled()) {
                        log.debug("Terminating poll loop for assignment {}: shutdown: {}, completed: {}, interrupted: {}", new Object[]{kafka.assignment(), shutdown.get(), completed.get(), Thread.currentThread().isInterrupted()});
                    }
                    if (!Thread.currentThread().isInterrupted() && !shutdown.get()) {
                        consumer.onCompleted();
                    } else {
                        consumer.onCancelled();
                    }
                    completedLatch.countDown();
                }
                catch (InterruptedException ex) {
                    log.info("Interrupted while polling kafka. Terminating consumption.", (Throwable)ex);
                    Thread.currentThread().interrupt();
                    consumer.onCancelled();
                    completedLatch.countDown();
                }
                catch (Throwable err) {
                    completedLatch.countDown();
                    log.error("Error processing consumer {}", (Object)name, (Object)err);
                    if (!consumer.onError(err) || shutdown.get()) break block27;
                    try {
                        this.submitConsumerWithObserver(name, offsets, position, stopAtCurrent, preWrite, consumer, executor, handle);
                    }
                    catch (InterruptedException ex) {
                        log.warn("Interrupted while restarting observer");
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(ex);
                    }
                }
                finally {
                    readyLatch.countDown();
                }
            }
        });
        readyLatch.await();
    }

    private void handleRebalanceInOffsetCommit(KafkaConsumer<Object, Object> kafka, ConsumerRebalanceListener listener) {
        Set<TopicPartition> assigned = kafka.assignment();
        listener.onPartitionsRevoked(assigned);
        listener.onPartitionsAssigned(assigned);
        Map<TopicPartition, OffsetAndMetadata> committed = kafka.committed(assigned);
        committed.forEach(kafka::seek);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ConsumerRecords<Object, Object> seekToNewOffsetsIfNeeded(List<TopicOffset> seekOffsets, ElementConsumer<Object, Object> consumer, AtomicReference<PartitionedWatermarkEstimator> watermarkEstimator, KafkaConsumer<Object, Object> kafka, ConsumerRecords<Object, Object> poll) {
        List<TopicOffset> list = seekOffsets;
        synchronized (list) {
            if (!seekOffsets.isEmpty()) {
                List<TopicOffset> toSeek = seekOffsets;
                Utils.seekToOffsets(toSeek, kafka);
                consumer.onAssign(kafka, kafka.assignment().stream().map(tp -> new TopicOffset(new PartitionWithTopic(tp.topic(), tp.partition()), kafka.position((TopicPartition)tp), ((PartitionedWatermarkEstimator)watermarkEstimator.get()).getWatermark())).collect(Collectors.toList()));
                log.info("Seeked consumer to offsets {} as requested", seekOffsets);
                seekOffsets.clear();
                return ConsumerRecords.empty();
            }
        }
        return poll;
    }

    private void logConsumerWatermark(@Nullable String name, @Nullable Collection<Offset> offsets, AtomicReference<PartitionedWatermarkEstimator> watermarkEstimator, int polledCount) {
        if (log.isDebugEnabled()) {
            log.debug("Current watermark of consumer name {} with offsets {} on {} poll'd records is {}", new Object[]{name, offsets, polledCount, Optional.ofNullable(watermarkEstimator.get()).map(PartitionedWatermarkEstimator::getWatermark).orElse(Long.MIN_VALUE)});
        }
    }

    private void rethrowErrorIfPresent(@Nullable String consumerName, AtomicReference<Throwable> error) {
        Throwable errorThrown = error.getAndSet(null);
        if (errorThrown != null) {
            log.warn("Error during processing {}", (Object)consumerName, (Object)errorThrown);
            throw new RuntimeException(errorThrown);
        }
    }

    private void terminateIfConsumed(boolean stopAtCurrent, KafkaConsumer<?, ?> consumer, Map<TopicPartition, Long> endOffsets, Map<TopicPartition, Integer> emptyPollCount, AtomicBoolean completed) {
        if (stopAtCurrent) {
            if (emptyPollCount.values().stream().allMatch(v -> v >= emptyPollCount.size())) {
                endOffsets.clear();
                endOffsets.putAll(this.findNonEmptyEndOffsets(consumer));
            }
            if (endOffsets.isEmpty()) {
                log.info("Assignment {} reached end of current data. Terminating consumption.", consumer.assignment());
                completed.set(true);
            }
        }
    }

    private boolean flushCommits(KafkaConsumer<Object, Object> kafka, ElementConsumer<?, ?> consumer) {
        try {
            Map<TopicPartition, OffsetAndMetadata> commitMap = consumer.prepareOffsetsForCommit();
            if (!commitMap.isEmpty()) {
                kafka.commitSync(commitMap);
            }
            return true;
        }
        catch (RebalanceInProgressException ex) {
            log.info("Caught {}. Resetting the consumer to the last committed position", (Object)ex.getClass().getSimpleName());
            return false;
        }
    }

    private void increaseWatermarkOnEmptyPolls(Map<TopicPartition, Integer> emptyPollCount, Map<TopicPartition, Integer> topicPartitionToId, AtomicReference<PartitionedWatermarkEstimator> watermarkEstimator) {
        int numEmptyPolls = emptyPollCount.size();
        emptyPollCount.entrySet().stream().filter(e -> (Integer)e.getValue() >= numEmptyPolls).forEach(e -> ((PartitionedWatermarkEstimator)watermarkEstimator.get()).idle(((Integer)topicPartitionToId.get(e.getKey())).intValue()));
    }

    private ObserveHandle createObserveHandle(final AtomicBoolean shutdown, final List<TopicOffset> seekOffsets, final ElementConsumer<?, ?> consumer, final CountDownLatch readyLatch, final CountDownLatch completedLatch) {
        return new ObserveHandle(){

            public void close() {
                shutdown.set(true);
                ExceptionUtils.ignoringInterrupted(completedLatch::await);
            }

            public List<Offset> getCommittedOffsets() {
                return consumer.getCommittedOffsets();
            }

            public void resetOffsets(List<Offset> offsets) {
                seekOffsets.addAll(offsets);
            }

            public List<Offset> getCurrentOffsets() {
                return consumer.getCurrentOffsets();
            }

            public void waitUntilReady() throws InterruptedException {
                readyLatch.await();
            }
        };
    }

    private Map<TopicPartition, Long> findNonEmptyEndOffsets(KafkaConsumer<?, ?> kafka) {
        Set<TopicPartition> assignment = kafka.assignment();
        Map<TopicPartition, Long> beginning = kafka.beginningOffsets(assignment);
        return kafka.endOffsets(assignment).entrySet().stream().filter(entry -> (Long)beginning.get(entry.getKey()) < (Long)entry.getValue()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    private KafkaConsumer<Object, Object> createConsumer() {
        return this.createConsumer(UUID.randomUUID().toString(), null, null, Position.NEWEST);
    }

    @VisibleForTesting
    KafkaConsumer<Object, Object> createConsumer(@Nullable String name, @Nullable Collection<Offset> offsets, @Nullable ConsumerRebalanceListener listener, Position position) {
        KafkaConsumer<Object, Object> consumer;
        Preconditions.checkArgument((name != null || listener == null ? 1 : 0) != 0, (Object)"Please use either named group (with listener) or offsets without listener");
        KafkaConsumerFactory factory = this.accessor.createConsumerFactory();
        if ("".equals(name)) {
            throw new IllegalArgumentException("Consumer group cannot be empty string");
        }
        if (name != null) {
            consumer = factory.create(name, position, listener);
        } else if (offsets != null) {
            List<Partition> partitions = offsets.stream().map(Offset::getPartition).collect(Collectors.toList());
            consumer = factory.create(position, partitions);
        } else {
            throw new IllegalArgumentException("Need either name or offsets to observe");
        }
        if (!this.accessor.isTopicRegex()) {
            this.validateTopic(consumer, this.accessor.getTopic());
        }
        if (position == Position.OLDEST) {
            if (offsets == null) {
                Set<TopicPartition> assignment;
                Map<TopicPartition, OffsetAndMetadata> committed;
                boolean emptyPoll = true;
                if (consumer.assignment().isEmpty()) {
                    emptyPoll = consumer.poll(Duration.ofMillis(this.accessor.getAssignmentTimeoutMillis())).isEmpty();
                }
                if ((committed = consumer.committed(assignment = consumer.assignment())).values().stream().allMatch(Objects::isNull)) {
                    log.info("Seeking consumer name {} to beginning of partitions {}", (Object)name, assignment);
                    consumer.seekToBeginning(assignment);
                } else if (!emptyPoll) {
                    log.info("Seeking consumer name {} to committed offsets {}", (Object)name, committed);
                    committed.forEach(consumer::seek);
                }
            } else {
                List<TopicPartition> tps = offsets.stream().map(TopicOffset.class::cast).map(p -> new TopicPartition(p.getPartition().getTopic(), p.getPartition().getId())).collect(Collectors.toList());
                log.info("Seeking given partitions {} to the beginning", tps);
                consumer.seekToBeginning(tps);
            }
        } else if (position == Position.CURRENT) {
            Preconditions.checkArgument((offsets != null ? 1 : 0) != 0, (String)"Please use %s only with specified offsets", (Object)position);
            log.info("Seeking to given offsets {}", offsets);
            Utils.seekToOffsets(offsets, consumer);
        } else {
            log.info("Starting to process kafka partitions from newest data");
        }
        return consumer;
    }

    @VisibleForTesting
    void validateTopic(KafkaConsumer<?, ?> consumer, String topicToValidate) {
        List<PartitionInfo> partitions = consumer.partitionsFor(topicToValidate);
        Preconditions.checkArgument((partitions != null && !partitions.isEmpty() ? 1 : 0) != 0, (String)"Received null or empty partitions for topic [%s]. Please check that the topic exists and has at least one partition.", (Object)topicToValidate);
    }

    public boolean hasExternalizableOffsets() {
        return true;
    }

    public OffsetExternalizer getOffsetExternalizer() {
        return new TopicOffsetExternalizer();
    }

    public CommitLogReader.Factory asFactory() {
        KafkaAccessor accessor = this.accessor;
        Context context = this.context;
        return (CommitLogReader.Factory & Serializable)repo -> new KafkaLogReader(accessor, context);
    }

    private static Collection<Offset> createDefaultOffsets(Collection<Partition> partitions) {
        if (partitions != null) {
            return partitions.stream().map(p -> new TopicOffset((PartitionWithTopic)p, -1L, Long.MIN_VALUE)).collect(Collectors.toList());
        }
        return null;
    }

    private static ObserveHandle dynamicHandle(final AtomicReference<ObserveHandle> proxy) {
        return new ObserveHandle(){

            public void close() {
                ((ObserveHandle)proxy.get()).close();
            }

            public List<Offset> getCommittedOffsets() {
                return ((ObserveHandle)proxy.get()).getCommittedOffsets();
            }

            public void resetOffsets(List<Offset> offsets) {
                ((ObserveHandle)proxy.get()).resetOffsets(offsets);
            }

            public List<Offset> getCurrentOffsets() {
                return ((ObserveHandle)proxy.get()).getCurrentOffsets();
            }

            public void waitUntilReady() throws InterruptedException {
                ((ObserveHandle)proxy.get()).waitUntilReady();
            }
        };
    }

    private OffsetCommitter<TopicPartition> createOffsetCommitter() {
        return new OffsetCommitter<TopicPartition>(this.accessor.getLogStaleCommitIntervalMs(), this.accessor.getAutoCommitIntervalMs());
    }

    private ConsumerRebalanceListener listener(final String name, final AtomicReference<KafkaConsumer<Object, Object>> kafka, final ElementConsumer<Object, Object> consumer, final Map<TopicPartition, Integer> emptyPollCount, final Map<TopicPartition, Integer> topicPartitionToId, final AtomicReference<PartitionedWatermarkEstimator> watermarkEstimator) {
        return new ConsumerRebalanceListener(){
            private final Set<TopicPartition> currentlyAssigned = new HashSet<TopicPartition>();

            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> parts) {
                this.currentlyAssigned.removeAll(parts);
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> parts) {
                this.currentlyAssigned.addAll(parts);
                log.info("Consumer {} has assigned partitions {}", (Object)name, this.currentlyAssigned);
                emptyPollCount.clear();
                topicPartitionToId.clear();
                AtomicInteger id = new AtomicInteger();
                this.currentlyAssigned.forEach(p -> {
                    topicPartitionToId.put(p, id.getAndIncrement());
                    emptyPollCount.put(p, 0);
                });
                if (this.currentlyAssigned.isEmpty()) {
                    watermarkEstimator.set(KafkaLogReader.createWatermarkEstimatorForEmptyParts());
                } else {
                    watermarkEstimator.set(new MinimalPartitionWatermarkEstimator(this.currentlyAssigned.stream().collect(Collectors.toMap(topicPartitionToId::get, item -> this.createWatermarkEstimator()))));
                }
                Optional.ofNullable((KafkaConsumer)kafka.get()).ifPresent(c -> consumer.onAssign(c, (Collection<TopicOffset>)(name != null ? this.getCommittedTopicOffsets((Collection<TopicPartition>)this.currentlyAssigned, (KafkaConsumer<Object, Object>)c) : this.getCurrentTopicOffsets((Collection<TopicPartition>)this.currentlyAssigned, (KafkaConsumer<Object, Object>)c))));
            }

            List<TopicOffset> getCurrentTopicOffsets(Collection<TopicPartition> parts, KafkaConsumer<Object, Object> c) {
                return parts.stream().map(tp -> new TopicOffset(new PartitionWithTopic(tp.topic(), tp.partition()), c.position((TopicPartition)tp), ((PartitionedWatermarkEstimator)watermarkEstimator.get()).getWatermark())).collect(Collectors.toList());
            }

            List<TopicOffset> getCommittedTopicOffsets(Collection<TopicPartition> parts, KafkaConsumer<Object, Object> c) {
                HashMap<TopicPartition, OffsetAndMetadata> committed = new HashMap<TopicPartition, OffsetAndMetadata>(c.committed(new HashSet<TopicPartition>(parts)));
                for (TopicPartition tp : parts) {
                    committed.putIfAbsent(tp, null);
                }
                return committed.entrySet().stream().map(entry -> {
                    long offset = entry.getValue() == null ? 0L : ((OffsetAndMetadata)entry.getValue()).offset();
                    return new TopicOffset(new PartitionWithTopic(((TopicPartition)entry.getKey()).topic(), ((TopicPartition)entry.getKey()).partition()), offset, ((PartitionedWatermarkEstimator)watermarkEstimator.get()).getWatermark());
                }).collect(Collectors.toList());
            }

            private WatermarkEstimator createWatermarkEstimator() {
                return KafkaLogReader.this.accessor.getWatermarkConfiguration().getWatermarkEstimatorFactory().create();
            }
        };
    }

    private static PartitionedWatermarkEstimator createWatermarkEstimatorForEmptyParts() {
        return (PartitionedWatermarkEstimator & Serializable)() -> Long.MAX_VALUE;
    }

    @Generated
    public KafkaAccessor getAccessor() {
        return this.accessor;
    }

    @Generated
    public Context getContext() {
        return this.context;
    }
}

