package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.utils.ProgressBarUtils;
import io.confluent.csid.utils.ProgressTracker;
import io.confluent.csid.utils.StringUtils;
import io.confluent.csid.utils.TrimListRepresentation;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import me.tongfei.progressbar.ProgressBar;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.BooleanAssert;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.internal.StandardComparisonStrategy;
import org.assertj.core.util.IterableUtil;
import org.awaitility.Awaitility;
import org.awaitility.core.TerminalFailureException;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import pl.tlinkowski.unij.api.UniLists;

/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/MultiInstanceRebalanceTest.class */
public class MultiInstanceRebalanceTest extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(MultiInstanceRebalanceTest.class);
    static final int DEFAULT_MAX_POLL = 500;
    public static final int CHAOS_FREQUENCY = 500;
    public static final int DEFAULT_POLL_DELAY = 150;
    ProgressBar overallProgress;
    AtomicInteger count = new AtomicInteger();
    Set<String> overallConsumedKeys = new ConcurrentSkipListSet();
    int pcInstanceCount = 0;

    /* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/MultiInstanceRebalanceTest$ParallelConsumerRunnable.class */
    public class ParallelConsumerRunnable implements Runnable {
        private final int instanceId;
        private final int maxPoll;
        private final ParallelConsumerOptions.CommitMode commitMode;
        private final ParallelConsumerOptions.ProcessingOrder order;
        private final String inputTopic;
        private final int expectedMessageCount;
        private final ProgressBar bar;
        private final int pollDelayMs;
        private ParallelEoSStreamProcessor<String, String> parallelConsumer;
        private boolean started = false;
        private final Queue<String> consumedKeys = new ConcurrentLinkedQueue();

        public ParallelConsumerRunnable(int i, ParallelConsumerOptions.CommitMode commitMode, ParallelConsumerOptions.ProcessingOrder processingOrder, String str, int i2, int i3) {
            this.maxPoll = i;
            this.commitMode = commitMode;
            this.order = processingOrder;
            this.inputTopic = str;
            this.expectedMessageCount = i2;
            this.pollDelayMs = i3;
            this.instanceId = MultiInstanceRebalanceTest.this.pcInstanceCount;
            MultiInstanceRebalanceTest.this.pcInstanceCount++;
            this.bar = ProgressBarUtils.getNewMessagesBar("PC" + this.instanceId, MultiInstanceRebalanceTest.log, i2);
        }

        @Override // java.lang.Runnable
        public void run() {
            MDC.put("pcId", "Runner-" + this.instanceId);
            this.started = true;
            MultiInstanceRebalanceTest.log.info("Running consumer!");
            Properties properties = new Properties();
            properties.put("max.poll.records", Integer.valueOf(this.maxPoll));
            this.parallelConsumer = new ParallelEoSStreamProcessor<>(ParallelConsumerOptions.builder().ordering(this.order).consumer(MultiInstanceRebalanceTest.this.kcu.createNewConsumer(false, properties)).commitMode(this.commitMode).maxConcurrency(10).build());
            this.parallelConsumer.setTimeBetweenCommits(Duration.ofSeconds(1L));
            this.parallelConsumer.setMyId(Optional.of("PC-" + this.instanceId));
            this.parallelConsumer.subscribe(UniLists.of(this.inputTopic));
            this.parallelConsumer.poll(pollContext -> {
                try {
                    Thread.sleep(this.pollDelayMs);
                } catch (InterruptedException e) {
                }
                MultiInstanceRebalanceTest.this.count.incrementAndGet();
                this.bar.step();
                MultiInstanceRebalanceTest.this.overallProgress.step();
                this.consumedKeys.add((String) pollContext.key());
                MultiInstanceRebalanceTest.this.overallConsumedKeys.add((String) pollContext.key());
            });
        }

        public void stop() {
            MultiInstanceRebalanceTest.log.info("Stopping {}", Integer.valueOf(this.instanceId));
            this.started = false;
            this.parallelConsumer.close();
        }

        public void start(ExecutorService executorService) {
            Exception failureCause = getParallelConsumer().getFailureCause();
            if (failureCause != null) {
                throw new RuntimeException("Error starting PC, pc died from previous error: " + failureCause.getMessage(), failureCause);
            }
            MultiInstanceRebalanceTest.log.info("Starting {}", this);
            executorService.submit(this);
        }

        public void close() {
            MultiInstanceRebalanceTest.log.info("Stopping {}", this);
            stop();
            this.bar.close();
        }

        public void toggle(ExecutorService executorService) {
            if (this.started) {
                stop();
            } else {
                start(executorService);
            }
        }

        public int getInstanceId() {
            return this.instanceId;
        }

        public int getMaxPoll() {
            return this.maxPoll;
        }

        public ParallelConsumerOptions.CommitMode getCommitMode() {
            return this.commitMode;
        }

        public ParallelConsumerOptions.ProcessingOrder getOrder() {
            return this.order;
        }

        public String getInputTopic() {
            return this.inputTopic;
        }

        public int getExpectedMessageCount() {
            return this.expectedMessageCount;
        }

        public ProgressBar getBar() {
            return this.bar;
        }

        public int getPollDelayMs() {
            return this.pollDelayMs;
        }

        public ParallelEoSStreamProcessor<String, String> getParallelConsumer() {
            return this.parallelConsumer;
        }

        public boolean isStarted() {
            return this.started;
        }

        public Queue<String> getConsumedKeys() {
            return this.consumedKeys;
        }

        public String toString() {
            return "MultiInstanceRebalanceTest.ParallelConsumerRunnable(instanceId=" + getInstanceId() + ", maxPoll=" + getMaxPoll() + ", commitMode=" + getCommitMode() + ", order=" + getOrder() + ", inputTopic=" + getInputTopic() + ", expectedMessageCount=" + getExpectedMessageCount() + ", bar=" + getBar() + ", pollDelayMs=" + getPollDelayMs() + ", parallelConsumer=" + getParallelConsumer() + ", started=" + isStarted() + ")";
        }
    }

    @EnumSource(ParallelConsumerOptions.ProcessingOrder.class)
    @ParameterizedTest
    void consumeWithMultipleInstancesPeriodicConsumerSync(ParallelConsumerOptions.ProcessingOrder processingOrder) {
        this.numPartitions = 2;
        runTest(500, ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC, processingOrder, processingOrder == ParallelConsumerOptions.ProcessingOrder.PARTITION ? 100 : 1000, 2, 1.0d, DEFAULT_POLL_DELAY);
    }

    @EnumSource(ParallelConsumerOptions.ProcessingOrder.class)
    @ParameterizedTest
    void consumeWithMultipleInstancesPeriodicConsumerAsynchronous(ParallelConsumerOptions.ProcessingOrder processingOrder) {
        this.numPartitions = 2;
        runTest(500, ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS, processingOrder, processingOrder == ParallelConsumerOptions.ProcessingOrder.PARTITION ? 100 : 1000, 2, 1.0d, DEFAULT_POLL_DELAY);
    }

    @Disabled
    @Test
    void largeNumberOfInstances() {
        this.numPartitions = 80;
        runTest(500, ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS, ParallelConsumerOptions.ProcessingOrder.UNORDERED, 500000, 12, 0.3d, 1);
    }

    private void runTest(int i, ParallelConsumerOptions.CommitMode commitMode, ParallelConsumerOptions.ProcessingOrder processingOrder, final int i2, int i3, double d, int i4) {
        final String str = setupTopic(getClass().getSimpleName() + "-input-" + RandomUtils.nextInt());
        this.overallProgress = ProgressBarUtils.getNewMessagesBar("overall", log, i2);
        final ExecutorService newWorkStealingPool = Executors.newWorkStealingPool();
        final ProgressBar newMessagesBar = ProgressBarUtils.getNewMessagesBar("sending", log, i2);
        final ConcurrentSkipListSet concurrentSkipListSet = new ConcurrentSkipListSet();
        log.info("Producing {} messages before starting test", Integer.valueOf(i2));
        final ArrayList arrayList = new ArrayList();
        final int i5 = (int) (i2 * d);
        KafkaProducer createNewProducer = this.kcu.createNewProducer(false);
        for (int i6 = 0; i6 < i5; i6++) {
            try {
                String str2 = "key-" + i6;
                arrayList.add(createNewProducer.send(new ProducerRecord(str, str2, "value-" + i6), (recordMetadata, exc) -> {
                    if (exc != null) {
                        log.error("Error sending, ", exc);
                    }
                    newMessagesBar.step();
                }));
                concurrentSkipListSet.add(str2);
            } finally {
            }
        }
        log.debug("Finished sending test data");
        if (createNewProducer != null) {
            createNewProducer.close();
        }
        log.debug("Waiting for broker acks");
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        Assertions.assertThat(arrayList).hasSizeGreaterThanOrEqualTo(i5);
        log.info("Running first instance of pc");
        int i7 = i2 / i3;
        ParallelConsumerRunnable parallelConsumerRunnable = new ParallelConsumerRunnable(i, commitMode, processingOrder, str, i7, i4);
        newWorkStealingPool.submit(parallelConsumerRunnable);
        Awaitility.waitAtMost(Duration.ofSeconds(10L)).until(() -> {
            return Boolean.valueOf(parallelConsumerRunnable.getConsumedKeys().size() > 1);
        });
        newWorkStealingPool.submit(new Runnable() { // from class: io.confluent.parallelconsumer.integrationTests.MultiInstanceRebalanceTest.1
            @Override // java.lang.Runnable
            public void run() {
                MultiInstanceRebalanceTest.log.info("Producing {} messages before starting test", Integer.valueOf(i2));
                KafkaProducer createNewProducer2 = MultiInstanceRebalanceTest.this.kcu.createNewProducer(false);
                try {
                    for (int i8 = i5; i8 < i2; i8++) {
                        String str3 = "key-" + i8;
                        MultiInstanceRebalanceTest.log.debug("sending {}", str3);
                        ProducerRecord producerRecord = new ProducerRecord(str, str3, "value-" + i8);
                        ProgressBar progressBar = newMessagesBar;
                        Future send = createNewProducer2.send(producerRecord, (recordMetadata2, exc2) -> {
                            if (exc2 != null) {
                                MultiInstanceRebalanceTest.log.error("Error sending, ", exc2);
                            }
                            progressBar.step();
                        });
                        send.get();
                        arrayList.add(send);
                        concurrentSkipListSet.add(str3);
                    }
                    MultiInstanceRebalanceTest.log.info("Finished sending test data");
                    if (createNewProducer2 != null) {
                        createNewProducer2.close();
                    }
                } finally {
                }
            }
        });
        final List synchronizedList = Collections.synchronizedList((List) IntStream.range(1, i3).mapToObj(i8 -> {
            try {
                Thread.sleep((int) (Math.random() * 2));
            } catch (InterruptedException e) {
                log.error(e.getMessage(), e);
            }
            log.info("Running pc instance {}", Integer.valueOf(i8));
            ParallelConsumerRunnable parallelConsumerRunnable2 = new ParallelConsumerRunnable(i, commitMode, processingOrder, str, i7, i4);
            newWorkStealingPool.submit(parallelConsumerRunnable2);
            return parallelConsumerRunnable2;
        }).collect(Collectors.toList()));
        final List<ParallelConsumerRunnable> synchronizedList2 = Collections.synchronizedList(new ArrayList());
        synchronizedList2.add(parallelConsumerRunnable);
        synchronizedList2.addAll(synchronizedList);
        ParallelConsumerRunnable[] parallelConsumerRunnableArr = (ParallelConsumerRunnable[]) synchronizedList2.toArray(new ParallelConsumerRunnable[0]);
        newWorkStealingPool.submit(new Runnable() { // from class: io.confluent.parallelconsumer.integrationTests.MultiInstanceRebalanceTest.2
            @Override // java.lang.Runnable
            public void run() {
                while (MultiInstanceRebalanceTest.this.noneHaveFailed(synchronizedList2)) {
                    try {
                        Thread.sleep((int) (500.0d * Math.random()));
                        if (Math.random() > 0.2d) {
                            int size = synchronizedList.size();
                            int random = (int) (Math.random() * size * 0.6d);
                            if (random > 0) {
                                MultiInstanceRebalanceTest.log.info("Will mess with {} instances", Integer.valueOf(random));
                                IntStream range = IntStream.range(0, random);
                                List list = synchronizedList;
                                ExecutorService executorService = newWorkStealingPool;
                                range.forEach(i9 -> {
                                    ParallelConsumerRunnable parallelConsumerRunnable2 = (ParallelConsumerRunnable) list.get((int) ((size - 1) * Math.random()));
                                    MultiInstanceRebalanceTest.log.info("Victim is instance: " + parallelConsumerRunnable2.instanceId);
                                    parallelConsumerRunnable2.toggle(executorService);
                                });
                            }
                        }
                    } catch (Throwable th) {
                        MultiInstanceRebalanceTest.log.error("Error in chaos loop", th);
                        throw new RuntimeException(th);
                    }
                }
                MultiInstanceRebalanceTest.log.error("Ending chaos as a PC instance has died");
            }
        });
        Assertions.useRepresentation(new TrimListRepresentation());
        String msg = StringUtils.msg("All keys sent to input-topic should be processed, within time (expected: {} commit: {} order: {} max poll: {})", new Object[]{Integer.valueOf(i2), commitMode, processingOrder, Integer.valueOf(i)});
        ProgressTracker progressTracker = new ProgressTracker(this.count);
        try {
            try {
                Awaitility.waitAtMost(Duration.ofMinutes(5L)).failFast("A PC has died - check logs", () -> {
                    return Boolean.valueOf(!noneHaveFailed(synchronizedList2));
                }).alias(msg).pollInterval(1L, TimeUnit.SECONDS).untilAsserted(() -> {
                    log.trace("Processed-count: {}", Integer.valueOf(getAllConsumedKeys(parallelConsumerRunnableArr).size()));
                    if (progressTracker.hasProgressNotBeenMade()) {
                        concurrentSkipListSet.removeAll(getAllConsumedKeys(parallelConsumerRunnableArr));
                        throw progressTracker.constructError(StringUtils.msg("No progress, missing keys: {}.", new Object[]{concurrentSkipListSet}));
                    }
                    SoftAssertions softAssertions = new SoftAssertions();
                    ((BooleanAssert) softAssertions.assertThat(this.overallConsumedKeys.containsAll(concurrentSkipListSet)).as("contains all: all expected are consumed at least once", new Object[0])).isTrue();
                    softAssertions.assertThat(this.overallConsumedKeys).as("size: all expected are consumed only once", new Object[0]).hasSizeGreaterThanOrEqualTo(concurrentSkipListSet.size());
                    softAssertions.assertAll();
                });
                this.overallProgress.close();
                newMessagesBar.close();
                synchronizedList2.forEach((v0) -> {
                    v0.close();
                });
                Assertions.assertThat(parallelConsumerRunnable.consumedKeys).hasSizeGreaterThan(0);
                Assertions.assertThat(getAllConsumedKeys((ParallelConsumerRunnable[]) synchronizedList.toArray(new ParallelConsumerRunnable[0]))).as("Second PC should have taken over some of the work and consumed some records", new Object[0]).hasSizeGreaterThan(0);
                newWorkStealingPool.shutdown();
                Collection collection = IterableUtil.toCollection(StandardComparisonStrategy.instance().duplicatesFrom(getAllConsumedKeys(parallelConsumerRunnableArr)));
                log.info("Duplicate consumed keys (at least one is expected due to the rebalance): {}", collection);
                Assertions.assertThat(collection).as("There should be few duplicate keys", new Object[0]).hasSizeLessThan((int) (i2 * 0.2d));
            } catch (Throwable th) {
                List<Exception> checkForFailure = checkForFailure(synchronizedList2);
                if (!(th instanceof TerminalFailureException)) {
                    throw new RuntimeException(StringUtils.msg("{} \n Assertion error. PC reported exception states: {} \n {}", new Object[]{msg, checkForFailure, th}), th);
                }
                throw new RuntimeException(StringUtils.msg("{} \n Terminal failure in one or more of the PCs. Reported exception states are: {} \n {}", new Object[]{msg, checkForFailure, th}), checkForFailure.stream().findAny().orElse(null));
            }
        } catch (Throwable th2) {
            this.overallProgress.close();
            newMessagesBar.close();
            throw th2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean noneHaveFailed(List<ParallelConsumerRunnable> list) {
        return checkForFailure(list).isEmpty();
    }

    private List<Exception> checkForFailure(List<ParallelConsumerRunnable> list) {
        return (List) list.stream().filter(parallelConsumerRunnable -> {
            ParallelEoSStreamProcessor<String, String> parallelConsumer = parallelConsumerRunnable.getParallelConsumer();
            if (parallelConsumer != null && parallelConsumer.isClosedOrFailed()) {
                return parallelConsumer.getFailureCause() != null;
            }
            return false;
        }).map(parallelConsumerRunnable2 -> {
            return parallelConsumerRunnable2.getParallelConsumer().getFailureCause();
        }).collect(Collectors.toList());
    }

    List<String> getAllConsumedKeys(ParallelConsumerRunnable... parallelConsumerRunnableArr) {
        return (List) Arrays.stream(parallelConsumerRunnableArr).flatMap(parallelConsumerRunnable -> {
            return parallelConsumerRunnable.consumedKeys.stream();
        }).collect(Collectors.toList());
    }

    static {
        MDC.put("pcId", "Test-Thread");
    }
}
