package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.utils.ThreadUtils;
import io.confluent.parallelconsumer.PCRetriableException;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.shaded.org.awaitility.core.ConditionFactory;
import org.testcontainers.shaded.org.hamcrest.Matchers;
import pl.tlinkowski.unij.api.UniSets;

/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/RetriesTest.class */
public class RetriesTest extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(RetriesTest.class);
    Consumer<String, String> consumer;
    ParallelConsumerOptions<String, String> pcOpts;
    ParallelEoSStreamProcessor<String, String> pc;

    @BeforeEach
    void setUp() {
        setupTopic();
        this.consumer = getKcu().createNewConsumer(KafkaClientUtils.GroupOption.NEW_GROUP);
        this.pcOpts = ParallelConsumerOptions.builder().consumer(this.consumer).ordering(ParallelConsumerOptions.ProcessingOrder.KEY).maxConcurrency(100).defaultMessageRetryDelay(Duration.ofMillis(200L)).messageBufferSize(KafkaClientUtils.MAX_POLL_RECORDS).build();
        this.pc = new ParallelEoSStreamProcessor<>(this.pcOpts);
        this.pc.subscribe(UniSets.of(this.topic));
    }

    @Test
    void awaitingWorkContainersSizeDoesNotExceedNumberOfFailedContainersInRetryLoop() {
        String str = "throw";
        CountDownLatch countDownLatch = new CountDownLatch(1);
        getKcu().produceMessagesWithThrowHeader(this.topic, 4000L);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        this.pc.poll(pollContext -> {
            ThreadUtils.sleepQuietly(7);
            if (atomicBoolean.get() && pollContext.getSingleConsumerRecord().headers().lastHeader(str) != null) {
                throw new PCRetriableException("THROW_EXCEPTION_FLAG_HAPPENED");
            }
            atomicInteger.incrementAndGet();
        });
        ConditionFactory pollInterval = Awaitility.await().atMost(Duration.ofSeconds(10L)).pollInterval(Duration.ofMillis(100L));
        Objects.requireNonNull(atomicInteger);
        pollInterval.until(atomicInteger::get, Matchers.is(Matchers.equalTo(2000)));
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean3 = new AtomicBoolean(true);
        new Thread(() -> {
            while (atomicBoolean3.get()) {
                try {
                    try {
                        long numberOfWorkQueuedInShardsAwaitingSelection = this.pc.getWm().getSm().getNumberOfWorkQueuedInShardsAwaitingSelection();
                        log.debug("NumberOfWorkQueuedInShardsAwaitingSelection : {}", Long.valueOf(numberOfWorkQueuedInShardsAwaitingSelection));
                        Assertions.assertThat(numberOfWorkQueuedInShardsAwaitingSelection).isBetween(0L, 2000L);
                        ThreadUtils.sleepQuietly(RandomUtils.nextInt(1, 10));
                    } catch (AssertionError e) {
                        atomicBoolean2.set(true);
                        throw e;
                    }
                } finally {
                    countDownLatch.countDown();
                }
            }
        }).start();
        ThreadUtils.sleepQuietly(3000);
        atomicBoolean.set(false);
        ThreadUtils.sleepQuietly(2000);
        atomicBoolean3.set(false);
        countDownLatch.await();
        Assertions.assertThat(atomicBoolean2.get()).isFalse();
    }
}
