/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.internal.PCModuleTestEnv;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

class ParallelEoSSStreamProcessorRebalancedTest
extends ParallelEoSStreamProcessorTestBase {
    private static final Logger log = LoggerFactory.getLogger(ParallelEoSSStreamProcessorRebalancedTest.class);
    private static final AtomicInteger RECORD_SET_KEY_GENERATOR = new AtomicInteger();

    ParallelEoSSStreamProcessorRebalancedTest() {
    }

    @Override
    @BeforeEach
    public void setupAsyncConsumerTestBase() {
        this.setupTopicNames();
        this.setupClients();
    }

    @Override
    @AfterEach
    public void close() {
    }

    @ParameterizedTest
    @EnumSource(value=ParallelConsumerOptions.CommitMode.class)
    void pausingAndResumingProcessingShouldWork(ParallelConsumerOptions.CommitMode commitMode) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        PCModuleTestEnv pcModuleTestEnv = new PCModuleTestEnv(this.getBaseOptions(commitMode), countDownLatch);
        this.parallelConsumer = new ParallelEoSStreamProcessor(this.getBaseOptions(commitMode), (PCModule)pcModuleTestEnv);
        this.parentParallelConsumer = this.parallelConsumer;
        this.parallelConsumer.subscribe((Collection)UniLists.of((Object)this.INPUT_TOPIC));
        this.consumerSpy.subscribeWithRebalanceAndAssignment(UniLists.of((Object)this.INPUT_TOPIC), 2);
        this.attachLoopCounter((AbstractParallelEoSStreamProcessor)this.parallelConsumer);
        this.parallelConsumer.poll(context -> {});
        this.addRecordsWithSetKeyForEachPartition();
        this.awaitUntilTrue(() -> this.parallelConsumer.getWm().getNumberRecordsOutForProcessing() > 0);
        this.consumerSpy.revoke(UniLists.of((Object)new TopicPartition(this.INPUT_TOPIC, 0)));
        this.consumerSpy.rebalanceWithoutAssignment(this.consumerSpy.assignment());
        this.consumerSpy.assign(UniLists.of((Object)new TopicPartition(this.INPUT_TOPIC, 0)));
        this.addRecordsWithSetKeyForEachPartition();
        countDownLatch.countDown();
        this.awaitForCommit(4);
    }

    private void addRecordsWithSetKeyForEachPartition() {
        long recordSetKey = RECORD_SET_KEY_GENERATOR.incrementAndGet();
        log.debug("Producing {} records with set key {}.", (Object)2, (Object)recordSetKey);
        this.consumerSpy.addRecord(this.ktu.makeRecord(0, "key-" + recordSetKey + 0, "v0-test-0"));
        this.consumerSpy.addRecord(this.ktu.makeRecord(1, "key-" + recordSetKey + 0, "v0-test-0"));
        log.debug("Finished producing {} records with set key {}.", (Object)2, (Object)recordSetKey);
    }

    private ParallelConsumerOptions<String, String> getBaseOptions(ParallelConsumerOptions.CommitMode commitMode) {
        ParallelConsumerOptions.ParallelConsumerOptionsBuilder optionsBuilder = ParallelConsumerOptions.builder().commitMode(commitMode).consumer((Consumer)this.consumerSpy).batchSize(Integer.valueOf(2)).maxConcurrency(1);
        if (commitMode == ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER) {
            optionsBuilder.producer((Producer)this.producerSpy);
        }
        return optionsBuilder.build();
    }
}

