package io.confluent.parallelconsumer.state;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.DynamicLoadFactor;
import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
import io.confluent.parallelconsumer.internal.PCModule;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

/* loaded from: input_file:io/confluent/parallelconsumer/state/WorkManager.class */
public class WorkManager<K, V> implements ConsumerRebalanceListener {
    private static final Logger log = LoggerFactory.getLogger(WorkManager.class);
    private final ParallelConsumerOptions<K, V> options;
    final PartitionStateManager<K, V> pm;
    private final ShardManager<K, V> sm;
    private final DynamicLoadFactor dynamicLoadFactor;
    private int numberRecordsOutForProcessing;
    private final List<Consumer<WorkContainer<K, V>>> successfulWorkListeners;

    public WorkManager(PCModule<K, V> pCModule) {
        this(pCModule, new DynamicLoadFactor());
    }

    public WorkManager(PCModule<K, V> pCModule, DynamicLoadFactor dynamicLoadFactor) {
        this.numberRecordsOutForProcessing = 0;
        this.successfulWorkListeners = new ArrayList();
        this.options = pCModule.options();
        this.dynamicLoadFactor = dynamicLoadFactor;
        this.sm = new ShardManager<>(this.options, this);
        this.pm = new PartitionStateManager<>(pCModule, this.sm);
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        this.pm.onPartitionsAssigned(collection);
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        this.pm.onPartitionsRevoked(collection);
        onPartitionsRemoved(collection);
    }

    public void onPartitionsLost(Collection<TopicPartition> collection) {
        this.pm.onPartitionsLost(collection);
        onPartitionsRemoved(collection);
    }

    void onPartitionsRemoved(Collection<TopicPartition> collection) {
    }

    public void registerWork(EpochAndRecordsMap<K, V> epochAndRecordsMap) {
        this.pm.maybeRegisterNewRecordAsWork(epochAndRecordsMap);
    }

    public List<WorkContainer<K, V>> getWorkIfAvailable() {
        return getWorkIfAvailable(Integer.MAX_VALUE);
    }

    public List<WorkContainer<K, V>> getWorkIfAvailable(int i) {
        if (i < 1) {
            return UniLists.of();
        }
        List<WorkContainer<K, V>> workIfAvailable = this.sm.getWorkIfAvailable(i);
        log.debug("Got {} of {} requested records of work. In-flight: {}, Awaiting in commit (partition) queues: {}", new Object[]{Integer.valueOf(workIfAvailable.size()), Integer.valueOf(i), Integer.valueOf(getNumberRecordsOutForProcessing()), Long.valueOf(getNumberOfEntriesInPartitionQueues())});
        this.numberRecordsOutForProcessing += workIfAvailable.size();
        return workIfAvailable;
    }

    public void onSuccessResult(WorkContainer<K, V> workContainer) {
        log.trace("Work success ({}), removing from processing shard queue", workContainer);
        workContainer.endFlight();
        this.pm.onSuccess(workContainer);
        this.sm.onSuccess(workContainer);
        this.successfulWorkListeners.forEach(consumer -> {
            consumer.accept(workContainer);
        });
        this.numberRecordsOutForProcessing--;
    }

    public void onOffsetCommitSuccess(Map<TopicPartition, OffsetAndMetadata> map) {
        this.pm.onOffsetCommitSuccess(map);
    }

    public void onFailureResult(WorkContainer<K, V> workContainer) {
        workContainer.endFlight();
        this.pm.onFailure(workContainer);
        this.sm.onFailure(workContainer);
        this.numberRecordsOutForProcessing--;
    }

    public long getNumberOfEntriesInPartitionQueues() {
        return this.pm.getNumberOfEntriesInPartitionQueues();
    }

    public Map<TopicPartition, OffsetAndMetadata> collectCommitDataForDirtyPartitions() {
        return this.pm.collectDirtyCommitData();
    }

    public boolean checkIfWorkIsStale(List<WorkContainer<K, V>> list) {
        Iterator<WorkContainer<K, V>> it = list.iterator();
        while (it.hasNext()) {
            if (checkIfWorkIsStale(it.next())) {
                return true;
            }
        }
        return false;
    }

    public boolean checkIfWorkIsStale(WorkContainer<K, V> workContainer) {
        return this.pm.checkIfWorkIsStale(workContainer);
    }

    public boolean shouldThrottle() {
        return isSufficientlyLoaded();
    }

    public boolean isSufficientlyLoaded() {
        return getNumberOfWorkQueuedInShardsAwaitingSelection() > ((long) this.options.getTargetAmountOfRecordsInFlight()) * ((long) getLoadingFactor());
    }

    private int getLoadingFactor() {
        return this.dynamicLoadFactor.getCurrentFactor();
    }

    public boolean workIsWaitingToBeProcessed() {
        return this.sm.workIsWaitingToBeProcessed();
    }

    public boolean hasWorkInFlight() {
        return getNumberRecordsOutForProcessing() != 0;
    }

    public boolean isWorkInFlightMeetingTarget() {
        return getNumberRecordsOutForProcessing() >= this.options.getTargetAmountOfRecordsInFlight();
    }

    public long getNumberOfWorkQueuedInShardsAwaitingSelection() {
        return this.sm.getNumberOfWorkQueuedInShardsAwaitingSelection();
    }

    public boolean hasWorkInCommitQueues() {
        return this.pm.hasWorkInCommitQueues();
    }

    public boolean isRecordsAwaitingProcessing() {
        return this.sm.getNumberOfWorkQueuedInShardsAwaitingSelection() > 0;
    }

    public boolean isRecordsAwaitingToBeCommitted() {
        return getNumberOfEntriesInPartitionQueues() > 0;
    }

    public void handleFutureResult(WorkContainer<K, V> workContainer) {
        if (checkIfWorkIsStale(workContainer)) {
            log.debug("Work result received, but from an old generation. Dropping work from revoked partition {}", workContainer);
            return;
        }
        Optional<Boolean> maybeUserFunctionSucceeded = workContainer.getMaybeUserFunctionSucceeded();
        if (!maybeUserFunctionSucceeded.isPresent()) {
            throw new IllegalStateException("Work returned, but without a success flag - report a bug");
        }
        if (Boolean.TRUE.equals(maybeUserFunctionSucceeded.get())) {
            onSuccessResult(workContainer);
        } else {
            onFailureResult(workContainer);
        }
    }

    public boolean isNoRecordsOutForProcessing() {
        return getNumberRecordsOutForProcessing() == 0;
    }

    public Optional<Duration> getLowestRetryTime() {
        return this.sm.getLowestRetryTime();
    }

    public boolean isDirty() {
        return this.pm.isDirty();
    }

    public ParallelConsumerOptions<K, V> getOptions() {
        return this.options;
    }

    public PartitionStateManager<K, V> getPm() {
        return this.pm;
    }

    public ShardManager<K, V> getSm() {
        return this.sm;
    }

    public int getNumberRecordsOutForProcessing() {
        return this.numberRecordsOutForProcessing;
    }

    public List<Consumer<WorkContainer<K, V>>> getSuccessfulWorkListeners() {
        return this.successfulWorkListeners;
    }
}
