/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.utils.EnumCartesianProductTestSets;
import io.confluent.csid.utils.ProgressBarUtils;
import io.confluent.csid.utils.ProgressTracker;
import io.confluent.csid.utils.StringUtils;
import io.confluent.csid.utils.TrimListRepresentation;
import io.confluent.parallelconsumer.AbstractParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.integrationTests.BrokerIntegrationTest;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import me.tongfei.progressbar.ProgressBar;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.presentation.Representation;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.CartesianProductTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

@Tag(value="transactions")
class TransactionAndCommitModeTest
extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(TransactionAndCommitModeTest.class);
    int LOW_MAX_POLL_RECORDS_CONFIG = 1;
    int DEFAULT_MAX_POLL_RECORDS_CONFIG = 500;
    int HIGH_MAX_POLL_RECORDS_CONFIG = 10000;

    TransactionAndCommitModeTest() {
    }

    @CartesianProductTest(factory="enumSets")
    void testDefaultMaxPoll(ParallelConsumerOptions.CommitMode commitMode, ParallelConsumerOptions.ProcessingOrder order) {
        int numMessages = 5000;
        if (order.equals((Object)ParallelConsumerOptions.ProcessingOrder.PARTITION)) {
            numMessages = 1000;
        }
        this.runTest(this.DEFAULT_MAX_POLL_RECORDS_CONFIG, commitMode, order, numMessages);
    }

    @Test
    void testDefaultMaxPollConsumerSyncSlow() {
        this.runTest(this.DEFAULT_MAX_POLL_RECORDS_CONFIG, ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC, ParallelConsumerOptions.ProcessingOrder.UNORDERED);
    }

    static CartesianProductTest.Sets enumSets() {
        return new EnumCartesianProductTestSets().add(ParallelConsumerOptions.CommitMode.class).add(new Object[]{ParallelConsumerOptions.ProcessingOrder.class});
    }

    @RepeatedTest(value=5)
    void testTransactionalDefaultMaxPoll() {
        this.runTest(this.DEFAULT_MAX_POLL_RECORDS_CONFIG, ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER, ParallelConsumerOptions.ProcessingOrder.KEY);
    }

    @CartesianProductTest(factory="enumSets")
    public void testLowMaxPoll(ParallelConsumerOptions.CommitMode commitMode, ParallelConsumerOptions.ProcessingOrder order) {
        int numMessages = 5000;
        if (order.equals((Object)ParallelConsumerOptions.ProcessingOrder.PARTITION)) {
            numMessages = 1000;
        }
        this.runTest(this.LOW_MAX_POLL_RECORDS_CONFIG, commitMode, order, numMessages);
    }

    @CartesianProductTest(factory="enumSets")
    public void testHighMaxPollEnum(ParallelConsumerOptions.CommitMode commitMode, ParallelConsumerOptions.ProcessingOrder order) {
        int numMessages = 10000;
        if (order.equals((Object)ParallelConsumerOptions.ProcessingOrder.PARTITION)) {
            numMessages = 1000;
        }
        this.runTest(this.HIGH_MAX_POLL_RECORDS_CONFIG, commitMode, order, numMessages);
    }

    private void runTest(int maxPoll, ParallelConsumerOptions.CommitMode commitMode, ParallelConsumerOptions.ProcessingOrder order) {
        int expectedMessageCount = 30000;
        this.runTest(maxPoll, commitMode, order, expectedMessageCount);
    }

    private void runTest(int maxPoll, ParallelConsumerOptions.CommitMode commitMode, ParallelConsumerOptions.ProcessingOrder order, int expectedCount) {
        String inputName = this.setupTopic(this.getClass().getSimpleName() + "-input-" + RandomUtils.nextInt());
        String outputName = this.setupTopic(this.getClass().getSimpleName() + "-output-" + RandomUtils.nextInt());
        int expectedMessageCount = expectedCount;
        ProgressBar bar = ProgressBarUtils.getNewMessagesBar(log, expectedMessageCount);
        ArrayList<String> expectedKeys = new ArrayList<String>();
        log.info("Producing {} messages before starting test", (Object)expectedMessageCount);
        ArrayList<Future> sends = new ArrayList<Future>();
        try (KafkaProducer kafkaProducer = this.getKcu().createNewProducer(false);){
            for (int i = 0; i < expectedMessageCount; ++i) {
                String key = "key-" + i;
                Future send = kafkaProducer.send(new ProducerRecord(inputName, (Object)key, (Object)("value-" + i)), (meta, exception) -> {
                    if (exception != null) {
                        log.error("Error sending, ", (Throwable)exception);
                    }
                });
                sends.add(send);
                expectedKeys.add(key);
            }
            log.debug("Finished sending test data");
        }
        log.debug("Waiting for broker acks");
        for (Future send : sends) {
            send.get();
        }
        Assertions.assertThat(sends).hasSize(expectedMessageCount);
        log.debug("Starting test");
        KafkaProducer<String, String> newProducer = this.getKcu().createNewProducer(commitMode);
        Properties consumerProps = new Properties();
        consumerProps.put("max.poll.records", (Object)maxPoll);
        KafkaConsumer newConsumer = this.getKcu().createNewConsumer(true, consumerProps);
        int numThreads = 64;
        ParallelEoSStreamProcessor pc = new ParallelEoSStreamProcessor(ParallelConsumerOptions.builder().ordering(order).consumer(newConsumer).producer(newProducer).commitMode(commitMode).maxConcurrency(numThreads).build());
        pc.subscribe((Collection)UniLists.of((Object)inputName));
        pc.setTimeBetweenCommits(Duration.ofSeconds(1L));
        TopicPartition tp = new TopicPartition(inputName, 0);
        Map beginOffsets = newConsumer.beginningOffsets((Collection)UniLists.of((Object)tp));
        Map endOffsets = newConsumer.endOffsets((Collection)UniLists.of((Object)tp));
        Assertions.assertThat((Map)endOffsets).containsEntry((Object)tp, (Object)expectedMessageCount);
        Assertions.assertThat((Long)((Long)beginOffsets.get(tp))).isZero();
        List consumedKeys = Collections.synchronizedList(new ArrayList());
        List producedKeysAcknowledged = Collections.synchronizedList(new ArrayList());
        AtomicInteger processedCount = new AtomicInteger(0);
        AtomicInteger producedCount = new AtomicInteger(0);
        pc.pollAndProduce(record -> {
            log.debug("Polled {}", (Object)record.offset());
            consumedKeys.add((String)record.key());
            processedCount.incrementAndGet();
            return new ProducerRecord(outputName, (Object)((String)record.key()), (Object)"data");
        }, consumeProduceResult -> {
            log.debug("Produced {}", (Object)consumeProduceResult.getOut());
            producedCount.incrementAndGet();
            producedKeysAcknowledged.add((String)consumeProduceResult.getIn().key());
            bar.step();
        });
        Assertions.useRepresentation((Representation)new TrimListRepresentation());
        int roundsAllowed = 10;
        ProgressTracker progressTracker = new ProgressTracker(processedCount, null, AbstractParallelEoSStreamProcessorTestBase.defaultTimeout);
        String failureMessage = StringUtils.msg((String)"All keys sent to input-topic should be processed and produced, within time (expected: {} commit: {} order: {} max poll: {})", (Object[])new Object[]{expectedMessageCount, commitMode, order, maxPoll});
        try {
            Awaitility.waitAtMost((Duration)AbstractParallelEoSStreamProcessorTestBase.defaultTimeout).failFast("PC died, check logs.", () -> pc.isClosedOrFailed() || producedCount.get() > expectedMessageCount).alias(failureMessage).untilAsserted(() -> {
                log.trace("Processed-count: {}, Produced-count: {}", (Object)processedCount.get(), (Object)producedCount.get());
                int delta = producedCount.get() - processedCount.get();
                if (delta == numThreads && progressTracker.getRounds().get() > 1) {
                    log.error("Here we go fishy...");
                }
                progressTracker.checkForProgressExceptionally();
                SoftAssertions all = new SoftAssertions();
                ((ListAssert)all.assertThat(new ArrayList(consumedKeys)).as("all expected are consumed", new Object[0])).hasSameSizeAs((Iterable)expectedKeys);
                ((ListAssert)all.assertThat(new ArrayList(producedKeysAcknowledged)).as("all consumed are produced ok ", new Object[0])).hasSameSizeAs((Iterable)expectedKeys);
                all.assertAll();
            });
        }
        catch (ConditionTimeoutException e) {
            log.debug("Expected keys (size {})", (Object)expectedKeys.size());
            log.debug("Consumed keys ack'd (size {})", (Object)consumedKeys.size());
            log.debug("Produced keys (size {})", (Object)producedKeysAcknowledged.size());
            expectedKeys.removeAll(consumedKeys);
            log.info("Missing keys from consumed: {}", expectedKeys);
            Assertions.fail((String)(failureMessage + "\n" + e.getMessage()));
        }
        pc.closeDrainFirst();
        ((AbstractIntegerAssert)Assertions.assertThat((int)processedCount.get()).as("messages processed and produced by parallel-consumer should be equal", new Object[0])).isEqualTo(producedCount.get());
        Assertions.assertThat((int)expectedMessageCount).isEqualTo(processedCount.get());
        Assertions.assertThat(producedKeysAcknowledged).hasSameSizeAs(expectedKeys);
        Assertions.assertThat((int)progressTracker.getHighestRoundCountSeen()).isLessThan(40);
        bar.close();
    }
}

