package cz.o2.proxima.direct.kafka;

import cz.o2.proxima.direct.commitlog.CommitLogObserver;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.ObserveHandle;
import cz.o2.proxima.direct.commitlog.Offset;
import cz.o2.proxima.direct.commitlog.OffsetExternalizer;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.kafka.ElementConsumers;
import cz.o2.proxima.direct.time.MinimalPartitionWatermarkEstimator;
import cz.o2.proxima.functional.BiConsumer;
import cz.o2.proxima.functional.Factory;
import cz.o2.proxima.internal.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.shaded.com.google.common.base.Preconditions;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecords;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.PartitionInfo;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.TopicPartition;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.errors.RebalanceInProgressException;
import cz.o2.proxima.storage.AbstractStorage;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.time.PartitionedWatermarkEstimator;
import cz.o2.proxima.time.WatermarkEstimator;
import cz.o2.proxima.time.WatermarkSupplier;
import cz.o2.proxima.util.ExceptionUtils;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/direct/kafka/KafkaLogReader.class */
public class KafkaLogReader extends AbstractStorage implements CommitLogReader {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaLogReader.class);
    final KafkaAccessor accessor;
    private final Context context;
    private final long consumerPollInterval;
    private final long maxBytesPerSec;
    private final Map<String, Object> cfg;
    private final ElementSerializer<Object, Object> serializer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaLogReader(KafkaAccessor kafkaAccessor, Context context) {
        super(kafkaAccessor.getEntityDescriptor(), kafkaAccessor.getUri());
        this.accessor = kafkaAccessor;
        this.context = context;
        this.consumerPollInterval = kafkaAccessor.getConsumerPollInterval();
        this.maxBytesPerSec = kafkaAccessor.getMaxBytesPerSec();
        this.cfg = kafkaAccessor.getCfg();
        this.serializer = kafkaAccessor.getSerializer();
        log.debug("Created {} for accessor {}", getClass().getSimpleName(), kafkaAccessor);
    }

    public boolean restoresSequentialIds() {
        return this.serializer.storesSequentialId();
    }

    public ObserveHandle observe(String str, Position position, CommitLogObserver commitLogObserver) {
        return observeKafka(str, null, position, false, commitLogObserver);
    }

    public ObserveHandle observePartitions(String str, @Nullable Collection<Partition> collection, Position position, boolean z, CommitLogObserver commitLogObserver) {
        return observeKafka(null, collection, position, z, commitLogObserver);
    }

    public ObserveHandle observeBulk(String str, Position position, boolean z, CommitLogObserver commitLogObserver) {
        return observeKafkaBulk(str, null, position, z, commitLogObserver);
    }

    public ObserveHandle observeBulkPartitions(String str, Collection<Partition> collection, Position position, boolean z, CommitLogObserver commitLogObserver) {
        return observeKafkaBulk(null, createDefaultOffsets(collection), position, z, commitLogObserver);
    }

    public ObserveHandle observeBulkOffsets(Collection<Offset> collection, boolean z, CommitLogObserver commitLogObserver) {
        return observeKafkaBulk(null, collection, Position.CURRENT, z, commitLogObserver);
    }

    public List<Partition> getPartitions() {
        if (this.accessor.isTopicRegex()) {
            throw new UnsupportedOperationException(String.format("Partitions of URI %s are unstable and should not be used.", getUri()));
        }
        KafkaConsumer<Object, Object> createConsumer = createConsumer();
        Throwable th = null;
        try {
            List<Partition> list = (List) createConsumer.partitionsFor(this.accessor.getTopic()).stream().map(partitionInfo -> {
                return new PartitionWithTopic(partitionInfo.topic(), partitionInfo.partition());
            }).collect(Collectors.toList());
            if (createConsumer != null) {
                if (0 != 0) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    createConsumer.close();
                }
            }
            return list;
        } catch (Throwable th3) {
            if (createConsumer != null) {
                if (0 != 0) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createConsumer.close();
                }
            }
            throw th3;
        }
    }

    public Map<Partition, Offset> fetchOffsets(Position position, List<Partition> list) {
        Preconditions.checkArgument(position == Position.NEWEST || position == Position.OLDEST, "Position %s does not have well defined offsets.", position);
        Stream<Partition> stream = list.stream();
        Class<PartitionWithTopic> cls = PartitionWithTopic.class;
        PartitionWithTopic.class.getClass();
        List list2 = (List) stream.map((v1) -> {
            return r1.cast(v1);
        }).map(partitionWithTopic -> {
            return new TopicPartition(partitionWithTopic.getTopic(), partitionWithTopic.getPartition());
        }).collect(Collectors.toList());
        KafkaConsumer<Object, Object> createConsumer = createConsumer();
        Throwable th = null;
        try {
            return (Map) (position == Position.OLDEST ? createConsumer.beginningOffsets(list2) : createConsumer.endOffsets(list2)).entrySet().stream().collect(Collectors.toMap(entry -> {
                return new PartitionWithTopic(((TopicPartition) entry.getKey()).topic(), ((TopicPartition) entry.getKey()).partition());
            }, entry2 -> {
                return new TopicOffset(new PartitionWithTopic(((TopicPartition) entry2.getKey()).topic(), ((TopicPartition) entry2.getKey()).partition()), ((Long) entry2.getValue()).longValue(), Long.MIN_VALUE);
            }));
        } finally {
            if (createConsumer != null) {
                if (0 != 0) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    createConsumer.close();
                }
            }
        }
    }

    @VisibleForTesting
    ObserveHandle observeKafka(@Nullable String str, @Nullable Collection<Partition> collection, Position position, boolean z, CommitLogObserver commitLogObserver) {
        Preconditions.checkArgument((str == null && collection == null) ? false : true, "Either name or offsets have to be non null");
        Preconditions.checkArgument(!this.accessor.isTopicRegex() || collection == null, "Regex URI %s cannot observe specific partitions, because these cannot be made stable.", getUri());
        try {
            return processConsumer(str, createDefaultOffsets(collection), position, z, str != null, commitLogObserver, this.context.getExecutorService());
        } catch (InterruptedException e) {
            log.warn("Interrupted waiting for kafka observer to start", (Throwable) e);
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    private ObserveHandle observeKafkaBulk(@Nullable String str, @Nullable Collection<Offset> collection, Position position, boolean z, CommitLogObserver commitLogObserver) {
        Preconditions.checkArgument((str == null && collection == null) ? false : true, "Either name or offsets have to be non null");
        Preconditions.checkArgument(position != null, "Position cannot be null");
        Preconditions.checkArgument(!this.accessor.isTopicRegex() || collection == null, "Regex URI %s cannot observe specific offsets, because these cannot be made stable.", getUri());
        try {
            return processConsumerBulk(str, collection, position, z, str != null, commitLogObserver, this.context.getExecutorService());
        } catch (InterruptedException e) {
            log.warn("Interrupted waiting for kafka observer to start", (Throwable) e);
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    ObserveHandle processConsumer(@Nullable String str, @Nullable Collection<Offset> collection, Position position, boolean z, boolean z2, CommitLogObserver commitLogObserver, ExecutorService executorService) throws InterruptedException {
        Map synchronizedMap = Collections.synchronizedMap(new HashMap());
        OffsetCommitter<TopicPartition> createOffsetCommitter = createOffsetCommitter();
        BiConsumer<TopicPartition, ConsumerRecord<Object, Object>> biConsumer = (topicPartition, consumerRecord) -> {
            long offset = consumerRecord.offset();
            createOffsetCommitter.register(topicPartition, offset, 1, () -> {
                OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset + 1);
                if (z2) {
                    synchronizedMap.put(topicPartition, offsetAndMetadata);
                }
            });
        };
        ElementConsumers.OnlineConsumer onlineConsumer = new ElementConsumers.OnlineConsumer(commitLogObserver, createOffsetCommitter, () -> {
            HashMap hashMap;
            synchronized (synchronizedMap) {
                hashMap = new HashMap(synchronizedMap);
                synchronizedMap.clear();
            }
            return hashMap;
        });
        AtomicReference<ObserveHandle> atomicReference = new AtomicReference<>();
        submitConsumerWithObserver(str, collection, position, z, biConsumer, onlineConsumer, executorService, atomicReference);
        return dynamicHandle(atomicReference);
    }

    @VisibleForTesting
    ObserveHandle processConsumerBulk(@Nullable String str, @Nullable Collection<Offset> collection, Position position, boolean z, boolean z2, CommitLogObserver commitLogObserver, ExecutorService executorService) throws InterruptedException {
        Map synchronizedMap = Collections.synchronizedMap(new HashMap());
        BiConsumer biConsumer = (topicPartition, l) -> {
            if (z2) {
                synchronizedMap.put(topicPartition, new OffsetAndMetadata(l.longValue()));
            }
        };
        Factory factory = () -> {
            HashMap hashMap;
            synchronized (synchronizedMap) {
                hashMap = new HashMap(synchronizedMap);
                synchronizedMap.clear();
            }
            return hashMap;
        };
        synchronizedMap.getClass();
        ElementConsumers.BulkConsumer bulkConsumer = new ElementConsumers.BulkConsumer(commitLogObserver, biConsumer, factory, synchronizedMap::clear);
        AtomicReference<ObserveHandle> atomicReference = new AtomicReference<>();
        submitConsumerWithObserver(str, collection, position, z, (topicPartition2, consumerRecord) -> {
        }, bulkConsumer, executorService, atomicReference);
        return dynamicHandle(atomicReference);
    }

    private void submitConsumerWithObserver(@Nullable String str, @Nullable Collection<Offset> collection, Position position, boolean z, BiConsumer<TopicPartition, ConsumerRecord<Object, Object>> biConsumer, ElementConsumer<Object, Object> elementConsumer, ExecutorService executorService, AtomicReference<ObserveHandle> atomicReference) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        Preconditions.checkArgument((this.accessor.isTopicRegex() && z) ? false : true, "Cannot use stopAtCurrent with regex URI");
        executorService.submit(() -> {
            ConsumerRecords<Object, Object> poll;
            Map<TopicPartition, Long> findNonEmptyEndOffsets;
            Long l;
            AtomicReference<KafkaConsumer<Object, Object>> atomicReference2 = new AtomicReference<>();
            AtomicReference<PartitionedWatermarkEstimator> atomicReference3 = new AtomicReference<>(null);
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            HashMap hashMap = new HashMap();
            Duration ofMillis = Duration.ofMillis(this.consumerPollInterval);
            KafkaThroughputLimiter kafkaThroughputLimiter = new KafkaThroughputLimiter(this.maxBytesPerSec);
            atomicReference.set(createObserveHandle(atomicBoolean2, synchronizedList, elementConsumer, countDownLatch, countDownLatch2));
            elementConsumer.onStart();
            ConsumerRebalanceListener listener = listener(str, atomicReference2, elementConsumer, concurrentHashMap, hashMap, atomicReference3);
            try {
                try {
                    try {
                        KafkaConsumer<Object, Object> createConsumer = createConsumer(str, collection, str != null ? listener : null, position);
                        Throwable th = null;
                        try {
                            try {
                                atomicReference2.set(createConsumer);
                                do {
                                    poll = createConsumer.poll(ofMillis);
                                    findNonEmptyEndOffsets = z ? findNonEmptyEndOffsets(createConsumer) : null;
                                    if (log.isDebugEnabled()) {
                                        log.debug("End offsets of current assignment {}: {}", createConsumer.assignment(), findNonEmptyEndOffsets);
                                    }
                                    if (!poll.isEmpty() || !this.accessor.isTopicRegex() || !createConsumer.assignment().isEmpty() || atomicBoolean2.get()) {
                                        break;
                                    }
                                } while (!Thread.currentThread().isInterrupted());
                                Set<TopicPartition> assignment = createConsumer.assignment();
                                if (!assignment.isEmpty()) {
                                    listener.onPartitionsRevoked(assignment);
                                    listener.onPartitionsAssigned(assignment);
                                }
                                countDownLatch.countDown();
                                AtomicReference<Throwable> atomicReference4 = new AtomicReference<>();
                                long j = 0;
                                do {
                                    if (poll.isEmpty()) {
                                        Optional ofNullable = Optional.ofNullable(atomicReference3.get());
                                        elementConsumer.getClass();
                                        ofNullable.ifPresent((v1) -> {
                                            r1.onIdle(v1);
                                        });
                                    }
                                    logConsumerWatermark(str, collection, atomicReference3, poll.count());
                                    ConsumerRecords<Object, Object> seekToNewOffsetsIfNeeded = seekToNewOffsetsIfNeeded(synchronizedList, elementConsumer, atomicReference3, createConsumer, poll);
                                    long j2 = 0;
                                    concurrentHashMap.replaceAll((topicPartition, num) -> {
                                        return Integer.valueOf(num.intValue() + 1);
                                    });
                                    Iterator<ConsumerRecord<Object, Object>> it = seekToNewOffsetsIfNeeded.iterator();
                                    while (true) {
                                        if (!it.hasNext()) {
                                            break;
                                        }
                                        ConsumerRecord<Object, Object> next = it.next();
                                        j2 += next.serializedKeySize() + next.serializedValueSize();
                                        TopicPartition topicPartition2 = new TopicPartition(next.topic(), next.partition());
                                        concurrentHashMap.put(topicPartition2, 0);
                                        biConsumer.accept(topicPartition2, next);
                                        StreamElement read = this.serializer.read(next, getEntityDescriptor());
                                        if (read != null) {
                                            atomicReference3.get().update(((Integer) Objects.requireNonNull(hashMap.get(topicPartition2))).intValue(), read);
                                        }
                                        long offset = next.offset();
                                        WatermarkSupplier watermarkSupplier = (WatermarkSupplier) atomicReference3.get();
                                        atomicReference4.getClass();
                                        if (!elementConsumer.consumeWithConfirm(read, topicPartition2, offset, watermarkSupplier, (v1) -> {
                                            r5.set(v1);
                                        })) {
                                            log.info("Terminating consumption by request");
                                            atomicBoolean.set(true);
                                            atomicBoolean2.set(true);
                                            break;
                                        } else if (z && (l = findNonEmptyEndOffsets.get(topicPartition2)) != null && l.longValue() - 1 <= next.offset()) {
                                            log.debug("Reached end of partition {} at offset {}", topicPartition2, Long.valueOf(next.offset()));
                                            findNonEmptyEndOffsets.remove(topicPartition2);
                                        }
                                    }
                                    increaseWatermarkOnEmptyPolls(concurrentHashMap, hashMap, atomicReference3);
                                    if (!flushCommits(createConsumer, elementConsumer)) {
                                        handleRebalanceInOffsetCommit(createConsumer, listener);
                                    }
                                    rethrowErrorIfPresent(str, atomicReference4);
                                    terminateIfConsumed(z, createConsumer, findNonEmptyEndOffsets, concurrentHashMap, atomicBoolean);
                                    kafkaThroughputLimiter.sleepToLimitThroughput(j2, j);
                                    long currentTimeMillis = System.currentTimeMillis();
                                    poll = createConsumer.poll(ofMillis);
                                    j = System.currentTimeMillis() - currentTimeMillis;
                                    if (atomicBoolean2.get() || atomicBoolean.get()) {
                                        break;
                                    }
                                } while (!Thread.currentThread().isInterrupted());
                                if (log.isDebugEnabled()) {
                                    log.debug("Terminating poll loop for assignment {}: shutdown: {}, completed: {}, interrupted: {}", createConsumer.assignment(), Boolean.valueOf(atomicBoolean2.get()), Boolean.valueOf(atomicBoolean.get()), Boolean.valueOf(Thread.currentThread().isInterrupted()));
                                }
                                if (Thread.currentThread().isInterrupted() || atomicBoolean2.get()) {
                                    elementConsumer.onCancelled();
                                } else {
                                    elementConsumer.onCompleted();
                                }
                                countDownLatch2.countDown();
                                if (createConsumer != null) {
                                    if (0 != 0) {
                                        try {
                                            createConsumer.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        createConsumer.close();
                                    }
                                }
                                countDownLatch.countDown();
                            } catch (Throwable th3) {
                                th = th3;
                                throw th3;
                            }
                        } catch (Throwable th4) {
                            if (createConsumer != null) {
                                if (th != null) {
                                    try {
                                        createConsumer.close();
                                    } catch (Throwable th5) {
                                        th.addSuppressed(th5);
                                    }
                                } else {
                                    createConsumer.close();
                                }
                            }
                            throw th4;
                        }
                    } catch (Throwable th6) {
                        countDownLatch2.countDown();
                        log.error("Error processing consumer {}", str, th6);
                        if (elementConsumer.onError(th6)) {
                            try {
                                submitConsumerWithObserver(str, collection, position, z, biConsumer, elementConsumer, executorService, atomicReference);
                            } catch (InterruptedException e) {
                                log.warn("Interrupted while restarting observer");
                                Thread.currentThread().interrupt();
                                throw new RuntimeException(e);
                            }
                        }
                        countDownLatch.countDown();
                    }
                } catch (InterruptedException e2) {
                    log.info("Interrupted while polling kafka. Terminating consumption.", (Throwable) e2);
                    Thread.currentThread().interrupt();
                    elementConsumer.onCancelled();
                    countDownLatch2.countDown();
                    countDownLatch.countDown();
                }
            } catch (Throwable th7) {
                countDownLatch.countDown();
                throw th7;
            }
        });
        countDownLatch.await();
    }

    private void handleRebalanceInOffsetCommit(KafkaConsumer<Object, Object> kafkaConsumer, ConsumerRebalanceListener consumerRebalanceListener) {
        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        consumerRebalanceListener.onPartitionsRevoked(assignment);
        consumerRebalanceListener.onPartitionsAssigned(assignment);
        Map<TopicPartition, OffsetAndMetadata> committed = kafkaConsumer.committed(assignment);
        kafkaConsumer.getClass();
        committed.forEach(kafkaConsumer::seek);
    }

    private ConsumerRecords<Object, Object> seekToNewOffsetsIfNeeded(List<TopicOffset> list, ElementConsumer<Object, Object> elementConsumer, AtomicReference<PartitionedWatermarkEstimator> atomicReference, KafkaConsumer<Object, Object> kafkaConsumer, ConsumerRecords<Object, Object> consumerRecords) {
        synchronized (list) {
            if (list.isEmpty()) {
                return consumerRecords;
            }
            Utils.seekToOffsets(list, kafkaConsumer);
            elementConsumer.onAssign(kafkaConsumer, (Collection) kafkaConsumer.assignment().stream().map(topicPartition -> {
                return new TopicOffset(new PartitionWithTopic(topicPartition.topic(), topicPartition.partition()), kafkaConsumer.position(topicPartition), ((PartitionedWatermarkEstimator) atomicReference.get()).getWatermark());
            }).collect(Collectors.toList()));
            log.info("Seeked consumer to offsets {} as requested", list);
            list.clear();
            return ConsumerRecords.empty();
        }
    }

    private void logConsumerWatermark(@Nullable String str, @Nullable Collection<Offset> collection, AtomicReference<PartitionedWatermarkEstimator> atomicReference, int i) {
        if (log.isDebugEnabled()) {
            log.debug("Current watermark of consumer name {} with offsets {} on {} poll'd records is {}", str, collection, Integer.valueOf(i), Optional.ofNullable(atomicReference.get()).map((v0) -> {
                return v0.getWatermark();
            }).orElse(Long.MIN_VALUE));
        }
    }

    private void rethrowErrorIfPresent(@Nullable String str, AtomicReference<Throwable> atomicReference) {
        Throwable andSet = atomicReference.getAndSet(null);
        if (andSet != null) {
            log.warn("Error during processing {}", str, andSet);
            throw new RuntimeException(andSet);
        }
    }

    private void terminateIfConsumed(boolean z, KafkaConsumer<?, ?> kafkaConsumer, Map<TopicPartition, Long> map, Map<TopicPartition, Integer> map2, AtomicBoolean atomicBoolean) {
        if (z) {
            if (map2.values().stream().allMatch(num -> {
                return num.intValue() >= map2.size();
            })) {
                map.clear();
                map.putAll(findNonEmptyEndOffsets(kafkaConsumer));
            }
            if (map.isEmpty()) {
                log.info("Assignment {} reached end of current data. Terminating consumption.", kafkaConsumer.assignment());
                atomicBoolean.set(true);
            }
        }
    }

    private boolean flushCommits(KafkaConsumer<Object, Object> kafkaConsumer, ElementConsumer<?, ?> elementConsumer) {
        try {
            Map<TopicPartition, OffsetAndMetadata> prepareOffsetsForCommit = elementConsumer.prepareOffsetsForCommit();
            if (prepareOffsetsForCommit.isEmpty()) {
                return true;
            }
            kafkaConsumer.commitSync(prepareOffsetsForCommit);
            return true;
        } catch (RebalanceInProgressException e) {
            log.info("Caught {}. Resetting the consumer to the last committed position", e.getClass().getSimpleName());
            return false;
        }
    }

    private void increaseWatermarkOnEmptyPolls(Map<TopicPartition, Integer> map, Map<TopicPartition, Integer> map2, AtomicReference<PartitionedWatermarkEstimator> atomicReference) {
        int size = map.size();
        map.entrySet().stream().filter(entry -> {
            return ((Integer) entry.getValue()).intValue() >= size;
        }).forEach(entry2 -> {
            ((PartitionedWatermarkEstimator) atomicReference.get()).idle(((Integer) map2.get(entry2.getKey())).intValue());
        });
    }

    private ObserveHandle createObserveHandle(final AtomicBoolean atomicBoolean, final List<TopicOffset> list, final ElementConsumer<?, ?> elementConsumer, final CountDownLatch countDownLatch, final CountDownLatch countDownLatch2) {
        return new ObserveHandle() { // from class: cz.o2.proxima.direct.kafka.KafkaLogReader.1
            public void close() {
                atomicBoolean.set(true);
                CountDownLatch countDownLatch3 = countDownLatch2;
                countDownLatch3.getClass();
                ExceptionUtils.ignoringInterrupted(countDownLatch3::await);
            }

            public List<Offset> getCommittedOffsets() {
                return elementConsumer.getCommittedOffsets();
            }

            public void resetOffsets(List<Offset> list2) {
                list.addAll(list2);
            }

            public List<Offset> getCurrentOffsets() {
                return elementConsumer.getCurrentOffsets();
            }

            public void waitUntilReady() throws InterruptedException {
                countDownLatch.await();
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case 93223254:
                        if (implMethodName.equals("await")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/util/ExceptionUtils$ThrowingRunnable") && serializedLambda.getFunctionalInterfaceMethodName().equals("run") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("java/util/concurrent/CountDownLatch") && serializedLambda.getImplMethodSignature().equals("()V")) {
                            CountDownLatch countDownLatch3 = (CountDownLatch) serializedLambda.getCapturedArg(0);
                            return countDownLatch3::await;
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        };
    }

    private Map<TopicPartition, Long> findNonEmptyEndOffsets(KafkaConsumer<?, ?> kafkaConsumer) {
        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        Map<TopicPartition, Long> beginningOffsets = kafkaConsumer.beginningOffsets(assignment);
        return (Map) kafkaConsumer.endOffsets(assignment).entrySet().stream().filter(entry -> {
            return ((Long) beginningOffsets.get(entry.getKey())).longValue() < ((Long) entry.getValue()).longValue();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private KafkaConsumer<Object, Object> createConsumer() {
        return createConsumer(UUID.randomUUID().toString(), null, null, Position.NEWEST);
    }

    @VisibleForTesting
    KafkaConsumer<Object, Object> createConsumer(@Nullable String str, @Nullable Collection<Offset> collection, @Nullable ConsumerRebalanceListener consumerRebalanceListener, Position position) {
        KafkaConsumer<?, ?> create;
        Preconditions.checkArgument(str != null || consumerRebalanceListener == null, "Please use either named group (with listener) or offsets without listener");
        KafkaConsumerFactory createConsumerFactory = this.accessor.createConsumerFactory();
        if ("".equals(str)) {
            throw new IllegalArgumentException("Consumer group cannot be empty string");
        }
        if (str != null) {
            create = createConsumerFactory.create(str, position, consumerRebalanceListener);
        } else {
            if (collection == null) {
                throw new IllegalArgumentException("Need either name or offsets to observe");
            }
            create = createConsumerFactory.create(position, (List) collection.stream().map((v0) -> {
                return v0.getPartition();
            }).collect(Collectors.toList()));
        }
        if (!this.accessor.isTopicRegex()) {
            validateTopic(create, this.accessor.getTopic());
        }
        if (position == Position.OLDEST) {
            if (collection == null) {
                boolean z = true;
                if (create.assignment().isEmpty()) {
                    z = create.poll(Duration.ofMillis(this.accessor.getAssignmentTimeoutMillis())).isEmpty();
                }
                Set<TopicPartition> assignment = create.assignment();
                Map<TopicPartition, OffsetAndMetadata> committed = create.committed(assignment);
                if (committed.values().stream().allMatch((v0) -> {
                    return Objects.isNull(v0);
                })) {
                    log.info("Seeking consumer name {} to beginning of partitions {}", str, assignment);
                    create.seekToBeginning(assignment);
                } else if (!z) {
                    log.info("Seeking consumer name {} to committed offsets {}", str, committed);
                    KafkaConsumer<?, ?> kafkaConsumer = create;
                    kafkaConsumer.getClass();
                    committed.forEach(kafkaConsumer::seek);
                }
            } else {
                Stream<Offset> stream = collection.stream();
                Class<TopicOffset> cls = TopicOffset.class;
                TopicOffset.class.getClass();
                List list = (List) stream.map((v1) -> {
                    return r1.cast(v1);
                }).map(topicOffset -> {
                    return new TopicPartition(topicOffset.m56getPartition().getTopic(), topicOffset.m56getPartition().getId());
                }).collect(Collectors.toList());
                log.info("Seeking given partitions {} to the beginning", list);
                create.seekToBeginning(list);
            }
        } else if (position == Position.CURRENT) {
            Preconditions.checkArgument(collection != null, "Please use %s only with specified offsets", position);
            log.info("Seeking to given offsets {}", collection);
            Utils.seekToOffsets(collection, create);
        } else {
            log.info("Starting to process kafka partitions from newest data");
        }
        return create;
    }

    @VisibleForTesting
    void validateTopic(KafkaConsumer<?, ?> kafkaConsumer, String str) {
        List<PartitionInfo> partitionsFor = kafkaConsumer.partitionsFor(str);
        Preconditions.checkArgument((partitionsFor == null || partitionsFor.isEmpty()) ? false : true, "Received null or empty partitions for topic [%s]. Please check that the topic exists and has at least one partition.", str);
    }

    public boolean hasExternalizableOffsets() {
        return true;
    }

    public OffsetExternalizer getOffsetExternalizer() {
        return new TopicOffsetExternalizer();
    }

    public CommitLogReader.Factory asFactory() {
        KafkaAccessor kafkaAccessor = this.accessor;
        Context context = this.context;
        return obj -> {
            return new KafkaLogReader(kafkaAccessor, context);
        };
    }

    private static Collection<Offset> createDefaultOffsets(Collection<Partition> collection) {
        if (collection != null) {
            return (Collection) collection.stream().map(partition -> {
                return new TopicOffset((PartitionWithTopic) partition, -1L, Long.MIN_VALUE);
            }).collect(Collectors.toList());
        }
        return null;
    }

    private static ObserveHandle dynamicHandle(final AtomicReference<ObserveHandle> atomicReference) {
        return new ObserveHandle() { // from class: cz.o2.proxima.direct.kafka.KafkaLogReader.2
            public void close() {
                ((ObserveHandle) atomicReference.get()).close();
            }

            public List<Offset> getCommittedOffsets() {
                return ((ObserveHandle) atomicReference.get()).getCommittedOffsets();
            }

            public void resetOffsets(List<Offset> list) {
                ((ObserveHandle) atomicReference.get()).resetOffsets(list);
            }

            public List<Offset> getCurrentOffsets() {
                return ((ObserveHandle) atomicReference.get()).getCurrentOffsets();
            }

            public void waitUntilReady() throws InterruptedException {
                ((ObserveHandle) atomicReference.get()).waitUntilReady();
            }
        };
    }

    private OffsetCommitter<TopicPartition> createOffsetCommitter() {
        return new OffsetCommitter<>(this.accessor.getLogStaleCommitIntervalNs(), this.accessor.getAutoCommitIntervalNs());
    }

    private ConsumerRebalanceListener listener(final String str, final AtomicReference<KafkaConsumer<Object, Object>> atomicReference, final ElementConsumer<Object, Object> elementConsumer, final Map<TopicPartition, Integer> map, final Map<TopicPartition, Integer> map2, final AtomicReference<PartitionedWatermarkEstimator> atomicReference2) {
        return new ConsumerRebalanceListener() { // from class: cz.o2.proxima.direct.kafka.KafkaLogReader.3
            private final Set<TopicPartition> currentlyAssigned = new HashSet();

            @Override // cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRebalanceListener
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                this.currentlyAssigned.removeAll(collection);
            }

            @Override // cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRebalanceListener
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                this.currentlyAssigned.addAll(collection);
                KafkaLogReader.log.info("Consumer {} has assigned partitions {}", str, this.currentlyAssigned);
                map.clear();
                map2.clear();
                AtomicInteger atomicInteger = new AtomicInteger();
                Set<TopicPartition> set = this.currentlyAssigned;
                Map map3 = map2;
                Map map4 = map;
                set.forEach(topicPartition -> {
                    map3.put(topicPartition, Integer.valueOf(atomicInteger.getAndIncrement()));
                    map4.put(topicPartition, 0);
                });
                if (this.currentlyAssigned.isEmpty()) {
                    atomicReference2.set(KafkaLogReader.access$100());
                } else {
                    AtomicReference atomicReference3 = atomicReference2;
                    Stream<TopicPartition> stream = this.currentlyAssigned.stream();
                    Map map5 = map2;
                    map5.getClass();
                    atomicReference3.set(new MinimalPartitionWatermarkEstimator((Map) stream.collect(Collectors.toMap((v1) -> {
                        return r4.get(v1);
                    }, topicPartition2 -> {
                        return createWatermarkEstimator();
                    }))));
                }
                Optional ofNullable = Optional.ofNullable(atomicReference.get());
                ElementConsumer elementConsumer2 = elementConsumer;
                String str2 = str;
                ofNullable.ifPresent(kafkaConsumer -> {
                    elementConsumer2.onAssign(kafkaConsumer, str2 != null ? getCommittedTopicOffsets(this.currentlyAssigned, kafkaConsumer) : getCurrentTopicOffsets(this.currentlyAssigned, kafkaConsumer));
                });
            }

            List<TopicOffset> getCurrentTopicOffsets(Collection<TopicPartition> collection, KafkaConsumer<Object, Object> kafkaConsumer) {
                Stream<TopicPartition> stream = collection.stream();
                AtomicReference atomicReference3 = atomicReference2;
                return (List) stream.map(topicPartition -> {
                    return new TopicOffset(new PartitionWithTopic(topicPartition.topic(), topicPartition.partition()), kafkaConsumer.position(topicPartition), ((PartitionedWatermarkEstimator) atomicReference3.get()).getWatermark());
                }).collect(Collectors.toList());
            }

            List<TopicOffset> getCommittedTopicOffsets(Collection<TopicPartition> collection, KafkaConsumer<Object, Object> kafkaConsumer) {
                HashMap hashMap = new HashMap(kafkaConsumer.committed(new HashSet(collection)));
                Iterator<TopicPartition> it = collection.iterator();
                while (it.hasNext()) {
                    hashMap.putIfAbsent(it.next(), null);
                }
                Stream stream = hashMap.entrySet().stream();
                AtomicReference atomicReference3 = atomicReference2;
                return (List) stream.map(entry -> {
                    return new TopicOffset(new PartitionWithTopic(((TopicPartition) entry.getKey()).topic(), ((TopicPartition) entry.getKey()).partition()), entry.getValue() == null ? 0L : ((OffsetAndMetadata) entry.getValue()).offset(), ((PartitionedWatermarkEstimator) atomicReference3.get()).getWatermark());
                }).collect(Collectors.toList());
            }

            private WatermarkEstimator createWatermarkEstimator() {
                return KafkaLogReader.this.accessor.getWatermarkConfiguration().getWatermarkEstimatorFactory().create(KafkaLogReader.this.cfg, KafkaLogReader.this.accessor.getWatermarkConfiguration().getWatermarkIdlePolicyFactory());
            }
        };
    }

    private static PartitionedWatermarkEstimator createWatermarkEstimatorForEmptyParts() {
        return () -> {
            return Long.MAX_VALUE;
        };
    }

    @Generated
    public KafkaAccessor getAccessor() {
        return this.accessor;
    }

    @Generated
    public Context getContext() {
        return this.context;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1633855528:
                if (implMethodName.equals("lambda$processConsumer$85b00785$1")) {
                    z = 3;
                    break;
                }
                break;
            case -1623447777:
                if (implMethodName.equals("lambda$processConsumerBulk$694c2f35$1")) {
                    z = 2;
                    break;
                }
                break;
            case -1259861706:
                if (implMethodName.equals("lambda$processConsumerBulk$dc006aa6$1")) {
                    z = 6;
                    break;
                }
                break;
            case 403789579:
                if (implMethodName.equals("lambda$processConsumerBulk$39c40074$1")) {
                    z = 5;
                    break;
                }
                break;
            case 750164247:
                if (implMethodName.equals("lambda$asFactory$c19250ba$1")) {
                    z = false;
                    break;
                }
                break;
            case 1778328680:
                if (implMethodName.equals("lambda$processConsumer$ae30ef9$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1834580840:
                if (implMethodName.equals("lambda$createWatermarkEstimatorForEmptyParts$6d1e51ee$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/commitlog/CommitLogReader$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/KafkaLogReader") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/direct/kafka/KafkaAccessor;Lcz/o2/proxima/direct/core/Context;Ljava/lang/Object;)Ljava/lang/Object;")) {
                    KafkaAccessor kafkaAccessor = (KafkaAccessor) serializedLambda.getCapturedArg(0);
                    Context context = (Context) serializedLambda.getCapturedArg(1);
                    return obj -> {
                        return new KafkaLogReader(kafkaAccessor, context);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/time/PartitionedWatermarkEstimator") && serializedLambda.getFunctionalInterfaceMethodName().equals("getWatermark") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()J") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/KafkaLogReader") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return () -> {
                        return Long.MAX_VALUE;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/BiConsumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/KafkaLogReader") && serializedLambda.getImplMethodSignature().equals("(ZLjava/util/Map;Lorg/apache/kafka/common/TopicPartition;Ljava/lang/Long;)V")) {
                    boolean booleanValue = ((Boolean) serializedLambda.getCapturedArg(0)).booleanValue();
                    Map map = (Map) serializedLambda.getCapturedArg(1);
                    return (topicPartition, l) -> {
                        if (booleanValue) {
                            map.put(topicPartition, new OffsetAndMetadata(l.longValue()));
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/KafkaLogReader") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map;)Ljava/util/Map;")) {
                    Map map2 = (Map) serializedLambda.getCapturedArg(0);
                    return () -> {
                        HashMap hashMap;
                        synchronized (map2) {
                            hashMap = new HashMap(map2);
                            map2.clear();
                        }
                        return hashMap;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/BiConsumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/KafkaLogReader") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/direct/kafka/OffsetCommitter;ZLjava/util/Map;Lorg/apache/kafka/common/TopicPartition;Lorg/apache/kafka/clients/consumer/ConsumerRecord;)V")) {
                    OffsetCommitter offsetCommitter = (OffsetCommitter) serializedLambda.getCapturedArg(0);
                    boolean booleanValue2 = ((Boolean) serializedLambda.getCapturedArg(1)).booleanValue();
                    Map map3 = (Map) serializedLambda.getCapturedArg(2);
                    return (topicPartition2, consumerRecord) -> {
                        long offset = consumerRecord.offset();
                        offsetCommitter.register(topicPartition2, offset, 1, () -> {
                            OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset + 1);
                            if (booleanValue2) {
                                map3.put(topicPartition2, offsetAndMetadata);
                            }
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/BiConsumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/KafkaLogReader") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/common/TopicPartition;Lorg/apache/kafka/clients/consumer/ConsumerRecord;)V")) {
                    return (topicPartition22, consumerRecord2) -> {
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/KafkaLogReader") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map;)Ljava/util/Map;")) {
                    Map map4 = (Map) serializedLambda.getCapturedArg(0);
                    return () -> {
                        HashMap hashMap;
                        synchronized (map4) {
                            hashMap = new HashMap(map4);
                            map4.clear();
                        }
                        return hashMap;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static /* synthetic */ PartitionedWatermarkEstimator access$100() {
        return createWatermarkEstimatorForEmptyParts();
    }
}
