package io.confluent.parallelconsumer.offsets;

import com.google.common.truth.Truth;
import io.confluent.csid.utils.KafkaTestUtils;
import io.confluent.parallelconsumer.ManagedTruth;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.integrationTests.TransactionTimeoutsTest;
import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.state.PartitionState;
import io.confluent.parallelconsumer.state.PartitionStateManager;
import io.confluent.parallelconsumer.state.WorkManager;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.assertj.core.api.Assertions;
import org.hamcrest.Matchers;
import org.junit.Assume;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.ResourceAccessMode;
import org.junit.jupiter.api.parallel.ResourceLock;
import org.junit.jupiter.api.parallel.ResourceLocks;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniSets;

/* loaded from: input_file:io/confluent/parallelconsumer/offsets/OffsetEncodingTests.class */
public class OffsetEncodingTests extends ParallelEoSStreamProcessorTestBase {
    private static final Logger log = LoggerFactory.getLogger(OffsetEncodingTests.class);

    /* renamed from: io.confluent.parallelconsumer.offsets.OffsetEncodingTests$1, reason: invalid class name */
    /* loaded from: input_file:io/confluent/parallelconsumer/offsets/OffsetEncodingTests$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$confluent$parallelconsumer$offsets$OffsetEncoding = new int[OffsetEncoding.values().length];

        static {
            try {
                $SwitchMap$io$confluent$parallelconsumer$offsets$OffsetEncoding[OffsetEncoding.BitSet.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$confluent$parallelconsumer$offsets$OffsetEncoding[OffsetEncoding.BitSetCompressed.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$confluent$parallelconsumer$offsets$OffsetEncoding[OffsetEncoding.BitSetV2.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$confluent$parallelconsumer$offsets$OffsetEncoding[OffsetEncoding.RunLength.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$confluent$parallelconsumer$offsets$OffsetEncoding[OffsetEncoding.RunLengthCompressed.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    @Test
    void runLengthDeserialise() {
        ByteBuffer allocate = ByteBuffer.allocate(3);
        allocate.put((byte) 0);
        allocate.putShort((short) 1);
        byte[] bArr = new byte[2];
        allocate.rewind();
        allocate.get(bArr);
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        wrap.get();
        Assertions.assertThat(OffsetRunLength.runLengthDeserialise(wrap.slice())).isEmpty();
    }

    @ValueSource(longs = {10000, 100000, 1000000})
    @ResourceLock(value = "Value doesn't matter, just needs a constant", mode = ResourceAccessMode.READ_WRITE)
    @ParameterizedTest
    void largeIncompleteOffsetValues(long j) {
        HashSet hashSet = new HashSet();
        hashSet.addAll(UniSets.of(123L, 2345L, 8765L));
        OffsetSimultaneousEncoder offsetSimultaneousEncoder = new OffsetSimultaneousEncoder(123L, j, hashSet);
        OffsetSimultaneousEncoder.compressionForced = true;
        offsetSimultaneousEncoder.invoke();
        Map encodingMap = offsetSimultaneousEncoder.getEncodingMap();
        Assertions.assertThat(EncodedOffsetPair.unwrap(offsetSimultaneousEncoder.packSmallest()).getDecodedIncompletes(123L).getIncompleteOffsets()).containsExactlyInAnyOrderElementsOf(hashSet);
        for (OffsetEncoding offsetEncoding : OffsetEncoding.values()) {
            log.info("Testing {}", offsetEncoding);
            byte[] bArr = (byte[]) encodingMap.get(offsetEncoding);
            if (bArr != null) {
                Assertions.assertThat(EncodedOffsetPair.unwrap(offsetSimultaneousEncoder.packEncoding(new EncodedOffsetPair(offsetEncoding, ByteBuffer.wrap(bArr)))).getDecodedIncompletes(123L).getIncompleteOffsets()).as(offsetEncoding.toString(), new Object[0]).containsExactlyInAnyOrderElementsOf(hashSet);
            } else {
                log.info("Encoding not performed: " + offsetEncoding);
            }
        }
        OffsetSimultaneousEncoder.compressionForced = false;
    }

    @EnumSource(OffsetEncoding.class)
    @ParameterizedTest
    @ResourceLocks({@ResourceLock(value = "Value doesn't matter, just needs a constant", mode = ResourceAccessMode.READ), @ResourceLock(value = "Value doesn't matter, just needs a constant", mode = ResourceAccessMode.READ_WRITE)})
    void ensureEncodingGracefullyWorksWhenOffsetsAreVeryLargeAndNotSequential(OffsetEncoding offsetEncoding) {
        Assume.assumeThat("Codec skipped, not applicable", offsetEncoding, Matchers.not(Matchers.in(UniLists.of(OffsetEncoding.ByteArray, OffsetEncoding.ByteArrayCompressed))));
        List<OffsetEncoding> of = UniLists.of(OffsetEncoding.BitSet, OffsetEncoding.BitSetCompressed, OffsetEncoding.BitSetV2, OffsetEncoding.RunLength, OffsetEncoding.RunLengthCompressed);
        OffsetMapCodecManager.forcedCodec = Optional.of(offsetEncoding);
        OffsetSimultaneousEncoder.compressionForced = true;
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 0L, "akey", "avalue"));
        arrayList.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 1L, "akey", "avalue"));
        arrayList.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 4L, "akey", "avalue"));
        arrayList.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 5L, "akey", "avalue"));
        arrayList.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 69L, "akey", "avalue"));
        arrayList.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 100L, "akey", "avalue"));
        arrayList.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 1000L, "akey", "avalue"));
        arrayList.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 20000L, "akey", "avalue"));
        arrayList.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 25000L, "akey", "avalue"));
        arrayList.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 30000L, "akey", "avalue"));
        arrayList.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 40000L, "akey", "avalue"));
        arrayList.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 72767 + 2, "akey", "avalue"));
        int i = 72767 + 2 + 1;
        arrayList.add(new ConsumerRecord(this.INPUT_TOPIC, 0, i, "akey", "avalue"));
        ArrayList arrayList2 = new ArrayList(arrayList);
        arrayList2.remove(arrayList2.stream().filter(consumerRecord -> {
            return consumerRecord.offset() == 0;
        }).findFirst().get());
        arrayList2.remove(arrayList2.stream().filter(consumerRecord2 -> {
            return consumerRecord2.offset() == 69;
        }).findFirst().get());
        arrayList2.remove(arrayList2.stream().filter(consumerRecord3 -> {
            return consumerRecord3.offset() == 25000;
        }).findFirst().get());
        arrayList2.remove(arrayList2.stream().filter(consumerRecord4 -> {
            return consumerRecord4.offset() == ((long) i);
        }).findFirst().get());
        List list = (List) arrayList2.stream().map((v0) -> {
            return v0.offset();
        }).sorted().collect(Collectors.toList());
        this.ktu.send(this.consumerSpy, arrayList);
        ParallelConsumerOptions options = this.parallelConsumer.getWm().getOptions();
        HashMap hashMap = new HashMap();
        TopicPartition topicPartition = new TopicPartition(this.INPUT_TOPIC, 0);
        hashMap.put(topicPartition, new ArrayList(arrayList));
        ConsumerRecords consumerRecords = new ConsumerRecords(hashMap);
        WorkManager workManager = new WorkManager(new PCModule(options.toBuilder().consumer(this.consumerSpy).build()));
        workManager.onPartitionsAssigned(UniSets.of(new TopicPartition(this.INPUT_TOPIC, 0)));
        workManager.registerWork(new EpochAndRecordsMap(consumerRecords, workManager.getPm()));
        List workIfAvailable = workManager.getWorkIfAvailable();
        Assertions.assertThat(workIfAvailable).hasSameSizeAs(arrayList);
        KafkaTestUtils.completeWork(workManager, workIfAvailable, 0L);
        KafkaTestUtils.completeWork(workManager, workIfAvailable, 69L);
        KafkaTestUtils.completeWork(workManager, workIfAvailable, 25000L);
        KafkaTestUtils.completeWork(workManager, workIfAvailable, i);
        Map collectCommitDataForDirtyPartitions = workManager.collectCommitDataForDirtyPartitions();
        Assertions.assertThat(((OffsetAndMetadata) collectCommitDataForDirtyPartitions.get(topicPartition)).offset()).isEqualTo(1L);
        this.consumerSpy.commitSync(collectCommitDataForDirtyPartitions);
        OffsetMapCodecManager offsetMapCodecManager = new OffsetMapCodecManager(this.consumerSpy);
        OffsetMapCodecManager.forcedCodec = Optional.empty();
        Assertions.assertThat(offsetMapCodecManager.makeOffsetMetadataPayload(1L, workManager.getPm().getPartitionState(topicPartition))).isNotEmpty();
        OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata) this.consumerSpy.committed(UniSets.of(topicPartition)).get(topicPartition);
        Assertions.assertThat(offsetAndMetadata.offset()).isEqualTo(1L);
        if (assumeWorkingCodec(offsetEncoding, of)) {
            Assertions.assertThat(offsetAndMetadata.metadata()).isNotBlank();
        }
        WorkManager workManager2 = new WorkManager(new PCModule(options));
        workManager2.onPartitionsAssigned(UniSets.of(topicPartition));
        workManager2.registerWork(new EpochAndRecordsMap(consumerRecords, workManager2.getPm()));
        PartitionStateManager pm = workManager2.getPm();
        PartitionState partitionState = pm.getPartitionState(topicPartition);
        if (assumeWorkingCodec(offsetEncoding, of)) {
            Assertions.assertThat(partitionState.getOffsetHighestSequentialSucceeded()).isEqualTo(0L);
            Assertions.assertThat(partitionState.getOffsetHighestSucceeded()).isEqualTo(i);
            Assertions.assertThat(partitionState.getOffsetHighestSeen()).isEqualTo(i);
            Truth.assertThat(partitionState.getIncompleteOffsetsBelowHighestSucceeded()).containsExactlyElementsIn(list);
        }
        ConsumerRecord consumerRecord5 = (ConsumerRecord) arrayList.get(3);
        Truth.assertThat(Boolean.valueOf(pm.isRecordPreviouslyCompleted(consumerRecord5))).isFalse();
        if (assumeWorkingCodec(offsetEncoding, of)) {
            Assertions.assertThat(partitionState.getOffsetHighestSequentialSucceeded()).isEqualTo(0L);
            Assertions.assertThat(partitionState.getOffsetHighestSucceeded()).isEqualTo(i);
            Assertions.assertThat(partitionState.getOffsetHighestSeen()).isEqualTo(i);
            Truth.assertThat(partitionState.getIncompleteOffsetsBelowHighestSucceeded()).containsExactlyElementsIn(list);
            Truth.assertThat(Boolean.valueOf(pm.isRecordPreviouslyCompleted(consumerRecord5))).isFalse();
        }
        List workIfAvailable2 = workManager2.getWorkIfAvailable();
        List list2 = (List) workIfAvailable2.stream().map((v0) -> {
            return v0.offset();
        }).collect(Collectors.toList());
        ManagedTruth.assertTruth(workIfAvailable2).isNotEmpty();
        switch (AnonymousClass1.$SwitchMap$io$confluent$parallelconsumer$offsets$OffsetEncoding[offsetEncoding.ordinal()]) {
            case 1:
            case TransactionTimeoutsTest.SMALL_TIMEOUT /* 2 */:
            case 3:
            case 4:
            case TransactionTimeoutsTest.NUMBER_TO_SEND /* 5 */:
                Assertions.assertThat(list2).doesNotContain(new Long[]{2500L});
                Assertions.assertThat(list2).doesNotContainSequence(list);
                break;
            default:
                Truth.assertWithMessage("Contains only incomplete records").that(list2).containsExactlyElementsIn(list).inOrder();
                break;
        }
        OffsetSimultaneousEncoder.compressionForced = false;
    }

    private boolean assumeWorkingCodec(OffsetEncoding offsetEncoding, List<OffsetEncoding> list) {
        return !list.contains(offsetEncoding);
    }

    @Test
    @ResourceLock(value = "Value doesn't matter, just needs a constant", mode = ResourceAccessMode.READ_WRITE)
    void ensureEncodingGracefullyWorksWhenOffsetsArentSequentialTwo() {
        HashSet hashSet = new HashSet(UniSets.of(1L, 4L, 5L, 100L));
        OffsetSimultaneousEncoder offsetSimultaneousEncoder = new OffsetSimultaneousEncoder(0L, 101L, hashSet);
        OffsetSimultaneousEncoder.compressionForced = true;
        offsetSimultaneousEncoder.invoke();
        Map encodingMap = offsetSimultaneousEncoder.getEncodingMap();
        Assertions.assertThat(EncodedOffsetPair.unwrap(offsetSimultaneousEncoder.packSmallest()).getDecodedIncompletes(0L).getIncompleteOffsets()).containsExactlyInAnyOrderElementsOf(hashSet);
        if (101 - 0 > BitSetEncoder.MAX_LENGTH_ENCODABLE.intValue()) {
            Assertions.assertThat(encodingMap.keySet()).as("Gracefully ignores that BitSet can't be supported", new Object[0]).doesNotContain(new OffsetEncoding[]{OffsetEncoding.BitSet});
        } else {
            Assertions.assertThat(encodingMap.keySet()).contains(new OffsetEncoding[]{OffsetEncoding.BitSet});
        }
        for (OffsetEncoding offsetEncoding : OffsetEncoding.values()) {
            log.info("Testing {}", offsetEncoding);
            byte[] bArr = (byte[]) encodingMap.get(offsetEncoding);
            if (bArr != null) {
                Assertions.assertThat(EncodedOffsetPair.unwrap(offsetSimultaneousEncoder.packEncoding(new EncodedOffsetPair(offsetEncoding, ByteBuffer.wrap(bArr)))).getDecodedIncompletes(0L).getIncompleteOffsets()).as(offsetEncoding.toString(), new Object[0]).containsExactlyInAnyOrderElementsOf(hashSet);
            } else {
                log.info("Encoding not performed: " + offsetEncoding);
            }
        }
        OffsetSimultaneousEncoder.compressionForced = false;
    }
}
