package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.utils.EnumCartesianProductTestSets;
import io.confluent.csid.utils.ProgressBarUtils;
import io.confluent.csid.utils.ProgressTracker;
import io.confluent.csid.utils.StringUtils;
import io.confluent.csid.utils.TrimListRepresentation;
import io.confluent.parallelconsumer.AbstractParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import me.tongfei.progressbar.ProgressBar;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.SoftAssertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.CartesianProductTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

@Tag("transactions")
/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/TransactionAndCommitModeTest.class */
class TransactionAndCommitModeTest extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(TransactionAndCommitModeTest.class);
    int LOW_MAX_POLL_RECORDS_CONFIG = 1;
    int DEFAULT_MAX_POLL_RECORDS_CONFIG = 500;
    int HIGH_MAX_POLL_RECORDS_CONFIG = KafkaClientUtils.MAX_POLL_RECORDS;

    TransactionAndCommitModeTest() {
    }

    @CartesianProductTest(factory = "enumSets")
    void testDefaultMaxPoll(ParallelConsumerOptions.CommitMode commitMode, ParallelConsumerOptions.ProcessingOrder processingOrder) {
        int i = 5000;
        if (processingOrder.equals(ParallelConsumerOptions.ProcessingOrder.PARTITION)) {
            i = 1000;
        }
        runTest(this.DEFAULT_MAX_POLL_RECORDS_CONFIG, commitMode, processingOrder, i);
    }

    @Test
    void testDefaultMaxPollConsumerSyncSlow() {
        runTest(this.DEFAULT_MAX_POLL_RECORDS_CONFIG, ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC, ParallelConsumerOptions.ProcessingOrder.UNORDERED);
    }

    static CartesianProductTest.Sets enumSets() {
        return new EnumCartesianProductTestSets().add(ParallelConsumerOptions.CommitMode.class).add(new Object[]{ParallelConsumerOptions.ProcessingOrder.class});
    }

    @RepeatedTest(5)
    void testTransactionalDefaultMaxPoll() {
        runTest(this.DEFAULT_MAX_POLL_RECORDS_CONFIG, ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER, ParallelConsumerOptions.ProcessingOrder.KEY);
    }

    @CartesianProductTest(factory = "enumSets")
    public void testLowMaxPoll(ParallelConsumerOptions.CommitMode commitMode, ParallelConsumerOptions.ProcessingOrder processingOrder) {
        int i = 5000;
        if (processingOrder.equals(ParallelConsumerOptions.ProcessingOrder.PARTITION)) {
            i = 1000;
        }
        runTest(this.LOW_MAX_POLL_RECORDS_CONFIG, commitMode, processingOrder, i);
    }

    @CartesianProductTest(factory = "enumSets")
    public void testHighMaxPollEnum(ParallelConsumerOptions.CommitMode commitMode, ParallelConsumerOptions.ProcessingOrder processingOrder) {
        int i = 10000;
        if (processingOrder.equals(ParallelConsumerOptions.ProcessingOrder.PARTITION)) {
            i = 1000;
        }
        runTest(this.HIGH_MAX_POLL_RECORDS_CONFIG, commitMode, processingOrder, i);
    }

    private void runTest(int i, ParallelConsumerOptions.CommitMode commitMode, ParallelConsumerOptions.ProcessingOrder processingOrder) {
        runTest(i, commitMode, processingOrder, 30000);
    }

    private void runTest(int i, ParallelConsumerOptions.CommitMode commitMode, ParallelConsumerOptions.ProcessingOrder processingOrder, int i2) {
        String str = setupTopic(getClass().getSimpleName() + "-input-" + RandomUtils.nextInt());
        String str2 = setupTopic(getClass().getSimpleName() + "-output-" + RandomUtils.nextInt());
        ProgressBar newMessagesBar = ProgressBarUtils.getNewMessagesBar(log, i2);
        ArrayList arrayList = new ArrayList();
        log.info("Producing {} messages before starting test", Integer.valueOf(i2));
        ArrayList arrayList2 = new ArrayList();
        KafkaProducer createNewProducer = getKcu().createNewProducer(false);
        for (int i3 = 0; i3 < i2; i3++) {
            try {
                String str3 = "key-" + i3;
                arrayList2.add(createNewProducer.send(new ProducerRecord(str, str3, "value-" + i3), (recordMetadata, exc) -> {
                    if (exc != null) {
                        log.error("Error sending, ", exc);
                    }
                }));
                arrayList.add(str3);
            } finally {
            }
        }
        log.debug("Finished sending test data");
        if (createNewProducer != null) {
            createNewProducer.close();
        }
        log.debug("Waiting for broker acks");
        Iterator it = arrayList2.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        Assertions.assertThat(arrayList2).hasSize(i2);
        log.debug("Starting test");
        KafkaProducer<String, String> createNewProducer2 = getKcu().createNewProducer(commitMode);
        Properties properties = new Properties();
        properties.put("max.poll.records", Integer.valueOf(i));
        KafkaConsumer createNewConsumer = getKcu().createNewConsumer(true, properties);
        int i4 = 64;
        ParallelEoSStreamProcessor parallelEoSStreamProcessor = new ParallelEoSStreamProcessor(ParallelConsumerOptions.builder().ordering(processingOrder).consumer(createNewConsumer).producer(createNewProducer2).commitMode(commitMode).maxConcurrency(64).build());
        parallelEoSStreamProcessor.subscribe(UniLists.of(str));
        parallelEoSStreamProcessor.setTimeBetweenCommits(Duration.ofSeconds(1L));
        TopicPartition topicPartition = new TopicPartition(str, 0);
        Map beginningOffsets = createNewConsumer.beginningOffsets(UniLists.of(topicPartition));
        Assertions.assertThat(createNewConsumer.endOffsets(UniLists.of(topicPartition))).containsEntry(topicPartition, Long.valueOf(i2));
        Assertions.assertThat((Long) beginningOffsets.get(topicPartition)).isZero();
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        List synchronizedList2 = Collections.synchronizedList(new ArrayList());
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        parallelEoSStreamProcessor.pollAndProduce(pollContext -> {
            log.debug("Polled {}", Long.valueOf(pollContext.offset()));
            synchronizedList.add((String) pollContext.key());
            atomicInteger.incrementAndGet();
            return new ProducerRecord(str2, (String) pollContext.key(), "data");
        }, consumeProduceResult -> {
            log.debug("Produced {}", consumeProduceResult.getOut());
            atomicInteger2.incrementAndGet();
            synchronizedList2.add((String) consumeProduceResult.getIn().key());
            newMessagesBar.step();
        });
        Assertions.useRepresentation(new TrimListRepresentation());
        ProgressTracker progressTracker = new ProgressTracker(atomicInteger, null, AbstractParallelEoSStreamProcessorTestBase.defaultTimeout);
        String msg = StringUtils.msg("All keys sent to input-topic should be processed and produced, within time (expected: {} commit: {} order: {} max poll: {})", new Object[]{Integer.valueOf(i2), commitMode, processingOrder, Integer.valueOf(i)});
        try {
            Awaitility.waitAtMost(AbstractParallelEoSStreamProcessorTestBase.defaultTimeout).failFast("PC died, check logs.", () -> {
                return Boolean.valueOf(parallelEoSStreamProcessor.isClosedOrFailed() || atomicInteger2.get() > i2);
            }).alias(msg).untilAsserted(() -> {
                log.trace("Processed-count: {}, Produced-count: {}", Integer.valueOf(atomicInteger.get()), Integer.valueOf(atomicInteger2.get()));
                if (atomicInteger2.get() - atomicInteger.get() == i4 && progressTracker.getRounds().get() > 1) {
                    log.error("Here we go fishy...");
                }
                progressTracker.checkForProgressExceptionally();
                SoftAssertions softAssertions = new SoftAssertions();
                softAssertions.assertThat(new ArrayList(synchronizedList)).as("all expected are consumed", new Object[0]).hasSameSizeAs(arrayList);
                softAssertions.assertThat(new ArrayList(synchronizedList2)).as("all consumed are produced ok ", new Object[0]).hasSameSizeAs(arrayList);
                softAssertions.assertAll();
            });
        } catch (ConditionTimeoutException e) {
            log.debug("Expected keys (size {})", Integer.valueOf(arrayList.size()));
            log.debug("Consumed keys ack'd (size {})", Integer.valueOf(synchronizedList.size()));
            log.debug("Produced keys (size {})", Integer.valueOf(synchronizedList2.size()));
            arrayList.removeAll(synchronizedList);
            log.info("Missing keys from consumed: {}", arrayList);
            Assertions.fail(msg + "\n" + e.getMessage());
        }
        parallelEoSStreamProcessor.closeDrainFirst();
        Assertions.assertThat(atomicInteger.get()).as("messages processed and produced by parallel-consumer should be equal", new Object[0]).isEqualTo(atomicInteger2.get());
        Assertions.assertThat(i2).isEqualTo(atomicInteger.get());
        Assertions.assertThat(synchronizedList2).hasSameSizeAs(arrayList);
        Assertions.assertThat(progressTracker.getHighestRoundCountSeen()).isLessThan(40);
        newMessagesBar.close();
    }
}
