/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer;

import com.google.common.truth.Truth;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.PollContext;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.Consumer;
import org.awaitility.Awaitility;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ParallelEoSStreamProcessorPauseResumeTest
extends ParallelEoSStreamProcessorTestBase {
    private static final Logger log = LoggerFactory.getLogger(ParallelEoSStreamProcessorPauseResumeTest.class);
    private static final AtomicInteger MY_ID_GENERATOR = new AtomicInteger();
    private static final AtomicInteger RECORD_SET_KEY_GENERATOR = new AtomicInteger();

    ParallelEoSStreamProcessorPauseResumeTest() {
    }

    private ParallelConsumerOptions<String, String> getBaseOptions(ParallelConsumerOptions.CommitMode commitMode, int maxConcurrency) {
        return ParallelConsumerOptions.builder().commitMode(commitMode).consumer((Consumer)this.consumerSpy).ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).maxConcurrency(maxConcurrency).build();
    }

    private void addRecordsWithSetKey(int numRecords) {
        long recordSetKey = RECORD_SET_KEY_GENERATOR.incrementAndGet();
        log.debug("Producing {} records with set key {}.", (Object)numRecords, (Object)recordSetKey);
        for (int i = 0; i < numRecords; ++i) {
            this.consumerSpy.addRecord(this.ktu.makeRecord("key-" + recordSetKey + i, "v0-test-" + i));
        }
        log.debug("Finished producing {} records with set key {}.", (Object)numRecords, (Object)recordSetKey);
    }

    private void setupParallelConsumerInstance(ParallelConsumerOptions.CommitMode commitMode, int maxConcurrency) {
        this.setupParallelConsumerInstance(this.getBaseOptions(commitMode, maxConcurrency));
        String myId = "p/r-test-" + MY_ID_GENERATOR.incrementAndGet();
        this.parallelConsumer.setMyId(Optional.of(myId));
    }

    private TestUserFunction createTestSetup(ParallelConsumerOptions.CommitMode commitMode, int maxConcurrency) {
        this.setupParallelConsumerInstance(commitMode, maxConcurrency);
        TestUserFunction testUserFunction = new TestUserFunction();
        this.parallelConsumer.poll((java.util.function.Consumer)testUserFunction);
        return testUserFunction;
    }

    @ParameterizedTest
    @EnumSource(value=ParallelConsumerOptions.CommitMode.class)
    void pausingAndResumingProcessingShouldWork(ParallelConsumerOptions.CommitMode commitMode) {
        int numTestRecordsPerSet = 1000;
        int totalRecordsExpected = 2 * numTestRecordsPerSet;
        TestUserFunction testUserFunction = this.createTestSetup(commitMode, 3);
        this.addRecordsWithSetKey(numTestRecordsPerSet);
        Awaitility.waitAtMost((Duration)defaultTimeout).alias(numTestRecordsPerSet + " records should be processed").untilAsserted(() -> Truth.assertThat((Integer)testUserFunction.numProcessedRecords.get()).isEqualTo((Object)numTestRecordsPerSet));
        this.awaitForCommit(numTestRecordsPerSet);
        testUserFunction.reset();
        this.parallelConsumer.pauseIfRunning();
        this.awaitForOneLoopCycle();
        this.addRecordsWithSetKey(numTestRecordsPerSet);
        this.awaitForSomeLoopCycles(2);
        Truth.assertThat((Integer)testUserFunction.numProcessedRecords.get()).isEqualTo((Object)0L);
        this.awaitForCommit(numTestRecordsPerSet);
        this.parallelConsumer.resumeIfPaused();
        Awaitility.waitAtMost((Duration)defaultTimeout).alias(numTestRecordsPerSet + " records should be processed").untilAsserted(() -> Truth.assertThat((Integer)testUserFunction.numProcessedRecords.get()).isEqualTo((Object)numTestRecordsPerSet));
        this.awaitForCommit(totalRecordsExpected);
    }

    @ParameterizedTest
    @EnumSource(value=ParallelConsumerOptions.CommitMode.class)
    void testThatInFlightWorkIsFinishedSuccessfullyAndOffsetsAreCommitted(ParallelConsumerOptions.CommitMode commitMode) {
        int degreeOfParallelism = 3;
        int numTestRecordsPerSet = 1000;
        TestUserFunction testUserFunction = this.createTestSetup(commitMode, degreeOfParallelism);
        testUserFunction.lockProcessing();
        this.addRecordsWithSetKey(numTestRecordsPerSet);
        Awaitility.waitAtMost((Duration)defaultTimeout).alias(degreeOfParallelism + " records should be in flight processed").untilAsserted(() -> Truth.assertThat((Integer)testUserFunction.numInFlightRecords.get()).isEqualTo((Object)degreeOfParallelism));
        this.assertCommits().isEmpty();
        this.parallelConsumer.pauseIfRunning();
        this.awaitForOneLoopCycle();
        testUserFunction.unlockProcessing();
        Awaitility.waitAtMost((Duration)defaultTimeout).alias("at least " + degreeOfParallelism + " records should be processed").untilAsserted(() -> Truth.assertThat((Integer)testUserFunction.numProcessedRecords.get()).isGreaterThan((Comparable)Integer.valueOf(degreeOfParallelism)));
        this.awaitForCommit(testUserFunction.numProcessedRecords.get());
        Truth.assertThat((Integer)testUserFunction.numInFlightRecords.get()).isEqualTo((Object)0);
        Truth.assertThat((Integer)this.parallelConsumer.getWm().getNumberRecordsOutForProcessing()).isEqualTo((Object)0);
        this.parallelConsumer.resumeIfPaused();
        Awaitility.waitAtMost((Duration)defaultTimeout).alias(numTestRecordsPerSet + " records should be processed").untilAsserted(() -> Truth.assertThat((Integer)testUserFunction.numProcessedRecords.get()).isEqualTo((Object)numTestRecordsPerSet));
        this.awaitForCommit(numTestRecordsPerSet);
    }

    private static class TestUserFunction
    implements java.util.function.Consumer<PollContext<String, String>> {
        private final AtomicInteger numProcessedRecords = new AtomicInteger();
        private final AtomicInteger numInFlightRecords = new AtomicInteger();
        private final ReentrantLock mutex = new ReentrantLock();

        private TestUserFunction() {
        }

        public void lockProcessing() {
            this.mutex.lock();
        }

        public void unlockProcessing() {
            log.debug("Unlocking processing");
            this.mutex.unlock();
        }

        @Override
        public void accept(PollContext<String, String> t) {
            log.debug("Received: {}", t);
            this.numInFlightRecords.incrementAndGet();
            try {
                this.lockProcessing();
                int numProcessed = this.numProcessedRecords.incrementAndGet();
                log.debug("Processed complete, incremented to {}", (Object)numProcessed);
            }
            finally {
                this.unlockProcessing();
                this.numInFlightRecords.decrementAndGet();
            }
        }

        public void reset() {
            this.numProcessedRecords.set(0);
        }
    }
}

