/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.state;

import com.google.common.truth.Truth;
import io.confluent.parallelconsumer.ManagedTruth;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.internal.PCModuleTestEnv;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import io.confluent.parallelconsumer.state.ModelUtils;
import io.confluent.parallelconsumer.state.PartitionState;
import io.confluent.parallelconsumer.state.PolledTestBatch;
import java.util.List;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;
import one.util.streamex.LongStreamEx;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniSets;

class PartitionStateCommittedOffsetTest {
    private static final Logger log = LoggerFactory.getLogger(PartitionStateCommittedOffsetTest.class);
    ModelUtils mu = new ModelUtils(new PCModuleTestEnv());
    TopicPartition tp = new TopicPartition("topic", 0);
    long unexpectedlyHighOffset = 20L;
    long previouslyCommittedOffset = 11L;
    final long highestSeenOffset = 101L;
    List<Long> trackedIncompletes = UniLists.of((Object)this.previouslyCommittedOffset, (Object)15L, (Object)this.unexpectedlyHighOffset, (Object)60L, (Object)80L, (Object)95L, (Object)96L, (Object)97L, (Object)98L, (Object)100L);
    List<Long> expectedTruncatedIncompletes = this.trackedIncompletes.stream().filter(offset -> offset >= this.unexpectedlyHighOffset).collect(Collectors.toList());
    OffsetMapCodecManager.HighestOffsetAndIncompletes offsetData = new OffsetMapCodecManager.HighestOffsetAndIncompletes(Optional.of(101L), new TreeSet<Long>(this.trackedIncompletes));
    PartitionState<String, String> state = new PartitionState(0L, (PCModule)this.mu.getModule(), this.tp, this.offsetData);

    PartitionStateCommittedOffsetTest() {
    }

    @Test
    void concurrentSkipListMapSanityCheck() {
        ConcurrentSkipListMap<Long, Boolean> incompletes = new ConcurrentSkipListMap<Long, Boolean>();
        incompletes.put(2L, true);
        incompletes.put(3L, true);
        incompletes.put(4L, true);
        incompletes.put(5L, true);
        incompletes.put(6L, true);
        ConcurrentSkipListMap<Long, Boolean> polled = new ConcurrentSkipListMap<Long, Boolean>();
        polled.put(3L, true);
        polled.put(5L, true);
        long lowPoll = (Long)polled.firstKey();
        long highPoll = (Long)polled.lastKey();
        NavigableSet<Long> polledRange = incompletes.keySet().subSet(lowPoll, true, highPoll, true);
        polledRange.forEach(x -> {
            if (polled.containsKey(x)) {
                log.warn("Found: {}", x);
            } else {
                log.warn("Not found, dropping: {}", x);
                incompletes.remove(x);
            }
        });
        ManagedTruth.assertThat((SortedSet)incompletes.keySet()).containsExactly(new Object[]{2L, 3L, 5L, 6L});
    }

    @Test
    void compactedTopic() {
        Set missingOffsets = UniSets.of((Object)80L, (Object)95L, (Object)97L);
        long slightlyLowerRange = 99L;
        List polledOffsetsWithCompactedRemoved = LongStreamEx.range((long)this.previouslyCommittedOffset, (long)slightlyLowerRange).filter(offset -> !missingOffsets.contains(offset)).boxed().toList();
        PolledTestBatch polledTestBatchWithoutMissingOffsets = new PolledTestBatch(this.mu, this.tp, polledOffsetsWithCompactedRemoved);
        this.addPollToState(this.state, polledTestBatchWithoutMissingOffsets);
        OffsetAndMetadata offsetAndMetadata = this.state.createOffsetAndMetadata();
        ManagedTruth.assertThat(offsetAndMetadata).getOffset().isEqualTo((Object)this.previouslyCommittedOffset);
        List incompletesWithoutMissingOffsets = this.trackedIncompletes.stream().filter(offset -> !missingOffsets.contains(offset)).collect(Collectors.toList());
        ManagedTruth.assertThat(this.state).getAllIncompleteOffsets().containsExactlyElementsIn(incompletesWithoutMissingOffsets);
    }

    @Test
    void committedOffsetLower() {
        long randomlyChosenStepBackwards = 5L;
        long unexpectedLowerOffset = this.previouslyCommittedOffset - randomlyChosenStepBackwards;
        PolledTestBatch polledTestBatch = new PolledTestBatch(this.mu, this.tp, unexpectedLowerOffset, 101L);
        this.addPollToState(this.state, polledTestBatch);
        OffsetAndMetadata offsetAndMetadata = this.state.createOffsetAndMetadata();
        ManagedTruth.assertThat(offsetAndMetadata).getOffset().isEqualTo((Object)unexpectedLowerOffset);
        ManagedTruth.assertThat(this.state).getAllIncompleteOffsets().containsExactlyElementsIn((Iterable)LongStreamEx.range((long)unexpectedLowerOffset, (long)102L).boxed().toList());
    }

    private void addPollToState(PartitionState<String, String> state, PolledTestBatch polledTestBatch) {
        state.maybeRegisterNewPollBatchAsWork(polledTestBatch.polledRecordBatch.records(state.getTp()));
    }

    @Test
    void bootstrapPollOffsetHigherDueToRetentionOrCompaction() {
        PolledTestBatch polledTestBatch = new PolledTestBatch(this.mu, this.tp, this.unexpectedlyHighOffset, 101L);
        this.addPollToState(this.state, polledTestBatch);
        Truth.assertThat((Long)this.state.getOffsetToCommit()).isEqualTo((Object)this.unexpectedlyHighOffset);
        OffsetAndMetadata offsetAndMetadata = this.state.createOffsetAndMetadata();
        ManagedTruth.assertThat(offsetAndMetadata).getOffset().isEqualTo((Object)this.unexpectedlyHighOffset);
        ManagedTruth.assertThat(this.state).getAllIncompleteOffsets().containsExactlyElementsIn(this.expectedTruncatedIncompletes);
    }
}

