package io.confluent.parallelconsumer.integrationTests;

import io.confluent.parallelconsumer.ManagedTruth;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.shaded.org.hamcrest.Matchers;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniSets;

/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/RebalanceTest.class */
class RebalanceTest extends BrokerIntegrationTest<String, String> {
    Consumer<String, String> consumer;
    ParallelEoSStreamProcessor<String, String> pc;
    private static final Logger log = LoggerFactory.getLogger(RebalanceTest.class);
    public static final Duration INFINITE = Duration.ofDays(1);

    RebalanceTest() {
        this.numPartitions = 2;
    }

    @BeforeEach
    void setup() {
        setupTopic();
        this.consumer = getKcu().createNewConsumer(KafkaClientUtils.GroupOption.NEW_GROUP);
    }

    @AfterEach
    void cleanup() {
        this.pc.close();
    }

    private ParallelEoSStreamProcessor<String, String> setupPC() {
        return setupPC(null);
    }

    private ParallelEoSStreamProcessor<String, String> setupPC(Function<ParallelConsumerOptions.ParallelConsumerOptionsBuilder<String, String>, ParallelConsumerOptions.ParallelConsumerOptionsBuilder<String, String>> function) {
        ParallelConsumerOptions.ParallelConsumerOptionsBuilder<String, String> ordering = ParallelConsumerOptions.builder().consumer(this.consumer).ordering(ParallelConsumerOptions.ProcessingOrder.PARTITION);
        if (function != null) {
            ordering = function.apply(ordering);
        }
        return new ParallelEoSStreamProcessor<>(ordering.build());
    }

    @Test
    void commitUponRevoke() {
        AtomicLong atomicLong = new AtomicLong();
        this.pc = setupPC();
        this.pc.subscribe(UniSets.of(this.topic));
        getKcu().produceMessages(this.topic, 20L);
        this.pc.setTimeBetweenCommits(INFINITE);
        this.pc.poll(pollContext -> {
            atomicLong.getAndIncrement();
            log.debug("Processed record, count now {} - offset: {}", atomicLong, Long.valueOf(pollContext.offset()));
        });
        Awaitility.await().untilAtomic(atomicLong, Matchers.is(Matchers.equalTo(20L)));
        log.debug("All records consumed");
        Duration ofSeconds = Duration.ofSeconds(5L);
        log.debug("Creating new consumer in same group and subscribing to same topic set with a no record timeout of {}, expect this phase to take entire timeout...", ofSeconds);
        KafkaConsumer createNewConsumer = getKcu().createNewConsumer(KafkaClientUtils.GroupOption.REUSE_GROUP);
        createNewConsumer.subscribe(UniLists.of(this.topic));
        log.debug("Polling with new group member for records with timeout {}...", ofSeconds);
        ConsumerRecords poll = createNewConsumer.poll(ofSeconds);
        log.debug("Poll complete");
        ManagedTruth.assertThat(poll).hasCountEqualTo(0);
        log.debug("Test finished");
    }

    static Stream<Arguments> rebalanceTestCommitModes() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{"Consumer Async", ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS, null}), Arguments.of(new Object[]{"Consumer Sync", ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC, null}), Arguments.of(new Object[]{"Consumer Async + Producer Non-transactional", ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS, KafkaClientUtils.ProducerMode.NOT_TRANSACTIONAL}), Arguments.of(new Object[]{"Consumer Sync + Producer Non-transactional", ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS, KafkaClientUtils.ProducerMode.NOT_TRANSACTIONAL}), Arguments.of(new Object[]{"Transactional Producer", ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER, KafkaClientUtils.ProducerMode.TRANSACTIONAL})});
    }

    @MethodSource({"rebalanceTestCommitModes"})
    @ParameterizedTest(name = "[{index}] - {0}")
    void rebalanceCompletesForCommitModeVariations(String str, ParallelConsumerOptions.CommitMode commitMode, KafkaClientUtils.ProducerMode producerMode) {
        long j = 20;
        AtomicLong atomicLong = new AtomicLong();
        this.pc = setupPC(parallelConsumerOptionsBuilder -> {
            ParallelConsumerOptions.ParallelConsumerOptionsBuilder commitMode2 = parallelConsumerOptionsBuilder.commitMode(commitMode);
            if (producerMode != null) {
                commitMode2 = commitMode2.producer(getKcu().createNewProducer(producerMode));
            }
            return commitMode2;
        });
        this.pc.subscribe(UniSets.of(this.topic));
        getKcu().produceMessages(this.topic, 20L);
        this.pc.poll(pollContext -> {
            atomicLong.getAndIncrement();
            log.debug("PC1 - Processed record, count now {} - offset: {}", atomicLong, Long.valueOf(pollContext.offset()));
        });
        Awaitility.await().untilAtomic(atomicLong, Matchers.is(Matchers.equalTo(20L)));
        log.debug("All records consumed");
        KafkaConsumer createNewConsumer = getKcu().createNewConsumer(KafkaClientUtils.GroupOption.REUSE_GROUP);
        ParallelEoSStreamProcessor<String, String> parallelEoSStreamProcessor = setupPC(parallelConsumerOptionsBuilder2 -> {
            return parallelConsumerOptionsBuilder2.consumer(createNewConsumer);
        });
        parallelEoSStreamProcessor.subscribe(UniSets.of(this.topic));
        parallelEoSStreamProcessor.poll(pollContext2 -> {
            atomicLong.getAndIncrement();
            log.debug("PC2 - Processed record, count now {} - offset: {}", atomicLong, Long.valueOf(pollContext2.offset()));
        });
        Awaitility.await().untilAsserted(() -> {
            ManagedTruth.assertThat(parallelEoSStreamProcessor).getNumberOfAssignedPartitions().isEqualTo(1);
        });
        getKcu().produceMessages(this.topic, 20L);
        Awaitility.await().untilAtomic(atomicLong, Matchers.is(Matchers.equalTo(Long.valueOf(20 * 2))));
        this.pc.closeDrainFirst();
        Awaitility.await().untilAsserted(() -> {
            ManagedTruth.assertThat(parallelEoSStreamProcessor).getNumberOfAssignedPartitions().isEqualTo(2);
        });
        getKcu().produceMessages(this.topic, 20L);
        Awaitility.await().untilAtomic(atomicLong, Matchers.is(Matchers.equalTo(Long.valueOf(20 * 3))));
        parallelEoSStreamProcessor.closeDrainFirst();
        Awaitility.await().untilAsserted(() -> {
            ManagedTruth.assertThat(parallelEoSStreamProcessor).isClosedOrFailed();
            Assertions.assertThat(atomicLong).hasValue(j * 3);
        });
        log.debug("Test finished");
    }
}
