/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.offsets;

import com.google.common.truth.Truth;
import io.confluent.csid.utils.Range;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.internal.PCModuleTestEnv;
import io.confluent.parallelconsumer.offsets.EncodedOffsetPair;
import io.confluent.parallelconsumer.offsets.KafkaStreamsEncodingNotSupported;
import io.confluent.parallelconsumer.offsets.OffsetCodecTestUtils;
import io.confluent.parallelconsumer.offsets.OffsetEncoding;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import io.confluent.parallelconsumer.offsets.OffsetRunLength;
import io.confluent.parallelconsumer.offsets.OffsetSimpleSerialisation;
import io.confluent.parallelconsumer.offsets.OffsetSimultaneousEncoder;
import io.confluent.parallelconsumer.state.PartitionState;
import io.confluent.parallelconsumer.state.WorkManager;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.assertj.core.api.AbstractByteArrayAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.SnappyOutputStream;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniSets;

@ExtendWith(value={MockitoExtension.class})
class WorkManagerOffsetMapCodecManagerTest {
    private static final Logger log = LoggerFactory.getLogger(WorkManagerOffsetMapCodecManagerTest.class);
    PCModuleTestEnv module;
    WorkManager<String, String> wm;
    OffsetMapCodecManager<String, String> offsetCodecManager;
    TopicPartition tp = new TopicPartition("myTopic", 0);
    TreeSet<Long> incompleteOffsets = new TreeSet(UniSets.of((Object)0L, (Object)2L, (Object)3L));
    long finalOffsetForPartition = 0L;
    long highestSucceeded = 4L;
    PartitionState<String, String> state = new PartitionState(0L, (PCModule)this.module, this.tp, new OffsetMapCodecManager.HighestOffsetAndIncompletes(Optional.of(this.highestSucceeded), this.incompleteOffsets));
    @Mock
    ConsumerRecord<String, String> mockCr;
    static List<String> simpleSampleInputsToCompress = UniLists.of((Object[])new String[]{"", "o", "x", "ooo", "xxx", "xox", "oxo", "xooxo", "ooxxoxox", "xxxxxxoooooxoxoxoooooxxxxooooo", "oooooooooooooooooooooooooooooo", "ooooooooooooooxxxxxxxxxxxxxxxx", "oxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", "xxxxxxoooooxoxoxoooooxxxxoooooxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxoxoxooxoxoxoxoxoxoxoxoxoxoxoxo"});
    static List<String> inputsToCompress = new ArrayList<String>();

    WorkManagerOffsetMapCodecManagerTest() {
    }

    @BeforeEach
    void setupMock() {
        this.injectSucceededWorkAtOffset(this.highestSucceeded);
    }

    private void injectSucceededWorkAtOffset(long offset) {
        ((ConsumerRecord)Mockito.doReturn((Object)offset).when(this.mockCr)).offset();
        this.state.addNewIncompleteRecord(this.mockCr);
        this.state.onSuccess(offset);
    }

    @BeforeEach
    void setup() {
        MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
        ParallelConsumerOptions options = ParallelConsumerOptions.builder().consumer((Consumer)mockConsumer).build();
        this.module = new PCModuleTestEnv((ParallelConsumerOptions<String, String>)options);
        this.wm = this.module.workManager();
        this.wm.onPartitionsAssigned((Collection)UniLists.of((Object)this.tp));
        this.offsetCodecManager = new OffsetMapCodecManager((PCModule)this.module);
    }

    @BeforeAll
    static void data() {
        String input100 = "xxxxxxoooooxoxoxoooooxxxxoooooxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxoxoxooxoxoxoxoxoxoxoxoxoxoxoxo";
        StringBuffer randomInput = WorkManagerOffsetMapCodecManagerTest.generateRandomData(100);
        String inputString = randomInput.toString();
        inputsToCompress.addAll(simpleSampleInputsToCompress);
        inputsToCompress.add(input100);
        inputsToCompress.add(input100 + input100 + input100 + input100 + input100 + input100 + input100 + input100 + input100 + input100 + input100);
        inputsToCompress.add(inputString);
        inputsToCompress.add(WorkManagerOffsetMapCodecManagerTest.generateRandomData(1000).toString());
        inputsToCompress.add(WorkManagerOffsetMapCodecManagerTest.generateRandomData(10000).toString());
        inputsToCompress.add(WorkManagerOffsetMapCodecManagerTest.generateRandomData(30000).toString());
    }

    private static StringBuffer generateRandomData(int entries) {
        StringBuffer randomInput = new StringBuffer();
        Range.range((long)entries).toStream().mapToObj(x -> RandomUtils.nextBoolean()).forEach(x -> randomInput.append(x != false ? (char)'x' : (char)'o'));
        return randomInput;
    }

    @Test
    void serialiseCycle() {
        String serialised = this.offsetCodecManager.serialiseIncompleteOffsetMapToBase64(this.finalOffsetForPartition, this.state);
        log.info("Size: {}", (Object)serialised.length());
        OffsetMapCodecManager.HighestOffsetAndIncompletes highestOffsetAndIncompletes = OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64((long)this.finalOffsetForPartition, (String)serialised);
        SortedSet deserializedIncompletes = highestOffsetAndIncompletes.getIncompleteOffsets();
        Assertions.assertThat((Object[])deserializedIncompletes.toArray()).containsExactly(this.incompleteOffsets.toArray());
    }

    @Test
    void javaSerialisationComparison() {
        TreeSet one = new TreeSet(UniSets.of((Object)1L));
        TreeSet two = new TreeSet(UniSets.of((Object)2L));
        String oneS = OffsetSimpleSerialisation.encodeAsJavaObjectStream(one);
        int payloadLength = 5;
        String oneStringPreamble = oneS.substring(0, oneS.length() - payloadLength);
        String twoS = OffsetSimpleSerialisation.encodeAsJavaObjectStream(two);
        String twoStringPreamble = twoS.substring(0, twoS.length() - payloadLength);
        Assertions.assertThat((String)oneStringPreamble).isEqualTo(twoStringPreamble);
    }

    @Test
    void runLengthEncodingCompression() {
        List inputs = UniLists.of((Object)"xxxxxxoooooxoxoxoooooxxxxooooo", (Object)"6x,5o,1x,1o,1x,1o,1x,5o,4x,5o", (Object)"oooooooooooooooooooooooooooooo", (Object)"30o", (Object)"ooooooooooooooxxxxxxxxxxxxxxxx", (Object)"15o,15x", (Object)"x", (Object)"1x", (Object)"");
        for (String i : inputs) {
            this.compareCompression(i);
        }
    }

    private byte[] compareCompression(String input) throws IOException {
        log.info("testing input of {}", (Object)input);
        byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8);
        byte[] outg = OffsetSimpleSerialisation.compressGzip((byte[])inputBytes);
        byte[] outz = OffsetSimpleSerialisation.compressZstd((byte[])inputBytes);
        ByteArrayOutputStream outs = new ByteArrayOutputStream();
        SnappyOutputStream snap = new SnappyOutputStream((OutputStream)outs);
        snap.write(inputBytes);
        snap.close();
        String g64 = Base64.getEncoder().encodeToString(outg);
        String z64 = Base64.getEncoder().encodeToString(outz);
        String s64 = Base64.getEncoder().encodeToString(outs.toByteArray());
        String raw64 = Base64.getEncoder().encodeToString(inputBytes);
        log.info("g {}", (Object)outg.length);
        log.info("z {}", (Object)outz.length);
        log.info("s {}", (Object)outs.size());
        log.info("64");
        log.info("r {}", (Object)raw64.length());
        log.info("g {}", (Object)g64.length());
        log.info("z {}", (Object)z64.length());
        log.info("s {}", (Object)s64.length());
        return outg;
    }

    @Test
    void base64Encoding() {
        String originalString = "TEST";
        byte[] stringBytes = originalString.getBytes(StandardCharsets.UTF_8);
        String base64Bytes = Base64.getEncoder().encodeToString(stringBytes);
        byte[] base64DecodedBytes = Base64.getDecoder().decode(base64Bytes);
        Assertions.assertThat((byte[])stringBytes).isEqualTo((Object)base64DecodedBytes);
        String decodedString = new String(base64DecodedBytes, StandardCharsets.UTF_8);
        Assertions.assertThat((String)originalString).isEqualTo(decodedString);
    }

    @Test
    void loadCompressedRunLengthEncoding() {
        byte[] bytes = this.offsetCodecManager.encodeOffsetsCompressed(this.finalOffsetForPartition, this.state);
        OffsetMapCodecManager.HighestOffsetAndIncompletes longs = OffsetMapCodecManager.decodeCompressedOffsets((long)this.finalOffsetForPartition, (byte[])bytes);
        Assertions.assertThat((Object[])longs.getIncompleteOffsets().toArray()).containsExactly(this.incompleteOffsets.toArray());
    }

    @Test
    void decodeOffsetMap() {
        TreeSet<Long> set = OffsetCodecTestUtils.bitmapStringToIncomplete(2L, "ooxx");
        Assertions.assertThat(set).containsExactly((Object[])new Long[]{2L, 3L});
        Assertions.assertThat(OffsetCodecTestUtils.bitmapStringToIncomplete(2L, "ooxxoxox")).containsExactly((Object[])new Long[]{2L, 3L, 6L, 8L});
        Assertions.assertThat(OffsetCodecTestUtils.bitmapStringToIncomplete(2L, "o")).containsExactly((Object[])new Long[]{2L});
        Assertions.assertThat(OffsetCodecTestUtils.bitmapStringToIncomplete(2L, "x")).containsExactly((Object[])new Long[0]);
        Assertions.assertThat(OffsetCodecTestUtils.bitmapStringToIncomplete(2L, "")).containsExactly((Object[])new Long[0]);
        Assertions.assertThat(OffsetCodecTestUtils.bitmapStringToIncomplete(2L, "ooo")).containsExactly((Object[])new Long[]{2L, 3L, 4L});
        Assertions.assertThat(OffsetCodecTestUtils.bitmapStringToIncomplete(2L, "xxx")).containsExactly((Object[])new Long[0]);
    }

    @Test
    void binaryArrayConstruction() {
        this.injectSucceededWorkAtOffset(6L);
        String encoding = OffsetCodecTestUtils.incompletesToBitmapString(this.finalOffsetForPartition, this.state);
        Assertions.assertThat((String)encoding).isEqualTo("oxooxx");
    }

    @Test
    void compressDecompressSanityGzip() {
        byte[] input = "Lilan".getBytes();
        byte[] compressedInput = OffsetSimpleSerialisation.compressGzip((byte[])input);
        byte[] decompressedInput = OffsetSimpleSerialisation.decompressGzip((ByteBuffer)ByteBuffer.wrap(compressedInput));
        Assertions.assertThat((byte[])decompressedInput).isEqualTo((Object)input);
    }

    @Test
    void compressDecompressWithBase64SanityGzip() {
        byte[] input = "Lilan".getBytes();
        byte[] compressedInput = OffsetSimpleSerialisation.compressGzip((byte[])input);
        byte[] b64input = Base64.getEncoder().encode(compressedInput);
        byte[] b64Output = Base64.getDecoder().decode(b64input);
        byte[] decompressedInput = OffsetSimpleSerialisation.decompressGzip((ByteBuffer)ByteBuffer.wrap(b64Output));
        Assertions.assertThat((byte[])decompressedInput).isEqualTo((Object)input);
    }

    @Test
    void compressDecompressSanityZstd() {
        byte[] input = "Lilan".getBytes();
        byte[] compressedInput = OffsetSimpleSerialisation.compressZstd((byte[])input);
        ByteBuffer decompressedInput = OffsetSimpleSerialisation.decompressZstd((ByteBuffer)ByteBuffer.wrap(compressedInput));
        Assertions.assertThat((Comparable)decompressedInput).isEqualTo((Object)ByteBuffer.wrap(input));
    }

    @Test
    void largeOffsetMap() {
        this.injectSucceededWorkAtOffset(200L);
        byte[] encoded = this.offsetCodecManager.encodeOffsetsCompressed(0L, this.state);
        int smallestCompressionObserved = 10;
        ((AbstractByteArrayAssert)Assertions.assertThat((byte[])encoded).as("very small", new Object[0])).hasSizeLessThan(smallestCompressionObserved);
    }

    @Test
    void stringVsByteVsBitSetEncoding() {
        for (String inputString : inputsToCompress) {
            int inputLength = inputString.length();
            TreeSet<Long> offsets = OffsetCodecTestUtils.bitmapStringToIncomplete(this.finalOffsetForPartition, inputString);
            OffsetSimultaneousEncoder simultaneousEncoder = new OffsetSimultaneousEncoder(this.finalOffsetForPartition, this.highestSucceeded, offsets).invoke();
            byte[] byteByte = (byte[])simultaneousEncoder.getEncodingMap().get(OffsetEncoding.ByteArray);
            byte[] bitsBytes = (byte[])simultaneousEncoder.getEncodingMap().get(OffsetEncoding.BitSet);
            byte[] runlengthBytes = (byte[])simultaneousEncoder.getEncodingMap().get(OffsetEncoding.RunLength);
            log.info("in: {}", (Object)inputString);
        }
    }

    @Test
    void deserialiseBitSet() {
        String input = "oxxooooooo";
        long highestSucceeded = input.length() - 1;
        int nextExpectedOffset = 0;
        TreeSet<Long> incompletes = OffsetCodecTestUtils.bitmapStringToIncomplete(nextExpectedOffset, input);
        OffsetSimultaneousEncoder encoder = new OffsetSimultaneousEncoder((long)nextExpectedOffset, highestSucceeded, incompletes);
        encoder.invoke();
        byte[] pack = encoder.packSmallest();
        EncodedOffsetPair encodedOffsetPair = EncodedOffsetPair.unwrap((byte[])pack);
        String deserialisedBitSet = encodedOffsetPair.getDecodedString();
        Assertions.assertThat((String)deserialisedBitSet).isEqualTo(input);
    }

    @Test
    void deserialiseKafkaStreamsV1() {
        ByteBuffer input = ByteBuffer.allocate(32);
        input.put((byte)1);
        input.putLong(System.currentTimeMillis());
        EncodedOffsetPair encodedOffsetPair = EncodedOffsetPair.unwrap((byte[])input.array());
        Assertions.assertThatThrownBy(() -> encodedOffsetPair.getDecodedIncompletes(0L)).isInstanceOf(KafkaStreamsEncodingNotSupported.class);
    }

    @Test
    void deserialiseKafkaStreamsV2() {
        ByteBuffer input = ByteBuffer.allocate(32);
        input.put((byte)2);
        input.putLong(System.currentTimeMillis());
        input.putInt(1);
        input.putInt(1);
        input.put((byte)97);
        input.putLong(1L);
        EncodedOffsetPair encodedOffsetPair = EncodedOffsetPair.unwrap((byte[])input.array());
        Assertions.assertThatThrownBy(() -> encodedOffsetPair.getDecodedIncompletes(0L)).isInstanceOf(KafkaStreamsEncodingNotSupported.class);
    }

    @Test
    void compressionCycle() {
        byte[] serialised = this.offsetCodecManager.encodeOffsetsCompressed(this.finalOffsetForPartition, this.state);
        OffsetMapCodecManager.HighestOffsetAndIncompletes deserialised = OffsetMapCodecManager.decodeCompressedOffsets((long)this.finalOffsetForPartition, (byte[])serialised);
        Assertions.assertThat((Collection)deserialised.getIncompleteOffsets()).isEqualTo(this.incompleteOffsets);
    }

    @Test
    void runLengthEncoding() {
        String stringMap = OffsetCodecTestUtils.incompletesToBitmapString(this.finalOffsetForPartition, this.state);
        List integers = OffsetRunLength.runLengthEncode((String)stringMap);
        ((ListAssert)Assertions.assertThat((List)integers).as("encoding of map: " + stringMap, new Object[0])).containsExactlyElementsOf((Iterable)UniLists.of((Object)1, (Object)1, (Object)2));
        Assertions.assertThat((String)OffsetRunLength.runLengthDecodeToString((List)integers)).isEqualTo(stringMap);
    }

    static List<String> differentInputsAndCompressions() {
        return inputsToCompress;
    }

    @ParameterizedTest
    @MethodSource
    void differentInputsAndCompressions(String input) {
        long highestSeen = input.length() - 1;
        log.debug("Testing round - size: {} input: '{}'", (Object)input.length(), (Object)input);
        TreeSet<Long> inputIncompletes = OffsetCodecTestUtils.bitmapStringToIncomplete(this.finalOffsetForPartition, input);
        String sanityEncoding = OffsetCodecTestUtils.incompletesToBitmapString(this.finalOffsetForPartition, highestSeen + 1L, inputIncompletes);
        Truth.assertThat((String)sanityEncoding).isEqualTo((Object)input);
        OffsetSimultaneousEncoder encoder = new OffsetSimultaneousEncoder(this.finalOffsetForPartition, highestSeen, inputIncompletes);
        encoder.invoke();
        for (EncodedOffsetPair encoding : encoder.sortedEncodings) {
            byte[] packedEncoding = encoder.packEncoding(encoding);
            OffsetMapCodecManager.HighestOffsetAndIncompletes recoveredIncompleteAndOffset = OffsetMapCodecManager.decodeCompressedOffsets((long)this.finalOffsetForPartition, (byte[])packedEncoding);
            SortedSet recoveredIncompletes = recoveredIncompleteAndOffset.getIncompleteOffsets();
            Assertions.assertThat((Collection)recoveredIncompletes).containsExactlyInAnyOrderElementsOf(inputIncompletes);
            String simple = OffsetCodecTestUtils.incompletesToBitmapString(this.finalOffsetForPartition, highestSeen + 1L, recoveredIncompletes);
            Truth.assertWithMessage((String)encoding.encoding.name()).that(simple).isEqualTo((Object)input);
        }
    }

    public static List<String> getInputsToCompress() {
        return inputsToCompress;
    }
}

