package io.confluent.parallelconsumer.state;

import io.confluent.csid.utils.BackportUtils;
import io.confluent.csid.utils.KafkaUtils;
import io.confluent.csid.utils.LoopingResumingIterator;
import io.confluent.csid.utils.StringUtils;
import io.confluent.csid.utils.WallClock;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.DynamicLoadFactor;
import io.confluent.parallelconsumer.internal.RateLimiter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;

/* loaded from: input_file:io/confluent/parallelconsumer/state/WorkManager.class */
public class WorkManager<K, V> implements ConsumerRebalanceListener {
    private static final Logger log = LoggerFactory.getLogger(WorkManager.class);
    private final ParallelConsumerOptions options;
    final PartitionMonitor<K, V> pm;
    private final ShardManager<K, V> sm;
    private final DynamicLoadFactor dynamicLoadFactor;
    private final WorkMailBoxManager<K, V> wmbm;
    private Optional<Object> iterationResumePoint;
    private int numberRecordsOutForProcessing;
    private final List<Consumer<WorkContainer<K, V>>> successfulWorkListeners;
    private WallClock clock;
    org.apache.kafka.clients.consumer.Consumer<K, V> consumer;
    private final AtomicBoolean workStateIsDirtyNeedsCommitting;
    private final Duration thresholdForTimeSpentInQueueWarning;
    private final RateLimiter slowWarningRateLimit;

    public WorkManager(ParallelConsumerOptions<K, V> parallelConsumerOptions, org.apache.kafka.clients.consumer.Consumer<K, V> consumer) {
        this(parallelConsumerOptions, consumer, new DynamicLoadFactor());
    }

    public WorkManager(ParallelConsumerOptions<K, V> parallelConsumerOptions, org.apache.kafka.clients.consumer.Consumer<K, V> consumer, DynamicLoadFactor dynamicLoadFactor) {
        this.iterationResumePoint = Optional.empty();
        this.numberRecordsOutForProcessing = 0;
        this.successfulWorkListeners = new ArrayList();
        this.clock = new WallClock();
        this.workStateIsDirtyNeedsCommitting = new AtomicBoolean(false);
        this.thresholdForTimeSpentInQueueWarning = Duration.ofSeconds(10L);
        this.slowWarningRateLimit = new RateLimiter(5);
        this.options = parallelConsumerOptions;
        this.consumer = consumer;
        this.dynamicLoadFactor = dynamicLoadFactor;
        this.wmbm = new WorkMailBoxManager<>();
        this.sm = new ShardManager<>(this.options);
        this.pm = new PartitionMonitor<>(consumer, this.sm);
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        this.pm.onPartitionsAssigned(collection);
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        this.pm.onPartitionsRevoked(collection);
        onPartitionsRemoved(collection);
    }

    public void onPartitionsLost(Collection<TopicPartition> collection) {
        this.pm.onPartitionsLost(collection);
        onPartitionsRemoved(collection);
    }

    void onPartitionsRemoved(Collection<TopicPartition> collection) {
        this.wmbm.onPartitionsRemoved(collection);
    }

    public void registerWork(ConsumerRecords<K, V> consumerRecords) {
        this.wmbm.registerWork(consumerRecords);
    }

    private void ingestPolledRecordsIntoQueues(int i) {
        ConsumerRecord<K, V> internalFlattenedMailQueuePoll;
        log.debug("Will attempt to register the requested {} - {} available in internal mailbox", Integer.valueOf(i), Integer.valueOf(this.wmbm.internalFlattenedMailQueueSize()));
        int i2 = 0;
        do {
            internalFlattenedMailQueuePoll = this.wmbm.internalFlattenedMailQueuePoll();
            if (maybeRegisterNewRecordAsWork(internalFlattenedMailQueuePoll)) {
                i2++;
            }
        } while (i2 < i && internalFlattenedMailQueuePoll != null);
        log.debug("{} new records were registered.", Integer.valueOf(i2));
    }

    private boolean maybeRegisterNewRecordAsWork(ConsumerRecord<K, V> consumerRecord) {
        if (consumerRecord == null) {
            return false;
        }
        if (!this.pm.isPartitionAssigned(consumerRecord)) {
            log.debug("Record in buffer for a partition no longer assigned. Dropping. TP: {} rec: {}", KafkaUtils.toTP(consumerRecord), consumerRecord);
            return false;
        }
        if (this.pm.isRecordPreviouslyProcessed(consumerRecord)) {
            log.trace("Record previously processed, skipping. offset: {}", Long.valueOf(consumerRecord.offset()));
            return false;
        }
        WorkContainer<K, V> workContainer = new WorkContainer<>(this.pm.getEpoch(consumerRecord, KafkaUtils.toTP(consumerRecord)), consumerRecord);
        this.sm.addWorkContainer(workContainer);
        this.pm.addWorkContainer(workContainer);
        return true;
    }

    public <R> List<WorkContainer<K, V>> maybeGetWork() {
        return maybeGetWork(Integer.MAX_VALUE);
    }

    public List<WorkContainer<K, V>> maybeGetWork(int i) {
        if (i < 1) {
            return UniLists.of();
        }
        tryToEnsureAvailableCapacity(i);
        ArrayList arrayList = new ArrayList();
        LoopingResumingIterator<Object, NavigableMap<Long, WorkContainer<K, V>>> iterator = this.sm.getIterator(this.iterationResumePoint);
        ArrayList arrayList2 = new ArrayList();
        int i2 = 0;
        Iterator<Map.Entry<Object, NavigableMap<Long, WorkContainer<K, V>>>> it = iterator.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<Object, NavigableMap<Long, WorkContainer<K, V>>> next = it.next();
            log.trace("Looking for work on shard: {}", next.getKey());
            if (arrayList.size() >= i) {
                this.iterationResumePoint = Optional.of(next.getKey());
                log.debug("Work taken is now over max, stopping (saving iteration resume point {})", this.iterationResumePoint);
                break;
            }
            ArrayList arrayList3 = new ArrayList();
            Iterator<Map.Entry<Long, WorkContainer<K, V>>> it2 = next.getValue().entrySet().iterator();
            while (true) {
                if (it2.hasNext()) {
                    Map.Entry<Long, WorkContainer<K, V>> next2 = it2.next();
                    int size = arrayList.size() + arrayList3.size();
                    if (size >= i) {
                        log.trace("Work taken ({}) exceeds max ({})", Integer.valueOf(size), Integer.valueOf(i));
                        break;
                    }
                    WorkContainer<K, V> value = next2.getValue();
                    if (checkEpochIsStale(value)) {
                        log.debug("Work is in queue with stale epoch. Will remove now. Was it not removed properly on revoke? Or are we in a race state? {}", value);
                        arrayList2.add(value);
                    } else {
                        TopicPartition topicPartition = value.getTopicPartition();
                        boolean isBlocked = this.pm.isBlocked(topicPartition);
                        boolean z = !value.hasPreviouslyFailed();
                        if (!isBlocked || !z || !value.isNotInFlight()) {
                            boolean z2 = !value.isUserFunctionSucceeded();
                            boolean hasDelayPassed = value.hasDelayPassed(this.clock);
                            if (hasDelayPassed && value.isNotInFlight() && z2) {
                                log.trace("Taking {} as work", value);
                                value.queueingForExecution();
                                arrayList3.add(value);
                            } else {
                                Duration timeInFlight = value.getTimeInFlight();
                                if (BackportUtils.toSeconds(timeInFlight) > BackportUtils.toSeconds(this.thresholdForTimeSpentInQueueWarning)) {
                                    i2++;
                                    log.trace("Work has spent over " + this.thresholdForTimeSpentInQueueWarning + " in queue! Can't take as work: Work ({}). Must all be true: Delay passed= {}. Is not in flight= {}. Has not succeeded already= {}. Time spent in execution queue: {}.", new Object[]{value, Boolean.valueOf(hasDelayPassed), Boolean.valueOf(value.isNotInFlight()), Boolean.valueOf(z2), timeInFlight});
                                } else {
                                    log.trace("Can't take as work: Work ({}). Must all be true: Delay passed= {}. Is not in flight= {}. Has not succeeded already= {}. Time spent in execution queue: {}.", new Object[]{value, Boolean.valueOf(hasDelayPassed), Boolean.valueOf(value.isNotInFlight()), Boolean.valueOf(z2), timeInFlight});
                                }
                            }
                            if (this.options.getOrdering() != ParallelConsumerOptions.ProcessingOrder.UNORDERED) {
                                log.trace("Processing by {}, so have cannot get more messages on this ({}) shard.", this.options.getOrdering(), next.getKey());
                                break;
                            }
                        } else {
                            log.debug("Not allowed more records ({}) for the partition ({}) as set from previous encode run, that this record ({}) belongs to due to offset encoding back pressure, has never been attemtped before ({}), not in flight ({}), continuing on to next container in shard.", new Object[]{Boolean.valueOf(isBlocked), topicPartition, Long.valueOf(value.offset()), Boolean.valueOf(z), Boolean.valueOf(value.isNotInFlight())});
                        }
                    }
                }
            }
            arrayList.addAll(arrayList3);
        }
        if (i2 > 0) {
            int i3 = i2;
            this.slowWarningRateLimit.performIfNotLimited(() -> {
                log.warn("Warning: {} records in the queue have been waiting longer than {}.", Integer.valueOf(i3), Long.valueOf(BackportUtils.toSeconds(this.thresholdForTimeSpentInQueueWarning)));
            });
        }
        Iterator it3 = arrayList2.iterator();
        while (it3.hasNext()) {
            this.sm.removeWorkFromShard((WorkContainer) it3.next());
        }
        log.debug("Got {} records of work. In-flight: {}, Awaiting in commit queues: {}", new Object[]{Integer.valueOf(arrayList.size()), Integer.valueOf(getNumberRecordsOutForProcessing()), Long.valueOf(getNumberOfEntriesInPartitionQueues())});
        this.numberRecordsOutForProcessing += arrayList.size();
        return arrayList;
    }

    private void tryToEnsureAvailableCapacity(int i) {
        int workQueuedInShardsCount = this.sm.getWorkQueuedInShardsCount();
        int i2 = i - workQueuedInShardsCount;
        log.debug("Requested: {}, available in shards: {}, will try to process from mailbox the delta of: {}", new Object[]{Integer.valueOf(i), Integer.valueOf(workQueuedInShardsCount), Integer.valueOf(i2)});
        ingestPolledRecordsIntoQueues(i2);
    }

    public void onSuccess(WorkContainer<K, V> workContainer) {
        log.trace("Processing success...");
        this.workStateIsDirtyNeedsCommitting.set(true);
        ConsumerRecord<K, V> cr = workContainer.getCr();
        log.trace("Work success ({}), removing from processing shard queue", workContainer);
        workContainer.succeed();
        Object computeShardKey = this.sm.computeShardKey(cr);
        NavigableMap<Long, WorkContainer<K, V>> shard = this.sm.getShard(computeShardKey);
        if (shard == null) {
            throw new NullPointerException(StringUtils.msg("Shard is missing for key {}", computeShardKey));
        }
        shard.remove(Long.valueOf(cr.offset()));
        if (this.options.getOrdering().equals(ParallelConsumerOptions.ProcessingOrder.KEY) && shard.isEmpty()) {
            log.trace("Removing empty shard (key: {})", computeShardKey);
            this.sm.removeShard(computeShardKey);
        }
        this.successfulWorkListeners.forEach(consumer -> {
            consumer.accept(workContainer);
        });
        this.numberRecordsOutForProcessing--;
    }

    public void onOffsetCommitSuccess(Map<TopicPartition, OffsetAndMetadata> map) {
        this.pm.onOffsetCommitSuccess(map);
    }

    public void onFailure(WorkContainer<K, V> workContainer) {
        workContainer.fail(this.clock);
        putBack(workContainer);
    }

    private void putBack(WorkContainer<K, V> workContainer) {
        log.debug("Work FAILED, returning to shard");
        this.sm.getShard(this.sm.computeShardKey(workContainer.getCr())).put(Long.valueOf(workContainer.getCr().offset()), workContainer);
        this.numberRecordsOutForProcessing--;
    }

    public long getNumberOfEntriesInPartitionQueues() {
        return this.pm.getNumberOfEntriesInPartitionQueues();
    }

    public Integer getWorkQueuedInMailboxCount() {
        return this.wmbm.getWorkQueuedInMailboxCount();
    }

    public Map<TopicPartition, OffsetAndMetadata> findCompletedEligibleOffsetsAndRemove() {
        return findCompletedEligibleOffsetsAndRemove(true);
    }

    public boolean hasCommittableOffsets() {
        return isDirty();
    }

    <R> Map<TopicPartition, OffsetAndMetadata> findCompletedEligibleOffsetsAndRemove(boolean z) {
        if (!isDirty()) {
            return UniMaps.of();
        }
        Map<TopicPartition, OffsetAndMetadata> hashMap = new HashMap<>();
        int i = 0;
        int i2 = 0;
        log.trace("Scanning for in order in-flight work that has completed...");
        for (Map.Entry<TopicPartition, PartitionState<K, V>> entry : this.pm.getPartitionStates().entrySet()) {
            NavigableMap<Long, WorkContainer<K, V>> commitQueues = entry.getValue().getCommitQueues();
            TopicPartition key = entry.getKey();
            log.trace("Starting scan of partition: {}", key);
            i += commitQueues.size();
            LinkedList linkedList = new LinkedList();
            LinkedHashSet<Long> linkedHashSet = new LinkedHashSet<>();
            long j = -1;
            boolean z2 = false;
            Iterator<Map.Entry<Long, WorkContainer<K, V>>> it = commitQueues.entrySet().iterator();
            while (it.hasNext()) {
                WorkContainer<K, V> value = it.next().getValue();
                long offset = value.getCr().offset();
                if (!value.isUserFunctionComplete()) {
                    j = value.offset();
                    z2 = true;
                    log.trace("Offset ({}) is incomplete, holding up the queue ({}) of size {}.", new Object[]{Long.valueOf(value.getCr().offset()), key, Integer.valueOf(commitQueues.size())});
                    linkedHashSet.add(Long.valueOf(offset));
                } else if (value.getUserFunctionSucceeded().get().booleanValue() && !z2) {
                    log.trace("Found offset candidate ({}) to add to offset commit map", value);
                    linkedList.add(value);
                    hashMap.put(key, new OffsetAndMetadata(offset + 1));
                } else if (value.getUserFunctionSucceeded().get().booleanValue() && z2) {
                    log.trace("Offset {} is complete and succeeded, but we've iterated past the lowest committable offset ({}). Will mark as complete in the offset map.", Long.valueOf(value.getCr().offset()), Long.valueOf(j));
                } else {
                    log.trace("Offset {} is complete, but failed processing. Will track in offset map as not complete. Can't do normal offset commit past this point.", Long.valueOf(value.getCr().offset()));
                    z2 = true;
                    linkedHashSet.add(Long.valueOf(offset));
                }
            }
            this.pm.addEncodedOffsets(hashMap, key, linkedHashSet);
            if (z) {
                i2 += linkedList.size();
                Iterator it2 = linkedList.iterator();
                while (it2.hasNext()) {
                    commitQueues.remove(Long.valueOf(((WorkContainer) it2.next()).getCr().offset()));
                }
            }
        }
        log.debug("Scan finished, {} were in flight, {} completed offsets removed, coalesced to {} offset(s) ({}) to be committed", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(hashMap.size()), hashMap});
        return hashMap;
    }

    public boolean checkEpochIsStale(WorkContainer<K, V> workContainer) {
        return this.pm.checkEpochIsStale(workContainer);
    }

    public boolean shouldThrottle() {
        return isSufficientlyLoaded();
    }

    public boolean isSufficientlyLoaded() {
        return getWorkQueuedInMailboxCount().intValue() > this.options.getMaxConcurrency() * getLoadingFactor();
    }

    private int getLoadingFactor() {
        return this.dynamicLoadFactor.getCurrentFactor();
    }

    public boolean workIsWaitingToBeProcessed() {
        return this.sm.workIsWaitingToBeProcessed();
    }

    public boolean hasWorkInFlight() {
        return getNumberRecordsOutForProcessing() != 0;
    }

    public boolean isClean() {
        return !isDirty();
    }

    private boolean isDirty() {
        return this.workStateIsDirtyNeedsCommitting.get();
    }

    public int getTotalWorkWaitingProcessing() {
        return this.sm.getWorkQueuedInShardsCount() + getWorkQueuedInMailboxCount().intValue();
    }

    public boolean hasWorkInMailboxes() {
        return getWorkQueuedInMailboxCount().intValue() > 0;
    }

    public boolean hasWorkInCommitQueues() {
        return this.pm.hasWorkInCommitQueues();
    }

    public boolean isRecordsAwaitingProcessing() {
        return this.sm.getWorkQueuedInShardsCount() > 0 || hasWorkInMailboxes();
    }

    public boolean isRecordsAwaitingToBeCommitted() {
        return getNumberOfEntriesInPartitionQueues() > 0;
    }

    public void handleFutureResult(WorkContainer<K, V> workContainer) {
        if (checkEpochIsStale(workContainer)) {
            log.debug("Work result received, but from an old generation. Dropping work from revoked partition {}", workContainer);
        } else if (workContainer.getUserFunctionSucceeded().get().booleanValue()) {
            onSuccess(workContainer);
        } else {
            onFailure(workContainer);
        }
    }

    public ParallelConsumerOptions getOptions() {
        return this.options;
    }

    public PartitionMonitor<K, V> getPm() {
        return this.pm;
    }

    public ShardManager<K, V> getSm() {
        return this.sm;
    }

    public int getNumberRecordsOutForProcessing() {
        return this.numberRecordsOutForProcessing;
    }

    public List<Consumer<WorkContainer<K, V>>> getSuccessfulWorkListeners() {
        return this.successfulWorkListeners;
    }

    void setClock(WallClock wallClock) {
        this.clock = wallClock;
    }
}
