/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.offsets;

import com.google.common.truth.Truth;
import com.google.common.truth.Truth8;
import io.confluent.csid.utils.JavaUtils;
import io.confluent.csid.utils.LatchTestUtils;
import io.confluent.csid.utils.ThreadUtils;
import io.confluent.parallelconsumer.FakeRuntimeException;
import io.confluent.parallelconsumer.ManagedTruth;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.offsets.OffsetDecodingError;
import io.confluent.parallelconsumer.offsets.OffsetEncoding;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import io.confluent.parallelconsumer.state.PartitionState;
import io.confluent.parallelconsumer.state.PartitionStateManager;
import io.confluent.parallelconsumer.state.ShardManager;
import io.confluent.parallelconsumer.state.WorkManager;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Isolated;
import org.junit.jupiter.api.parallel.ResourceAccessMode;
import org.junit.jupiter.api.parallel.ResourceLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

@Isolated
class OffsetEncodingBackPressureTest
extends ParallelEoSStreamProcessorTestBase {
    private static final Logger log = LoggerFactory.getLogger(OffsetEncodingBackPressureTest.class);

    OffsetEncodingBackPressureTest() {
    }

    @AfterAll
    static void cleanup() {
        PartitionStateManager.setUSED_PAYLOAD_THRESHOLD_MULTIPLIER((double)0.75);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @ResourceLock(value="Value doesn't matter, just needs a constant", mode=ResourceAccessMode.READ_WRITE)
    void backPressureShouldPreventTooManyMessagesBeingQueuedForProcessing() throws OffsetDecodingError {
        int numberOfRecordsToPrimeWith = 100;
        this.parallelConsumer.setTimeBetweenCommits(Duration.ofSeconds(1L));
        int realMax = OffsetMapCodecManager.DefaultMaxMetadataSize;
        OffsetMapCodecManager.DefaultMaxMetadataSize = 40;
        OffsetMapCodecManager.forcedCodec = Optional.of(OffsetEncoding.BitSetV2);
        List<ConsumerRecord<String, String>> records = this.ktu.generateRecords(100);
        this.ktu.send((MockConsumer<String, String>)this.consumerSpy, records);
        AtomicInteger userFuncFinishedCount = new AtomicInteger();
        AtomicInteger userFuncStartCount = new AtomicInteger();
        CountDownLatch finalMsgLock = new CountDownLatch(1);
        CountDownLatch msgLockTwo = new CountDownLatch(1);
        CountDownLatch msgLockThree = new CountDownLatch(1);
        AtomicInteger attempts = new AtomicInteger(0);
        long offsetToBlock = 0L;
        List blockedOffsets = UniLists.of((Object)0L, (Object)2L);
        int numberOfBlockedMessages = blockedOffsets.size();
        WorkManager wm = this.parallelConsumer.getWm();
        PartitionState partitionState = wm.getPm().getPartitionState(this.topicPartition);
        ConcurrentLinkedQueue seen = new ConcurrentLinkedQueue();
        this.parallelConsumer.poll(recordContext -> {
            log.debug("Processing {}", (Object)recordContext.offset());
            seen.add(recordContext.offset());
            userFuncStartCount.incrementAndGet();
            if (recordContext.offset() == offsetToBlock) {
                int attemptNumber = attempts.incrementAndGet();
                if (attemptNumber == 1) {
                    log.debug("Force first message to 'never' complete, causing a large offset encoding (lots of messages completing above the low water mark. Waiting for msgLock countdown.");
                    int timeout = 120;
                    LatchTestUtils.awaitLatch(finalMsgLock, timeout);
                    log.debug("Very slow message awoken, throwing exception");
                    throw new FakeRuntimeException("Fake error");
                }
                log.debug("Second attempt, waiting for msgLockTwo countdown");
                LatchTestUtils.awaitLatch(msgLockTwo, 60);
                log.debug("Second attempt, unlocked, succeeding");
            } else if (recordContext.offset() == 2L) {
                LatchTestUtils.awaitLatch(msgLockThree);
                log.debug("// msg 2L unblocked");
            } else {
                ThreadUtils.sleepQuietly(1);
            }
            userFuncFinishedCount.incrementAndGet();
        });
        ShardManager sm = wm.getSm();
        try {
            Awaitility.waitAtMost((Duration)Duration.ofSeconds(120L)).failFast("PC died - check logs", () -> ((ParallelEoSStreamProcessor)this.parallelConsumer).isClosedOrFailed()).pollInterval(1L, TimeUnit.SECONDS).untilAsserted(() -> Assertions.assertThat((int)userFuncFinishedCount.get()).isEqualTo(100 - numberOfBlockedMessages));
            this.awaitForSomeLoopCycles(1);
            this.parallelConsumer.requestCommitAsap();
            this.awaitForSomeLoopCycles(1);
            this.assertThatConsumer("Initial commit has been executed").hasCommittedToAnyPartition().offset(0L);
            List<OffsetAndMetadata> offsetAndMetadataList = this.extractAllPartitionsOffsetsAndMetadataSequentially();
            OffsetAndMetadata mostRecentCommit = (OffsetAndMetadata)JavaUtils.getLast(offsetAndMetadataList).get();
            Assertions.assertThat((long)mostRecentCommit.offset()).isZero();
            String metadata = mostRecentCommit.metadata();
            OffsetMapCodecManager.HighestOffsetAndIncompletes decodedOffsetPayload = OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64((long)0L, (String)metadata);
            Long highestSeenOffset = (Long)decodedOffsetPayload.getHighestSeenOffset().get();
            SortedSet incompletes = decodedOffsetPayload.getIncompleteOffsets();
            ((AbstractCollectionAssert)((AbstractCollectionAssert)Assertions.assertThat((Collection)incompletes).isNotEmpty()).contains((Object[])new Long[]{offsetToBlock})).doesNotContain((Object[])new Long[]{1L, 50L, 99L, 100L - (long)numberOfBlockedMessages});
            int expectedHighestSeenOffset = 99;
            ((AbstractLongAssert)Assertions.assertThat((Long)highestSeenOffset).as("offset 99 is encoded as having been seen", new Object[0])).isEqualTo((long)expectedHighestSeenOffset);
            ManagedTruth.assertTruth(partitionState).isAllowedMoreRecords();
            log.debug("// feed more messages in order to threshold block - as Bitset requires linearly as much space as we are feeding messages into it, it's guaranteed to block");
            int bytesNeededToCrossThreshold = 5;
            int extraRecordsToBlockWithThresholdBlocks = 8 * bytesNeededToCrossThreshold;
            ManagedTruth.assertTruth(partitionState).isAllowedMoreRecords();
            this.ktu.send((MockConsumer<String, String>)this.consumerSpy, this.ktu.generateRecords(extraRecordsToBlockWithThresholdBlocks));
            this.awaitForOneLoopCycle();
            log.debug("// assert partition now blocked from threshold");
            Awaitility.waitAtMost((Duration)Duration.ofSeconds(10L)).untilAsserted(() -> ManagedTruth.assertWithMessage("Partition SHOULD be blocked due to back pressure").that(partitionState).isBlocked());
            Long partitionOffsetHighWaterMarks = wm.getPm().getHighestSeenOffset(this.topicPartition);
            Assertions.assertThat((Long)partitionOffsetHighWaterMarks).isGreaterThan(100L);
            this.parallelConsumer.requestCommitAsap();
            this.awaitForOneLoopCycle();
            log.debug("// assert blocked, but can still write payload");
            Awaitility.waitAtMost((Duration)defaultTimeout).untilAsserted(() -> {
                OffsetAndMetadata partitionCommit = this.getLastCommit();
                Assertions.assertThat((long)partitionCommit.offset()).isZero();
                String meta = partitionCommit.metadata();
                OffsetMapCodecManager.HighestOffsetAndIncompletes incompletes = OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64((long)0L, (String)meta);
                Truth.assertWithMessage((String)"The only incomplete record now is offset zero, which we are blocked on").that((Iterable)incompletes.getIncompleteOffsets()).containsExactlyElementsIn((Iterable)blockedOffsets);
                int expectedHighestSeen = 100 + extraRecordsToBlockWithThresholdBlocks - 1;
                Truth8.assertThat((Optional)incompletes.getHighestSeenOffset()).hasValue((Object)expectedHighestSeen);
            });
            log.debug("// test max payload exceeded, payload dropped");
            log.debug("Force system to allow more records to be processed beyond the safety threshold setting (i.e. the actual system attempts to never allow the payload to grow this big) i.e. effectively this disables blocking mechanism for the partition");
            PartitionStateManager.setUSED_PAYLOAD_THRESHOLD_MULTIPLIER((double)30.0);
            OffsetMapCodecManager.DefaultMaxMetadataSize = 30;
            log.debug("// unlock record to make the state dirty to get a commit");
            msgLockThree.countDown();
            this.parallelConsumer.requestCommitAsap();
            this.awaitForSomeLoopCycles(2);
            ManagedTruth.assertTruth(partitionState).isBlocked();
            log.debug("// assert payload missing from commit now");
            Awaitility.await().untilAsserted(() -> {
                ManagedTruth.assertTruth(partitionState).isBlocked();
                OffsetAndMetadata partitionCommit = this.getLastCommit();
                ManagedTruth.assertTruth(partitionCommit).hasOffsetEqualTo(0L);
                ManagedTruth.assertTruth(partitionCommit).getMetadata().isEmpty();
            });
            log.debug("Test that failed messages can retry, causing partition to un-block");
            finalMsgLock.countDown();
            this.awaitForOneLoopCycle();
            ThreadUtils.sleepQuietly(ParallelConsumerOptions.DEFAULT_STATIC_RETRY_DELAY.toMillis());
            Awaitility.await().until(() -> attempts.get() >= 2);
            this.awaitForOneLoopCycle();
            Awaitility.await().untilAsserted(() -> Assertions.assertThat((boolean)wm.getPm().isAllowedMoreRecords(this.topicPartition)).isFalse());
            msgLockTwo.countDown();
            this.awaitForOneLoopCycle();
            Awaitility.await().untilAsserted(() -> ManagedTruth.assertTruth(partitionState).isAllowedMoreRecords());
            Awaitility.await().untilAsserted(() -> {
                List<Integer> offsets = this.extractAllPartitionsOffsetsSequentially(false);
                Assertions.assertThat(offsets).contains((Object[])new Integer[]{userFuncFinishedCount.get()});
            });
            Awaitility.await().untilAsserted(() -> ManagedTruth.assertTruth(partitionState).isAllowedMoreRecords());
        }
        finally {
            OffsetMapCodecManager.DefaultMaxMetadataSize = realMax;
            OffsetMapCodecManager.forcedCodec = Optional.empty();
        }
    }

    private OffsetAndMetadata getLastCommit() {
        List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> commitHistory = this.getCommitHistory();
        Map lastCommit = (Map)JavaUtils.getLast(commitHistory).get();
        Map allPartitionCommits = (Map)JavaUtils.getOnlyOne((Map)lastCommit).get();
        return (OffsetAndMetadata)allPartitionCommits.get(this.topicPartition);
    }
}

