package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.utils.GeneralTestUtils;
import io.confluent.csid.utils.KafkaTestUtils;
import io.confluent.csid.utils.ProgressBarUtils;
import io.confluent.csid.utils.Range;
import io.confluent.csid.utils.ThreadUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import me.tongfei.progressbar.ProgressBar;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.assertj.core.api.Assertions;
import org.assertj.core.util.Lists;
import org.awaitility.Awaitility;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.com.google.common.collect.Maps;
import pl.tlinkowski.unij.api.UniLists;

/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/LargeVolumeInMemoryTests.class */
public class LargeVolumeInMemoryTests extends ParallelEoSStreamProcessorTestBase {
    private static final Logger log = LoggerFactory.getLogger(LargeVolumeInMemoryTests.class);
    KafkaTestUtils ku = new KafkaTestUtils(this.consumerSpy);

    @EnumSource(ParallelConsumerOptions.CommitMode.class)
    @ParameterizedTest
    public void load(ParallelConsumerOptions.CommitMode commitMode) {
        setupClients();
        setupParallelConsumerInstance(ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).commitMode(commitMode).build());
        this.ku.send(this.consumerSpy, this.ku.generateRecords(500));
        CountDownLatch countDownLatch = new CountDownLatch(500);
        this.parallelConsumer.pollAndProduceMany(consumerRecord -> {
            return UniLists.of((ProducerRecord) Mockito.mock(ProducerRecord.class));
        }, consumeProduceResult -> {
            countDownLatch.countDown();
        });
        countDownLatch.await(defaultTimeoutSeconds, TimeUnit.SECONDS);
        this.parallelConsumer.waitForProcessedNotCommitted(defaultTimeout.multipliedBy(10L));
        this.parallelConsumer.close();
        Assertions.assertThat(this.producerSpy.history()).hasSize(500);
        if (commitMode.equals(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER)) {
            assertCommitsAlwaysIncrease();
            Assertions.assertThat(this.producerSpy.consumerGroupOffsetsHistory()).isNotEmpty();
            Assertions.assertThat(findMostRecentCommitOffset(this.producerSpy)).isEqualTo(500);
        } else {
            CopyOnWriteArrayList<Map<TopicPartition, OffsetAndMetadata>> commitHistoryInt = this.consumerSpy.getCommitHistoryInt();
            Assertions.assertThat(commitHistoryInt).isNotEmpty();
            Assertions.assertThat(((OffsetAndMetadata) ((List) commitHistoryInt.get(commitHistoryInt.size() - 1).values().stream().collect(Collectors.toList())).get(0)).offset()).isEqualTo(500);
        }
    }

    private void assertCommitsAlwaysIncrease() {
        HashMap hashMap = new HashMap();
        Iterator it = this.producerSpy.consumerGroupOffsetsHistory().iterator();
        while (it.hasNext()) {
            for (Map.Entry entry : ((Map) ((Map) it.next()).get(ParallelEoSStreamProcessorTestBase.CONSUMER_GROUP_ID)).entrySet()) {
                ((List) hashMap.computeIfAbsent((TopicPartition) entry.getKey(), topicPartition -> {
                    return new ArrayList();
                })).add(Long.valueOf(((OffsetAndMetadata) entry.getValue()).offset()));
            }
        }
        log.trace("Sorted offset commit history: {}", hashMap);
        Iterator it2 = hashMap.entrySet().iterator();
        while (it2.hasNext()) {
            List<Long> list = (List) ((Map.Entry) it2.next()).getValue();
            long longValue = ((Long) list.get(0)).longValue();
            for (Long l : list) {
                if (longValue > l.longValue()) {
                    throw new AssertionError("Offsets not in incrementing order: last seen: " + longValue + " vs current: " + l);
                }
            }
        }
    }

    private long findMostRecentCommitOffset(MockProducer<?, ?> mockProducer) {
        List consumerGroupOffsetsHistory = mockProducer.consumerGroupOffsetsHistory();
        Assertions.assertThat(consumerGroupOffsetsHistory).as("No offsets committed", new Object[0]).hasSizeGreaterThan(0);
        return ((OffsetAndMetadata) ((Map) ((Map) consumerGroupOffsetsHistory.get(consumerGroupOffsetsHistory.size() - 1)).get(ParallelEoSStreamProcessorTestBase.CONSUMER_GROUP_ID)).get(new TopicPartition("input", 0))).offset();
    }

    @EnumSource(ParallelConsumerOptions.CommitMode.class)
    @ParameterizedTest
    public void timingOfDifferentOrderingTypes(ParallelConsumerOptions.CommitMode commitMode) {
        int i = 1000;
        int i2 = 20;
        ParallelConsumerOptions build = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).commitMode(commitMode).build();
        setupParallelConsumerInstance(build);
        Duration duration = null;
        Iterator it = Range.range(2L).iterator();
        while (it.hasNext()) {
            Integer num = (Integer) it.next();
            setupParallelConsumerInstance(build.toBuilder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).build());
            log.debug("No order");
            duration = GeneralTestUtils.time(() -> {
                testTiming(i2, i);
            });
            log.info("Duration for Unordered processing in round {} with {} keys was {}", new Object[]{num, 20, duration});
        }
        TreeMap newTreeMap = Maps.newTreeMap();
        for (Integer num2 : UniLists.of(1, 2, 5, 10, 20, 50, 100, 1000)) {
            setupParallelConsumerInstance(build.toBuilder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).build());
            log.debug("By key, {} keys", num2);
            Duration time = GeneralTestUtils.time(() -> {
                testTiming(num2.intValue(), i);
            });
            log.info("Duration for Key order processing {} keys was {}", num2, time);
            newTreeMap.put(num2, time);
        }
        setupParallelConsumerInstance(build.toBuilder().ordering(ParallelConsumerOptions.ProcessingOrder.PARTITION).build());
        log.debug("By partition");
        Duration time2 = GeneralTestUtils.time(() -> {
            testTiming(i2, i);
        });
        log.info("Duration for Partition order processing {} keys was {}", 20, time2);
        log.info("Key duration results:\n{}", newTreeMap);
        log.info("Unordered duration: {}", duration);
        Duration duration2 = (Duration) newTreeMap.get(5);
        Assertions.assertThat(duration).as("UNORDERED should be faster than PARTITION order", new Object[0]).isLessThan(time2);
        if (commitMode.equals(ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC)) {
            Assertions.assertThat(duration).as("Committing synchronously from the controller causes a large overhead, making UNORDERED very close in speed to KEY order, keySize of: 5", new Object[0]).isCloseTo(duration2, duration2.plus(duration2.dividedBy(5L)));
        } else {
            Assertions.assertThat(duration).as("UNORDERED should be faster than KEY order, keySize of: 5", new Object[0]).isLessThan(duration2);
        }
        Assertions.assertThat(duration2).as("KEY order is faster than PARTITION order", new Object[0]).isLessThan(time2);
    }

    private void testTiming(int i, int i2) {
        log.info("Running test for {} keys and {} messages", Integer.valueOf(i), Integer.valueOf(i2));
        this.ku.send(this.consumerSpy, this.ku.generateRecords(Range.range(i).list(), i2));
        ProgressBar newMessagesBar = ProgressBarUtils.getNewMessagesBar(log, i2);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        this.parallelConsumer.pollAndProduceMany(consumerRecord -> {
            concurrentLinkedQueue.add(consumerRecord);
            ThreadUtils.sleepQuietly(3);
            ProducerRecord producerRecord = new ProducerRecord("output", "sk:" + ((String) consumerRecord.key()), "SourceV: " + ((String) consumerRecord.value()));
            newMessagesBar.stepTo(this.producerSpy.history().size());
            return UniLists.of(producerRecord);
        }, consumeProduceResult -> {
        });
        Awaitility.waitAtMost(defaultTimeout.multipliedBy(10L)).untilAsserted(() -> {
            Assertions.assertThat(this.successfulWork.size()).as("All messages expected messages were processed and successful", new Object[0]).isEqualTo(i2);
            Assertions.assertThat(this.producerSpy.history().size()).as("All messages expected messages were processed and results produced", new Object[0]).isEqualTo(i2);
        });
        newMessagesBar.close();
        log.info("Closing async client");
        this.parallelConsumer.close();
        assertCommitsAlwaysIncrease();
        if (concurrentLinkedQueue.size() != i2) {
            int i3 = 0;
            ArrayList newArrayList = Lists.newArrayList(concurrentLinkedQueue.iterator());
            newArrayList.sort(Comparator.comparing(consumerRecord2 -> {
                return Integer.valueOf(Integer.parseInt((String) consumerRecord2.value()));
            }));
            log.error("Expectation mismatch - where are my messages?");
            Iterator it = newArrayList.iterator();
            while (it.hasNext()) {
                int parseInt = Integer.parseInt((String) ((ConsumerRecord) it.next()).value());
                if (i3 != parseInt) {
                    log.error("bad step: {} vs {}", Integer.valueOf(i3), Integer.valueOf(parseInt));
                    throw new RuntimeException("bad process step, expected message is missing: " + i3 + " vs " + parseInt);
                }
                i3++;
            }
        }
        List history = this.producerSpy.history();
        ArrayList arrayList = new ArrayList();
        if (history.size() == i2) {
            Assertions.assertThat(this.producerSpy.history().size()).as("Finally, all messages expected messages were produced", new Object[0]).isEqualTo(i2);
            if (isUsingTransactionalProducer()) {
                Assertions.assertThat(this.producerSpy.consumerGroupOffsetsHistory()).as("No offsets committed", new Object[0]).hasSizeGreaterThan(0);
            } else {
                Assertions.assertThat(this.consumerSpy.getCommitHistoryInt()).as("No offsets committed", new Object[0]).hasSizeGreaterThan(0);
            }
            this.successfulWork.clear();
            return;
        }
        int i4 = 0;
        history.sort(Comparator.comparing(producerRecord -> {
            Objects.requireNonNull(producerRecord);
            return Integer.valueOf(Integer.parseInt((String) producerRecord.value()));
        }));
        log.error("Expectation mismatch - where are my messages?");
        Iterator it2 = history.iterator();
        while (it2.hasNext()) {
            int parseInt2 = Integer.parseInt((String) ((ProducerRecord) it2.next()).value());
            if (i4 != parseInt2) {
                log.error("bad step: {} vs {}", Integer.valueOf(i4), Integer.valueOf(parseInt2));
                arrayList.add(Integer.valueOf(parseInt2));
                i4++;
            }
            i4++;
        }
        if (!arrayList.isEmpty()) {
            log.error("Missing: {}", arrayList);
        }
        throw new RuntimeException("bad step, expected message(s) is missing: " + arrayList);
    }
}
