package io.confluent.parallelconsumer.state;

import io.confluent.csid.utils.KafkaUtils;
import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
import io.confluent.parallelconsumer.internal.InternalRuntimeError;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import java.time.Clock;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/parallelconsumer/state/PartitionStateManager.class */
public class PartitionStateManager<K, V> implements ConsumerRebalanceListener {
    public static final double USED_PAYLOAD_THRESHOLD_MULTIPLIER_DEFAULT = 0.75d;
    private final Consumer<K, V> consumer;
    private final ShardManager<K, V> sm;
    private final ParallelConsumerOptions<K, V> options;
    private final Map<TopicPartition, PartitionState<K, V>> partitionStates = new ConcurrentHashMap();
    private final Map<TopicPartition, Long> partitionsAssignmentEpochs = new ConcurrentHashMap();
    private final Clock clock;
    private static final Logger log = LoggerFactory.getLogger(PartitionStateManager.class);
    private static double USED_PAYLOAD_THRESHOLD_MULTIPLIER = 0.75d;

    public PartitionState<K, V> getPartitionState(TopicPartition topicPartition) {
        return this.partitionStates.get(topicPartition);
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        log.debug("Partitions assigned: {}", collection);
        for (TopicPartition topicPartition : collection) {
            if (this.partitionStates.containsKey(topicPartition)) {
                PartitionState<K, V> partitionState = this.partitionStates.get(topicPartition);
                if (partitionState.isRemoved()) {
                    log.trace("Reassignment of previously revoked partition {} - state: {}", topicPartition, partitionState);
                } else {
                    log.warn("New assignment of partition which already exists and isn't recorded as removed in partition state. Could be a state bug - was the partition revocation somehow missed, or is this a race? Please file a GH issue. Partition: {}, state: {}", topicPartition, partitionState);
                }
            }
        }
        incrementPartitionAssignmentEpoch(collection);
        try {
            this.partitionStates.putAll(new OffsetMapCodecManager(this.consumer).loadPartitionStateForAssignment(collection));
        } catch (Exception e) {
            log.error("Error in onPartitionsAssigned", e);
            throw e;
        }
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        log.info("Partitions revoked: {}", collection);
        try {
            onPartitionsRemoved(collection);
        } catch (Exception e) {
            log.error("Error in onPartitionsRevoked", e);
            throw e;
        }
    }

    void onPartitionsRemoved(Collection<TopicPartition> collection) {
        incrementPartitionAssignmentEpoch(collection);
        resetOffsetMapAndRemoveWork(collection);
    }

    public void onPartitionsLost(Collection<TopicPartition> collection) {
        try {
            log.info("Lost partitions: {}", collection);
            onPartitionsRemoved(collection);
        } catch (Exception e) {
            log.error("Error in onPartitionsLost", e);
            throw e;
        }
    }

    public void onOffsetCommitSuccess(Map<TopicPartition, OffsetAndMetadata> map) {
        map.forEach((topicPartition, offsetAndMetadata) -> {
            getPartitionState(topicPartition).onOffsetCommitSuccess(offsetAndMetadata);
        });
    }

    private void resetOffsetMapAndRemoveWork(Collection<TopicPartition> collection) {
        for (TopicPartition topicPartition : collection) {
            PartitionState<K, V> partitionState = this.partitionStates.get(topicPartition);
            this.partitionStates.put(topicPartition, RemovedPartitionState.getSingleton());
            partitionState.onPartitionsRemoved(this.sm);
        }
    }

    public Long getEpochOfPartitionForRecord(ConsumerRecord<K, V> consumerRecord) {
        Long l = this.partitionsAssignmentEpochs.get(KafkaUtils.toTopicPartition(consumerRecord));
        if (l == null) {
            throw new InternalRuntimeError(StringUtils.msg("Received message for a partition which is not assigned: {}", consumerRecord));
        }
        return l;
    }

    public Long getEpochOfPartition(TopicPartition topicPartition) {
        return this.partitionsAssignmentEpochs.get(topicPartition);
    }

    private void incrementPartitionAssignmentEpoch(Collection<TopicPartition> collection) {
        for (TopicPartition topicPartition : collection) {
            this.partitionsAssignmentEpochs.put(topicPartition, Long.valueOf(this.partitionsAssignmentEpochs.getOrDefault(topicPartition, -1L).longValue() + 1));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean checkIfWorkIsStale(WorkContainer<?, ?> workContainer) {
        Long l = this.partitionsAssignmentEpochs.get(workContainer.getTopicPartition());
        long epoch = workContainer.getEpoch();
        boolean isPartitionRemovedOrNeverAssigned = isPartitionRemovedOrNeverAssigned(workContainer.getCr());
        if (!(l.longValue() != epoch) && !isPartitionRemovedOrNeverAssigned) {
            return false;
        }
        log.debug("Epoch mismatch {} vs {} for record {}. Skipping message - it's partition has already assigned to a different consumer.", new Object[]{Long.valueOf(epoch), l, workContainer});
        return true;
    }

    public boolean isRecordPreviouslyCompleted(ConsumerRecord<K, V> consumerRecord) {
        boolean isRecordPreviouslyCompleted = getPartitionState(KafkaUtils.toTopicPartition(consumerRecord)).isRecordPreviouslyCompleted(consumerRecord);
        log.trace("Record {} previously completed? {}", Long.valueOf(consumerRecord.offset()), Boolean.valueOf(isRecordPreviouslyCompleted));
        return isRecordPreviouslyCompleted;
    }

    public boolean isAllowedMoreRecords(TopicPartition topicPartition) {
        return getPartitionState(topicPartition).isAllowedMoreRecords();
    }

    public boolean isAllowedMoreRecords(WorkContainer<?, ?> workContainer) {
        return isAllowedMoreRecords(workContainer.getTopicPartition());
    }

    public boolean hasWorkInCommitQueues() {
        Iterator<PartitionState<K, V>> it = getAssignedPartitions().values().iterator();
        while (it.hasNext()) {
            if (it.next().hasWorkInCommitQueue()) {
                return true;
            }
        }
        return false;
    }

    public long getNumberOfEntriesInPartitionQueues() {
        return getAssignedPartitions().values().stream().mapToLong((v0) -> {
            return v0.getCommitQueueSize();
        }).reduce(Long::sum).orElse(0L);
    }

    public long getHighestSeenOffset(TopicPartition topicPartition) {
        return getPartitionState(topicPartition).getOffsetHighestSeen();
    }

    public void addWorkContainer(WorkContainer<K, V> workContainer) {
        getPartitionState(workContainer.getTopicPartition()).addWorkContainer(workContainer);
    }

    public boolean isBlocked(TopicPartition topicPartition) {
        return !isAllowedMoreRecords(topicPartition);
    }

    public boolean isPartitionRemovedOrNeverAssigned(ConsumerRecord<?, ?> consumerRecord) {
        PartitionState<K, V> partitionState = getPartitionState(KafkaUtils.toTopicPartition(consumerRecord));
        return (partitionState == null) || partitionState.isRemoved();
    }

    public void onSuccess(WorkContainer<K, V> workContainer) {
        getPartitionState(workContainer.getTopicPartition()).onSuccess(workContainer);
    }

    public void onFailure(WorkContainer<K, V> workContainer) {
        getPartitionState(workContainer.getTopicPartition()).onFailure(workContainer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void maybeRegisterNewRecordAsWork(EpochAndRecordsMap<K, V> epochAndRecordsMap) {
        Iterator<TopicPartition> it = epochAndRecordsMap.partitions().iterator();
        while (it.hasNext()) {
            EpochAndRecordsMap<K, V>.RecordsAndEpoch records = epochAndRecordsMap.records(it.next());
            Long epochOfPartitionAtPoll = records.getEpochOfPartitionAtPoll();
            Iterator<ConsumerRecord<K, V>> it2 = records.getRecords().iterator();
            while (it2.hasNext()) {
                maybeRegisterNewRecordAsWork(epochOfPartitionAtPoll, it2.next());
            }
        }
    }

    private void maybeRegisterNewRecordAsWork(Long l, ConsumerRecord<K, V> consumerRecord) {
        Long epochOfPartitionForRecord = getEpochOfPartitionForRecord(consumerRecord);
        if (!Objects.equals(l, epochOfPartitionForRecord)) {
            log.debug("Inbound record of work has epoch ({}) not matching currently assigned epoch for the applicable partition ({}), skipping", l, epochOfPartitionForRecord);
            return;
        }
        if (isPartitionRemovedOrNeverAssigned(consumerRecord)) {
            log.debug("Record in buffer for a partition no longer assigned. Dropping. TP: {} rec: {}", KafkaUtils.toTopicPartition(consumerRecord), consumerRecord);
        }
        if (isRecordPreviouslyCompleted(consumerRecord)) {
            log.trace("Record previously completed, skipping. offset: {}", Long.valueOf(consumerRecord.offset()));
            return;
        }
        WorkContainer<K, V> workContainer = new WorkContainer<>(l.longValue(), consumerRecord, this.options.getRetryDelayProvider(), this.clock);
        this.sm.addWorkContainer(workContainer);
        addWorkContainer(workContainer);
    }

    public Map<TopicPartition, OffsetAndMetadata> collectDirtyCommitData() {
        HashMap hashMap = new HashMap();
        for (PartitionState<K, V> partitionState : getAssignedPartitions().values()) {
            partitionState.getCommitDataIfDirty().ifPresent(offsetAndMetadata -> {
                hashMap.put(partitionState.getTp(), offsetAndMetadata);
            });
        }
        return hashMap;
    }

    private Map<TopicPartition, PartitionState<K, V>> getAssignedPartitions() {
        return Collections.unmodifiableMap((Map) this.partitionStates.entrySet().stream().filter(entry -> {
            return !((PartitionState) entry.getValue()).isRemoved();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })));
    }

    public boolean couldBeTakenAsWork(WorkContainer<?, ?> workContainer) {
        if (checkIfWorkIsStale(workContainer)) {
            log.debug("Work is in queue with stale epoch or no longer assigned. Skipping. Shard it came from will/was removed during partition revocation. WC: {}", workContainer);
            return false;
        }
        if (isAllowedMoreRecords(workContainer) || isBlockingProgress(workContainer)) {
            return true;
        }
        log.debug("Not allowed more records for the partition ({}) as set from previous encode run (blocked), that this record ({}) belongs to, due to offset encoding back pressure, is within the encoded payload already (offset lower than highest succeeded, not in flight ({}), continuing on to next container in shardEntry.", new Object[]{workContainer.getTopicPartition(), Long.valueOf(workContainer.offset()), Boolean.valueOf(workContainer.isNotInFlight())});
        return false;
    }

    private boolean isBlockingProgress(WorkContainer<?, ?> workContainer) {
        return workContainer.offset() < getPartitionState(workContainer.getTopicPartition()).getOffsetHighestSucceeded();
    }

    public PartitionStateManager(Consumer<K, V> consumer, ShardManager<K, V> shardManager, ParallelConsumerOptions<K, V> parallelConsumerOptions, Clock clock) {
        this.consumer = consumer;
        this.sm = shardManager;
        this.options = parallelConsumerOptions;
        this.clock = clock;
    }

    public static double getUSED_PAYLOAD_THRESHOLD_MULTIPLIER() {
        return USED_PAYLOAD_THRESHOLD_MULTIPLIER;
    }

    public static void setUSED_PAYLOAD_THRESHOLD_MULTIPLIER(double d) {
        USED_PAYLOAD_THRESHOLD_MULTIPLIER = d;
    }
}
