package io.confluent.parallelconsumer;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.PCModuleTestEnv;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

/* loaded from: input_file:io/confluent/parallelconsumer/ParallelEoSSStreamProcessorRebalancedTest.class */
class ParallelEoSSStreamProcessorRebalancedTest extends ParallelEoSStreamProcessorTestBase {
    private static final Logger log = LoggerFactory.getLogger(ParallelEoSSStreamProcessorRebalancedTest.class);
    private static final AtomicInteger RECORD_SET_KEY_GENERATOR = new AtomicInteger();

    ParallelEoSSStreamProcessorRebalancedTest() {
    }

    @Override // io.confluent.parallelconsumer.AbstractParallelEoSStreamProcessorTestBase
    @BeforeEach
    public void setupAsyncConsumerTestBase() {
        setupTopicNames();
        setupClients();
    }

    @Override // io.confluent.parallelconsumer.AbstractParallelEoSStreamProcessorTestBase
    @AfterEach
    public void close() {
    }

    @EnumSource(ParallelConsumerOptions.CommitMode.class)
    @ParameterizedTest
    void pausingAndResumingProcessingShouldWork(ParallelConsumerOptions.CommitMode commitMode) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.parallelConsumer = new ParallelEoSStreamProcessor<>(getBaseOptions(commitMode), new PCModuleTestEnv(getBaseOptions(commitMode), countDownLatch));
        this.parentParallelConsumer = this.parallelConsumer;
        this.parallelConsumer.subscribe(UniLists.of(this.INPUT_TOPIC));
        this.consumerSpy.subscribeWithRebalanceAndAssignment(UniLists.of(this.INPUT_TOPIC), 2);
        attachLoopCounter(this.parallelConsumer);
        this.parallelConsumer.poll(pollContext -> {
        });
        addRecordsWithSetKeyForEachPartition();
        awaitUntilTrue(() -> {
            return Boolean.valueOf(this.parallelConsumer.getWm().getNumberRecordsOutForProcessing() > 0);
        });
        this.consumerSpy.revoke(UniLists.of(new TopicPartition(this.INPUT_TOPIC, 0)));
        this.consumerSpy.rebalanceWithoutAssignment(this.consumerSpy.assignment());
        this.consumerSpy.assign(UniLists.of(new TopicPartition(this.INPUT_TOPIC, 0)));
        addRecordsWithSetKeyForEachPartition();
        countDownLatch.countDown();
        awaitForCommit(4);
    }

    private void addRecordsWithSetKeyForEachPartition() {
        long incrementAndGet = RECORD_SET_KEY_GENERATOR.incrementAndGet();
        log.debug("Producing {} records with set key {}.", 2, Long.valueOf(incrementAndGet));
        this.consumerSpy.addRecord(this.ktu.makeRecord(0, "key-" + incrementAndGet + 0, "v0-test-0"));
        this.consumerSpy.addRecord(this.ktu.makeRecord(1, "key-" + incrementAndGet + 0, "v0-test-0"));
        log.debug("Finished producing {} records with set key {}.", 2, Long.valueOf(incrementAndGet));
    }

    private ParallelConsumerOptions<String, String> getBaseOptions(ParallelConsumerOptions.CommitMode commitMode) {
        ParallelConsumerOptions.ParallelConsumerOptionsBuilder maxConcurrency = ParallelConsumerOptions.builder().commitMode(commitMode).consumer(this.consumerSpy).batchSize(2).maxConcurrency(1);
        if (commitMode == ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER) {
            maxConcurrency.producer(this.producerSpy);
        }
        return maxConcurrency.build();
    }
}
