package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.utils.ProgressBarUtils;
import io.confluent.csid.utils.StringUtils;
import io.confluent.csid.utils.TrimListRepresentation;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import me.tongfei.progressbar.ProgressBar;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.SoftAssertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/MultiInstanceHighVolumeTest.class */
class MultiInstanceHighVolumeTest extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(MultiInstanceHighVolumeTest.class);
    public List<String> consumedKeys = Collections.synchronizedList(new ArrayList());
    public List<String> producedKeysAcknowledged = Collections.synchronizedList(new ArrayList());
    public AtomicInteger processedCount = new AtomicInteger(0);
    public AtomicInteger producedCount = new AtomicInteger(0);
    int maxPoll = 500;
    ParallelConsumerOptions.CommitMode commitMode = ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC;
    ParallelConsumerOptions.ProcessingOrder order = ParallelConsumerOptions.ProcessingOrder.KEY;
    Integer barId = 0;

    MultiInstanceHighVolumeTest() {
    }

    @Test
    void multiInstance() {
        this.numPartitions = 12;
        String str = setupTopic(getClass().getSimpleName() + "-input");
        ArrayList arrayList = new ArrayList();
        log.info("Producing {} messages before starting test", 10000000);
        produceMessages(str, arrayList, 10000000);
        ParallelEoSStreamProcessor<String, String> buildPc = buildPc(str, 10000000);
        ParallelEoSStreamProcessor<String, String> buildPc2 = buildPc(str, 10000000);
        ParallelEoSStreamProcessor<String, String> buildPc3 = buildPc(str, 10000000);
        List<ConsumerRecord<?, ?>> synchronizedList = Collections.synchronizedList(new ArrayList());
        List<ConsumerRecord<?, ?>> synchronizedList2 = Collections.synchronizedList(new ArrayList());
        List<ConsumerRecord<?, ?>> synchronizedList3 = Collections.synchronizedList(new ArrayList());
        run(10000000 / 3, buildPc, synchronizedList);
        run(10000000 / 3, buildPc2, synchronizedList2);
        run(10000000 / 3, buildPc3, synchronizedList3);
        Assertions.useRepresentation(new TrimListRepresentation());
        String msg = StringUtils.msg("All keys sent to input-topic should be processed and produced, within time (expected: {} commit: {} order: {} max poll: {})", new Object[]{10000000, this.commitMode, this.order, Integer.valueOf(this.maxPoll)});
        try {
            Awaitility.waitAtMost(Duration.ofSeconds(60L)).alias(msg).pollInterval(1L, TimeUnit.SECONDS).untilAsserted(() -> {
                log.trace("Processed-count: {}, Produced-count: {}", Integer.valueOf(this.processedCount.get()), Integer.valueOf(this.producedCount.get()));
                SoftAssertions softAssertions = new SoftAssertions();
                softAssertions.assertThat(new ArrayList(this.consumedKeys)).as("all expected are consumed", new Object[0]).hasSameSizeAs(arrayList);
                softAssertions.assertAll();
            });
        } catch (ConditionTimeoutException e) {
            Assertions.fail(msg + "\n" + e.getMessage());
        }
        Assertions.assertThat(this.processedCount.get()).as("messages processed and produced by parallel-consumer should be equal", new Object[0]).isEqualTo(10000000);
        Assertions.assertThat(10000000).isEqualTo(this.processedCount.get());
    }

    private ProgressBar run(int i, ParallelEoSStreamProcessor<String, String> parallelEoSStreamProcessor, List<ConsumerRecord<?, ?>> list) {
        ProgressBar newMessagesBar = ProgressBarUtils.getNewMessagesBar(log, i);
        newMessagesBar.setExtraMessage("#" + this.barId);
        parallelEoSStreamProcessor.setMyId(Optional.of("id: " + this.barId));
        Integer num = this.barId;
        this.barId = Integer.valueOf(this.barId.intValue() + 1);
        parallelEoSStreamProcessor.poll(consumerRecord -> {
            processRecord(newMessagesBar, consumerRecord, list);
        });
        return newMessagesBar;
    }

    private void callBack(ParallelStreamProcessor.ConsumeProduceResult<String, String, String, String> consumeProduceResult) {
        this.producedCount.incrementAndGet();
        this.producedKeysAcknowledged.add((String) consumeProduceResult.getIn().key());
    }

    private void processRecord(ProgressBar progressBar, ConsumerRecord<String, String> consumerRecord, List<ConsumerRecord<?, ?>> list) {
        progressBar.stepBy(1L);
        this.consumedKeys.add((String) consumerRecord.key());
        this.processedCount.incrementAndGet();
        list.add(consumerRecord);
    }

    private ParallelEoSStreamProcessor<String, String> buildPc(String str, int i) {
        log.debug("Starting test");
        Properties properties = new Properties();
        properties.put("max.poll.records", Integer.valueOf(this.maxPoll));
        ParallelEoSStreamProcessor<String, String> parallelEoSStreamProcessor = new ParallelEoSStreamProcessor<>(ParallelConsumerOptions.builder().ordering(this.order).consumer(this.kcu.createNewConsumer(false, properties)).commitMode(this.commitMode).maxConcurrency(100).build());
        parallelEoSStreamProcessor.subscribe(UniLists.of(str));
        return parallelEoSStreamProcessor;
    }

    private void produceMessages(String str, List<String> list, int i) throws InterruptedException, ExecutionException {
        log.info("Producing {} messages to {}", Integer.valueOf(i), str);
        ArrayList arrayList = new ArrayList();
        KafkaProducer createNewProducer = this.kcu.createNewProducer(false);
        for (int i2 = 0; i2 < i; i2++) {
            try {
                String str2 = "key-" + i2;
                arrayList.add(createNewProducer.send(new ProducerRecord(str, str2, "value-" + i2), (recordMetadata, exc) -> {
                    if (exc != null) {
                        log.error("Error sending, ", exc);
                    }
                }));
                list.add(str2);
            } catch (Throwable th) {
                if (createNewProducer != null) {
                    try {
                        createNewProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        log.debug("Finished sending test data");
        if (createNewProducer != null) {
            createNewProducer.close();
        }
        log.debug("Waiting for broker acks");
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            RecordMetadata recordMetadata2 = (RecordMetadata) ((Future) it.next()).get();
            Assertions.assertThat(recordMetadata2.hasOffset()).isTrue();
            recordMetadata2.offset();
        }
        Assertions.assertThat(arrayList).hasSize(i);
    }
}
