package io.confluent.parallelconsumer.offsets;

import com.google.common.truth.Truth;
import com.google.common.truth.Truth8;
import io.confluent.csid.utils.JavaUtils;
import io.confluent.csid.utils.LatchTestUtils;
import io.confluent.csid.utils.ThreadUtils;
import io.confluent.parallelconsumer.FakeRuntimeException;
import io.confluent.parallelconsumer.ManagedTruth;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import io.confluent.parallelconsumer.state.PartitionState;
import io.confluent.parallelconsumer.state.PartitionStateManager;
import io.confluent.parallelconsumer.state.WorkManager;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Isolated;
import org.junit.jupiter.api.parallel.ResourceAccessMode;
import org.junit.jupiter.api.parallel.ResourceLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

@Isolated
/* loaded from: input_file:io/confluent/parallelconsumer/offsets/OffsetEncodingBackPressureTest.class */
class OffsetEncodingBackPressureTest extends ParallelEoSStreamProcessorTestBase {
    private static final Logger log = LoggerFactory.getLogger(OffsetEncodingBackPressureTest.class);

    OffsetEncodingBackPressureTest() {
    }

    @AfterAll
    static void cleanup() {
        PartitionStateManager.setUSED_PAYLOAD_THRESHOLD_MULTIPLIER(0.75d);
    }

    @Test
    @ResourceLock(value = "Value doesn't matter, just needs a constant", mode = ResourceAccessMode.READ_WRITE)
    void backPressureShouldPreventTooManyMessagesBeingQueuedForProcessing() throws OffsetDecodingError {
        this.parallelConsumer.setTimeBetweenCommits(Duration.ofSeconds(1L));
        int i = OffsetMapCodecManager.DefaultMaxMetadataSize;
        OffsetMapCodecManager.DefaultMaxMetadataSize = 40;
        OffsetMapCodecManager.forcedCodec = Optional.of(OffsetEncoding.BitSetV2);
        this.ktu.send(this.consumerSpy, this.ktu.generateRecords(100));
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        AtomicInteger atomicInteger3 = new AtomicInteger(0);
        long j = 0;
        List of = UniLists.of(0L, 2L);
        int size = of.size();
        WorkManager wm = this.parallelConsumer.getWm();
        PartitionState partitionState = wm.getPm().getPartitionState(this.topicPartition);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        this.parallelConsumer.poll(pollContext -> {
            log.debug("Processing {}", Long.valueOf(pollContext.offset()));
            concurrentLinkedQueue.add(Long.valueOf(pollContext.offset()));
            atomicInteger2.incrementAndGet();
            if (pollContext.offset() == j) {
                if (atomicInteger3.incrementAndGet() == 1) {
                    log.debug("Force first message to 'never' complete, causing a large offset encoding (lots of messages completing above the low water mark. Waiting for msgLock countdown.");
                    LatchTestUtils.awaitLatch(countDownLatch, 120);
                    log.debug("Very slow message awoken, throwing exception");
                    throw new FakeRuntimeException("Fake error");
                }
                log.debug("Second attempt, waiting for msgLockTwo countdown");
                LatchTestUtils.awaitLatch(countDownLatch2, 60);
                log.debug("Second attempt, unlocked, succeeding");
            } else if (pollContext.offset() == 2) {
                LatchTestUtils.awaitLatch(countDownLatch3);
                log.debug("// msg 2L unblocked");
            } else {
                ThreadUtils.sleepQuietly(1);
            }
            atomicInteger.incrementAndGet();
        });
        wm.getSm();
        try {
            ConditionFactory waitAtMost = Awaitility.waitAtMost(Duration.ofSeconds(120L));
            ParallelEoSStreamProcessor<String, String> parallelEoSStreamProcessor = this.parallelConsumer;
            Objects.requireNonNull(parallelEoSStreamProcessor);
            waitAtMost.failFast("PC died - check logs", parallelEoSStreamProcessor::isClosedOrFailed).pollInterval(1L, TimeUnit.SECONDS).untilAsserted(() -> {
                Assertions.assertThat(atomicInteger.get()).isEqualTo(100 - size);
            });
            awaitForSomeLoopCycles(1);
            this.parallelConsumer.requestCommitAsap();
            awaitForSomeLoopCycles(1);
            assertThatConsumer("Initial commit has been executed").hasCommittedToAnyPartition().offset(0L);
            OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata) JavaUtils.getLast(extractAllPartitionsOffsetsAndMetadataSequentially()).get();
            Assertions.assertThat(offsetAndMetadata.offset()).isZero();
            OffsetMapCodecManager.HighestOffsetAndIncompletes deserialiseIncompleteOffsetMapFromBase64 = OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(0L, offsetAndMetadata.metadata());
            Long l = (Long) deserialiseIncompleteOffsetMapFromBase64.getHighestSeenOffset().get();
            Assertions.assertThat(deserialiseIncompleteOffsetMapFromBase64.getIncompleteOffsets()).isNotEmpty().contains(new Long[]{0L}).doesNotContain(new Long[]{1L, 50L, 99L, Long.valueOf(100 - size)});
            Assertions.assertThat(l).as("offset 99 is encoded as having been seen", new Object[0]).isEqualTo(99);
            ManagedTruth.assertTruth(partitionState).isAllowedMoreRecords();
            log.debug("// feed more messages in order to threshold block - as Bitset requires linearly as much space as we are feeding messages into it, it's guaranteed to block");
            int i2 = 8 * 5;
            ManagedTruth.assertTruth(partitionState).isAllowedMoreRecords();
            this.ktu.send(this.consumerSpy, this.ktu.generateRecords(i2));
            awaitForOneLoopCycle();
            log.debug("// assert partition now blocked from threshold");
            Awaitility.waitAtMost(Duration.ofSeconds(10L)).untilAsserted(() -> {
                ManagedTruth.assertWithMessage("Partition SHOULD be blocked due to back pressure").that(partitionState).isBlocked();
            });
            Assertions.assertThat(Long.valueOf(wm.getPm().getHighestSeenOffset(this.topicPartition))).isGreaterThan(100L);
            this.parallelConsumer.requestCommitAsap();
            awaitForOneLoopCycle();
            log.debug("// assert blocked, but can still write payload");
            Awaitility.waitAtMost(defaultTimeout).untilAsserted(() -> {
                OffsetAndMetadata lastCommit = getLastCommit();
                Assertions.assertThat(lastCommit.offset()).isZero();
                OffsetMapCodecManager.HighestOffsetAndIncompletes deserialiseIncompleteOffsetMapFromBase642 = OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(0L, lastCommit.metadata());
                Truth.assertWithMessage("The only incomplete record now is offset zero, which we are blocked on").that(deserialiseIncompleteOffsetMapFromBase642.getIncompleteOffsets()).containsExactlyElementsIn(of);
                Truth8.assertThat(deserialiseIncompleteOffsetMapFromBase642.getHighestSeenOffset()).hasValue(Integer.valueOf((100 + i2) - 1));
            });
            log.debug("// test max payload exceeded, payload dropped");
            log.debug("Force system to allow more records to be processed beyond the safety threshold setting (i.e. the actual system attempts to never allow the payload to grow this big) i.e. effectively this disables blocking mechanism for the partition");
            PartitionStateManager.setUSED_PAYLOAD_THRESHOLD_MULTIPLIER(30.0d);
            OffsetMapCodecManager.DefaultMaxMetadataSize = 30;
            log.debug("// unlock record to make the state dirty to get a commit");
            countDownLatch3.countDown();
            this.parallelConsumer.requestCommitAsap();
            awaitForSomeLoopCycles(2);
            ManagedTruth.assertTruth(partitionState).isBlocked();
            log.debug("// assert payload missing from commit now");
            Awaitility.await().untilAsserted(() -> {
                ManagedTruth.assertTruth(partitionState).isBlocked();
                OffsetAndMetadata lastCommit = getLastCommit();
                ManagedTruth.assertTruth(lastCommit).hasOffsetEqualTo(0L);
                ManagedTruth.assertTruth(lastCommit).getMetadata().isEmpty();
            });
            log.debug("Test that failed messages can retry, causing partition to un-block");
            countDownLatch.countDown();
            awaitForOneLoopCycle();
            ThreadUtils.sleepQuietly(ParallelConsumerOptions.DEFAULT_STATIC_RETRY_DELAY.toMillis());
            Awaitility.await().until(() -> {
                return Boolean.valueOf(atomicInteger3.get() >= 2);
            });
            awaitForOneLoopCycle();
            Awaitility.await().untilAsserted(() -> {
                Assertions.assertThat(wm.getPm().isAllowedMoreRecords(this.topicPartition)).isFalse();
            });
            countDownLatch2.countDown();
            awaitForOneLoopCycle();
            Awaitility.await().untilAsserted(() -> {
                ManagedTruth.assertTruth(partitionState).isAllowedMoreRecords();
            });
            Awaitility.await().untilAsserted(() -> {
                Assertions.assertThat(extractAllPartitionsOffsetsSequentially(false)).contains(new Integer[]{Integer.valueOf(atomicInteger.get())});
            });
            Awaitility.await().untilAsserted(() -> {
                ManagedTruth.assertTruth(partitionState).isAllowedMoreRecords();
            });
            OffsetMapCodecManager.DefaultMaxMetadataSize = i;
            OffsetMapCodecManager.forcedCodec = Optional.empty();
        } catch (Throwable th) {
            OffsetMapCodecManager.DefaultMaxMetadataSize = i;
            OffsetMapCodecManager.forcedCodec = Optional.empty();
            throw th;
        }
    }

    private OffsetAndMetadata getLastCommit() {
        return (OffsetAndMetadata) ((Map) JavaUtils.getOnlyOne((Map) JavaUtils.getLast(getCommitHistory()).get()).get()).get(this.topicPartition);
    }
}
