package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.utils.ThreadUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.Consumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import pl.tlinkowski.unij.api.UniSets;

/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/BrokerPollerBackpressureTest.class */
public class BrokerPollerBackpressureTest extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(BrokerPollerBackpressureTest.class);
    Consumer<String, String> consumer;
    ParallelConsumerOptions<String, String> pcOpts;
    ParallelEoSStreamProcessor<String, String> pc;

    @BeforeEach
    void setUp() {
        setupTopic();
        this.consumer = getKcu().createNewConsumer(KafkaClientUtils.GroupOption.NEW_GROUP);
        this.pcOpts = ParallelConsumerOptions.builder().consumer(this.consumer).ordering(ParallelConsumerOptions.ProcessingOrder.KEY).maxConcurrency(10).messageBufferSize(MultiInstanceRebalanceTest.DEFAULT_POLL_DELAY).build();
        this.pc = new ParallelEoSStreamProcessor<>(this.pcOpts);
        this.pc.subscribe(UniSets.of(this.topic));
    }

    @Test
    void brokerPollPausedWithEmptyShardsButHighInFlight() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Assertions.assertThat(this.pc.getPausedPartitionSize()).isEqualTo(0);
        getKcu().produceMessages(this.topic, 200L);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.pc.poll(pollContext -> {
            try {
                atomicInteger.incrementAndGet();
                countDownLatch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        Awaitility.await().atMost(Duration.ofSeconds(10L)).pollInterval(Duration.ofMillis(200L)).until(() -> {
            return Boolean.valueOf(this.pc.getWm().getNumberOfWorkQueuedInShardsAwaitingSelection() == 0);
        });
        Awaitility.await().atMost(Duration.ofSeconds(10L)).pollInterval(Duration.ofMillis(20L)).untilAsserted(() -> {
            Assertions.assertThat(this.pc.getPausedPartitionSize()).isEqualTo(1);
        });
        ThreadUtils.sleepQuietly(1000);
        Assertions.assertThat(this.pc.getPausedPartitionSize()).isEqualTo(1);
        countDownLatch.countDown();
        Awaitility.await().atMost(Duration.ofSeconds(10L)).pollInterval(Duration.ofMillis(20L)).untilAsserted(() -> {
            Assertions.assertThat(atomicInteger.get()).isEqualTo(200);
            Assertions.assertThat(this.pc.getWm().getNumberRecordsOutForProcessing()).isEqualTo(0);
        });
        Awaitility.await().atMost(Duration.ofSeconds(10L)).pollInterval(Duration.ofMillis(20L)).untilAsserted(() -> {
            Assertions.assertThat(this.pc.getPausedPartitionSize()).isEqualTo(0);
        });
        getKcu().produceMessages(this.topic, 10L);
        Awaitility.await().atMost(Duration.ofSeconds(10L)).pollInterval(Duration.ofMillis(20L)).untilAsserted(() -> {
            Assertions.assertThat(atomicInteger.get()).isEqualTo(210);
        });
    }

    @Test
    void brokerPollPausedWithHighNumberInShardsButLowInFlight() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Assertions.assertThat(this.pc.getPausedPartitionSize()).isEqualTo(0);
        getKcu().produceMessages(this.topic, 200L, 5L);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.pc.poll(pollContext -> {
            try {
                atomicInteger.incrementAndGet();
                countDownLatch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        Awaitility.await().atMost(Duration.ofSeconds(10L)).pollInterval(Duration.ofMillis(200L)).until(() -> {
            return Boolean.valueOf(this.pc.getWm().getNumberOfWorkQueuedInShardsAwaitingSelection() == 195);
        });
        Awaitility.await().atMost(Duration.ofSeconds(10L)).pollInterval(Duration.ofMillis(20L)).untilAsserted(() -> {
            Assertions.assertThat(this.pc.getPausedPartitionSize()).isEqualTo(1);
        });
        ThreadUtils.sleepQuietly(1000);
        Assertions.assertThat(this.pc.getPausedPartitionSize()).isEqualTo(1);
        countDownLatch.countDown();
        Awaitility.await().atMost(Duration.ofSeconds(10L)).pollInterval(Duration.ofMillis(20L)).untilAsserted(() -> {
            Assertions.assertThat(atomicInteger.get()).isEqualTo(200);
            Assertions.assertThat(this.pc.getWm().getNumberRecordsOutForProcessing()).isEqualTo(0);
        });
        Awaitility.await().atMost(Duration.ofSeconds(10L)).pollInterval(Duration.ofMillis(20L)).untilAsserted(() -> {
            Assertions.assertThat(this.pc.getPausedPartitionSize()).isEqualTo(0);
        });
        getKcu().produceMessages(this.topic, 10L);
        Awaitility.await().atMost(Duration.ofSeconds(10L)).pollInterval(Duration.ofMillis(20L)).untilAsserted(() -> {
            Assertions.assertThat(atomicInteger.get()).isEqualTo(210);
        });
    }
}
