/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.utils.GeneralTestUtils;
import io.confluent.csid.utils.ProgressBarUtils;
import io.confluent.csid.utils.Range;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.integrationTests.DbTest;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import me.tongfei.progressbar.ProgressBar;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.util.Lists;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.junit.jupiter.Testcontainers;
import pl.tlinkowski.unij.api.UniLists;

@Testcontainers
public class LoadTest
extends DbTest {
    private static final Logger log = LoggerFactory.getLogger(LoadTest.class);
    static int total = 4000;

    public void setupTestData() {
        this.setupTopic();
        this.publishMessages(total / 100, total, this.topic);
    }

    @Test
    void timedNormalKafkaConsumerTest() {
        this.setupTestData();
        this.getKcu().getConsumer().subscribe((Collection)UniLists.of((Object)this.topic));
        this.readRecordsPlainConsumer(total, this.topic);
    }

    @Test
    void asyncConsumeAndProcess() {
        ProgressBar pb;
        this.setupTestData();
        KafkaConsumer newConsumer = this.getKcu().createNewConsumer();
        boolean tx = true;
        ParallelConsumerOptions options = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER).producer(this.getKcu().createNewProducer(tx)).consumer(newConsumer).maxConcurrency(3).build();
        ParallelEoSStreamProcessor async = new ParallelEoSStreamProcessor(options);
        async.subscribe(Pattern.compile(this.topic));
        AtomicInteger msgCount = new AtomicInteger(0);
        try (ProgressBar progressBar = pb = ProgressBarUtils.getNewMessagesBar(log, total);){
            async.poll(r -> {
                this.sleepABit();
                msgCount.getAndIncrement();
            });
            Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> {
                pb.stepTo((long)msgCount.get());
                return msgCount.get() >= total;
            });
        }
        async.close();
    }

    private void sleepABit() {
        int simulatedCPUMessageProcessingDelay = RandomUtils.nextInt((int)0, (int)5);
        try {
            Thread.sleep(simulatedCPUMessageProcessingDelay);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private void readRecordsPlainConsumer(int total, String topic) {
        log.info("Starting to read back");
        ArrayList allRecords = Lists.newArrayList();
        AtomicInteger count = new AtomicInteger();
        GeneralTestUtils.time(() -> {
            ProgressBar pb = ProgressBarUtils.getNewMessagesBar(log, total);
            Executors.newCachedThreadPool().submit(() -> {
                while (allRecords.size() < total) {
                    ConsumerRecords poll = this.getKcu().getConsumer().poll(Duration.ofMillis(500L));
                    log.info("Polled batch of {} messages", (Object)poll.count());
                    Iterable records = poll.records(topic);
                    records.forEach(x -> {
                        this.sleepABit();
                        pb.step();
                    });
                    ArrayList c = Lists.newArrayList((Iterable)records);
                    allRecords.addAll(c);
                    count.getAndAdd(c.size());
                }
            });
            try (ProgressBar progressBar = pb;){
                Awaitility.await().atMost(Duration.ofSeconds(60L)).untilAsserted(() -> Assertions.assertThat((AtomicInteger)count).hasValue(total));
            }
        });
        Assertions.assertThat((List)allRecords).hasSize(total);
    }

    private void publishMessages(int keyRange, int total, String topic) {
        List keys = Range.listOfIntegers((int)keyRange);
        ArrayList integers = Lists.newArrayList((Iterator)IntStream.range(0, total).iterator());
        LinkedList futureMetadataResultsFromPublishing = new LinkedList();
        log.info("Start publishing...");
        GeneralTestUtils.time(() -> {
            for (Integer x : ProgressBar.wrap((Iterable)integers, (String)"Publishing async")) {
                String key = ((Integer)keys.get(RandomUtils.nextInt((int)0, (int)keys.size()))).toString();
                int messageSizeInBytes = 500;
                String value = RandomStringUtils.randomAlphabetic((int)messageSizeInBytes);
                ProducerRecord producerRecord = new ProducerRecord(topic, (Object)key, (Object)value);
                try {
                    Future meta = this.getKcu().getProducer().send(producerRecord);
                    futureMetadataResultsFromPublishing.add(meta);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        });
        HashSet<Integer> usedPartitions = new HashSet<Integer>();
        for (Future meta : ProgressBar.wrap(futureMetadataResultsFromPublishing, (String)"Joining")) {
            RecordMetadata recordMetadata = (RecordMetadata)meta.get();
            int partition = recordMetadata.partition();
            usedPartitions.add(partition);
        }
        if (this.numPartitions > 100000) {
            ((ListAssert)Assertions.assertThat(usedPartitions.stream().distinct()).as("All partitions are made use of", new Object[0])).hasSize(this.numPartitions);
        }
    }
}

