/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.integrationTests;

import io.confluent.parallelconsumer.ManagedTruth;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.integrationTests.BrokerIntegrationTest;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.shaded.org.hamcrest.Matcher;
import org.testcontainers.shaded.org.hamcrest.Matchers;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniSets;

class RebalanceTest
extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(RebalanceTest.class);
    Consumer<String, String> consumer;
    ParallelEoSStreamProcessor<String, String> pc;
    public static final Duration INFINITE = Duration.ofDays(1L);

    RebalanceTest() {
        this.numPartitions = 2;
    }

    @BeforeEach
    void setup() {
        this.setupTopic();
        this.consumer = this.getKcu().createNewConsumer(KafkaClientUtils.GroupOption.NEW_GROUP);
        this.pc = new ParallelEoSStreamProcessor(ParallelConsumerOptions.builder().consumer(this.consumer).ordering(ParallelConsumerOptions.ProcessingOrder.PARTITION).build());
        this.pc.subscribe((Collection)UniSets.of((Object)this.topic));
    }

    @Test
    void commitUponRevoke() {
        long numberOfRecordsToProduce = 20L;
        AtomicLong count = new AtomicLong();
        this.getKcu().produceMessages(this.topic, numberOfRecordsToProduce);
        this.pc.setTimeBetweenCommits(INFINITE);
        this.pc.poll(recordContexts -> {
            count.getAndIncrement();
            log.debug("Processed record, count now {} - offset: {}", (Object)count, (Object)recordContexts.offset());
        });
        Awaitility.await().untilAtomic(count, Matchers.is((Matcher)Matchers.equalTo((Object)numberOfRecordsToProduce)));
        log.debug("All records consumed");
        Duration newPollTimeout = Duration.ofSeconds(5L);
        log.debug("Creating new consumer in same group and subscribing to same topic set with a no record timeout of {}, expect this phase to take entire timeout...", (Object)newPollTimeout);
        KafkaConsumer newConsumer = this.getKcu().createNewConsumer(KafkaClientUtils.GroupOption.REUSE_GROUP);
        newConsumer.subscribe((Collection)UniLists.of((Object)this.topic));
        log.debug("Polling with new group member for records with timeout {}...", (Object)newPollTimeout);
        ConsumerRecords newConsumersPollResult = newConsumer.poll(newPollTimeout);
        log.debug("Poll complete");
        ManagedTruth.assertThat(newConsumersPollResult).hasCountEqualTo(0);
        log.debug("Test finished");
    }
}

