/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.offsets;

import com.google.common.truth.Truth;
import io.confluent.parallelconsumer.ManagedTruth;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
import io.confluent.parallelconsumer.offsets.OffsetDecodingError;
import io.confluent.parallelconsumer.offsets.OffsetEncoding;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import io.confluent.parallelconsumer.state.PartitionState;
import io.confluent.parallelconsumer.state.PartitionStateManager;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import one.util.streamex.LongStreamEx;
import one.util.streamex.StreamEx;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.ResourceAccessMode;
import org.junit.jupiter.api.parallel.ResourceLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;

class OffsetEncodingBackPressureUnitTest
extends ParallelEoSStreamProcessorTestBase {
    private static final Logger log = LoggerFactory.getLogger(OffsetEncodingBackPressureUnitTest.class);

    OffsetEncodingBackPressureUnitTest() {
    }

    @AfterAll
    static void cleanup() {
        PartitionStateManager.setUSED_PAYLOAD_THRESHOLD_MULTIPLIER((double)0.75);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @ResourceLock(value="Value doesn't matter, just needs a constant", mode=ResourceAccessMode.READ_WRITE)
    void backPressureShouldPreventTooManyMessagesBeingQueuedForProcessing() throws OffsetDecodingError {
        int numberOfRecords = 100;
        int realMax = OffsetMapCodecManager.DefaultMaxMetadataSize;
        OffsetMapCodecManager.DefaultMaxMetadataSize = 40;
        OffsetMapCodecManager.forcedCodec = Optional.of(OffsetEncoding.BitSetV2);
        WorkManager wm = this.parallelConsumer.getWm();
        PartitionStateManager pm = wm.getPm();
        PartitionState partitionState = pm.getPartitionState(this.topicPartition);
        this.sendRecordsToWM(100, (WorkManager<String, String>)wm);
        int numberOfBlockedMessages = 2;
        List samplingOfShouldBeCompleteOffsets = UniLists.of((Object)1L, (Object)50L, (Object)99L, (Object)98L);
        List blockedOffsets = UniLists.of((Object)0L, (Object)2L);
        List completes = LongStreamEx.of((long)100L).filter(x -> !blockedOffsets.contains(x)).boxed().toList();
        List workIfAvailable = wm.getWorkIfAvailable();
        ManagedTruth.assertTruth(workIfAvailable).hasSize(100);
        List<WorkContainer> toSucceed = workIfAvailable.stream().filter(x -> !blockedOffsets.contains(x.offset())).collect(Collectors.toList());
        toSucceed.forEach(arg_0 -> ((WorkManager)wm).onSuccessResult(arg_0));
        try {
            Optional commitDataIfDirty = partitionState.getCommitDataIfDirty();
            ManagedTruth.assertTruth(partitionState).isAllowedMoreRecords();
            int expectedHighestSeenOffset = 99;
            ManagedTruth.assertTruth(partitionState).getOffsetHighestSeen().isEqualTo((Object)expectedHighestSeenOffset);
            ManagedTruth.assertTruth(partitionState).getCommitDataIfDirty().hasOffsetEqualTo(0L);
            log.debug("// feed more messages in order to threshold block - as Bitset requires linearly as much space as we are feeding messages into it, it's guaranteed to block");
            int extraRecordsToBlockWithThresholdBlocks = 50;
            this.sendRecordsToWM(extraRecordsToBlockWithThresholdBlocks, (WorkManager<String, String>)wm);
            this.succeedExcept((WorkManager<String, String>)wm, blockedOffsets);
            Optional commitDataIfDirty2 = partitionState.getCommitDataIfDirty();
            log.debug("// assert partition now blocked from threshold");
            ManagedTruth.assertTruth(partitionState).isNotAllowedMoreRecords();
            log.debug("// assert blocked, but can still write payload");
            ManagedTruth.assertTruth(partitionState).getCommitDataIfDirty().hasOffsetEqualTo(0L);
            ManagedTruth.assertTruth(partitionState).getOffsetHighestSeen().isEqualTo((Object)(100 + extraRecordsToBlockWithThresholdBlocks - 1));
            ManagedTruth.assertTruth(partitionState).getCommitDataIfDirty().getMetadata().isNotEmpty();
            ManagedTruth.assertTruth(partitionState).getAllIncompleteOffsets().containsNoneIn((Iterable)samplingOfShouldBeCompleteOffsets);
            ManagedTruth.assertWithMessage("The only incomplete record now is offset zero, which we are blocked on").that(partitionState).getAllIncompleteOffsets().containsExactlyElementsIn((Iterable)blockedOffsets);
            log.debug("// test max payload exceeded, payload dropped");
            int processedBeforePartitionBlock = extraRecordsToBlockWithThresholdBlocks + 100 - blockedOffsets.size();
            int extraMessages = 100 + extraRecordsToBlockWithThresholdBlocks / 2;
            log.debug("// messages already sent {}, sending {} more", (Object)processedBeforePartitionBlock, (Object)extraMessages);
            log.debug("// force system to allow more records (i.e. the actual system attempts to never allow the payload to grow this big)");
            PartitionStateManager.setUSED_PAYLOAD_THRESHOLD_MULTIPLIER((double)2.0);
            this.unblock((WorkManager<String, String>)wm, workIfAvailable, 2L);
            log.debug("// unlock to make state dirty to get a commit");
            Optional commitDataIfDirty3 = partitionState.getCommitDataIfDirty();
            log.debug("// send {} more messages", (Object)extraMessages);
            this.sendRecordsToWM(extraMessages, (WorkManager<String, String>)wm);
            this.succeedExcept((WorkManager<String, String>)wm, UniLists.of((Object)0L));
            log.debug("// assert payload missing from commit now");
            ManagedTruth.assertTruth(partitionState).getCommitDataIfDirty().hasOffsetEqualTo(0L);
            ManagedTruth.assertTruth(partitionState).getCommitDataIfDirty().getMetadata().isEmpty();
            log.debug("// test failed messages can retry");
            List workIfAvailable1 = StreamEx.of((Collection)wm.getWorkIfAvailable()).map(WorkContainer::offset).toList();
            ManagedTruth.assertTruth(workIfAvailable1).doesNotContain((Object)0L);
            wm.onFailureResult(this.findWC(workIfAvailable, 0L));
            workIfAvailable1 = StreamEx.of((Collection)wm.getWorkIfAvailable()).map(WorkContainer::offset).toList();
            ManagedTruth.assertTruth(workIfAvailable1).contains((Object)0L);
            this.unblock((WorkManager<String, String>)wm, workIfAvailable, 0L);
            commitDataIfDirty3 = partitionState.getCommitDataIfDirty();
            ManagedTruth.assertTruth(partitionState).isAllowedMoreRecords();
            ManagedTruth.assertTruth(partitionState).getCommitDataIfDirty().getOffset().isEqualTo((Object)(processedBeforePartitionBlock + extraMessages + 2));
            ManagedTruth.assertTruth(partitionState).isAllowedMoreRecords();
        }
        finally {
            OffsetMapCodecManager.DefaultMaxMetadataSize = realMax;
            OffsetMapCodecManager.forcedCodec = Optional.empty();
        }
    }

    private void succeedExcept(WorkManager<String, String> wm, List<Long> incomplete) {
        List workIfAvailable = wm.getWorkIfAvailable();
        List<WorkContainer> toSucceed = workIfAvailable.stream().filter(x -> !incomplete.contains(x.offset())).collect(Collectors.toList());
        toSucceed.forEach(arg_0 -> wm.onSuccessResult(arg_0));
    }

    private void unblock(WorkManager<String, String> wm, List<WorkContainer<String, String>> from, long offsetToUnblock) {
        WorkContainer<String, String> unblock = this.findWC(from, offsetToUnblock);
        wm.onSuccessResult(unblock);
    }

    private WorkContainer<String, String> findWC(List<WorkContainer<String, String>> from, long offsetToUnblock) {
        return from.stream().filter(x -> x.offset() == offsetToUnblock).findFirst().get();
    }

    private void sendRecordsToWM(int numberOfRecords, WorkManager<String, String> wm) {
        log.debug("~Sending {} more records", (Object)numberOfRecords);
        List<ConsumerRecord<String, String>> records = this.ktu.generateRecords(numberOfRecords);
        wm.registerWork(new EpochAndRecordsMap(new ConsumerRecords(UniMaps.of((Object)this.topicPartition, records)), wm.getPm()));
        Truth.assertThat((Long)wm.getNumberOfWorkQueuedInShardsAwaitingSelection()).isEqualTo((Object)numberOfRecords);
    }
}

