package io.confluent.parallelconsumer.offsets;

import com.google.common.truth.Truth;
import io.confluent.csid.utils.Range;
import io.confluent.csid.utils.TimeUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import io.confluent.parallelconsumer.state.PartitionState;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.assertj.core.api.AbstractByteArrayAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.SnappyOutputStream;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;
import pl.tlinkowski.unij.api.UniSets;

@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:io/confluent/parallelconsumer/offsets/WorkManagerOffsetMapCodecManagerTest.class */
class WorkManagerOffsetMapCodecManagerTest {
    WorkManager<String, String> wm;
    OffsetMapCodecManager<String, String> offsetCodecManager;
    TopicPartition tp = new TopicPartition("myTopic", 0);
    TreeSet<Long> incompleteOffsets = new TreeSet<>(UniSets.of(0L, 2L, 3L));
    long finalOffsetForPartition = 0;
    long highestSucceeded = 4;
    PartitionState<String, String> state = new PartitionState<>(this.tp, new OffsetMapCodecManager.HighestOffsetAndIncompletes(Optional.of(Long.valueOf(this.highestSucceeded)), this.incompleteOffsets));

    @Mock
    ConsumerRecord<String, String> mockCr;
    private static final Logger log = LoggerFactory.getLogger(WorkManagerOffsetMapCodecManagerTest.class);
    static List<String> simpleSampleInputsToCompress = UniLists.of(new String[]{"", "o", "x", "ooo", "xxx", "xox", "oxo", "xooxo", "ooxxoxox", "xxxxxxoooooxoxoxoooooxxxxooooo", "oooooooooooooooooooooooooooooo", "ooooooooooooooxxxxxxxxxxxxxxxx", "oxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", "xxxxxxoooooxoxoxoooooxxxxoooooxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxoxoxooxoxoxoxoxoxoxoxoxoxoxoxo"});
    static List<String> inputsToCompress = new ArrayList();

    WorkManagerOffsetMapCodecManagerTest() {
    }

    @BeforeEach
    void setupMock() {
        injectSucceededWorkAtOffset(this.highestSucceeded);
    }

    private void injectSucceededWorkAtOffset(long j) {
        WorkContainer workContainer = new WorkContainer(0L, this.mockCr, (Function) null, TimeUtils.getClock());
        ((ConsumerRecord) Mockito.doReturn(Long.valueOf(j)).when(this.mockCr)).offset();
        this.state.addWorkContainer(workContainer);
        this.state.onSuccess(workContainer);
    }

    @BeforeEach
    void setup() {
        MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
        this.wm = new WorkManager<>(ParallelConsumerOptions.builder().build(), mockConsumer);
        this.wm.onPartitionsAssigned(UniLists.of(this.tp));
        this.offsetCodecManager = new OffsetMapCodecManager<>(mockConsumer);
    }

    @BeforeAll
    static void data() {
        String stringBuffer = generateRandomData(100).toString();
        inputsToCompress.addAll(simpleSampleInputsToCompress);
        inputsToCompress.add("xxxxxxoooooxoxoxoooooxxxxoooooxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxoxoxooxoxoxoxoxoxoxoxoxoxoxoxo");
        inputsToCompress.add("xxxxxxoooooxoxoxoooooxxxxoooooxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxoxoxooxoxoxoxoxoxoxoxoxoxoxoxoxxxxxxoooooxoxoxoooooxxxxoooooxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxoxoxooxoxoxoxoxoxoxoxoxoxoxoxoxxxxxxoooooxoxoxoooooxxxxoooooxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxoxoxooxoxoxoxoxoxoxoxoxoxoxoxoxxxxxxoooooxoxoxoooooxxxxoooooxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxoxoxooxoxoxoxoxoxoxoxoxoxoxoxoxxxxxxoooooxoxoxoooooxxxxoooooxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxoxoxooxoxoxoxoxoxoxoxoxoxoxoxoxxxxxxoooooxoxoxoooooxxxxoooooxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxoxoxooxoxoxoxoxoxoxoxoxoxoxoxoxxxxxxoooooxoxoxoooooxxxxoooooxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxoxoxooxoxoxoxoxoxoxoxoxoxoxoxoxxxxxxoooooxoxoxoooooxxxxoooooxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxoxoxooxoxoxoxoxoxoxoxoxoxoxoxoxxxxxxoooooxoxoxoooooxxxxoooooxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxoxoxooxoxoxoxoxoxoxoxoxoxoxoxoxxxxxxoooooxoxoxoooooxxxxoooooxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxoxoxooxoxoxoxoxoxoxoxoxoxoxoxoxxxxxxoooooxoxoxoooooxxxxoooooxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxoxoxooxoxoxoxoxoxoxoxoxoxoxoxo");
        inputsToCompress.add(stringBuffer);
        inputsToCompress.add(generateRandomData(1000).toString());
        inputsToCompress.add(generateRandomData(KafkaClientUtils.MAX_POLL_RECORDS).toString());
        inputsToCompress.add(generateRandomData(30000).toString());
    }

    private static StringBuffer generateRandomData(int i) {
        StringBuffer stringBuffer = new StringBuffer();
        Range.range(i).toStream().mapToObj(i2 -> {
            return Boolean.valueOf(RandomUtils.nextBoolean());
        }).forEach(bool -> {
            stringBuffer.append(bool.booleanValue() ? 'x' : 'o');
        });
        return stringBuffer;
    }

    @Test
    void serialiseCycle() {
        String serialiseIncompleteOffsetMapToBase64 = this.offsetCodecManager.serialiseIncompleteOffsetMapToBase64(this.finalOffsetForPartition, this.state);
        log.info("Size: {}", Integer.valueOf(serialiseIncompleteOffsetMapToBase64.length()));
        Assertions.assertThat(OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(this.finalOffsetForPartition, serialiseIncompleteOffsetMapToBase64).getIncompleteOffsets().toArray()).containsExactly(this.incompleteOffsets.toArray());
    }

    @Test
    void javaSerialisationComparison() {
        TreeSet treeSet = new TreeSet(UniSets.of(1L));
        TreeSet treeSet2 = new TreeSet(UniSets.of(2L));
        String encodeAsJavaObjectStream = OffsetSimpleSerialisation.encodeAsJavaObjectStream(treeSet);
        String substring = encodeAsJavaObjectStream.substring(0, encodeAsJavaObjectStream.length() - 5);
        String encodeAsJavaObjectStream2 = OffsetSimpleSerialisation.encodeAsJavaObjectStream(treeSet2);
        Assertions.assertThat(substring).isEqualTo(encodeAsJavaObjectStream2.substring(0, encodeAsJavaObjectStream2.length() - 5));
    }

    @Test
    void runLengthEncodingCompression() {
        Iterator it = UniLists.of("xxxxxxoooooxoxoxoooooxxxxooooo", "6x,5o,1x,1o,1x,1o,1x,5o,4x,5o", "oooooooooooooooooooooooooooooo", "30o", "ooooooooooooooxxxxxxxxxxxxxxxx", "15o,15x", "x", "1x", "").iterator();
        while (it.hasNext()) {
            compareCompression((String) it.next());
        }
    }

    private byte[] compareCompression(String str) throws IOException {
        log.info("testing input of {}", str);
        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
        byte[] compressGzip = OffsetSimpleSerialisation.compressGzip(bytes);
        byte[] compressZstd = OffsetSimpleSerialisation.compressZstd(bytes);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        SnappyOutputStream snappyOutputStream = new SnappyOutputStream(byteArrayOutputStream);
        snappyOutputStream.write(bytes);
        snappyOutputStream.close();
        String encodeToString = Base64.getEncoder().encodeToString(compressGzip);
        String encodeToString2 = Base64.getEncoder().encodeToString(compressZstd);
        String encodeToString3 = Base64.getEncoder().encodeToString(byteArrayOutputStream.toByteArray());
        String encodeToString4 = Base64.getEncoder().encodeToString(bytes);
        log.info("g {}", Integer.valueOf(compressGzip.length));
        log.info("z {}", Integer.valueOf(compressZstd.length));
        log.info("s {}", Integer.valueOf(byteArrayOutputStream.size()));
        log.info("64");
        log.info("r {}", Integer.valueOf(encodeToString4.length()));
        log.info("g {}", Integer.valueOf(encodeToString.length()));
        log.info("z {}", Integer.valueOf(encodeToString2.length()));
        log.info("s {}", Integer.valueOf(encodeToString3.length()));
        return compressGzip;
    }

    @Disabled("TODO: Blocker: Not implemented yet")
    @Test
    void truncationOnCommit() {
        this.wm.onOffsetCommitSuccess(UniMaps.of());
        Assertions.assertThat(true).isFalse();
    }

    @Test
    void base64Encoding() {
        byte[] bytes = "TEST".getBytes(StandardCharsets.UTF_8);
        byte[] decode = Base64.getDecoder().decode(Base64.getEncoder().encodeToString(bytes));
        Assertions.assertThat(bytes).isEqualTo(decode);
        Assertions.assertThat("TEST").isEqualTo(new String(decode, StandardCharsets.UTF_8));
    }

    @Test
    void loadCompressedRunLengthEncoding() {
        Assertions.assertThat(OffsetMapCodecManager.decodeCompressedOffsets(this.finalOffsetForPartition, this.offsetCodecManager.encodeOffsetsCompressed(this.finalOffsetForPartition, this.state)).getIncompleteOffsets().toArray()).containsExactly(this.incompleteOffsets.toArray());
    }

    @Test
    void decodeOffsetMap() {
        Assertions.assertThat(OffsetCodecTestUtils.bitmapStringToIncomplete(2L, "ooxx")).containsExactly(new Long[]{2L, 3L});
        Assertions.assertThat(OffsetCodecTestUtils.bitmapStringToIncomplete(2L, "ooxxoxox")).containsExactly(new Long[]{2L, 3L, 6L, 8L});
        Assertions.assertThat(OffsetCodecTestUtils.bitmapStringToIncomplete(2L, "o")).containsExactly(new Long[]{2L});
        Assertions.assertThat(OffsetCodecTestUtils.bitmapStringToIncomplete(2L, "x")).containsExactly(new Long[0]);
        Assertions.assertThat(OffsetCodecTestUtils.bitmapStringToIncomplete(2L, "")).containsExactly(new Long[0]);
        Assertions.assertThat(OffsetCodecTestUtils.bitmapStringToIncomplete(2L, "ooo")).containsExactly(new Long[]{2L, 3L, 4L});
        Assertions.assertThat(OffsetCodecTestUtils.bitmapStringToIncomplete(2L, "xxx")).containsExactly(new Long[0]);
    }

    @Test
    void binaryArrayConstruction() {
        injectSucceededWorkAtOffset(6L);
        Assertions.assertThat(OffsetCodecTestUtils.incompletesToBitmapString(this.finalOffsetForPartition, this.state)).isEqualTo("oxooxx");
    }

    @Test
    void compressDecompressSanityGzip() {
        byte[] bytes = "Lilan".getBytes();
        Assertions.assertThat(OffsetSimpleSerialisation.decompressGzip(ByteBuffer.wrap(OffsetSimpleSerialisation.compressGzip(bytes)))).isEqualTo(bytes);
    }

    @Test
    void compressDecompressWithBase64SanityGzip() {
        byte[] bytes = "Lilan".getBytes();
        Assertions.assertThat(OffsetSimpleSerialisation.decompressGzip(ByteBuffer.wrap(Base64.getDecoder().decode(Base64.getEncoder().encode(OffsetSimpleSerialisation.compressGzip(bytes)))))).isEqualTo(bytes);
    }

    @Test
    void compressDecompressSanityZstd() {
        byte[] bytes = "Lilan".getBytes();
        Assertions.assertThat(OffsetSimpleSerialisation.decompressZstd(ByteBuffer.wrap(OffsetSimpleSerialisation.compressZstd(bytes)))).isEqualTo(ByteBuffer.wrap(bytes));
    }

    @Test
    void largeOffsetMap() {
        injectSucceededWorkAtOffset(200L);
        ((AbstractByteArrayAssert) Assertions.assertThat(this.offsetCodecManager.encodeOffsetsCompressed(0L, this.state)).as("very small", new Object[0])).hasSizeLessThan(10);
    }

    @Test
    void stringVsByteVsBitSetEncoding() {
        for (String str : inputsToCompress) {
            str.length();
            OffsetSimultaneousEncoder invoke = new OffsetSimultaneousEncoder(this.finalOffsetForPartition, this.highestSucceeded, OffsetCodecTestUtils.bitmapStringToIncomplete(this.finalOffsetForPartition, str)).invoke();
            log.info("in: {}", str);
        }
    }

    @Test
    void deserialiseBitSet() {
        OffsetSimultaneousEncoder offsetSimultaneousEncoder = new OffsetSimultaneousEncoder(0, "oxxooooooo".length() - 1, OffsetCodecTestUtils.bitmapStringToIncomplete(0, "oxxooooooo"));
        offsetSimultaneousEncoder.invoke();
        Assertions.assertThat(EncodedOffsetPair.unwrap(offsetSimultaneousEncoder.packSmallest()).getDecodedString()).isEqualTo("oxxooooooo");
    }

    @Test
    void compressionCycle() {
        Assertions.assertThat(OffsetMapCodecManager.decodeCompressedOffsets(this.finalOffsetForPartition, this.offsetCodecManager.encodeOffsetsCompressed(this.finalOffsetForPartition, this.state)).getIncompleteOffsets()).isEqualTo(this.incompleteOffsets);
    }

    @Test
    void runLengthEncoding() {
        String incompletesToBitmapString = OffsetCodecTestUtils.incompletesToBitmapString(this.finalOffsetForPartition, this.state);
        List runLengthEncode = OffsetRunLength.runLengthEncode(incompletesToBitmapString);
        Assertions.assertThat(runLengthEncode).as("encoding of map: " + incompletesToBitmapString, new Object[0]).containsExactlyElementsOf(UniLists.of(1, 1, 2));
        Assertions.assertThat(OffsetRunLength.runLengthDecodeToString(runLengthEncode)).isEqualTo(incompletesToBitmapString);
    }

    static List<String> differentInputsAndCompressions() {
        return inputsToCompress;
    }

    @MethodSource
    @ParameterizedTest
    void differentInputsAndCompressions(String str) {
        long length = str.length() - 1;
        log.debug("Testing round - size: {} input: '{}'", Integer.valueOf(str.length()), str);
        Set<Long> bitmapStringToIncomplete = OffsetCodecTestUtils.bitmapStringToIncomplete(this.finalOffsetForPartition, str);
        Truth.assertThat(OffsetCodecTestUtils.incompletesToBitmapString(this.finalOffsetForPartition, length + 1, bitmapStringToIncomplete)).isEqualTo(str);
        OffsetSimultaneousEncoder offsetSimultaneousEncoder = new OffsetSimultaneousEncoder(this.finalOffsetForPartition, length, bitmapStringToIncomplete);
        offsetSimultaneousEncoder.invoke();
        for (EncodedOffsetPair encodedOffsetPair : offsetSimultaneousEncoder.sortedEncodings) {
            Set incompleteOffsets = OffsetMapCodecManager.decodeCompressedOffsets(this.finalOffsetForPartition, offsetSimultaneousEncoder.packEncoding(encodedOffsetPair)).getIncompleteOffsets();
            Assertions.assertThat(incompleteOffsets).containsExactlyInAnyOrderElementsOf(bitmapStringToIncomplete);
            Truth.assertWithMessage(encodedOffsetPair.encoding.name()).that(OffsetCodecTestUtils.incompletesToBitmapString(this.finalOffsetForPartition, length + 1, incompleteOffsets)).isEqualTo(str);
        }
    }

    public static List<String> getInputsToCompress() {
        return inputsToCompress;
    }
}
