/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer;

import io.confluent.csid.utils.GeneralTestUtils;
import io.confluent.csid.utils.JavaUtils;
import io.confluent.csid.utils.KafkaTestUtils;
import io.confluent.csid.utils.KafkaUtils;
import io.confluent.csid.utils.LatchTestUtils;
import io.confluent.csid.utils.LongPollingMockConsumer;
import io.confluent.csid.utils.ProgressBarUtils;
import io.confluent.csid.utils.Range;
import io.confluent.parallelconsumer.FakeRuntimeException;
import io.confluent.parallelconsumer.ParallelConsumerException;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.PollContext;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import me.tongfei.progressbar.ProgressBar;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractDurationAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.MapAssert;
import org.assertj.core.api.ObjectAssert;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.internal.verification.VerificationModeFactory;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

@Timeout(value=3L, unit=TimeUnit.MINUTES)
public class ParallelEoSStreamProcessorTest
extends ParallelEoSStreamProcessorTestBase {
    private static final Logger log = LoggerFactory.getLogger(ParallelEoSStreamProcessorTest.class);

    @BeforeEach
    public void setupData() {
        this.primeFirstRecord();
    }

    @ParameterizedTest
    @EnumSource(value=ParallelConsumerOptions.CommitMode.class)
    public void failingActionNothingCommitted(ParallelConsumerOptions.CommitMode commitMode) {
        this.setupParallelConsumerInstance(commitMode);
        this.parallelConsumer.poll(ignore -> {
            throw new FakeRuntimeException("My user's function error");
        });
        this.awaitForSomeLoopCycles(3);
        this.parallelConsumer.close();
        this.assertCommits((List<Integer>)UniLists.of(), "All erroring, so nothing committed except initial");
    }

    @ParameterizedTest
    @EnumSource(value=ParallelConsumerOptions.CommitMode.class)
    void offsetsAreNeverCommittedForMessagesStillInFlightSimplest(ParallelConsumerOptions.CommitMode commitMode) {
        ParallelConsumerOptions options = this.getBaseOptions(commitMode).toBuilder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).build();
        this.setupParallelConsumerInstance(options);
        this.parallelConsumer.setTimeBetweenCommits(Duration.ofSeconds(1L));
        this.primeFirstRecord();
        this.sendSecondRecord(this.consumerSpy);
        Assertions.assertThat((Comparable)this.parallelConsumer.getWm().getOptions().getOrdering()).isEqualTo((Object)ParallelConsumerOptions.ProcessingOrder.UNORDERED);
        List<CountDownLatch> locks = LatchTestUtils.constructLatches(2);
        LinkedHashMap processedStates = new LinkedHashMap();
        CountDownLatch startBarrierLatch = new CountDownLatch(1);
        this.parallelConsumer.poll(context -> {
            log.debug("msg: {}", context);
            startBarrierLatch.countDown();
            int offset = (int)context.offset();
            LatchTestUtils.awaitLatch(locks, offset);
            processedStates.put(offset, true);
        });
        LatchTestUtils.awaitLatch(startBarrierLatch);
        Assertions.assertThat((long)this.parallelConsumer.getWm().getNumberOfWorkQueuedInShardsAwaitingSelection()).isZero();
        Assertions.assertThat((int)this.parallelConsumer.getWm().getNumberRecordsOutForProcessing()).isEqualTo(2);
        this.releaseAndWait(locks, 1);
        this.parallelConsumer.requestCommitAsap();
        this.awaitForCommitExact(0);
        this.assertCommits((List<Integer>)UniLists.of(), "Partition is blocked");
        this.releaseAndWait(locks, 0);
        this.parallelConsumer.requestCommitAsap();
        this.awaitForCommitExact(2);
        log.debug("Closing...");
        this.parallelConsumer.closeDrainFirst();
        ((MapAssert)Assertions.assertThat(processedStates).as("sanity - all expected messages are processed", new Object[0])).containsValues((Object[])new Boolean[]{true, true});
    }

    private void setupParallelConsumerInstance(ParallelConsumerOptions.CommitMode commitMode) {
        this.setupParallelConsumerInstance(this.getBaseOptions(commitMode));
        this.primeFirstRecord();
    }

    private ParallelConsumerOptions getBaseOptions(ParallelConsumerOptions.CommitMode commitMode) {
        return ParallelConsumerOptions.builder().commitMode(commitMode).consumer((Consumer)this.consumerSpy).producer((Producer)this.producerSpy).build();
    }

    @ParameterizedTest
    @EnumSource(value=ParallelConsumerOptions.CommitMode.class)
    void offsetsAreNeverCommittedForMessagesStillInFlightShort(ParallelConsumerOptions.CommitMode commitMode) {
        this.offsetsAreNeverCommittedForMessagesStillInFlightSimplest(commitMode);
        log.info("Test start");
        Awaitility.await().untilAsserted(() -> this.assertCommits((List<Integer>)UniLists.of((Object)2), "Only one of the two offsets committed, as they were coalesced for efficiency"));
    }

    @Disabled
    @ParameterizedTest
    @EnumSource(value=ParallelConsumerOptions.CommitMode.class)
    void offsetsAreNeverCommittedForMessagesStillInFlightLong(ParallelConsumerOptions.CommitMode commitMode) {
        this.setupParallelConsumerInstance(commitMode);
        this.sendSecondRecord(this.consumerSpy);
        this.consumerSpy.addRecord(this.ktu.makeRecord("0", "v2"));
        this.consumerSpy.addRecord(this.ktu.makeRecord("0", "v3"));
        this.consumerSpy.addRecord(this.ktu.makeRecord("0", "v4"));
        this.consumerSpy.addRecord(this.ktu.makeRecord("0", "v5"));
        List<CountDownLatch> locks = LatchTestUtils.constructLatches(6);
        CountDownLatch startLatch = new CountDownLatch(1);
        this.parallelConsumer.poll(context -> {
            int offset = (int)context.offset();
            CountDownLatch latchForMsg = (CountDownLatch)locks.get(offset);
            try {
                startLatch.countDown();
                latchForMsg.await();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        });
        startLatch.countDown();
        this.releaseAndWait(locks, 1);
        this.awaitForSomeLoopCycles(1);
        ((MockProducer)Mockito.verify((Object)this.producerSpy, (VerificationMode)Mockito.after((long)this.verificationWaitDelay).never())).commitTransaction();
        this.releaseAndWait(locks, 2);
        this.awaitForSomeLoopCycles(1);
        ((MockProducer)Mockito.verify((Object)this.producerSpy, (VerificationMode)Mockito.after((long)this.verificationWaitDelay).never())).commitTransaction();
        this.releaseAndWait(locks, 0);
        this.awaitForOneLoopCycle();
        ((MockProducer)Mockito.verify((Object)this.producerSpy, (VerificationMode)Mockito.after((long)this.verificationWaitDelay).times(1))).commitTransaction();
        List maps = this.producerSpy.consumerGroupOffsetsHistory();
        Assertions.assertThat((List)maps).hasSize(1);
        OffsetAndMetadata offsets = (OffsetAndMetadata)((Map)((Map)maps.get(0)).get(this.CONSUMER_GROUP_ID)).get(KafkaUtils.toTopicPartition((ConsumerRecord)this.firstRecord));
        Assertions.assertThat((long)offsets.offset()).isEqualTo(2L);
        this.releaseAndWait(locks, 3);
        ((MockProducer)Mockito.verify((Object)this.producerSpy, (VerificationMode)Mockito.after((long)this.verificationWaitDelay).times(2))).commitTransaction();
        maps = this.producerSpy.consumerGroupOffsetsHistory();
        Assertions.assertThat((List)maps).hasSize(2);
        offsets = (OffsetAndMetadata)((Map)((Map)maps.get(1)).get(this.CONSUMER_GROUP_ID)).get(KafkaUtils.toTopicPartition((ConsumerRecord)this.firstRecord));
        Assertions.assertThat((long)offsets.offset()).isEqualTo(3L);
        this.releaseAndWait(locks, UniLists.of((Object)4, (Object)5));
        ((MockProducer)Mockito.verify((Object)this.producerSpy, (VerificationMode)Mockito.after((long)this.verificationWaitDelay).atLeast(3))).commitTransaction();
        maps = this.producerSpy.consumerGroupOffsetsHistory();
        Assertions.assertThat((List)maps).hasSizeGreaterThanOrEqualTo(3);
        offsets = (OffsetAndMetadata)((Map)((Map)maps.get(2)).get(this.CONSUMER_GROUP_ID)).get(KafkaUtils.toTopicPartition((ConsumerRecord)this.firstRecord));
        Assertions.assertThat((long)offsets.offset()).isEqualTo(5L);
        this.assertCommits(UniLists.of((Object)2, (Object)3, (Object)5));
        this.parallelConsumer.close();
    }

    @ParameterizedTest
    @EnumSource(value=ParallelConsumerOptions.CommitMode.class)
    void offsetCommitsAreIsolatedPerPartition(ParallelConsumerOptions.CommitMode commitMode) {
        ((ObjectAssert)Assumptions.assumeThat((Object)this.parallelConsumer).as("Should only test on core PC - this test is very complicated to get to work with vert.x thread system, as the event and locking system needed is quite different", new Object[0])).isExactlyInstanceOf(AbstractParallelEoSStreamProcessor.class);
        this.setupParallelConsumerInstance(this.getBaseOptions(commitMode).toBuilder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).build());
        this.primeFirstRecord();
        this.sendSecondRecord(this.consumerSpy);
        this.consumerSpy.addRecord(this.ktu.makeRecord(1, "0", "v2"));
        this.consumerSpy.addRecord(this.ktu.makeRecord(1, "0", "v3"));
        CountDownLatch msg0Lock = new CountDownLatch(1);
        CountDownLatch msg1Lock = new CountDownLatch(1);
        CountDownLatch msg2Lock = new CountDownLatch(1);
        CountDownLatch msg3Lock = new CountDownLatch(1);
        List locks = UniLists.of((Object)msg0Lock, (Object)msg1Lock, (Object)msg2Lock, (Object)msg3Lock);
        this.parallelConsumer.poll(ignore -> {
            int offset = (int)ignore.offset();
            CountDownLatch latchForMsg = (CountDownLatch)locks.get(offset);
            try {
                latchForMsg.await();
            }
            catch (InterruptedException e) {
                log.error(e.toString());
            }
        });
        this.releaseAndWait((List<CountDownLatch>)locks, 1);
        this.parallelConsumer.requestCommitAsap();
        this.awaitForSomeLoopCycles(50);
        this.assertCommitLists(UniLists.of((Object)UniLists.of(), (Object)UniLists.of((Object)2)));
        this.releaseAndWait((List<CountDownLatch>)locks, 2);
        this.parallelConsumer.requestCommitAsap();
        Awaitility.await().untilAsserted(() -> this.assertCommitLists(UniLists.of((Object)UniLists.of(), (Object)UniLists.of((Object)2, (Object)3))));
        this.releaseAndWait((List<CountDownLatch>)locks, 0);
        this.parallelConsumer.requestCommitAsap();
        this.awaitForOneLoopCycle();
        if (this.isUsingAsyncCommits()) {
            this.awaitForSomeLoopCycles(3);
        }
        this.assertCommitLists(UniLists.of((Object)UniLists.of((Object)2), (Object)UniLists.of((Object)2, (Object)3)));
        this.releaseAndWait((List<CountDownLatch>)locks, 3);
        if (this.isUsingAsyncCommits()) {
            this.awaitForSomeLoopCycles(3);
        }
        Awaitility.await().untilAsserted(() -> this.assertCommitLists(UniLists.of((Object)UniLists.of((Object)2), (Object)UniLists.of((Object)2, (Object)3, (Object)4))));
    }

    @ParameterizedTest
    @EnumSource(value=ParallelConsumerOptions.CommitMode.class)
    void controlFlowException(ParallelConsumerOptions.CommitMode commitMode) {
        this.instantiateConsumerProducer();
        this.parentParallelConsumer = this.initPollingAsyncConsumer((ParallelConsumerOptions<String, String>)this.getBaseOptions(commitMode));
        this.subscribeParallelConsumerAndMockConsumerTo(this.INPUT_TOPIC);
        this.setupData();
        this.parallelConsumer.addLoopEndCallBack(() -> {
            throw new FakeRuntimeException("My fake control loop error");
        });
        this.parallelConsumer.poll(ignore -> log.info("Ignoring {}", ignore));
        Assertions.assertThatThrownBy(() -> this.parallelConsumer.closeDrainFirst(Duration.ofSeconds(10L))).hasMessageContainingAll(new CharSequence[]{"Error", "poll", "thread", "fake control"});
    }

    @ParameterizedTest
    @EnumSource(value=ParallelConsumerOptions.CommitMode.class)
    void testVoidPollMethod(ParallelConsumerOptions.CommitMode commitMode) {
        this.setupParallelConsumerInstance(commitMode);
        int expected = 1;
        CountDownLatch msgCompleteBarrier = new CountDownLatch(expected);
        this.parallelConsumer.poll(context -> {
            log.debug("Processing test context...");
            ConsumerRecord singleRecord = context.getSingleConsumerRecord();
            this.myRecordProcessingAction.apply((ConsumerRecord<String, String>)singleRecord);
            msgCompleteBarrier.countDown();
        });
        LatchTestUtils.awaitLatch(msgCompleteBarrier);
        this.awaitForSomeLoopCycles(2);
        this.parallelConsumer.close();
        this.assertCommits(UniLists.of((Object)1));
        ((MyAction)Mockito.verify((Object)this.myRecordProcessingAction, (VerificationMode)VerificationModeFactory.times((int)expected))).apply((ConsumerRecord<String, String>)((ConsumerRecord)ArgumentMatchers.any()));
        if (commitMode.equals((Object)ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER)) {
            ((MockProducer)Mockito.verify((Object)this.producerSpy, (VerificationMode)Mockito.atLeastOnce())).commitTransaction();
            ((MockProducer)Mockito.verify((Object)this.producerSpy, (VerificationMode)Mockito.atLeastOnce())).sendOffsetsToTransaction(Mockito.anyMap(), (ConsumerGroupMetadata)ArgumentMatchers.any());
        }
    }

    @ParameterizedTest
    @EnumSource(value=ParallelConsumerOptions.CommitMode.class)
    @Disabled
    public void processInKeyOrder(ParallelConsumerOptions.CommitMode commitMode) {
        this.setupParallelConsumerInstance(ParallelConsumerOptions.builder().commitMode(commitMode).ordering(ParallelConsumerOptions.ProcessingOrder.KEY).build());
        this.primeFirstRecord();
        Assertions.assertThat((Comparable)this.parallelConsumer.getWm().getOptions().getOrdering()).isEqualTo((Object)ParallelConsumerOptions.ProcessingOrder.KEY);
        this.sendSecondRecord(this.consumerSpy);
        this.consumerSpy.addRecord(this.ktu.makeRecord("key-1", "v2"));
        this.consumerSpy.addRecord(this.ktu.makeRecord("key-1", "v3"));
        this.consumerSpy.addRecord(this.ktu.makeRecord(1, "key-2", "v4"));
        this.consumerSpy.addRecord(this.ktu.makeRecord(1, "key-3", "v5"));
        this.consumerSpy.addRecord(this.ktu.makeRecord(1, "key-3", "v6"));
        this.consumerSpy.addRecord(this.ktu.makeRecord(1, "key-3", "v7"));
        this.consumerSpy.addRecord(this.ktu.makeRecord(1, "key-4", "v8"));
        CountDownLatch msg0Lock = new CountDownLatch(1);
        CountDownLatch msg1Lock = new CountDownLatch(1);
        CountDownLatch msg2Lock = new CountDownLatch(1);
        CountDownLatch msg3Lock = new CountDownLatch(1);
        CountDownLatch msg4Lock = new CountDownLatch(1);
        CountDownLatch msg5Lock = new CountDownLatch(1);
        CountDownLatch msg6Lock = new CountDownLatch(1);
        CountDownLatch msg7Lock = new CountDownLatch(1);
        CountDownLatch msg8Lock = new CountDownLatch(1);
        HashMap<Integer, Boolean> processedState = new HashMap<Integer, Boolean>();
        for (Long msgIndex : Range.range((long)8L)) {
            processedState.put(msgIndex.intValue(), false);
        }
        List locks = UniLists.of((Object)msg0Lock, (Object)msg1Lock, (Object)msg2Lock, (Object)msg3Lock, (Object)msg4Lock, (Object)msg5Lock, (Object)msg6Lock, (Object)msg7Lock, (Object)msg8Lock);
        ArrayList polled = new ArrayList();
        ((LongPollingMockConsumer)((Object)Mockito.doAnswer(x -> {
            ConsumerRecords o = (ConsumerRecords)x.callRealMethod();
            for (Object o1 : o) {
                polled.add(o1);
            }
            return o;
        }).when((Object)this.consumerSpy))).poll((Duration)ArgumentMatchers.any());
        this.parallelConsumer.poll(ignore -> {
            int offset = (int)ignore.offset();
            CountDownLatch latchForMsg = (CountDownLatch)locks.get(offset);
            try {
                log.debug("Started msg {} processing, locking on latch to simulate long process times...", (Object)offset);
                latchForMsg.await();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            log.debug("Finished msg {} processing after waking...", (Object)offset);
            processedState.put(offset, true);
        });
        msg6Lock.countDown();
        msg8Lock.countDown();
        log.debug("Unlocking 1...");
        msg1Lock.countDown();
        this.awaitForOneLoopCycle();
        Assertions.assertThat(polled).as("sanity check input data", new Object[0]).hasSameSizeAs((Iterable)locks);
        ((AbstractBooleanAssert)Assertions.assertThat((Boolean)((Boolean)processedState.get(1))).as("blocked by 0 (1 shouldn't be run until 0 is complete, due to key order processing)", new Object[0])).isFalse();
        this.assertCommits(UniLists.of());
        log.debug("Unlocking 2...");
        msg2Lock.countDown();
        this.awaitForSomeLoopCycles(2);
        Assertions.assertThat((Boolean)((Boolean)processedState.get(2))).isTrue();
        ((MockProducer)Mockito.verify((Object)this.producerSpy, (VerificationMode)Mockito.after((long)this.verificationWaitDelay).never())).commitTransaction();
        this.awaitForOneLoopCycle();
        this.assertCommits(UniLists.of());
        log.debug("Unlocking 0...");
        msg0Lock.countDown();
        this.awaitForCommitExact(0, 0);
        this.awaitForCommitExact(0, 2);
        this.assertCommits(UniLists.of((Object)0, (Object)2));
        log.debug("Unlocking 3...");
        msg3Lock.countDown();
        log.debug("Unlocking 5...");
        msg5Lock.countDown();
        this.awaitUntilTrue(() -> (Boolean)processedState.get(5));
        ((AbstractBooleanAssert)Assertions.assertThat((Boolean)((Boolean)processedState.get(5))).as("5 should processed", new Object[0])).isTrue();
        this.awaitForCommitExact(0, 3);
        this.assertCommits(UniLists.of((Object)0, (Object)2, (Object)3));
        log.debug("Unlocking 4...");
        msg4Lock.countDown();
        this.awaitUntilTrue(() -> (Boolean)processedState.get(6));
        ((AbstractBooleanAssert)Assertions.assertThat((Boolean)((Boolean)processedState.get(6))).as("6 should processed", new Object[0])).isTrue();
        this.awaitForSomeLoopCycles(1);
        this.awaitForCommitExact(1, 6);
        this.assertCommits(UniLists.of((Object)0, (Object)2, (Object)3, (Object)6));
        Assertions.assertThat((Boolean)((Boolean)processedState.get(7))).isFalse();
        Assertions.assertThat((Boolean)((Boolean)processedState.get(8))).isTrue();
        this.releaseAndWait((List<CountDownLatch>)locks, 7);
        this.awaitForCommitExact(1, 8);
        this.assertCommits(UniLists.of((Object)0, (Object)2, (Object)3, (Object)6, (Object)8));
    }

    @Test
    void processInKeyOrderWorkNotReturnedDoesntBreakCommits() {
        ParallelConsumerOptions options = ParallelConsumerOptions.builder().commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC).ordering(ParallelConsumerOptions.ProcessingOrder.KEY).build();
        this.setupParallelConsumerInstance(options);
        this.primeFirstRecord();
        this.sendSecondRecord(this.consumerSpy);
        Assertions.assertThat((Comparable)this.parallelConsumer.getWm().getOptions().getOrdering()).isEqualTo((Object)ParallelConsumerOptions.ProcessingOrder.KEY);
        this.consumerSpy.addRecord(this.ktu.makeRecord("key-1", "v2"));
        CountDownLatch msg1latch = new CountDownLatch(1);
        HashMap<Integer, CountDownLatch> locks = new HashMap<Integer, CountDownLatch>();
        locks.put(1, msg1latch);
        CountDownLatch twoLoopLatch = new CountDownLatch(2);
        CountDownLatch fourLoopLatch = new CountDownLatch(4);
        this.parallelConsumer.addLoopEndCallBack(() -> {
            log.trace("Control loop cycle - {}, {}", (Object)twoLoopLatch.getCount(), (Object)fourLoopLatch.getCount());
            twoLoopLatch.countDown();
            fourLoopLatch.countDown();
        });
        ArrayList polled = new ArrayList();
        ((LongPollingMockConsumer)((Object)Mockito.doAnswer(x -> {
            ConsumerRecords records = (ConsumerRecords)x.callRealMethod();
            for (ConsumerRecord record : records) {
                polled.add(record);
            }
            return records;
        }).when((Object)this.consumerSpy))).poll((Duration)ArgumentMatchers.any());
        this.parallelConsumer.poll(ignore -> {
            int offset = (int)ignore.offset();
            CountDownLatch countDownLatch = (CountDownLatch)locks.get(offset);
            if (countDownLatch != null) {
                try {
                    countDownLatch.await();
                }
                catch (Exception e) {
                    log.error(e.getMessage(), (Throwable)e);
                }
            }
            log.debug("Message offset {} processed...", (Object)offset);
        });
        Awaitility.await().untilAsserted(() -> ((ListAssert)Assertions.assertThat((List)polled).as("sanity check - the records have been polled", new Object[0])).hasSize(3));
        LatchTestUtils.awaitLatch(twoLoopLatch);
        this.awaitForOneLoopCycle();
        Awaitility.await().untilAsserted(() -> {
            try {
                this.assertCommits((List<Integer>)UniLists.of((Object)0, (Object)1), "Only 0 should be committed, as even though 2 is also finished, 1 should be blocking the partition");
            }
            catch (AssertionError e) {
                this.assertCommits((List<Integer>)UniLists.of((Object)1), "Bootstrap commit is optional. See msg in code above");
            }
        });
        msg1latch.countDown();
        LatchTestUtils.awaitLatch(fourLoopLatch);
        this.awaitForOneLoopCycle();
        Awaitility.await().untilAsserted(() -> {
            try {
                this.assertCommits((List<Integer>)UniLists.of((Object)0, (Object)1, (Object)3), "Remaining two records should be committed as a single offset");
            }
            catch (AssertionError e) {
                this.assertCommits((List<Integer>)UniLists.of((Object)1, (Object)3), "Bootstrap commit is optional. See msg in code above");
            }
        });
    }

    @ParameterizedTest
    @EnumSource(value=ParallelConsumerOptions.CommitMode.class)
    public void closeAfterSingleMessageShouldBeEventBasedFast(ParallelConsumerOptions.CommitMode commitMode) {
        this.setupParallelConsumerInstance(commitMode);
        Duration timeBetweenCommits = this.parallelConsumer.getTimeBetweenCommits();
        CountDownLatch msgCompleteBarrier = new CountDownLatch(1);
        this.parallelConsumer.poll(ignore -> {
            log.info("Message processed: {} - noop", (Object)ignore.offset());
            msgCompleteBarrier.countDown();
        });
        LatchTestUtils.awaitLatch(msgCompleteBarrier);
        this.awaitForOneLoopCycle();
        this.parallelConsumer.requestCommitAsap();
        this.awaitForOneLoopCycle();
        Awaitility.await().untilAsserted(() -> this.assertCommits(UniLists.of((Object)1)));
        Duration durationOfCloseOperation = GeneralTestUtils.time(() -> this.parallelConsumer.close());
        Duration expectedDurationOfClose = JavaUtils.max((Duration)timeBetweenCommits, (Duration)Duration.ofSeconds(1L));
        ((AbstractDurationAssert)Assertions.assertThat((Duration)durationOfCloseOperation).as("Should be fast", new Object[0])).isLessThan((Comparable)expectedDurationOfClose);
    }

    @ParameterizedTest
    @EnumSource(value=ParallelConsumerOptions.CommitMode.class)
    public void closeWithoutRunningShouldBeEventBasedFast(ParallelConsumerOptions.CommitMode commitMode) {
        this.setupParallelConsumerInstance(this.getBaseOptions(commitMode));
        this.parallelConsumer.closeDontDrainFirst();
    }

    @Test
    public void ensureLibraryCantBeUsedTwice() {
        this.parallelConsumer.poll(ignore -> {});
        Assertions.assertThatIllegalStateException().isThrownBy(() -> this.parallelConsumer.poll(ignore -> {}));
    }

    @ParameterizedTest
    @EnumSource(value=ParallelConsumerOptions.CommitMode.class)
    void consumeFlowDoesntRequireProducer(ParallelConsumerOptions.CommitMode commitMode) {
        this.setupClients();
        ParallelConsumerOptions optionsWithClients = ParallelConsumerOptions.builder().consumer((Consumer)this.consumerSpy).commitMode(commitMode).build();
        if (commitMode.equals((Object)ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER)) {
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
                this.parallelConsumer = this.initPollingAsyncConsumer((ParallelConsumerOptions<String, String>)optionsWithClients);
            }).isInstanceOf(IllegalArgumentException.class)).hasMessageContainingAll(new CharSequence[]{"Producer", "Transaction"});
        } else {
            this.parallelConsumer = this.initPollingAsyncConsumer((ParallelConsumerOptions<String, String>)optionsWithClients);
            this.attachLoopCounter((AbstractParallelEoSStreamProcessor)this.parallelConsumer);
            this.subscribeParallelConsumerAndMockConsumerTo(this.INPUT_TOPIC);
            this.setupData();
            this.parallelConsumer.poll(ignore -> log.debug("Test record processor - rec: {}", ignore));
            this.parallelConsumer.requestCommitAsap();
            this.awaitForCommitExact(1);
            this.parallelConsumer.closeDrainFirst();
            this.assertCommits(UniLists.of((Object)1));
        }
    }

    @Test
    void optionsProduceMessageFlowRequiresProducer() {
        this.setupClients();
        ParallelConsumerOptions optionsWithClients = ParallelConsumerOptions.builder().consumer((Consumer)this.consumerSpy).commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER).build();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
            this.parallelConsumer = this.initPollingAsyncConsumer((ParallelConsumerOptions<String, String>)optionsWithClients);
        }).isInstanceOf(IllegalArgumentException.class)).hasMessageContainingAll(new CharSequence[]{"Producer", "Transaction"});
    }

    @Test
    void optionsGroupIdRequiredAndAutoCommitDisabled() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        Deserializer deserializer = Serdes.String().deserializer();
        KafkaConsumer realConsumer = new KafkaConsumer(properties, deserializer, deserializer);
        ParallelConsumerOptions.ParallelConsumerOptionsBuilder optionsBuilder = ParallelConsumerOptions.builder().consumer((Consumer)realConsumer).commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS);
        ParallelConsumerOptions optionsWithClients = optionsBuilder.build();
        ((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
            this.parallelConsumer = this.initPollingAsyncConsumer((ParallelConsumerOptions<String, String>)optionsWithClients);
        }).as("Should error on missing group id", new Object[0])).isInstanceOf(IllegalArgumentException.class)).hasMessageContainingAll(new CharSequence[]{"Consumer", "GroupId"});
        properties.setProperty("group.id", "dummy-group");
        optionsBuilder.consumer((Consumer)new KafkaConsumer(properties, deserializer, deserializer));
        ((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThat((Throwable)Assertions.catchThrowable(() -> {
            this.parallelConsumer = this.initPollingAsyncConsumer((ParallelConsumerOptions<String, String>)optionsBuilder.build());
        })).as("Should error on auto commit enabled by default", new Object[0])).isInstanceOf(ParallelConsumerException.class)).hasMessageContainingAll(new CharSequence[]{"auto", "commit", "disabled"});
        properties.setProperty("enable.auto.commit", "true");
        optionsBuilder.consumer((Consumer)new KafkaConsumer(properties, deserializer, deserializer));
        ((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThat((Throwable)Assertions.catchThrowable(() -> {
            this.parallelConsumer = this.initPollingAsyncConsumer((ParallelConsumerOptions<String, String>)optionsBuilder.build());
        })).as("Should error on auto commit enabled", new Object[0])).isInstanceOf(ParallelConsumerException.class)).hasMessageContainingAll(new CharSequence[]{"auto", "commit", "disabled"});
        properties.setProperty("enable.auto.commit", "false");
        optionsBuilder.consumer((Consumer)new KafkaConsumer(properties, deserializer, deserializer));
        Assertions.assertThatNoException().isThrownBy(() -> {
            this.parallelConsumer = this.initPollingAsyncConsumer((ParallelConsumerOptions<String, String>)optionsBuilder.build());
        });
    }

    @Test
    void cantUseProduceFlowWithWrongOptions() throws InterruptedException {
        this.setupClients();
        ParallelConsumerOptions optionsWithClients = ParallelConsumerOptions.builder().consumer((Consumer)this.consumerSpy).commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS).build();
        this.setupParallelConsumerInstance(optionsWithClients);
        this.subscribeParallelConsumerAndMockConsumerTo(this.INPUT_TOPIC);
        this.setupData();
        ParallelEoSStreamProcessor<String, String> parallel = this.initPollingAsyncConsumer((ParallelConsumerOptions<String, String>)optionsWithClients);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> parallel.pollAndProduce(record -> new ProducerRecord(this.INPUT_TOPIC, (Object)"hi there"))).isInstanceOf(IllegalArgumentException.class)).hasMessageContainingAll(new CharSequence[]{"Producer", "options"});
    }

    @ParameterizedTest
    @EnumSource(value=ParallelConsumerOptions.CommitMode.class)
    void produceMessageFlow(ParallelConsumerOptions.CommitMode commitMode) {
        this.setupParallelConsumerInstance(commitMode);
        this.parallelConsumer.pollAndProduce(ignore -> new ProducerRecord("Hello", (Object)"there"));
        this.awaitForSomeLoopCycles(2);
        this.parallelConsumer.requestCommitAsap();
        Awaitility.await().untilAsserted(() -> this.assertCommits(UniLists.of((Object)1)));
        this.parallelConsumer.closeDrainFirst();
        Assertions.assertThat((List)this.producerSpy.history()).hasSize(1);
    }

    @Test
    void lessKeysThanThreads() {
        this.setupParallelConsumerInstance(ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).maxConcurrency(100).build());
        int keySetSize = 4;
        List keys = Range.range((long)4L).listAsIntegers();
        int total = 100000;
        log.debug("Generating {} records against {} keys...", (Object)100000, (Object)4);
        HashMap records = this.ktu.generateRecords(keys, 100000);
        records.entrySet().forEach(x -> log.debug("Key {} has {} records", x.getKey(), (Object)((List)x.getValue()).size()));
        log.debug("Sending...");
        this.ktu.send((MockConsumer<String, String>)this.consumerSpy, records);
        ProgressBar bar = ProgressBarUtils.getNewMessagesBar(log, 100000L);
        log.debug("Consuming...");
        ConcurrentHashMap<String, Queue<PollContext<String, String>>> results = new ConcurrentHashMap<String, Queue<PollContext<String, String>>>();
        AtomicLong counter = new AtomicLong();
        this.parallelConsumer.poll(recordContexts -> {
            counter.incrementAndGet();
            bar.step();
            log.trace("Consumed {}", recordContexts);
            results.computeIfAbsent((String)recordContexts.key(), ignore -> new ConcurrentLinkedQueue()).add(recordContexts);
        });
        Awaitility.await().atMost(3L, TimeUnit.MINUTES).untilAsserted(() -> Assertions.assertThat((long)counter.get()).isEqualTo(100000L));
        this.parallelConsumer.closeDrainFirst();
        bar.close();
        int sequenceSize = Math.max(25000, 1);
        log.debug("Testing...");
        KafkaTestUtils.checkExactOrdering(results, records);
    }

    public static class MyAction
    implements Function<ConsumerRecord<String, String>, String> {
        @Override
        public String apply(ConsumerRecord<String, String> record) {
            log.info("User client function - consuming a record... {}", record.key());
            return "my-result";
        }
    }
}

