package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.utils.GeneralTestUtils;
import io.confluent.csid.utils.ProgressBarUtils;
import io.confluent.csid.utils.Range;
import io.confluent.parallelconsumer.AbstractParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import me.tongfei.progressbar.ProgressBar;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.assertj.core.api.Assertions;
import org.assertj.core.util.Lists;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.junit.jupiter.Testcontainers;
import pl.tlinkowski.unij.api.UniLists;

@Testcontainers
/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/LoadTest.class */
public class LoadTest extends DbTest {
    private static final Logger log = LoggerFactory.getLogger(LoadTest.class);
    static int total = 4000;

    public void setupTestData() {
        setupTopic();
        publishMessages(total / 100, total, this.topic);
    }

    @Test
    void timedNormalKafkaConsumerTest() {
        setupTestData();
        this.kcu.consumer.subscribe(UniLists.of(this.topic));
        readRecordsPlainConsumer(total, this.topic);
    }

    @Test
    void asyncConsumeAndProcess() {
        setupTestData();
        ParallelEoSStreamProcessor parallelEoSStreamProcessor = new ParallelEoSStreamProcessor(ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER).producer(this.kcu.createNewProducer(true)).consumer(this.kcu.createNewConsumer()).maxConcurrency(3).build());
        parallelEoSStreamProcessor.subscribe(Pattern.compile(this.topic));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ProgressBar newMessagesBar = ProgressBarUtils.getNewMessagesBar(log, total);
        try {
            parallelEoSStreamProcessor.poll(consumerRecord -> {
                sleepABit();
                atomicInteger.getAndIncrement();
            });
            Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> {
                newMessagesBar.stepTo(atomicInteger.get());
                return Boolean.valueOf(atomicInteger.get() >= total);
            });
            if (newMessagesBar != null) {
                newMessagesBar.close();
            }
            parallelEoSStreamProcessor.close();
        } finally {
        }
    }

    private void sleepABit() {
        try {
            Thread.sleep(RandomUtils.nextInt(0, 5));
        } catch (Exception e) {
        }
    }

    private void readRecordsPlainConsumer(int i, String str) {
        log.info("Starting to read back");
        ArrayList newArrayList = Lists.newArrayList();
        AtomicInteger atomicInteger = new AtomicInteger();
        GeneralTestUtils.time(() -> {
            ProgressBar newMessagesBar = ProgressBarUtils.getNewMessagesBar(log, i);
            Executors.newCachedThreadPool().submit(() -> {
                while (newArrayList.size() < i) {
                    ConsumerRecords poll = this.kcu.consumer.poll(Duration.ofMillis(500L));
                    log.info("Polled batch of {} messages", Integer.valueOf(poll.count()));
                    Iterable records = poll.records(str);
                    records.forEach(consumerRecord -> {
                        sleepABit();
                        newMessagesBar.step();
                    });
                    ArrayList newArrayList2 = Lists.newArrayList(records);
                    newArrayList.addAll(newArrayList2);
                    atomicInteger.getAndAdd(newArrayList2.size());
                }
            });
            try {
                Awaitility.await().atMost(Duration.ofSeconds(60L)).untilAsserted(() -> {
                    Assertions.assertThat(atomicInteger).hasValue(i);
                });
                if (newMessagesBar != null) {
                    newMessagesBar.close();
                }
            } catch (Throwable th) {
                if (newMessagesBar != null) {
                    try {
                        newMessagesBar.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
        Assertions.assertThat(newArrayList).hasSize(i);
    }

    private void publishMessages(int i, int i2, String str) {
        List list = Range.range(i).list();
        ArrayList newArrayList = Lists.newArrayList(IntStream.range(0, i2).iterator());
        LinkedList linkedList = new LinkedList();
        log.info("Start publishing...");
        GeneralTestUtils.time(() -> {
            for (Integer num : ProgressBar.wrap(newArrayList, "Publishing async")) {
                try {
                    linkedList.add(this.kcu.producer.send(new ProducerRecord(str, ((Integer) list.get(RandomUtils.nextInt(0, list.size()))).toString(), RandomStringUtils.randomAlphabetic(AbstractParallelEoSStreamProcessorTestBase.DEFAULT_BROKER_POLL_FREQUENCY_MS))));
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        });
        HashSet hashSet = new HashSet();
        Iterator it = ProgressBar.wrap(linkedList, "Joining").iterator();
        while (it.hasNext()) {
            hashSet.add(Integer.valueOf(((RecordMetadata) ((Future) it.next()).get()).partition()));
        }
        if (this.numPartitions > 100000) {
            Assertions.assertThat(hashSet.stream().distinct()).as("All partitions are made use of", new Object[0]).hasSize(this.numPartitions);
        }
    }
}
