package io.confluent.parallelconsumer.offsets;

import io.confluent.csid.utils.JavaUtils;
import io.confluent.csid.utils.LatchTestUtils;
import io.confluent.csid.utils.ThreadUtils;
import io.confluent.csid.utils.TrimListRepresentation;
import io.confluent.parallelconsumer.FakeRuntimeError;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Isolated;
import org.junit.jupiter.api.parallel.ResourceAccessMode;
import org.junit.jupiter.api.parallel.ResourceLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Isolated
/* loaded from: input_file:io/confluent/parallelconsumer/offsets/OffsetEncodingBackPressureTest.class */
class OffsetEncodingBackPressureTest extends ParallelEoSStreamProcessorTestBase {
    private static final Logger log = LoggerFactory.getLogger(OffsetEncodingBackPressureTest.class);

    OffsetEncodingBackPressureTest() {
    }

    @Test
    @ResourceLock(value = "Value doesn't matter, just needs a constant", mode = ResourceAccessMode.READ_WRITE)
    void backPressureShouldPreventTooManyMessagesBeingQueuedForProcessing() throws OffsetDecodingError {
        this.parallelConsumer.setLongPollTimeout(Duration.ofMillis(200L));
        int i = OffsetMapCodecManager.DefaultMaxMetadataSize;
        OffsetMapCodecManager.DefaultMaxMetadataSize = 40;
        OffsetMapCodecManager.forcedCodec = Optional.of(OffsetEncoding.BitSetV2);
        this.ktu.send(this.consumerSpy, this.ktu.generateRecords(100));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        this.parallelConsumer.poll(consumerRecord -> {
            if (consumerRecord.offset() != 0) {
                ThreadUtils.sleepQuietly(1);
            } else {
                if (atomicInteger2.incrementAndGet() == 1) {
                    log.debug("force first message to 'never' complete, causing a large offset encoding (lots of messages completing above the low water mark");
                    LatchTestUtils.awaitLatch(countDownLatch, 60);
                    log.debug("very slow message awoken, throwing exception");
                    throw new FakeRuntimeError("Fake error");
                }
                log.debug("Second attempt, sleeping");
                LatchTestUtils.awaitLatch(countDownLatch2, 60);
                log.debug("Second attempt, unlocked, succeeding");
            }
            atomicInteger.getAndIncrement();
        });
        try {
            Assertions.useRepresentation(new TrimListRepresentation());
            ConditionFactory waitAtMost = Awaitility.waitAtMost(Duration.ofSeconds(120L));
            ParallelEoSStreamProcessor<String, String> parallelEoSStreamProcessor = this.parallelConsumer;
            Objects.requireNonNull(parallelEoSStreamProcessor);
            waitAtMost.failFast("PC died - check logs", parallelEoSStreamProcessor::isClosedOrFailed).pollInterval(1L, TimeUnit.SECONDS).untilAsserted(() -> {
                Assertions.assertThat(atomicInteger.get()).isEqualTo(99L);
            });
            waitForSomeLoopCycles(1);
            this.parallelConsumer.requestCommitAsap();
            waitForSomeLoopCycles(1);
            List<OffsetAndMetadata> extractAllPartitionsOffsetsAndMetadataSequentially = extractAllPartitionsOffsetsAndMetadataSequentially();
            Assertions.assertThat(extractAllPartitionsOffsetsAndMetadataSequentially).isNotEmpty();
            OffsetAndMetadata offsetAndMetadata = extractAllPartitionsOffsetsAndMetadataSequentially.get(extractAllPartitionsOffsetsAndMetadataSequentially.size() - 1);
            Assertions.assertThat(offsetAndMetadata.offset()).isZero();
            OffsetMapCodecManager.HighestOffsetAndIncompletes deserialiseIncompleteOffsetMapFromBase64 = OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(0L, offsetAndMetadata.metadata());
            Long highestSeenOffset = deserialiseIncompleteOffsetMapFromBase64.getHighestSeenOffset();
            Assertions.assertThat(deserialiseIncompleteOffsetMapFromBase64.getIncompleteOffsets()).isNotEmpty().contains(new Long[]{0L}).doesNotContain(new Long[]{1L, 50L, 99L});
            Assertions.assertThat(highestSeenOffset).isEqualTo(99L);
            WorkManager wm = this.parallelConsumer.getWm();
            Assertions.assertThat(wm.getPm().isBlocked(this.topicPartition)).isFalse();
            int i2 = 100 + (50 / 2);
            this.ktu.send(this.consumerSpy, this.ktu.generateRecords(50));
            waitForOneLoopCycle();
            Assertions.assertThat(wm.getPm().isAllowedMoreRecords(this.topicPartition)).isTrue();
            Awaitility.await().untilAsserted(() -> {
                Assertions.assertThat(atomicInteger.get()).isGreaterThan(i2);
            });
            waitForOneLoopCycle();
            int i3 = 125;
            Assertions.assertThat(wm.getPm().getHighestSeenOffset(this.topicPartition)).isGreaterThan(125);
            wm.getPm().getState(this.topicPartition);
            ((AbstractBooleanAssert) Assertions.assertThat(wm.getPm().isBlocked(this.topicPartition)).as("Partition SHOULD be blocked due to back pressure", new Object[0])).isTrue();
            Awaitility.await().untilAsserted(() -> {
                OffsetAndMetadata lastCommit = getLastCommit();
                Assertions.assertThat(lastCommit.offset()).isZero();
                OffsetMapCodecManager.HighestOffsetAndIncompletes deserialiseIncompleteOffsetMapFromBase642 = OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(0L, lastCommit.metadata());
                Assertions.assertThat(deserialiseIncompleteOffsetMapFromBase642.getIncompleteOffsets()).containsOnly(new Long[]{0L});
                Assertions.assertThat(deserialiseIncompleteOffsetMapFromBase642.getHighestSeenOffset()).isEqualTo(atomicInteger.get());
            });
            wm.getPm().setUSED_PAYLOAD_THRESHOLD_MULTIPLIER(2.0d);
            this.ktu.send(this.consumerSpy, this.ktu.generateRecords(125));
            Awaitility.await().atMost(Duration.ofSeconds(5L)).untilAsserted(() -> {
                Assertions.assertThat(atomicInteger.get()).isGreaterThan(i3);
            });
            Awaitility.await().untilAsserted(() -> {
                OffsetAndMetadata lastCommit = getLastCommit();
                Assertions.assertThat(lastCommit.offset()).isZero();
                Assertions.assertThat(lastCommit.metadata()).isBlank();
            });
            Duration ofMillis = Duration.ofMillis(100L);
            WorkContainer.setDefaultRetryDelay(ofMillis);
            countDownLatch.countDown();
            waitForOneLoopCycle();
            ThreadUtils.sleepQuietly(ofMillis.toMillis());
            Awaitility.await().until(() -> {
                return Boolean.valueOf(atomicInteger2.get() >= 2);
            });
            waitForOneLoopCycle();
            Awaitility.await().untilAsserted(() -> {
                Assertions.assertThat(wm.getPm().isAllowedMoreRecords(this.topicPartition)).isFalse();
            });
            countDownLatch2.countDown();
            waitForOneLoopCycle();
            Awaitility.await().untilAsserted(() -> {
                Assertions.assertThat(wm.getPm().isAllowedMoreRecords(this.topicPartition)).isTrue();
            });
            Awaitility.await().untilAsserted(() -> {
                Assertions.assertThat(extractAllPartitionsOffsetsSequentially()).contains(new Integer[]{Integer.valueOf(atomicInteger.get())});
            });
            Awaitility.await().untilAsserted(() -> {
                Assertions.assertThat(wm.getPm().isAllowedMoreRecords(this.topicPartition)).isTrue();
            });
            countDownLatch.countDown();
            countDownLatch2.countDown();
            OffsetMapCodecManager.DefaultMaxMetadataSize = i;
            OffsetMapCodecManager.forcedCodec = Optional.empty();
        } catch (Throwable th) {
            countDownLatch.countDown();
            countDownLatch2.countDown();
            OffsetMapCodecManager.DefaultMaxMetadataSize = i;
            OffsetMapCodecManager.forcedCodec = Optional.empty();
            throw th;
        }
    }

    private OffsetAndMetadata getLastCommit() {
        return (OffsetAndMetadata) ((Map) JavaUtils.getOnlyOne((Map) JavaUtils.getLast(getCommitHistory()).get()).get()).get(this.topicPartition);
    }
}
