package io.confluent.parallelconsumer.state;

import io.confluent.parallelconsumer.offsets.NoEncodingPossibleException;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import java.util.Collections;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/parallelconsumer/state/PartitionState.class */
public class PartitionState<K, V> {
    private static final Logger log;
    public static final long KAFKA_OFFSET_ABSENCE = -1;
    private final TopicPartition tp;
    private final ConcurrentSkipListSet<Long> incompleteOffsets;
    private boolean dirty;
    private long offsetHighestSeen;
    private long offsetHighestSucceeded;
    private boolean allowedMoreRecords = true;
    private final NavigableMap<Long, WorkContainer<K, V>> commitQueue = new ConcurrentSkipListMap();
    static final /* synthetic */ boolean $assertionsDisabled;

    private NavigableMap<Long, WorkContainer<K, V>> getCommitQueue() {
        return Collections.unmodifiableNavigableMap(this.commitQueue);
    }

    public PartitionState(TopicPartition topicPartition, OffsetMapCodecManager.HighestOffsetAndIncompletes highestOffsetAndIncompletes) {
        this.offsetHighestSucceeded = -1L;
        this.tp = topicPartition;
        this.offsetHighestSeen = highestOffsetAndIncompletes.getHighestSeenOffset().orElse(-1L).longValue();
        this.incompleteOffsets = new ConcurrentSkipListSet<>(highestOffsetAndIncompletes.getIncompleteOffsets());
        this.offsetHighestSucceeded = this.offsetHighestSeen;
    }

    private void maybeRaiseHighestSeenOffset(long j) {
        if (j >= this.offsetHighestSeen) {
            log.trace("Updating highest seen - was: {} now: {}", Long.valueOf(this.offsetHighestSeen), Long.valueOf(j));
            this.offsetHighestSeen = j;
        }
    }

    public void onOffsetCommitSuccess(OffsetAndMetadata offsetAndMetadata) {
        setClean();
    }

    private void setClean() {
        setDirty(false);
    }

    private void setDirty() {
        setDirty(true);
    }

    public boolean isRecordPreviouslyCompleted(ConsumerRecord<K, V> consumerRecord) {
        long offset = consumerRecord.offset();
        return !this.incompleteOffsets.contains(Long.valueOf(offset)) && offset <= this.offsetHighestSeen;
    }

    public boolean hasWorkInCommitQueue() {
        return !this.commitQueue.isEmpty();
    }

    public int getCommitQueueSize() {
        return this.commitQueue.size();
    }

    public void onSuccess(WorkContainer<K, V> workContainer) {
        long offset = workContainer.offset();
        WorkContainer workContainer2 = (WorkContainer) this.commitQueue.remove(Long.valueOf(workContainer.offset()));
        if (!$assertionsDisabled && workContainer2 == null) {
            throw new AssertionError();
        }
        boolean remove = this.incompleteOffsets.remove(Long.valueOf(offset));
        if (!$assertionsDisabled && !remove) {
            throw new AssertionError();
        }
        updateHighestSucceededOffsetSoFar(workContainer);
        setDirty();
    }

    public void onFailure(WorkContainer<K, V> workContainer) {
    }

    private void updateHighestSucceededOffsetSoFar(WorkContainer<K, V> workContainer) {
        long offsetHighestSucceeded = getOffsetHighestSucceeded();
        long offset = workContainer.offset();
        if (offset > offsetHighestSucceeded) {
            log.trace("Updating highest completed - was: {} now: {}", Long.valueOf(offsetHighestSucceeded), Long.valueOf(offset));
            this.offsetHighestSucceeded = offset;
        }
    }

    public void addWorkContainer(WorkContainer<K, V> workContainer) {
        maybeRaiseHighestSeenOffset(workContainer.offset());
        this.commitQueue.put(Long.valueOf(workContainer.offset()), workContainer);
        this.incompleteOffsets.add(Long.valueOf(workContainer.offset()));
    }

    public boolean isRemoved() {
        return false;
    }

    public Optional<OffsetAndMetadata> getCommitDataIfDirty() {
        return isDirty() ? Optional.of(createOffsetAndMetadata()) : Optional.empty();
    }

    private OffsetAndMetadata createOffsetAndMetadata() {
        Optional<String> tryToEncodeOffsets = tryToEncodeOffsets();
        long nextExpectedPolledOffset = getNextExpectedPolledOffset();
        return (OffsetAndMetadata) tryToEncodeOffsets.map(str -> {
            return new OffsetAndMetadata(nextExpectedPolledOffset, str);
        }).orElseGet(() -> {
            return new OffsetAndMetadata(nextExpectedPolledOffset);
        });
    }

    private long getNextExpectedPolledOffset() {
        return getOffsetHighestSequentialSucceeded() + 1;
    }

    public Set<Long> getAllIncompleteOffsets() {
        return Collections.unmodifiableSet((Set) this.incompleteOffsets.parallelStream().collect(Collectors.toSet()));
    }

    public Set<Long> getIncompleteOffsetsBelowHighestSucceeded() {
        long offsetHighestSucceeded = getOffsetHighestSucceeded();
        return Collections.unmodifiableSet((Set) this.incompleteOffsets.parallelStream().filter(l -> {
            return l.longValue() < offsetHighestSucceeded;
        }).collect(Collectors.toSet()));
    }

    public long getOffsetHighestSequentialSucceeded() {
        return this.incompleteOffsets.isEmpty() ? this.offsetHighestSeen : this.incompleteOffsets.first().longValue() - 1;
    }

    private Optional<String> tryToEncodeOffsets() {
        if (this.incompleteOffsets.isEmpty()) {
            setAllowedMoreRecords(true);
            return Optional.empty();
        }
        try {
            String makeOffsetMetadataPayload = new OffsetMapCodecManager(null).makeOffsetMetadataPayload(getNextExpectedPolledOffset(), this);
            return updateBlockFromEncodingResult(makeOffsetMetadataPayload) ? Optional.empty() : Optional.of(makeOffsetMetadataPayload);
        } catch (NoEncodingPossibleException e) {
            setAllowedMoreRecords(false);
            log.warn("No encodings could be used to encode the offset map, skipping. Warning: messages might be replayed on rebalance.", e);
            return Optional.empty();
        }
    }

    private boolean updateBlockFromEncodingResult(String str) {
        int length = str.length();
        boolean z = false;
        if (length > OffsetMapCodecManager.DefaultMaxMetadataSize) {
            z = true;
            setAllowedMoreRecords(false);
            log.warn("Offset map data too large (size: {}) to fit in metadata payload hard limit of {} - cannot include in commit. Warning: messages might be replayed on rebalance. See kafka.coordinator.group.OffsetConfig#DefaultMaxMetadataSize = {} and issue #47.", new Object[]{Integer.valueOf(length), Integer.valueOf(OffsetMapCodecManager.DefaultMaxMetadataSize), Integer.valueOf(OffsetMapCodecManager.DefaultMaxMetadataSize)});
        } else if (length > getPressureThresholdValue()) {
            setAllowedMoreRecords(false);
            log.warn("Payload size {} higher than threshold {}, but still lower than max {}. Will write payload, but will not allow further messages, in order to allow the offset data to shrink (via succeeding messages).", new Object[]{Integer.valueOf(length), Double.valueOf(getPressureThresholdValue()), Integer.valueOf(OffsetMapCodecManager.DefaultMaxMetadataSize)});
        } else {
            setAllowedMoreRecords(true);
            log.debug("Payload size {} within threshold {}", Integer.valueOf(length), Double.valueOf(getPressureThresholdValue()));
        }
        return z;
    }

    private double getPressureThresholdValue() {
        return OffsetMapCodecManager.DefaultMaxMetadataSize * PartitionStateManager.getUSED_PAYLOAD_THRESHOLD_MULTIPLIER();
    }

    public void onPartitionsRemoved(ShardManager<K, V> shardManager) {
        shardManager.removeAnyShardsReferencedBy(getCommitQueue());
    }

    public boolean isBlocked() {
        return !isAllowedMoreRecords();
    }

    public String toString() {
        return "PartitionState(tp=" + getTp() + ", incompleteOffsets=" + this.incompleteOffsets + ", dirty=" + isDirty() + ", offsetHighestSeen=" + getOffsetHighestSeen() + ", offsetHighestSucceeded=" + getOffsetHighestSucceeded() + ", allowedMoreRecords=" + isAllowedMoreRecords() + ")";
    }

    public TopicPartition getTp() {
        return this.tp;
    }

    private void setDirty(boolean z) {
        this.dirty = z;
    }

    private boolean isDirty() {
        return this.dirty;
    }

    public long getOffsetHighestSeen() {
        return this.offsetHighestSeen;
    }

    public long getOffsetHighestSucceeded() {
        return this.offsetHighestSucceeded;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isAllowedMoreRecords() {
        return this.allowedMoreRecords;
    }

    private void setAllowedMoreRecords(boolean z) {
        this.allowedMoreRecords = z;
    }

    static {
        $assertionsDisabled = !PartitionState.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(PartitionState.class);
    }
}
