package io.confluent.parallelconsumer.offsets;

import com.google.common.truth.Truth;
import io.confluent.parallelconsumer.ManagedTruth;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
import io.confluent.parallelconsumer.state.PartitionState;
import io.confluent.parallelconsumer.state.PartitionStateManager;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import one.util.streamex.LongStreamEx;
import one.util.streamex.StreamEx;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.ResourceAccessMode;
import org.junit.jupiter.api.parallel.ResourceLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;

/* loaded from: input_file:io/confluent/parallelconsumer/offsets/OffsetEncodingBackPressureUnitTest.class */
class OffsetEncodingBackPressureUnitTest extends ParallelEoSStreamProcessorTestBase {
    private static final Logger log = LoggerFactory.getLogger(OffsetEncodingBackPressureUnitTest.class);

    OffsetEncodingBackPressureUnitTest() {
    }

    @AfterAll
    static void cleanup() {
        PartitionStateManager.setUSED_PAYLOAD_THRESHOLD_MULTIPLIER(0.75d);
    }

    @Test
    @ResourceLock(value = "Value doesn't matter, just needs a constant", mode = ResourceAccessMode.READ_WRITE)
    void backPressureShouldPreventTooManyMessagesBeingQueuedForProcessing() throws OffsetDecodingError {
        int i = OffsetMapCodecManager.DefaultMaxMetadataSize;
        OffsetMapCodecManager.DefaultMaxMetadataSize = 40;
        OffsetMapCodecManager.forcedCodec = Optional.of(OffsetEncoding.BitSetV2);
        WorkManager<String, String> wm = this.parallelConsumer.getWm();
        PartitionState partitionState = wm.getPm().getPartitionState(this.topicPartition);
        sendRecordsToWM(100, wm);
        List of = UniLists.of(1L, 50L, 99L, 98L);
        List<Long> of2 = UniLists.of(0L, 2L);
        LongStreamEx.of(100L).filter(j -> {
            return !of2.contains(Long.valueOf(j));
        }).boxed().toList();
        List<WorkContainer<String, String>> workIfAvailable = wm.getWorkIfAvailable();
        ManagedTruth.assertTruth(workIfAvailable).hasSize(100);
        List list = (List) workIfAvailable.stream().filter(workContainer -> {
            return !of2.contains(Long.valueOf(workContainer.offset()));
        }).collect(Collectors.toList());
        Objects.requireNonNull(wm);
        list.forEach(wm::onSuccessResult);
        try {
            partitionState.getCommitDataIfDirty();
            ManagedTruth.assertTruth(partitionState).isAllowedMoreRecords();
            ManagedTruth.assertTruth(partitionState).getOffsetHighestSeen().isEqualTo(99);
            ManagedTruth.assertTruth(partitionState).getCommitDataIfDirty().hasOffsetEqualTo(0L);
            log.debug("// feed more messages in order to threshold block - as Bitset requires linearly as much space as we are feeding messages into it, it's guaranteed to block");
            sendRecordsToWM(50, wm);
            succeedExcept(wm, of2);
            partitionState.getCommitDataIfDirty();
            log.debug("// assert partition now blocked from threshold");
            ManagedTruth.assertTruth(partitionState).isNotAllowedMoreRecords();
            log.debug("// assert blocked, but can still write payload");
            ManagedTruth.assertTruth(partitionState).getCommitDataIfDirty().hasOffsetEqualTo(0L);
            ManagedTruth.assertTruth(partitionState).getOffsetHighestSeen().isEqualTo(Integer.valueOf((100 + 50) - 1));
            ManagedTruth.assertTruth(partitionState).getCommitDataIfDirty().getMetadata().isNotEmpty();
            ManagedTruth.assertTruth(partitionState).getAllIncompleteOffsets().containsNoneIn(of);
            ManagedTruth.assertWithMessage("The only incomplete record now is offset zero, which we are blocked on").that(partitionState).getAllIncompleteOffsets().containsExactlyElementsIn(of2);
            log.debug("// test max payload exceeded, payload dropped");
            int size = (50 + 100) - of2.size();
            int i2 = 100 + (50 / 2);
            log.debug("// messages already sent {}, sending {} more", Integer.valueOf(size), Integer.valueOf(i2));
            log.debug("// force system to allow more records (i.e. the actual system attempts to never allow the payload to grow this big)");
            PartitionStateManager.setUSED_PAYLOAD_THRESHOLD_MULTIPLIER(2.0d);
            unblock(wm, workIfAvailable, 2L);
            log.debug("// unlock to make state dirty to get a commit");
            partitionState.getCommitDataIfDirty();
            log.debug("// send {} more messages", Integer.valueOf(i2));
            sendRecordsToWM(i2, wm);
            succeedExcept(wm, UniLists.of(0L));
            log.debug("// assert payload missing from commit now");
            ManagedTruth.assertTruth(partitionState).getCommitDataIfDirty().hasOffsetEqualTo(0L);
            ManagedTruth.assertTruth(partitionState).getCommitDataIfDirty().getMetadata().isEmpty();
            log.debug("// test failed messages can retry");
            ManagedTruth.assertTruth(StreamEx.of(wm.getWorkIfAvailable()).map((v0) -> {
                return v0.offset();
            }).toList()).doesNotContain(0L);
            WorkContainer.setDefaultRetryDelay(Duration.ofMillis(100L));
            wm.onFailureResult(findWC(workIfAvailable, 0L));
            ManagedTruth.assertTruth(StreamEx.of(wm.getWorkIfAvailable()).map((v0) -> {
                return v0.offset();
            }).toList()).contains(0L);
            unblock(wm, workIfAvailable, 0L);
            partitionState.getCommitDataIfDirty();
            ManagedTruth.assertTruth(partitionState).isAllowedMoreRecords();
            ManagedTruth.assertTruth(partitionState).getCommitDataIfDirty().getOffset().isEqualTo(Integer.valueOf(size + i2 + 2));
            ManagedTruth.assertTruth(partitionState).isAllowedMoreRecords();
            OffsetMapCodecManager.DefaultMaxMetadataSize = i;
            OffsetMapCodecManager.forcedCodec = Optional.empty();
        } catch (Throwable th) {
            OffsetMapCodecManager.DefaultMaxMetadataSize = i;
            OffsetMapCodecManager.forcedCodec = Optional.empty();
            throw th;
        }
    }

    private void succeedExcept(WorkManager<String, String> workManager, List<Long> list) {
        List list2 = (List) workManager.getWorkIfAvailable().stream().filter(workContainer -> {
            return !list.contains(Long.valueOf(workContainer.offset()));
        }).collect(Collectors.toList());
        Objects.requireNonNull(workManager);
        list2.forEach(workManager::onSuccessResult);
    }

    private void unblock(WorkManager<String, String> workManager, List<WorkContainer<String, String>> list, long j) {
        workManager.onSuccessResult(findWC(list, j));
    }

    private WorkContainer<String, String> findWC(List<WorkContainer<String, String>> list, long j) {
        return list.stream().filter(workContainer -> {
            return workContainer.offset() == j;
        }).findFirst().get();
    }

    private void sendRecordsToWM(int i, WorkManager<String, String> workManager) {
        log.debug("~Sending {} more records", Integer.valueOf(i));
        workManager.registerWork(new EpochAndRecordsMap(new ConsumerRecords(UniMaps.of(this.topicPartition, this.ktu.generateRecords(i))), workManager.getPm()));
        Truth.assertThat(Long.valueOf(workManager.getNumberOfWorkQueuedInShardsAwaitingSelection())).isEqualTo(Integer.valueOf(i));
    }
}
