/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.utils.GeneralTestUtils;
import io.confluent.csid.utils.ProgressBarUtils;
import io.confluent.csid.utils.Range;
import io.confluent.csid.utils.ThreadUtils;
import io.confluent.parallelconsumer.FakeRuntimeException;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import me.tongfei.progressbar.ProgressBar;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.assertj.core.api.AbstractDurationAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.util.Lists;
import org.awaitility.Awaitility;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

class LargeVolumeInMemoryTests
extends ParallelEoSStreamProcessorTestBase {
    private static final Logger log = LoggerFactory.getLogger(LargeVolumeInMemoryTests.class);

    LargeVolumeInMemoryTests() {
    }

    @ParameterizedTest
    @EnumSource(value=ParallelConsumerOptions.CommitMode.class)
    void load(ParallelConsumerOptions.CommitMode commitMode) {
        this.setupClients();
        this.setupParallelConsumerInstance(ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).commitMode(commitMode).build());
        int quantityOfMessagesToProduce = 500;
        List<ConsumerRecord<String, String>> records = this.ktu.generateRecords(quantityOfMessagesToProduce);
        this.ktu.send((MockConsumer<String, String>)this.consumerSpy, records);
        CountDownLatch allMessagesConsumedLatch = new CountDownLatch(quantityOfMessagesToProduce);
        this.parallelConsumer.pollAndProduceMany(rec -> {
            ProducerRecord mock = (ProducerRecord)Mockito.mock(ProducerRecord.class);
            return UniLists.of((Object)mock);
        }, x -> allMessagesConsumedLatch.countDown());
        allMessagesConsumedLatch.await(defaultTimeoutSeconds, TimeUnit.SECONDS);
        this.parallelConsumer.close();
        List history = this.producerSpy.history();
        Assertions.assertThat((List)history).hasSize(quantityOfMessagesToProduce);
        if (commitMode.equals((Object)ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER)) {
            this.assertCommitsAlwaysIncrease();
            List producerCommits = this.producerSpy.consumerGroupOffsetsHistory();
            Assertions.assertThat((List)producerCommits).isNotEmpty();
            long mostRecentProducerCommitOffset = this.findMostRecentCommitOffset(this.producerSpy);
            Assertions.assertThat((long)mostRecentProducerCommitOffset).isEqualTo((long)quantityOfMessagesToProduce);
        } else {
            CopyOnWriteArrayList<Map<TopicPartition, OffsetAndMetadata>> consumerCommitHistory = this.consumerSpy.getCommitHistoryInt();
            Assertions.assertThat(consumerCommitHistory).isNotEmpty();
            long mostRecentConsumerCommitOffset = ((OffsetAndMetadata)new ArrayList(((Map)consumerCommitHistory.get(consumerCommitHistory.size() - 1)).values()).get(0)).offset();
            Assertions.assertThat((long)mostRecentConsumerCommitOffset).isEqualTo((long)quantityOfMessagesToProduce);
        }
    }

    private void assertCommitsAlwaysIncrease() {
        HashMap<TopicPartition, List> map = new HashMap<TopicPartition, List>();
        for (Map map2 : this.producerSpy.consumerGroupOffsetsHistory()) {
            for (Map.Entry entry : ((Map)map2.get(this.CONSUMER_GROUP_ID)).entrySet()) {
                map.computeIfAbsent((TopicPartition)entry.getKey(), ignore -> new ArrayList()).add(((OffsetAndMetadata)entry.getValue()).offset());
            }
        }
        log.trace("Sorted offset commit history: {}", map);
        for (Map.Entry entry : map.entrySet()) {
            List value = (List)entry.getValue();
            long lastSeenOffset = (Long)value.get(0);
            for (Long offset : value) {
                if (lastSeenOffset > offset) {
                    throw new AssertionError((Object)("Offsets not in incrementing order: last seen: " + lastSeenOffset + " vs current: " + offset));
                }
            }
        }
    }

    private long findMostRecentCommitOffset(MockProducer<?, ?> producerSpy) {
        List commitHistory = producerSpy.consumerGroupOffsetsHistory();
        ((ListAssert)Assertions.assertThat((List)commitHistory).as("No offsets committed", new Object[0])).hasSizeGreaterThan(0);
        Map mostRecent = (Map)commitHistory.get(commitHistory.size() - 1);
        Map topicPartitionOffsetAndMetadataMap = (Map)mostRecent.get(this.CONSUMER_GROUP_ID);
        OffsetAndMetadata mostRecentTPCommit = (OffsetAndMetadata)topicPartitionOffsetAndMetadataMap.get(new TopicPartition(this.INPUT_TOPIC, 0));
        return mostRecentTPCommit.offset();
    }

    @ParameterizedTest
    @EnumSource(value=ParallelConsumerOptions.CommitMode.class)
    void timingOfDifferentOrderingTypes(ParallelConsumerOptions.CommitMode commitMode) {
        int quantityOfMessagesToProduce = 1000;
        int defaultNumKeys = 20;
        ParallelConsumerOptions baseOptions = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).commitMode(commitMode).build();
        this.setupParallelConsumerInstance(baseOptions);
        Duration unorderedDuration = null;
        for (Object round : Range.range((long)2L)) {
            this.setupParallelConsumerInstance(baseOptions.toBuilder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).build());
            log.debug("No order");
            unorderedDuration = GeneralTestUtils.time(() -> this.testTiming(defaultNumKeys, quantityOfMessagesToProduce));
            log.info("Duration for Unordered processing in round {} with {} keys was {}", new Object[]{round, defaultNumKeys, unorderedDuration});
        }
        TreeMap<Integer, Duration> keyOrderingSizeToResults = new TreeMap<Integer, Duration>();
        for (Integer keySize : UniLists.of((Object)1, (Object)2, (Object)5, (Object)10, (Object)20, (Object)50, (Object)100, (Object)1000)) {
            this.setupParallelConsumerInstance(baseOptions.toBuilder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).build());
            log.debug("By key, {} keys", (Object)keySize);
            Duration keyOrderDuration = GeneralTestUtils.time(() -> this.testTiming(keySize, quantityOfMessagesToProduce));
            log.info("Duration for Key order processing {} keys was {}", (Object)keySize, (Object)keyOrderDuration);
            keyOrderingSizeToResults.put(keySize, keyOrderDuration);
        }
        this.setupParallelConsumerInstance(baseOptions.toBuilder().ordering(ParallelConsumerOptions.ProcessingOrder.PARTITION).build());
        log.debug("By partition");
        Duration partitionOrderDuration = GeneralTestUtils.time(() -> this.testTiming(defaultNumKeys, quantityOfMessagesToProduce));
        log.info("Duration for Partition order processing {} keys was {}", (Object)defaultNumKeys, (Object)partitionOrderDuration);
        log.info("Key duration results:\n{}", keyOrderingSizeToResults);
        log.info("Unordered duration: {}", (Object)unorderedDuration);
        ((AbstractDurationAssert)Assertions.assertThat((Duration)unorderedDuration).as("UNORDERED should be faster than PARTITION order", new Object[0])).isLessThan((Comparable)partitionOrderDuration);
        int numOfKeysToCompare = 20;
        Duration keyOrderHalfDefaultKeySize = (Duration)keyOrderingSizeToResults.get(numOfKeysToCompare);
        ((AbstractDurationAssert)Assertions.assertThat((Duration)keyOrderHalfDefaultKeySize).as("KEY order should be faster than PARTITION order", new Object[0])).isLessThan((Comparable)partitionOrderDuration);
    }

    private void testTiming(int numberOfKeys, int quantityOfMessagesToProduce) {
        log.info("Running test for {} keys and {} messages", (Object)numberOfKeys, (Object)quantityOfMessagesToProduce);
        ArrayList<WorkContainer<String, String>> successfulWork = new ArrayList<WorkContainer<String, String>>();
        super.injectWorkSuccessListener((WorkManager<String, String>)this.parallelConsumer.getWm(), successfulWork);
        List keys = Range.listOfIntegers((int)numberOfKeys);
        HashMap<Integer, List<ConsumerRecord<String, String>>> records = this.ktu.generateRecords(keys, quantityOfMessagesToProduce);
        this.ktu.send((MockConsumer<String, String>)this.consumerSpy, records);
        ProgressBar bar = ProgressBarUtils.getNewMessagesBar(log, quantityOfMessagesToProduce);
        ConcurrentLinkedQueue processingCheck = new ConcurrentLinkedQueue();
        this.parallelConsumer.pollAndProduceMany(rec -> {
            processingCheck.add(rec.getSingleConsumerRecord());
            ThreadUtils.sleepQuietly(3);
            ProducerRecord stub = new ProducerRecord(this.OUTPUT_TOPIC, (Object)("sk:" + (String)rec.key()), (Object)("SourceV: " + (String)rec.value()));
            bar.stepTo((long)this.producerSpy.history().size());
            return UniLists.of((Object)stub);
        }, x -> {});
        Awaitility.waitAtMost((Duration)defaultTimeout.multipliedBy(15L)).untilAsserted(() -> {
            List list = successfulWork;
            synchronized (list) {
                ((ListAssert)Assertions.assertThat((List)successfulWork).as("All expected messages were processed and successful", new Object[0])).hasSize(quantityOfMessagesToProduce);
            }
            ((ListAssert)Assertions.assertThat((List)this.producerSpy.history()).as("Expected number of produced messages", new Object[0])).hasSize(quantityOfMessagesToProduce);
        });
        bar.close();
        log.info("Closing async client");
        this.parallelConsumer.close();
        this.assertCommitsAlwaysIncrease();
        if (processingCheck.size() != quantityOfMessagesToProduce) {
            int stepIndex = 0;
            ArrayList processingCheckCollection = Lists.newArrayList(processingCheck.iterator());
            processingCheckCollection.sort(Comparator.comparing(record -> Integer.parseInt((String)record.value())));
            log.error("Expectation mismatch - where are my messages?");
            for (ConsumerRecord rec2 : processingCheckCollection) {
                int i = Integer.parseInt((String)rec2.value());
                if (stepIndex != i) {
                    log.error("bad step: {} vs {}", (Object)stepIndex, (Object)i);
                    throw new FakeRuntimeException("bad process step, expected message is missing: " + stepIndex + " vs " + i);
                }
                ++stepIndex;
            }
        }
        List history = this.producerSpy.history();
        ArrayList<Integer> missing = new ArrayList<Integer>();
        if (history.size() != quantityOfMessagesToProduce) {
            int stepIndex = 0;
            history.sort(Comparator.comparing(record -> {
                Objects.requireNonNull(record);
                return Integer.parseInt((String)record.value());
            }));
            log.error("Expectation mismatch - where are my messages?");
            for (ProducerRecord rec3 : history) {
                int i = Integer.parseInt((String)rec3.value());
                if (stepIndex != i) {
                    log.error("bad step: {} vs {}", (Object)stepIndex, (Object)i);
                    missing.add(i);
                    ++stepIndex;
                }
                ++stepIndex;
            }
            if (!missing.isEmpty()) {
                log.error("Missing: {}", missing);
            }
            throw new FakeRuntimeException("bad step, expected message(s) is missing: " + missing);
        }
        ((ListAssert)Assertions.assertThat((List)this.producerSpy.history()).as("Finally, all messages expected messages were produced", new Object[0])).hasSize(quantityOfMessagesToProduce);
        if (this.isUsingTransactionalProducer()) {
            List groupOffsetsHistory = this.producerSpy.consumerGroupOffsetsHistory();
            ((ListAssert)Assertions.assertThat((List)groupOffsetsHistory).as("No offsets committed", new Object[0])).hasSizeGreaterThan(0);
        } else {
            CopyOnWriteArrayList<Map<TopicPartition, OffsetAndMetadata>> commitHistory = this.consumerSpy.getCommitHistoryInt();
            ((ListAssert)Assertions.assertThat(commitHistory).as("No offsets committed", new Object[0])).hasSizeGreaterThan(0);
        }
    }
}

