package io.confluent.parallelconsumer.state;

import io.confluent.csid.utils.JavaUtils;
import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.metrics.PCMetrics;
import io.confluent.parallelconsumer.metrics.PCMetricsDef;
import io.confluent.parallelconsumer.offsets.NoEncodingPossibleException;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Tag;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/parallelconsumer/state/PartitionState.class */
public class PartitionState<K, V> {
    private static final Logger log;
    public static final long KAFKA_OFFSET_ABSENCE = -1;
    private final PCModule<K, V> module;

    @NonNull
    private final TopicPartition tp;

    @NonNull
    private ConcurrentSkipListMap<Long, Optional<ConsumerRecord<K, V>>> incompleteOffsets;
    private boolean dirty;
    private long offsetHighestSeen;
    private final long partitionsAssignmentEpoch;
    private long lastCommittedOffset;
    private Gauge lastCommittedOffsetGauge;
    private Gauge highestSeenOffsetGauge;
    private Gauge highestCompletedOffsetGauge;
    private Gauge highestSequentialSucceededOffsetGauge;
    private Gauge numberOfIncompletesGauge;
    private Gauge ephochGauge;
    private DistributionSummary ratioPayloadUsedDistributionSummary;
    private DistributionSummary ratioMetadataSpaceUsedDistributionSummary;
    private final PCMetrics pcMetrics;
    static final /* synthetic */ boolean $assertionsDisabled;
    private boolean bootstrapPhase = true;
    private long offsetHighestSucceeded = -1;
    private boolean allowedMoreRecords = true;
    private boolean stateChangedSinceCommitStart = false;

    public PartitionState(long j, PCModule<K, V> pCModule, TopicPartition topicPartition, OffsetMapCodecManager.HighestOffsetAndIncompletes highestOffsetAndIncompletes) {
        this.module = pCModule;
        this.tp = topicPartition;
        this.partitionsAssignmentEpoch = j;
        this.pcMetrics = this.module.pcMetrics();
        initStateFromOffsetData(highestOffsetAndIncompletes);
        initMetrics();
    }

    private void initStateFromOffsetData(OffsetMapCodecManager.HighestOffsetAndIncompletes highestOffsetAndIncompletes) {
        this.offsetHighestSeen = highestOffsetAndIncompletes.getHighestSeenOffset().orElse(-1L).longValue();
        this.incompleteOffsets = new ConcurrentSkipListMap<>();
        highestOffsetAndIncompletes.getIncompleteOffsets().forEach(l -> {
            this.incompleteOffsets.put(l, Optional.empty());
        });
        this.offsetHighestSucceeded = this.offsetHighestSeen;
    }

    private void maybeRaiseHighestSeenOffset(long j) {
        if (j >= this.offsetHighestSeen) {
            log.trace("Updating highest seen - was: {} now: {}", Long.valueOf(this.offsetHighestSeen), Long.valueOf(j));
            this.offsetHighestSeen = j;
        }
    }

    public void onOffsetCommitSuccess(OffsetAndMetadata offsetAndMetadata) {
        this.lastCommittedOffset = offsetAndMetadata.offset();
        setClean();
    }

    private void setClean() {
        if (this.stateChangedSinceCommitStart) {
            return;
        }
        setDirty(false);
    }

    private void setDirty() {
        this.stateChangedSinceCommitStart = true;
        setDirty(true);
    }

    public boolean isRecordPreviouslyCompleted(ConsumerRecord<K, V> consumerRecord) {
        long offset = consumerRecord.offset();
        return !this.incompleteOffsets.containsKey(Long.valueOf(offset)) && offset <= this.offsetHighestSucceeded;
    }

    public boolean hasIncompleteOffsets() {
        return !this.incompleteOffsets.isEmpty();
    }

    public int getNumberOfIncompleteOffsets() {
        return this.incompleteOffsets.size();
    }

    public void onSuccess(long j) {
        boolean z = this.incompleteOffsets.remove(Long.valueOf(j)) != null;
        if (!$assertionsDisabled && !z) {
            throw new AssertionError();
        }
        updateHighestSucceededOffsetSoFar(j);
        setDirty();
    }

    public void onFailure(WorkContainer<K, V> workContainer) {
    }

    private void updateHighestSucceededOffsetSoFar(long j) {
        long offsetHighestSucceeded = getOffsetHighestSucceeded();
        if (j > offsetHighestSucceeded) {
            log.trace("Updating highest completed - was: {} now: {}", Long.valueOf(offsetHighestSucceeded), Long.valueOf(j));
            this.offsetHighestSucceeded = j;
        }
    }

    private boolean epochIsStale(EpochAndRecordsMap<K, V>.RecordsAndEpoch recordsAndEpoch) {
        return !Objects.equals(recordsAndEpoch.getEpochOfPartitionAtPoll(), Long.valueOf(getPartitionsAssignmentEpoch()));
    }

    public void maybeRegisterNewPollBatchAsWork(@NonNull EpochAndRecordsMap<K, V>.RecordsAndEpoch recordsAndEpoch) {
        if (recordsAndEpoch == null) {
            throw new NullPointerException("recordsAndEpoch is marked non-null but is null");
        }
        if (epochIsStale(recordsAndEpoch)) {
            log.debug("Inbound record of work has epoch ({}) not matching currently assigned epoch for the applicable partition ({}), skipping", recordsAndEpoch.getEpochOfPartitionAtPoll(), Long.valueOf(getPartitionsAssignmentEpoch()));
            return;
        }
        maybeTruncateOrPruneTrackedOffsets(recordsAndEpoch);
        long longValue = recordsAndEpoch.getEpochOfPartitionAtPoll().longValue();
        for (ConsumerRecord<K, V> consumerRecord : recordsAndEpoch.getRecords()) {
            if (isRecordPreviouslyCompleted(consumerRecord)) {
                log.trace("Record previously completed, skipping. offset: {}", Long.valueOf(consumerRecord.offset()));
            } else {
                getShardManager().addWorkContainer(longValue, consumerRecord);
                addNewIncompleteRecord(consumerRecord);
            }
        }
    }

    private ShardManager<K, V> getShardManager() {
        return this.module.workManager().getSm();
    }

    public boolean isPartitionRemovedOrNeverAssigned() {
        return false;
    }

    public void addNewIncompleteRecord(ConsumerRecord<K, V> consumerRecord) {
        long offset = consumerRecord.offset();
        maybeRaiseHighestSeenOffset(offset);
        this.incompleteOffsets.put(Long.valueOf(offset), Optional.of(consumerRecord));
    }

    private void maybeTruncateBelowOrAbove(long j) {
        if (this.bootstrapPhase) {
            this.bootstrapPhase = false;
            long offsetToCommit = getOffsetToCommit();
            boolean z = j > offsetToCommit;
            boolean z2 = j < offsetToCommit;
            if (!z) {
                if (z2) {
                    log.warn("Bootstrap polled offset has been reset to an earlier offset ({}) for partition {} of topic {} - truncating state - all records above (including this) will be replayed. Was expecting {} but bootstrap poll was {}. Could be caused by record retention or compaction and offset reset policy EARLIEST.", new Object[]{Long.valueOf(j), Integer.valueOf(this.tp.partition()), this.tp.topic(), Long.valueOf(offsetToCommit), Long.valueOf(j)});
                    initStateFromOffsetData(OffsetMapCodecManager.HighestOffsetAndIncompletes.of());
                    return;
                }
                return;
            }
            log.warn("Truncating state - removing records lower than {} from partition {} of topic {}. Offsets have been removed from the partition by the broker or committed offset has been raised. Bootstrap polled {} but expected {} from loaded commit data. Could be caused by record retention or compaction and offset reset policy LATEST.", new Object[]{Long.valueOf(j), Integer.valueOf(this.tp.partition()), this.tp.topic(), Long.valueOf(j), Long.valueOf(offsetToCommit)});
            NavigableSet<Long> headSet = this.incompleteOffsets.keySet().headSet(Long.valueOf(j), false);
            ConcurrentSkipListMap<Long, Optional<ConsumerRecord<K, V>>> concurrentSkipListMap = this.incompleteOffsets;
            Objects.requireNonNull(concurrentSkipListMap);
            headSet.forEach((v1) -> {
                r1.remove(v1);
            });
        }
    }

    public boolean isRemoved() {
        return false;
    }

    public Optional<OffsetAndMetadata> getCommitDataIfDirty() {
        if (!isDirty()) {
            return Optional.empty();
        }
        this.stateChangedSinceCommitStart = false;
        return Optional.of(createOffsetAndMetadata());
    }

    protected OffsetAndMetadata createOffsetAndMetadata() {
        Optional<String> tryToEncodeOffsets = tryToEncodeOffsets();
        long offsetToCommit = getOffsetToCommit();
        return (OffsetAndMetadata) tryToEncodeOffsets.map(str -> {
            return new OffsetAndMetadata(offsetToCommit, str);
        }).orElseGet(() -> {
            return new OffsetAndMetadata(offsetToCommit);
        });
    }

    protected long getOffsetToCommit() {
        return getOffsetHighestSequentialSucceeded() + 1;
    }

    public List<Long> getAllIncompleteOffsets() {
        return Collections.unmodifiableList((List) this.incompleteOffsets.keySet().parallelStream().collect(Collectors.toList()));
    }

    public SortedSet<Long> getIncompleteOffsetsBelowHighestSucceeded() {
        long offsetHighestSucceeded = getOffsetHighestSucceeded();
        return (SortedSet) this.incompleteOffsets.keySet().parallelStream().filter(l -> {
            return l.longValue() < offsetHighestSucceeded;
        }).collect(JavaUtils.toTreeSet());
    }

    public long getOffsetHighestSequentialSucceeded() {
        long j = this.offsetHighestSucceeded;
        Long ceiling = this.incompleteOffsets.keySet().ceiling(-1L);
        return ceiling == null ? j : ceiling.longValue() - 1;
    }

    private Optional<String> tryToEncodeOffsets() {
        if (this.incompleteOffsets.isEmpty()) {
            setAllowedMoreRecords(true);
            return Optional.empty();
        }
        try {
            OffsetMapCodecManager offsetMapCodecManager = new OffsetMapCodecManager(this.module);
            long offsetToCommit = getOffsetToCommit();
            long offsetHighestSucceeded = getOffsetHighestSucceeded() - offsetToCommit;
            String makeOffsetMetadataPayload = offsetMapCodecManager.makeOffsetMetadataPayload(offsetToCommit, this);
            this.ratioPayloadUsedDistributionSummary.record(makeOffsetMetadataPayload.length() / offsetHighestSucceeded);
            this.ratioMetadataSpaceUsedDistributionSummary.record(makeOffsetMetadataPayload.length() / OffsetMapCodecManager.DefaultMaxMetadataSize);
            return updateBlockFromEncodingResult(makeOffsetMetadataPayload) ? Optional.empty() : Optional.of(makeOffsetMetadataPayload);
        } catch (NoEncodingPossibleException e) {
            setAllowedMoreRecords(false);
            log.warn("No encodings could be used to encode the offset map, skipping. Warning: messages might be replayed on rebalance.", e);
            return Optional.empty();
        }
    }

    private boolean updateBlockFromEncodingResult(String str) {
        int length = str.length();
        boolean z = false;
        if (length > OffsetMapCodecManager.DefaultMaxMetadataSize) {
            z = true;
            setAllowedMoreRecords(false);
            log.warn("Offset map data too large (size: {}) to fit in metadata payload hard limit of {} - cannot include in commit. Warning: messages might be replayed on rebalance. See kafka.coordinator.group.OffsetConfig#DefaultMaxMetadataSize = {} and issue #47.", new Object[]{Integer.valueOf(length), Integer.valueOf(OffsetMapCodecManager.DefaultMaxMetadataSize), Integer.valueOf(OffsetMapCodecManager.DefaultMaxMetadataSize)});
        } else if (length > getPressureThresholdValue()) {
            setAllowedMoreRecords(false);
            log.warn("Payload size {} higher than threshold {}, but still lower than max {}. Will write payload, but will not allow further messages, in order to allow the offset data to shrink (via succeeding messages).", new Object[]{Integer.valueOf(length), Double.valueOf(getPressureThresholdValue()), Integer.valueOf(OffsetMapCodecManager.DefaultMaxMetadataSize)});
        } else {
            if (!this.allowedMoreRecords) {
                setAllowedMoreRecords(true);
            }
            log.debug("Payload size {} within threshold {}", Integer.valueOf(length), Double.valueOf(getPressureThresholdValue()));
        }
        return z;
    }

    private double getPressureThresholdValue() {
        return OffsetMapCodecManager.DefaultMaxMetadataSize * PartitionStateManager.getUSED_PAYLOAD_THRESHOLD_MULTIPLIER();
    }

    public void onPartitionsRemoved(ShardManager<K, V> shardManager) {
        shardManager.removeAnyShardEntriesReferencedFrom(this.incompleteOffsets.values());
        deregisterMetrics();
    }

    public boolean isBlocked() {
        return !isAllowedMoreRecords();
    }

    private void maybeTruncateOrPruneTrackedOffsets(EpochAndRecordsMap<?, ?>.RecordsAndEpoch recordsAndEpoch) {
        List<ConsumerRecord<?, ?>> records = recordsAndEpoch.getRecords();
        if (records.isEmpty()) {
            log.warn("Polled an empty batch of records? {}", recordsAndEpoch);
            return;
        }
        long offset = ((ConsumerRecord) JavaUtils.getFirst(records).get()).offset();
        maybeTruncateBelowOrAbove(offset);
        Set set = (Set) records.stream().map((v0) -> {
            return v0.offset();
        }).collect(Collectors.toSet());
        long offset2 = ((ConsumerRecord) JavaUtils.getLast(records).get()).offset();
        ArrayList arrayList = new ArrayList();
        Iterator<Long> it = this.incompleteOffsets.keySet().subSet(Long.valueOf(offset), true, Long.valueOf(offset2), true).iterator();
        while (it.hasNext()) {
            long longValue = it.next().longValue();
            if (!set.contains(Long.valueOf(longValue))) {
                arrayList.add(Long.valueOf(longValue));
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        log.warn("Offsets {} have been removed from partition {} (as they were not been returned within a polled batch which should have contained them - batch offset range is {} to {}), so they be removed from tracking state, as they will never be sent again to be retried. This can be caused by PC rebalancing across a partition which has been compacted on offsets above the committed base offset, after initial load and before a rebalance.", new Object[]{arrayList, getTp(), Long.valueOf(offset), Long.valueOf(offset2)});
        ConcurrentSkipListMap<Long, Optional<ConsumerRecord<K, V>>> concurrentSkipListMap = this.incompleteOffsets;
        Objects.requireNonNull(concurrentSkipListMap);
        arrayList.forEach((v1) -> {
            r1.remove(v1);
        });
    }

    private boolean isBlockingProgress(WorkContainer<?, ?> workContainer) {
        return workContainer.offset() < getOffsetHighestSucceeded();
    }

    public boolean couldBeTakenAsWork(WorkContainer<K, V> workContainer) {
        if (checkIfWorkIsStale(workContainer)) {
            log.debug("Work is in queue with stale epoch or no longer assigned. Skipping. Shard it came from will/was removed during partition revocation. WC: {}", workContainer);
            return false;
        }
        if (isAllowedMoreRecords()) {
            log.debug("Partition is allowed more records. Taking work. WC: {}", workContainer);
            return true;
        }
        if (isBlockingProgress(workContainer)) {
            log.debug("Partition is blocked, but this record is blocking progress. Taking work. WC: {}", workContainer);
            return true;
        }
        log.debug("Not allowed more records for the partition ({}) as set from previous encode run (blocked), that this record ({}) belongs to, due to offset encoding back pressure, is within the encoded payload already (offset lower than highest succeeded, not in flight ({}), continuing on to next container in shardEntry.", new Object[]{workContainer.getTopicPartition(), Long.valueOf(workContainer.offset()), Boolean.valueOf(workContainer.isNotInFlight())});
        return false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean checkIfWorkIsStale(WorkContainer<?, ?> workContainer) {
        Long valueOf = Long.valueOf(getPartitionsAssignmentEpoch());
        long epoch = workContainer.getEpoch();
        boolean isPartitionRemovedOrNeverAssigned = isPartitionRemovedOrNeverAssigned();
        if (!(valueOf.longValue() != epoch) && !isPartitionRemovedOrNeverAssigned) {
            return false;
        }
        log.debug("Epoch mismatch {} vs {} for record {}. Skipping message - it's partition has already assigned to a different consumer.", new Object[]{Long.valueOf(epoch), valueOf, workContainer});
        return true;
    }

    private void initMetrics() {
        TopicPartition tp = getTp();
        if (tp == null) {
            return;
        }
        Tag[] tagArr = {Tag.of("topic", tp.topic()), Tag.of("partition", String.valueOf(tp.partition()))};
        this.lastCommittedOffsetGauge = this.pcMetrics.gaugeFromMetricDef(PCMetricsDef.PARTITION_LAST_COMMITTED_OFFSET, this, partitionState -> {
            return partitionState.lastCommittedOffset;
        }, tagArr);
        this.highestSeenOffsetGauge = this.pcMetrics.gaugeFromMetricDef(PCMetricsDef.PARTITION_HIGHEST_SEEN_OFFSET, this, (v0) -> {
            return v0.getOffsetHighestSeen();
        }, tagArr);
        this.highestCompletedOffsetGauge = this.pcMetrics.gaugeFromMetricDef(PCMetricsDef.PARTITION_HIGHEST_COMPLETED_OFFSET, this, (v0) -> {
            return v0.getOffsetHighestSucceeded();
        }, tagArr);
        this.highestSequentialSucceededOffsetGauge = this.pcMetrics.gaugeFromMetricDef(PCMetricsDef.PARTITION_HIGHEST_SEQUENTIAL_SUCCEEDED_OFFSET, this, (v0) -> {
            return v0.getOffsetHighestSequentialSucceeded();
        }, tagArr);
        this.numberOfIncompletesGauge = this.pcMetrics.gaugeFromMetricDef(PCMetricsDef.PARTITION_INCOMPLETE_OFFSETS, this, partitionState2 -> {
            return partitionState2.incompleteOffsets.size();
        }, tagArr);
        this.ephochGauge = this.pcMetrics.gaugeFromMetricDef(PCMetricsDef.PARTITION_ASSIGNMENT_EPOCH, this, (v0) -> {
            return v0.getPartitionsAssignmentEpoch();
        }, tagArr);
        this.ratioMetadataSpaceUsedDistributionSummary = this.pcMetrics.getDistributionSummaryFromMetricDef(PCMetricsDef.METADATA_SPACE_USED, tagArr);
        this.ratioPayloadUsedDistributionSummary = this.pcMetrics.getDistributionSummaryFromMetricDef(PCMetricsDef.PAYLOAD_RATIO_USED, tagArr);
    }

    private void deregisterMetrics() {
        this.pcMetrics.removeMeter((Meter) this.lastCommittedOffsetGauge);
        this.pcMetrics.removeMeter((Meter) this.highestSeenOffsetGauge);
        this.pcMetrics.removeMeter((Meter) this.highestCompletedOffsetGauge);
        this.pcMetrics.removeMeter((Meter) this.highestSequentialSucceededOffsetGauge);
        this.pcMetrics.removeMeter((Meter) this.numberOfIncompletesGauge);
        this.pcMetrics.removeMeter((Meter) this.ephochGauge);
        this.pcMetrics.removeMeter((Meter) this.ratioMetadataSpaceUsedDistributionSummary);
        this.pcMetrics.removeMeter((Meter) this.ratioPayloadUsedDistributionSummary);
    }

    public String toString() {
        return "PartitionState(module=" + this.module + ", tp=" + getTp() + ", incompleteOffsets=" + this.incompleteOffsets + ", bootstrapPhase=" + this.bootstrapPhase + ", dirty=" + isDirty() + ", offsetHighestSeen=" + getOffsetHighestSeen() + ", offsetHighestSucceeded=" + getOffsetHighestSucceeded() + ", allowedMoreRecords=" + isAllowedMoreRecords() + ", partitionsAssignmentEpoch=" + getPartitionsAssignmentEpoch() + ", lastCommittedOffset=" + this.lastCommittedOffset + ", lastCommittedOffsetGauge=" + this.lastCommittedOffsetGauge + ", highestSeenOffsetGauge=" + this.highestSeenOffsetGauge + ", highestCompletedOffsetGauge=" + this.highestCompletedOffsetGauge + ", highestSequentialSucceededOffsetGauge=" + this.highestSequentialSucceededOffsetGauge + ", numberOfIncompletesGauge=" + this.numberOfIncompletesGauge + ", ephochGauge=" + this.ephochGauge + ", ratioPayloadUsedDistributionSummary=" + this.ratioPayloadUsedDistributionSummary + ", ratioMetadataSpaceUsedDistributionSummary=" + this.ratioMetadataSpaceUsedDistributionSummary + ", pcMetrics=" + this.pcMetrics + ", stateChangedSinceCommitStart=" + this.stateChangedSinceCommitStart + ")";
    }

    @NonNull
    public TopicPartition getTp() {
        return this.tp;
    }

    void setIncompleteOffsets(@NonNull ConcurrentSkipListMap<Long, Optional<ConsumerRecord<K, V>>> concurrentSkipListMap) {
        if (concurrentSkipListMap == null) {
            throw new NullPointerException("incompleteOffsets is marked non-null but is null");
        }
        this.incompleteOffsets = concurrentSkipListMap;
    }

    private void setDirty(boolean z) {
        this.dirty = z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isDirty() {
        return this.dirty;
    }

    public long getOffsetHighestSeen() {
        return this.offsetHighestSeen;
    }

    public long getOffsetHighestSucceeded() {
        return this.offsetHighestSucceeded;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isAllowedMoreRecords() {
        return this.allowedMoreRecords;
    }

    private void setAllowedMoreRecords(boolean z) {
        this.allowedMoreRecords = z;
    }

    public long getPartitionsAssignmentEpoch() {
        return this.partitionsAssignmentEpoch;
    }

    static {
        $assertionsDisabled = !PartitionState.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(PartitionState.class);
    }
}
