/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.offsets;

import com.google.common.truth.Truth;
import io.confluent.csid.utils.KafkaTestUtils;
import io.confluent.parallelconsumer.ManagedTruth;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.internal.PCModuleTestEnv;
import io.confluent.parallelconsumer.offsets.BitSetEncoder;
import io.confluent.parallelconsumer.offsets.EncodedOffsetPair;
import io.confluent.parallelconsumer.offsets.OffsetEncoding;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import io.confluent.parallelconsumer.offsets.OffsetRunLength;
import io.confluent.parallelconsumer.offsets.OffsetSimultaneousEncoder;
import io.confluent.parallelconsumer.state.PartitionState;
import io.confluent.parallelconsumer.state.PartitionStateManager;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assume;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.ResourceAccessMode;
import org.junit.jupiter.api.parallel.ResourceLock;
import org.junit.jupiter.api.parallel.ResourceLocks;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;
import pl.tlinkowski.unij.api.UniSets;

public class OffsetEncodingTests
extends ParallelEoSStreamProcessorTestBase {
    private static final Logger log = LoggerFactory.getLogger(OffsetEncodingTests.class);
    PCModuleTestEnv module = new PCModuleTestEnv();

    @Test
    void runLengthDeserialise() {
        ByteBuffer sb = ByteBuffer.allocate(3);
        sb.put((byte)0);
        sb.putShort((short)1);
        byte[] array = new byte[2];
        sb.rewind();
        sb.get(array);
        ByteBuffer wrap = ByteBuffer.wrap(array);
        byte b = wrap.get();
        ByteBuffer slice = wrap.slice();
        List integers = OffsetRunLength.runLengthDeserialise((ByteBuffer)slice);
        Assertions.assertThat((List)integers).isEmpty();
    }

    @ParameterizedTest
    @ValueSource(longs={10000L, 100000L, 1000000L})
    @ResourceLock(value="Value doesn't matter, just needs a constant", mode=ResourceAccessMode.READ_WRITE)
    void largeIncompleteOffsetValues(long nextExpectedOffset) {
        long lowWaterMark = 123L;
        TreeSet incompletes = new TreeSet(UniSets.of((Object)lowWaterMark, (Object)2345L, (Object)8765L));
        OffsetSimultaneousEncoder encoder = new OffsetSimultaneousEncoder(lowWaterMark, nextExpectedOffset, incompletes);
        OffsetSimultaneousEncoder.compressionForced = true;
        encoder.invoke();
        Map encodingMap = encoder.getEncodingMap();
        byte[] smallestBytes = encoder.packSmallest();
        EncodedOffsetPair unwrap = EncodedOffsetPair.unwrap((byte[])smallestBytes);
        OffsetMapCodecManager.HighestOffsetAndIncompletes decodedIncompletes = unwrap.getDecodedIncompletes(lowWaterMark);
        Assertions.assertThat((Collection)decodedIncompletes.getIncompleteOffsets()).containsExactlyInAnyOrderElementsOf(incompletes);
        for (OffsetEncoding encodingToUse : OffsetEncoding.values()) {
            log.info("Testing {}", (Object)encodingToUse);
            byte[] bitsetBytes = (byte[])encodingMap.get(encodingToUse);
            if (bitsetBytes != null) {
                EncodedOffsetPair bitsetUnwrap = EncodedOffsetPair.unwrap((byte[])encoder.packEncoding(new EncodedOffsetPair(encodingToUse, ByteBuffer.wrap(bitsetBytes))));
                OffsetMapCodecManager.HighestOffsetAndIncompletes decodedBitsets = bitsetUnwrap.getDecodedIncompletes(lowWaterMark);
                ((AbstractCollectionAssert)Assertions.assertThat((Collection)decodedBitsets.getIncompleteOffsets()).as(encodingToUse.toString(), new Object[0])).containsExactlyInAnyOrderElementsOf(incompletes);
                continue;
            }
            log.info("Encoding not performed: " + encodingToUse);
        }
        OffsetSimultaneousEncoder.compressionForced = false;
    }

    @ParameterizedTest
    @EnumSource(value=OffsetEncoding.class)
    @ResourceLocks(value={@ResourceLock(value="Value doesn't matter, just needs a constant", mode=ResourceAccessMode.READ), @ResourceLock(value="Value doesn't matter, just needs a constant", mode=ResourceAccessMode.READ_WRITE)})
    void ensureEncodingGracefullyWorksWhenOffsetsAreVeryLargeAndNotSequential(OffsetEncoding encoding) {
        Assume.assumeThat((String)"Codec skipped, not applicable", (Object)encoding, (Matcher)Matchers.not((Matcher)Matchers.in((Collection)UniLists.of((Object)OffsetEncoding.ByteArray, (Object)OffsetEncoding.ByteArrayCompressed, (Object)OffsetEncoding.KafkaStreams, (Object)OffsetEncoding.KafkaStreamsV2))));
        List encodingsThatFail = UniLists.of((Object)OffsetEncoding.BitSet, (Object)OffsetEncoding.BitSetCompressed, (Object)OffsetEncoding.BitSetV2, (Object)OffsetEncoding.RunLength, (Object)OffsetEncoding.RunLengthCompressed);
        OffsetMapCodecManager.forcedCodec = Optional.of(encoding);
        OffsetSimultaneousEncoder.compressionForced = true;
        ArrayList<ConsumerRecord<String, String>> records = new ArrayList<ConsumerRecord<String, String>>();
        boolean FIRST_SUCCEEDED_OFFSET = false;
        records.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 0L, (Object)"akey", (Object)"avalue"));
        records.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 1L, (Object)"akey", (Object)"avalue"));
        records.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 4L, (Object)"akey", (Object)"avalue"));
        records.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 5L, (Object)"akey", (Object)"avalue"));
        records.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 69L, (Object)"akey", (Object)"avalue"));
        records.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 100L, (Object)"akey", (Object)"avalue"));
        records.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 1000L, (Object)"akey", (Object)"avalue"));
        records.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 20000L, (Object)"akey", (Object)"avalue"));
        records.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 25000L, (Object)"akey", (Object)"avalue"));
        records.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 30000L, (Object)"akey", (Object)"avalue"));
        records.add(new ConsumerRecord(this.INPUT_TOPIC, 0, 40000L, (Object)"akey", (Object)"avalue"));
        int avoidOffByOne = 2;
        records.add(new ConsumerRecord(this.INPUT_TOPIC, 0, (long)(72767 + avoidOffByOne), (Object)"akey", (Object)"avalue"));
        int highestSucceeded = 72767 + avoidOffByOne + 1;
        records.add(new ConsumerRecord(this.INPUT_TOPIC, 0, (long)highestSucceeded, (Object)"akey", (Object)"avalue"));
        ArrayList incompleteRecords = new ArrayList(records);
        incompleteRecords.remove(incompleteRecords.stream().filter(x -> x.offset() == 0L).findFirst().get());
        incompleteRecords.remove(incompleteRecords.stream().filter(x -> x.offset() == 69L).findFirst().get());
        incompleteRecords.remove(incompleteRecords.stream().filter(x -> x.offset() == 25000L).findFirst().get());
        incompleteRecords.remove(incompleteRecords.stream().filter(x -> x.offset() == (long)highestSucceeded).findFirst().get());
        List expected = incompleteRecords.stream().map(ConsumerRecord::offset).sorted().collect(Collectors.toList());
        this.ktu.send((MockConsumer<String, String>)this.consumerSpy, records);
        ParallelConsumerOptions options = this.parallelConsumer.getWm().getOptions();
        HashMap<TopicPartition, ArrayList<ConsumerRecord<String, String>>> recordsMap = new HashMap<TopicPartition, ArrayList<ConsumerRecord<String, String>>>();
        TopicPartition tp = new TopicPartition(this.INPUT_TOPIC, 0);
        recordsMap.put(tp, new ArrayList<ConsumerRecord<String, String>>(records));
        ConsumerRecords testRecords = new ConsumerRecords(recordsMap);
        ParallelConsumerOptions newOptions = options.toBuilder().consumer((Consumer)this.consumerSpy).build();
        long FIRST_COMMITTED_OFFSET = 1L;
        PCModule moduleTwo = new PCModule(newOptions);
        WorkManager wmm = moduleTwo.workManager();
        wmm.onPartitionsAssigned((Collection)UniSets.of((Object)new TopicPartition(this.INPUT_TOPIC, 0)));
        wmm.registerWork(new EpochAndRecordsMap(testRecords, wmm.getPm()));
        List work = wmm.getWorkIfAvailable();
        Assertions.assertThat((List)work).hasSameSizeAs(records);
        KafkaTestUtils.completeWork((WorkManager<String, String>)wmm, work, 0L);
        KafkaTestUtils.completeWork((WorkManager<String, String>)wmm, work, 69L);
        KafkaTestUtils.completeWork((WorkManager<String, String>)wmm, work, 25000L);
        KafkaTestUtils.completeWork((WorkManager<String, String>)wmm, work, highestSucceeded);
        Map completedEligibleOffsets = wmm.collectCommitDataForDirtyPartitions();
        Assertions.assertThat((long)((OffsetAndMetadata)completedEligibleOffsets.get(tp)).offset()).isEqualTo(1L);
        this.consumerSpy.commitSync(completedEligibleOffsets);
        OffsetMapCodecManager om = new OffsetMapCodecManager((PCModule)this.module);
        OffsetMapCodecManager.forcedCodec = Optional.empty();
        PartitionState state = wmm.getPm().getPartitionState(tp);
        String bestPayload = om.makeOffsetMetadataPayload(1L, state);
        Assertions.assertThat((String)bestPayload).isNotEmpty();
        OffsetAndMetadata committed = (OffsetAndMetadata)this.consumerSpy.committed(UniSets.of((Object)tp)).get(tp);
        Assertions.assertThat((long)committed.offset()).isEqualTo(1L);
        if (this.assumeWorkingCodec(encoding, encodingsThatFail)) {
            Assertions.assertThat((String)committed.metadata()).isNotBlank();
        }
        PCModule moduleThree = new PCModule(options);
        WorkManager newWm = moduleThree.workManager();
        newWm.onPartitionsAssigned((Collection)UniSets.of((Object)tp));
        PartitionStateManager pm = newWm.getPm();
        PartitionState partitionState = pm.getPartitionState(tp);
        if (this.assumeWorkingCodec(encoding, encodingsThatFail)) {
            ManagedTruth.assertTruth(partitionState).getOffsetHighestSucceeded().isEqualTo((Object)highestSucceeded);
        }
        ConsumerRecords testRecordsWithBaseCommittedRecordRemoved = new ConsumerRecords(UniMaps.of((Object)tp, testRecords.records(tp).stream().filter(x -> x.offset() >= 1L).collect(Collectors.toList())));
        EpochAndRecordsMap epochAndRecordsMap = new EpochAndRecordsMap(testRecordsWithBaseCommittedRecordRemoved, newWm.getPm());
        newWm.registerWork(epochAndRecordsMap);
        if (this.assumeWorkingCodec(encoding, encodingsThatFail)) {
            ManagedTruth.assertTruth(partitionState).getOffsetHighestSucceeded().isEqualTo((Object)highestSucceeded);
        }
        if (this.assumeWorkingCodec(encoding, encodingsThatFail)) {
            ManagedTruth.assertTruth(partitionState).getOffsetHighestSequentialSucceeded().isEqualTo((Object)0);
            ManagedTruth.assertTruth(partitionState).getOffsetHighestSucceeded().isEqualTo((Object)highestSucceeded);
            long offsetHighestSeen = partitionState.getOffsetHighestSeen();
            Assertions.assertThat((long)offsetHighestSeen).isEqualTo((long)highestSucceeded);
            SortedSet incompletes = partitionState.getIncompleteOffsetsBelowHighestSucceeded();
            Truth.assertThat((Iterable)incompletes).containsExactlyElementsIn(expected);
        }
        ConsumerRecord<String, String> anIncompleteRecord = records.get(3);
        Assertions.assertThat((boolean)partitionState.isRecordPreviouslyCompleted(anIncompleteRecord)).isFalse();
        if (this.assumeWorkingCodec(encoding, encodingsThatFail)) {
            long offsetHighestSequentialSucceeded = partitionState.getOffsetHighestSequentialSucceeded();
            Assertions.assertThat((long)offsetHighestSequentialSucceeded).isEqualTo(0L);
            long offsetHighestSucceeded = partitionState.getOffsetHighestSucceeded();
            Assertions.assertThat((long)offsetHighestSucceeded).isEqualTo((long)highestSucceeded);
            long offsetHighestSeen = partitionState.getOffsetHighestSeen();
            Assertions.assertThat((long)offsetHighestSeen).isEqualTo((long)highestSucceeded);
            SortedSet incompletes = partitionState.getIncompleteOffsetsBelowHighestSucceeded();
            Truth.assertThat((Iterable)incompletes).containsExactlyElementsIn(expected);
            Assertions.assertThat((boolean)partitionState.isRecordPreviouslyCompleted(anIncompleteRecord)).isFalse();
        }
        List workRetrieved = newWm.getWorkIfAvailable();
        List workRetrievedOffsets = workRetrieved.stream().map(WorkContainer::offset).collect(Collectors.toList());
        ManagedTruth.assertTruth(workRetrieved).isNotEmpty();
        switch (encoding) {
            case BitSet: 
            case BitSetCompressed: 
            case BitSetV2: 
            case RunLength: 
            case RunLengthCompressed: {
                Assertions.assertThat(workRetrievedOffsets).doesNotContain((Object[])new Long[]{2500L});
                Assertions.assertThat(workRetrievedOffsets).doesNotContainSequence(expected);
                break;
            }
            default: {
                Truth.assertWithMessage((String)"Contains only incomplete records").that(workRetrievedOffsets).containsExactlyElementsIn(expected).inOrder();
            }
        }
        OffsetSimultaneousEncoder.compressionForced = false;
    }

    private boolean assumeWorkingCodec(OffsetEncoding encoding, List<OffsetEncoding> encodingsThatFail) {
        return !encodingsThatFail.contains(encoding);
    }

    @Test
    @ResourceLock(value="Value doesn't matter, just needs a constant", mode=ResourceAccessMode.READ_WRITE)
    void ensureEncodingGracefullyWorksWhenOffsetsArentSequentialTwo() {
        long nextExpectedOffset = 101L;
        long lowWaterMark = 0L;
        TreeSet incompletes = new TreeSet(UniSets.of((Object)1L, (Object)4L, (Object)5L, (Object)100L));
        OffsetSimultaneousEncoder encoder = new OffsetSimultaneousEncoder(lowWaterMark, nextExpectedOffset, incompletes);
        OffsetSimultaneousEncoder.compressionForced = true;
        encoder.invoke();
        Map encodingMap = encoder.getEncodingMap();
        byte[] smallestBytes = encoder.packSmallest();
        EncodedOffsetPair unwrap = EncodedOffsetPair.unwrap((byte[])smallestBytes);
        OffsetMapCodecManager.HighestOffsetAndIncompletes decodedIncompletes = unwrap.getDecodedIncompletes(lowWaterMark);
        Assertions.assertThat((Collection)decodedIncompletes.getIncompleteOffsets()).containsExactlyInAnyOrderElementsOf(incompletes);
        if (nextExpectedOffset - lowWaterMark > (long)BitSetEncoder.MAX_LENGTH_ENCODABLE.intValue()) {
            ((AbstractCollectionAssert)Assertions.assertThat(encodingMap.keySet()).as("Gracefully ignores that BitSet can't be supported", new Object[0])).doesNotContain((Object[])new OffsetEncoding[]{OffsetEncoding.BitSet});
        } else {
            Assertions.assertThat(encodingMap.keySet()).contains((Object[])new OffsetEncoding[]{OffsetEncoding.BitSet});
        }
        for (OffsetEncoding encodingToUse : OffsetEncoding.values()) {
            log.info("Testing {}", (Object)encodingToUse);
            byte[] bitsetBytes = (byte[])encodingMap.get(encodingToUse);
            if (bitsetBytes != null) {
                EncodedOffsetPair bitsetUnwrap = EncodedOffsetPair.unwrap((byte[])encoder.packEncoding(new EncodedOffsetPair(encodingToUse, ByteBuffer.wrap(bitsetBytes))));
                OffsetMapCodecManager.HighestOffsetAndIncompletes decodedBitsets = bitsetUnwrap.getDecodedIncompletes(lowWaterMark);
                ((AbstractCollectionAssert)Assertions.assertThat((Collection)decodedBitsets.getIncompleteOffsets()).as(encodingToUse.toString(), new Object[0])).containsExactlyInAnyOrderElementsOf(incompletes);
                continue;
            }
            log.info("Encoding not performed: " + encodingToUse);
        }
        OffsetSimultaneousEncoder.compressionForced = false;
    }
}

