/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.utils.ProgressBarUtils;
import io.confluent.csid.utils.StringUtils;
import io.confluent.csid.utils.TrimListRepresentation;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.integrationTests.BrokerIntegrationTest;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import me.tongfei.progressbar.ProgressBar;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.presentation.Representation;
import org.assertj.core.util.Lists;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

class MultiInstanceHighVolumeTest
extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(MultiInstanceHighVolumeTest.class);
    public List<String> consumedKeys = Collections.synchronizedList(new ArrayList());
    public List<String> producedKeysAcknowledged = Collections.synchronizedList(new ArrayList());
    public AtomicInteger processedCount = new AtomicInteger(0);
    public AtomicInteger producedCount = new AtomicInteger(0);
    int maxPoll = 500;
    ParallelConsumerOptions.CommitMode commitMode = ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC;
    ParallelConsumerOptions.ProcessingOrder order = ParallelConsumerOptions.ProcessingOrder.KEY;
    Integer barId = 0;

    MultiInstanceHighVolumeTest() {
    }

    @Test
    void multiInstance() {
        this.numPartitions = 12;
        String inputTopicName = this.setupTopic(this.getClass().getSimpleName() + "-input");
        int expectedMessageCount = 3000000;
        log.info("Producing {} messages before starting test", (Object)expectedMessageCount);
        List<String> expectedKeys = this.getKcu().produceMessages(inputTopicName, expectedMessageCount);
        ParallelEoSStreamProcessor<String, String> pcOne = this.buildPc(inputTopicName, this.maxPoll, this.order, this.commitMode);
        ParallelEoSStreamProcessor<String, String> pcTwo = this.buildPc(inputTopicName, this.maxPoll, this.order, this.commitMode);
        ParallelEoSStreamProcessor<String, String> pcThree = this.buildPc(inputTopicName, this.maxPoll, this.order, this.commitMode);
        List<ConsumerRecord<?, ?>> consumedByOne = Collections.synchronizedList(new ArrayList());
        List<ConsumerRecord<?, ?>> consumedByTwo = Collections.synchronizedList(new ArrayList());
        List<ConsumerRecord<?, ?>> consumedByThree = Collections.synchronizedList(new ArrayList());
        List bars = Lists.list((Object[])new ProgressBar[0]);
        bars.add(this.run(expectedMessageCount / 3, pcOne, consumedByOne));
        bars.add(this.run(expectedMessageCount / 3, pcTwo, consumedByTwo));
        bars.add(this.run(expectedMessageCount / 3, pcThree, consumedByThree));
        Assertions.useRepresentation((Representation)new TrimListRepresentation());
        String failureMessage = StringUtils.msg((String)"All keys sent to input-topic should be processed and produced, within time (expected: {} commit: {} order: {} max poll: {})", (Object[])new Object[]{expectedMessageCount, this.commitMode, this.order, this.maxPoll});
        try {
            Awaitility.waitAtMost((Duration)Duration.ofSeconds(60L)).failFast("PC died - check logs", () -> pcThree.isClosedOrFailed()).alias(failureMessage).pollInterval(1L, TimeUnit.SECONDS).untilAsserted(() -> {
                log.trace("Processed-count: {}, Produced-count: {}", (Object)this.processedCount.get(), (Object)this.producedCount.get());
                SoftAssertions all = new SoftAssertions();
                ((ListAssert)all.assertThat(new ArrayList<String>(this.consumedKeys)).as("all expected are consumed", new Object[0])).hasSameSizeAs((Iterable)expectedKeys);
                all.assertAll();
            });
        }
        catch (ConditionTimeoutException e) {
            Assertions.fail((String)(failureMessage + "\n" + e.getMessage()));
        }
        ((AbstractIntegerAssert)Assertions.assertThat((int)this.processedCount.get()).as("messages processed and produced by parallel-consumer should be equal", new Object[0])).isEqualTo(expectedMessageCount);
        Assertions.assertThat((int)expectedMessageCount).isEqualTo(this.processedCount.get());
        bars.forEach(ProgressBar::close);
    }

    private ParallelEoSStreamProcessor<String, String> buildPc(String inputTopicName, int maxPoll, ParallelConsumerOptions.ProcessingOrder order, ParallelConsumerOptions.CommitMode commitMode) {
        ParallelEoSStreamProcessor<String, String> pc = this.getKcu().buildPc(order, commitMode, maxPoll);
        pc.subscribe((Collection)UniLists.of((Object)inputTopicName));
        return pc;
    }

    private ProgressBar run(int expectedMessageCount, ParallelEoSStreamProcessor<String, String> pc, List<ConsumerRecord<?, ?>> consumed) {
        ProgressBar bar = ProgressBarUtils.getNewMessagesBar(log, expectedMessageCount);
        bar.setExtraMessage("#" + this.barId);
        pc.setMyId(Optional.of("id: " + this.barId));
        Integer n = this.barId;
        this.barId = this.barId + 1;
        pc.poll(record -> this.processRecord(bar, (ConsumerRecord<String, String>)record.getSingleConsumerRecord(), consumed));
        return bar;
    }

    private void processRecord(ProgressBar bar, ConsumerRecord<String, String> record, List<ConsumerRecord<?, ?>> consumed) {
        bar.stepBy(1L);
        this.consumedKeys.add((String)record.key());
        this.processedCount.incrementAndGet();
        consumed.add(record);
    }
}

