package io.confluent.parallelconsumer;

import io.confluent.csid.utils.KafkaUtils;
import io.confluent.csid.utils.LoopingResumingIterator;
import io.confluent.csid.utils.WallClock;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniSets;

/* loaded from: input_file:io/confluent/parallelconsumer/WorkManager.class */
public class WorkManager<K, V> implements ConsumerRebalanceListener {
    private static final Logger log = LoggerFactory.getLogger(WorkManager.class);
    private final ParallelConsumerOptions options;
    Consumer<K, V> consumer;
    private final Map<Object, NavigableMap<Long, WorkContainer<K, V>>> processingShards = new ConcurrentHashMap();
    private final LinkedBlockingQueue<ConsumerRecords<K, V>> workInbox = new LinkedBlockingQueue<>();
    private final Map<TopicPartition, NavigableMap<Long, WorkContainer<K, V>>> partitionCommitQueues = new ConcurrentHashMap();
    private Optional<Object> iterationResumePoint = Optional.empty();
    private int inFlightCount = 0;
    private final int loadingFactor = 3;
    private final List<java.util.function.Consumer<WorkContainer<K, V>>> successfulWorkListeners = new ArrayList();
    private WallClock clock = new WallClock();
    Map<TopicPartition, TreeSet<Long>> partitionIncompleteOffsets = new HashMap();
    Map<TopicPartition, Long> partitionOffsetHighWaterMarks = new HashMap();
    long MISSING_HIGH_WATER_MARK = -1;

    public WorkManager(ParallelConsumerOptions parallelConsumerOptions, Consumer<K, V> consumer) {
        this.options = parallelConsumerOptions;
        this.consumer = consumer;
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        try {
            log.debug("onPartitionsAssigned: {}", collection);
            new OffsetMapCodecManager(this, this.consumer).loadOffsetMapForPartition(UniSets.copyOf(collection));
        } catch (Exception e) {
            log.error("Error in onPartitionsAssigned", e);
            throw e;
        }
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        try {
            log.debug("Partitions revoked: {}", collection);
            resetOffsetMapAndRemoveWork(collection);
        } catch (Exception e) {
            log.error("Error in onPartitionsRevoked", e);
            throw e;
        }
    }

    public void onPartitionsLost(Collection<TopicPartition> collection) {
        try {
            log.warn("Partitions have been lost");
            log.debug("Lost partitions: {}", collection);
            resetOffsetMapAndRemoveWork(collection);
        } catch (Exception e) {
            log.error("Error in onPartitionsLost", e);
            throw e;
        }
    }

    private void resetOffsetMapAndRemoveWork(Collection<TopicPartition> collection) {
        for (TopicPartition topicPartition : collection) {
            this.partitionIncompleteOffsets.remove(topicPartition);
            this.partitionOffsetHighWaterMarks.remove(topicPartition);
            NavigableMap<Long, WorkContainer<K, V>> remove = this.partitionCommitQueues.remove(topicPartition);
            if (remove != null) {
                removeShardsFoundIn(remove);
            } else {
                log.trace("Removing empty commit queue");
            }
        }
    }

    private void removeShardsFoundIn(NavigableMap<Long, WorkContainer<K, V>> navigableMap) {
        Iterator<WorkContainer<K, V>> it = navigableMap.values().iterator();
        while (it.hasNext()) {
            this.processingShards.remove(it.next().getCr().key());
        }
    }

    public void registerWork(List<ConsumerRecords<K, V>> list) {
        Iterator<ConsumerRecords<K, V>> it = list.iterator();
        while (it.hasNext()) {
            registerWork(it.next());
        }
    }

    public void registerWork(ConsumerRecords<K, V> consumerRecords) {
        this.workInbox.add(consumerRecords);
    }

    private void processInbox() {
        ArrayList arrayList = new ArrayList();
        this.workInbox.drainTo(arrayList);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            processInbox((ConsumerRecords) it.next());
        }
    }

    private void processInbox(ConsumerRecords<K, V> consumerRecords) {
        log.debug("Registering {} records of work", Integer.valueOf(consumerRecords.count()));
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord<K, V> consumerRecord = (ConsumerRecord) it.next();
            if (isRecordPreviouslyProcessed(consumerRecord)) {
                log.trace("Record previously processed, skipping. offset: {}", Long.valueOf(consumerRecord.offset()));
            } else {
                Object computeShardKey = computeShardKey(consumerRecord);
                long offset = consumerRecord.offset();
                WorkContainer workContainer = new WorkContainer(consumerRecord);
                TopicPartition tp = KafkaUtils.toTP(consumerRecord);
                raisePartitionHighWaterMark(offset, tp);
                this.processingShards.computeIfAbsent(computeShardKey, obj -> {
                    return new ConcurrentSkipListMap();
                }).put(Long.valueOf(offset), workContainer);
                this.partitionCommitQueues.computeIfAbsent(tp, topicPartition -> {
                    return new ConcurrentSkipListMap();
                }).put(Long.valueOf(offset), workContainer);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void raisePartitionHighWaterMark(long j, TopicPartition topicPartition) {
        if (j >= this.partitionOffsetHighWaterMarks.getOrDefault(topicPartition, Long.valueOf(this.MISSING_HIGH_WATER_MARK)).longValue() || j == this.MISSING_HIGH_WATER_MARK) {
            this.partitionOffsetHighWaterMarks.put(topicPartition, Long.valueOf(j));
        }
    }

    private boolean isRecordPreviouslyProcessed(ConsumerRecord<K, V> consumerRecord) {
        long offset = consumerRecord.offset();
        TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
        return !this.partitionIncompleteOffsets.getOrDefault(topicPartition, new TreeSet<>()).contains(Long.valueOf(offset)) && offset < this.partitionOffsetHighWaterMarks.getOrDefault(topicPartition, Long.valueOf(this.MISSING_HIGH_WATER_MARK)).longValue();
    }

    private Object computeShardKey(ConsumerRecord<K, V> consumerRecord) {
        switch (this.options.getOrdering()) {
            case KEY:
                return consumerRecord.key();
            case PARTITION:
            case UNORDERED:
                return new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
            default:
                throw new IncompatibleClassChangeError();
        }
    }

    public <R> List<WorkContainer<K, V>> maybeGetWork() {
        return maybeGetWork(this.options.getMaxMessagesToQueue());
    }

    public List<WorkContainer<K, V>> maybeGetWork(int i) {
        processInbox();
        int min = Math.min(Math.min(i, this.options.getMaxMessagesToQueue()), this.options.getMaxNumberMessagesBeyondBaseCommitOffset()) - getInFlightCount();
        if (min < 1) {
            return UniLists.of();
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = new LoopingResumingIterator(this.iterationResumePoint, this.processingShards).iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry entry = (Map.Entry) it.next();
            log.trace("Looking for work on shard: {}", entry.getKey());
            if (arrayList.size() >= min) {
                this.iterationResumePoint = Optional.of(entry.getKey());
                log.debug("Work taken is now over max, stopping (saving iteration resume point {})", this.iterationResumePoint);
                break;
            }
            ArrayList arrayList2 = new ArrayList();
            Iterator<Map.Entry<K, V>> it2 = ((SortedMap) entry.getValue()).entrySet().iterator();
            while (true) {
                if (it2.hasNext()) {
                    Map.Entry<K, V> next = it2.next();
                    int size = arrayList.size() + arrayList2.size();
                    if (size < min) {
                        WorkContainer workContainer = (WorkContainer) next.getValue();
                        boolean z = !workContainer.isUserFunctionSucceeded();
                        if (workContainer.hasDelayPassed(this.clock) && workContainer.isNotInFlight() && z) {
                            log.trace("Taking {} as work", workContainer);
                            workContainer.takingAsWork();
                            arrayList2.add(workContainer);
                        } else {
                            log.trace("Work ({}) still delayed or is in flight, can't take...", workContainer);
                        }
                        if (this.options.getOrdering() != ParallelConsumerOptions.ProcessingOrder.UNORDERED) {
                            log.trace("Processing by {}, so have cannot get more messages on this ({}) shard.", this.options.getOrdering(), entry.getKey());
                            break;
                        }
                    } else {
                        log.trace("Work taken ({}) exceeds max ({})", Integer.valueOf(size), Integer.valueOf(min));
                        break;
                    }
                }
            }
            arrayList.addAll(arrayList2);
        }
        log.debug("Got {} records of work", Integer.valueOf(arrayList.size()));
        this.inFlightCount += arrayList.size();
        return arrayList;
    }

    public void success(WorkContainer<K, V> workContainer) {
        ConsumerRecord<K, V> cr = workContainer.getCr();
        log.trace("Work success ({}), removing from processing shard queue", workContainer);
        workContainer.succeed();
        Object computeShardKey = computeShardKey(cr);
        NavigableMap<Long, WorkContainer<K, V>> navigableMap = this.processingShards.get(computeShardKey);
        navigableMap.remove(Long.valueOf(cr.offset()));
        if (this.options.getOrdering().equals(ParallelConsumerOptions.ProcessingOrder.KEY) && navigableMap.isEmpty()) {
            log.debug("Removing empty shard (key: {})", computeShardKey);
            this.processingShards.remove(computeShardKey);
        }
        this.successfulWorkListeners.forEach(consumer -> {
            consumer.accept(workContainer);
        });
        this.inFlightCount--;
    }

    public void failed(WorkContainer<K, V> workContainer) {
        workContainer.fail(this.clock);
        putBack(workContainer);
    }

    private void putBack(WorkContainer<K, V> workContainer) {
        log.debug("Work FAILED, returning to shard");
        this.processingShards.get(computeShardKey(workContainer.getCr())).put(Long.valueOf(workContainer.getCr().offset()), workContainer);
        this.inFlightCount--;
    }

    public int getPartitionWorkRemainingCount() {
        int i = 0;
        Iterator<Map.Entry<TopicPartition, NavigableMap<Long, WorkContainer<K, V>>>> it = this.partitionCommitQueues.entrySet().iterator();
        while (it.hasNext()) {
            i += it.next().getValue().size();
        }
        return i;
    }

    public int getWorkRemainingCount() {
        return getMappedShardWorkRemainingCount() + ((Integer) this.workInbox.stream().map(consumerRecords -> {
            return Integer.valueOf(consumerRecords.count());
        }).reduce(0, (num, num2) -> {
            return Integer.valueOf(num.intValue() + num2.intValue());
        })).intValue();
    }

    public int getMappedShardWorkRemainingCount() {
        int i = 0;
        Iterator<Map.Entry<Object, NavigableMap<Long, WorkContainer<K, V>>>> it = this.processingShards.entrySet().iterator();
        while (it.hasNext()) {
            i += it.next().getValue().size();
        }
        return i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isRecordsAwaitingProcessing() {
        return getMappedShardWorkRemainingCount() > 0 || !this.workInbox.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isRecordsAwaitingToBeCommitted() {
        return getPartitionWorkRemainingCount() > 0;
    }

    public WorkContainer<K, V> getWorkContainerForRecord(ConsumerRecord<K, V> consumerRecord) {
        return (WorkContainer) this.processingShards.get(computeShardKey(consumerRecord)).get(Long.valueOf(consumerRecord.offset()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<TopicPartition, OffsetAndMetadata> findCompletedEligibleOffsetsAndRemove() {
        return findCompletedEligibleOffsetsAndRemove(true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasComittableOffsets() {
        return findCompletedEligibleOffsetsAndRemove(false).size() != 0;
    }

    <R> Map<TopicPartition, OffsetAndMetadata> findCompletedEligibleOffsetsAndRemove(boolean z) {
        HashMap hashMap = new HashMap();
        int i = 0;
        int i2 = 0;
        log.trace("Scanning for in order in-flight work that has completed...");
        int i3 = 0;
        for (Map.Entry<TopicPartition, NavigableMap<Long, WorkContainer<K, V>>> entry : this.partitionCommitQueues.entrySet()) {
            TopicPartition key = entry.getKey();
            log.trace("Starting scan of partition: {}", key);
            NavigableMap<Long, WorkContainer<K, V>> value = entry.getValue();
            i += value.size();
            LinkedList linkedList = new LinkedList();
            LinkedHashSet linkedHashSet = new LinkedHashSet();
            boolean z2 = false;
            Iterator<Map.Entry<Long, WorkContainer<K, V>>> it = value.entrySet().iterator();
            while (it.hasNext()) {
                WorkContainer<K, V> value2 = it.next().getValue();
                boolean isUserFunctionComplete = value2.isUserFunctionComplete();
                long offset = value2.getCr().offset();
                if (!isUserFunctionComplete) {
                    z2 = true;
                    log.trace("Offset ({}) is incomplete, holding up the queue ({}) of size {}.", new Object[]{Long.valueOf(value2.getCr().offset()), entry.getKey(), Integer.valueOf(entry.getValue().size())});
                    linkedHashSet.add(Long.valueOf(offset));
                } else if (value2.getUserFunctionSucceeded().get().booleanValue() && !z2) {
                    log.trace("Found offset candidate ({}) to add to offset commit map", value2);
                    linkedList.add(value2);
                    hashMap.put(key, new OffsetAndMetadata(offset + 1));
                } else if (value2.getUserFunctionSucceeded().get().booleanValue() && z2) {
                    log.trace("Offset {} is complete and succeeded, but we've iterated past the lowest committable offset. Will mark as complete in the offset map.", Long.valueOf(value2.getCr().offset()));
                } else {
                    log.trace("Offset {} is complete, but failed processing. Will track in offset map as not complete. Can't do normal offset commit past this point.", Long.valueOf(value2.getCr().offset()));
                    z2 = true;
                    linkedHashSet.add(Long.valueOf(offset));
                }
            }
            if (!linkedHashSet.isEmpty()) {
                OffsetAndMetadata offsetAndMetadata = hashMap.get(key);
                long longValue = offsetAndMetadata == null ? ((Long) linkedHashSet.iterator().next()).longValue() : offsetAndMetadata.offset();
                String makeOffsetMetadataPayload = new OffsetMapCodecManager(this, this.consumer).makeOffsetMetadataPayload(longValue, key, linkedHashSet);
                i3 += makeOffsetMetadataPayload.length();
                hashMap.put(key, new OffsetAndMetadata(longValue, makeOffsetMetadataPayload));
            }
            if (z) {
                i2 += linkedList.size();
                Iterator it2 = linkedList.iterator();
                while (it2.hasNext()) {
                    value.remove(Long.valueOf(((WorkContainer) it2.next()).getCr().offset()));
                }
            }
        }
        maybeStripOffsetPayload(hashMap, i3);
        log.debug("Scan finished, {} were in flight, {} completed offsets removed, coalesced to {} offset(s) ({}) to be committed", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(hashMap.size()), hashMap});
        return hashMap;
    }

    private void maybeStripOffsetPayload(Map<TopicPartition, OffsetAndMetadata> map, int i) {
        if (i > 4096) {
            log.warn("Offset map data too large (size: {}) to fit in metadata payload - stripping offset map out. See kafka.coordinator.group.OffsetConfig#DefaultMaxMetadataSize = 4096", Integer.valueOf(i));
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
                TopicPartition key = entry.getKey();
                OffsetAndMetadata value = entry.getValue();
                map.replace(key, new OffsetAndMetadata(value.offset(), value.toString()));
            }
        }
    }

    public void onOffsetCommitSuccess(Map<TopicPartition, OffsetAndMetadata> map) {
        map.forEach((topicPartition, offsetAndMetadata) -> {
            TreeSet<Long> treeSet = this.partitionIncompleteOffsets.get(topicPartition);
            if (treeSet != null) {
                long offset = offsetAndMetadata.offset();
                treeSet.removeIf(l -> {
                    return l.longValue() < offset;
                });
            }
        });
    }

    public boolean shouldThrottle() {
        return isSufficientlyLoaded();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isSufficientlyLoaded() {
        int partitionWorkRemainingCount = getPartitionWorkRemainingCount();
        boolean z = partitionWorkRemainingCount > this.options.getMaxMessagesToQueue() * 3;
        boolean z2 = partitionWorkRemainingCount > this.options.getMaxNumberMessagesBeyondBaseCommitOffset();
        boolean z3 = z || z2;
        if (z3) {
            log.debug("loadedEnoughInPipeline {} || overMaxUncommitted {}", Boolean.valueOf(z), Boolean.valueOf(z2));
        }
        return z3;
    }

    public int getInFlightCount() {
        return this.inFlightCount;
    }

    public boolean workIsWaitingToBeCompletedSuccessfully() {
        Iterator<NavigableMap<Long, WorkContainer<K, V>>> it = this.processingShards.values().iterator();
        while (it.hasNext()) {
            if (!it.next().isEmpty()) {
                return true;
            }
        }
        return false;
    }

    public boolean hasWorkInFlight() {
        return getInFlightCount() != 0;
    }

    public ParallelConsumerOptions getOptions() {
        return this.options;
    }

    List<java.util.function.Consumer<WorkContainer<K, V>>> getSuccessfulWorkListeners() {
        return this.successfulWorkListeners;
    }

    void setClock(WallClock wallClock) {
        this.clock = wallClock;
    }
}
