package io.confluent.parallelconsumer.state;

import com.google.common.truth.Truth;
import io.confluent.parallelconsumer.ManagedTruth;
import io.confluent.parallelconsumer.internal.PCModuleTestEnv;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;
import one.util.streamex.LongStreamEx;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniSets;

/* loaded from: input_file:io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.class */
class PartitionStateCommittedOffsetTest {
    private static final Logger log = LoggerFactory.getLogger(PartitionStateCommittedOffsetTest.class);
    ModelUtils mu = new ModelUtils(new PCModuleTestEnv());
    TopicPartition tp = new TopicPartition("topic", 0);
    long unexpectedlyHighOffset = 20;
    long previouslyCommittedOffset = 11;
    final long highestSeenOffset = 101;
    List<Long> trackedIncompletes = UniLists.of(Long.valueOf(this.previouslyCommittedOffset), 15L, Long.valueOf(this.unexpectedlyHighOffset), 60L, 80L, 95L, 96L, 97L, 98L, 100L);
    List<Long> expectedTruncatedIncompletes = (List) this.trackedIncompletes.stream().filter(l -> {
        return l.longValue() >= this.unexpectedlyHighOffset;
    }).collect(Collectors.toList());
    OffsetMapCodecManager.HighestOffsetAndIncompletes offsetData = new OffsetMapCodecManager.HighestOffsetAndIncompletes(Optional.of(101L), new TreeSet(this.trackedIncompletes));
    PartitionState<String, String> state = new PartitionState<>(0, this.mu.getModule(), this.tp, this.offsetData);

    PartitionStateCommittedOffsetTest() {
    }

    @Test
    void concurrentSkipListMapSanityCheck() {
        ConcurrentSkipListMap concurrentSkipListMap = new ConcurrentSkipListMap();
        concurrentSkipListMap.put(2L, true);
        concurrentSkipListMap.put(3L, true);
        concurrentSkipListMap.put(4L, true);
        concurrentSkipListMap.put(5L, true);
        concurrentSkipListMap.put(6L, true);
        ConcurrentSkipListMap concurrentSkipListMap2 = new ConcurrentSkipListMap();
        concurrentSkipListMap2.put(3L, true);
        concurrentSkipListMap2.put(5L, true);
        concurrentSkipListMap.keySet().subSet(Long.valueOf(((Long) concurrentSkipListMap2.firstKey()).longValue()), true, Long.valueOf(((Long) concurrentSkipListMap2.lastKey()).longValue()), true).forEach(l -> {
            if (concurrentSkipListMap2.containsKey(l)) {
                log.warn("Found: {}", l);
            } else {
                log.warn("Not found, dropping: {}", l);
                concurrentSkipListMap.remove(l);
            }
        });
        ManagedTruth.assertThat((SortedSet) concurrentSkipListMap.keySet()).containsExactly(new Object[]{2L, 3L, 5L, 6L});
    }

    @Test
    void compactedTopic() {
        Set of = UniSets.of(80L, 95L, 97L);
        addPollToState(this.state, new PolledTestBatch(this.mu, this.tp, LongStreamEx.range(this.previouslyCommittedOffset, 99L).filter(j -> {
            return !of.contains(Long.valueOf(j));
        }).boxed().toList()));
        ManagedTruth.assertThat(this.state.createOffsetAndMetadata()).getOffset().isEqualTo(Long.valueOf(this.previouslyCommittedOffset));
        ManagedTruth.assertThat(this.state).getAllIncompleteOffsets().containsExactlyElementsIn((List) this.trackedIncompletes.stream().filter(l -> {
            return !of.contains(l);
        }).collect(Collectors.toList()));
    }

    @Test
    void committedOffsetLower() {
        long j = this.previouslyCommittedOffset - 5;
        addPollToState(this.state, new PolledTestBatch(this.mu, this.tp, j, 101L));
        ManagedTruth.assertThat(this.state.createOffsetAndMetadata()).getOffset().isEqualTo(Long.valueOf(j));
        ManagedTruth.assertThat(this.state).getAllIncompleteOffsets().containsExactlyElementsIn(LongStreamEx.range(j, 102L).boxed().toList());
    }

    private void addPollToState(PartitionState<String, String> partitionState, PolledTestBatch polledTestBatch) {
        partitionState.maybeRegisterNewPollBatchAsWork(polledTestBatch.polledRecordBatch.records(partitionState.getTp()));
    }

    @Test
    void bootstrapPollOffsetHigherDueToRetentionOrCompaction() {
        addPollToState(this.state, new PolledTestBatch(this.mu, this.tp, this.unexpectedlyHighOffset, 101L));
        Truth.assertThat(Long.valueOf(this.state.getOffsetToCommit())).isEqualTo(Long.valueOf(this.unexpectedlyHighOffset));
        ManagedTruth.assertThat(this.state.createOffsetAndMetadata()).getOffset().isEqualTo(Long.valueOf(this.unexpectedlyHighOffset));
        ManagedTruth.assertThat(this.state).getAllIncompleteOffsets().containsExactlyElementsIn(this.expectedTruncatedIncompletes);
    }
}
