package io.confluent.parallelconsumer.state;

import io.confluent.csid.utils.AdvancingWallClockProvider;
import io.confluent.csid.utils.KafkaTestUtils;
import io.confluent.csid.utils.ProgressTracker;
import io.confluent.csid.utils.Range;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.TreeMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.assertj.core.api.AbstractListAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

/* loaded from: input_file:io/confluent/parallelconsumer/state/WorkManagerTest.class */
class WorkManagerTest {
    private static final Logger log = LoggerFactory.getLogger(WorkManagerTest.class);
    public static final String INPUT_TOPIC = "input";
    public static final String OUTPUT_TOPIC = "output";
    WorkManager<String, String> wm;
    int offset;
    Instant time = Instant.now();
    AdvancingWallClockProvider clock = new AdvancingWallClockProvider() { // from class: io.confluent.parallelconsumer.state.WorkManagerTest.1
        public Instant getNow() {
            return WorkManagerTest.this.time;
        }
    };
    protected List<WorkContainer<String, String>> successfulWork = new ArrayList();

    /* loaded from: input_file:io/confluent/parallelconsumer/state/WorkManagerTest$FluentQueue.class */
    static class FluentQueue<T> implements Iterable<T> {
        ArrayDeque<T> work = new ArrayDeque<>();

        FluentQueue() {
        }

        Collection<T> add(Collection<T> collection) {
            this.work.addAll(collection);
            return collection;
        }

        public T poll() {
            return this.work.poll();
        }

        @Override // java.lang.Iterable
        public Iterator<T> iterator() {
            return this.work.iterator();
        }

        public int size() {
            return this.work.size();
        }
    }

    WorkManagerTest() {
    }

    @BeforeEach
    public void setup() {
        setupWorkManager(ParallelConsumerOptions.builder().build());
    }

    private void setupWorkManager(ParallelConsumerOptions parallelConsumerOptions) {
        this.offset = 0;
        this.wm = new WorkManager<>(parallelConsumerOptions, new MockConsumer(OffsetResetStrategy.EARLIEST));
        this.wm.setClock(this.clock);
        this.wm.getSuccessfulWorkListeners().add(workContainer -> {
            log.debug("Heard some successful work: {}", workContainer);
            this.successfulWork.add(workContainer);
        });
        assignPartition(0);
    }

    private void assignPartition(int i) {
        this.wm.onPartitionsAssigned(UniLists.of(new TopicPartition(INPUT_TOPIC, i)));
    }

    private void registerSomeWork() {
        ConsumerRecord<String, String> makeRec = makeRec("0", "key-0", 0);
        ConsumerRecord<String, String> makeRec2 = makeRec("1", "key-0", 0);
        ConsumerRecord<String, String> makeRec3 = makeRec("2", "key-0", 0);
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(INPUT_TOPIC, 0), UniLists.of(makeRec, makeRec2, makeRec3));
        this.wm.registerWork(new ConsumerRecords(hashMap));
    }

    private ConsumerRecord<String, String> makeRec(String str, String str2, int i) {
        ConsumerRecord<String, String> consumerRecord = new ConsumerRecord<>(INPUT_TOPIC, i, this.offset, str2, str);
        this.offset++;
        return consumerRecord;
    }

    @Test
    void testRemovedUnordered() {
        setupWorkManager(ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).build());
        registerSomeWork();
        List<WorkContainer<String, String>> maybeGetWork = this.wm.maybeGetWork(1);
        Assertions.assertThat(maybeGetWork).hasSize(1);
        assertOffsets(maybeGetWork, UniLists.of(0));
        this.wm.onSuccess(maybeGetWork.get(0));
        List<WorkContainer<String, String>> maybeGetWork2 = this.wm.maybeGetWork(1);
        Assertions.assertThat(maybeGetWork2).hasSize(1);
        assertOffsets(maybeGetWork2, UniLists.of(1));
    }

    @Test
    void testUnorderedAndDelayed() {
        setupWorkManager(ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).build());
        registerSomeWork();
        List<WorkContainer<String, String>> maybeGetWork = this.wm.maybeGetWork(2);
        Assertions.assertThat(maybeGetWork).hasSize(2);
        assertOffsets(maybeGetWork, UniLists.of(0, 1));
        this.wm.onSuccess(maybeGetWork.get(0));
        this.wm.onFailure(maybeGetWork.get(1));
        List<WorkContainer<String, String>> maybeGetWork2 = this.wm.maybeGetWork(2);
        assertOffsets(maybeGetWork2, UniLists.of(2));
        this.wm.onSuccess(maybeGetWork2.get(0));
        assertOffsets(this.wm.maybeGetWork(2), UniLists.of());
        advanceClockBySlightlyLessThanDelay();
        assertOffsets(this.wm.maybeGetWork(2), UniLists.of());
        advanceClockByDelay();
        List<WorkContainer<String, String>> maybeGetWork3 = this.wm.maybeGetWork(2);
        assertOffsets(maybeGetWork3, UniLists.of(1));
        this.wm.onSuccess(maybeGetWork3.get(0));
        Assertions.assertThat(this.successfulWork).extracting(workContainer -> {
            return Integer.valueOf((int) workContainer.getCr().offset());
        }).isEqualTo(UniLists.of(0, 2, 1));
    }

    private AbstractListAssert<?, List<? extends Integer>, Integer, ObjectAssert<Integer>> assertOffsets(List<WorkContainer<String, String>> list, List<Integer> list2) {
        return Assertions.assertThat(list).as("offsets of work given", new Object[0]).extracting(workContainer -> {
            return Integer.valueOf((int) workContainer.getCr().offset());
        }).isEqualTo(list2);
    }

    @Test
    public void testOrderedInFlightShouldBlockQueue() {
        setupWorkManager(ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.PARTITION).build());
        Assertions.assertThat(this.wm.getOptions().getOrdering()).isEqualTo(ParallelConsumerOptions.ProcessingOrder.PARTITION);
        registerSomeWork();
        List<WorkContainer<String, String>> maybeGetWork = this.wm.maybeGetWork(2);
        assertOffsets(maybeGetWork, UniLists.of(0));
        WorkContainer<String, String> workContainer = maybeGetWork.get(0);
        assertOffsets(this.wm.maybeGetWork(2), UniLists.of());
        this.wm.onSuccess(workContainer);
        assertOffsets(this.wm.maybeGetWork(2), UniLists.of(1));
    }

    @Test
    void testOrderedAndDelayed() {
        setupWorkManager(ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.PARTITION).build());
        Assertions.assertThat(this.wm.getOptions().getOrdering()).isEqualTo(ParallelConsumerOptions.ProcessingOrder.PARTITION);
        registerSomeWork();
        List<WorkContainer<String, String>> maybeGetWork = this.wm.maybeGetWork(2);
        assertOffsets(maybeGetWork, UniLists.of(0));
        this.wm.onFailure(maybeGetWork.get(0));
        assertOffsets(this.wm.maybeGetWork(2), UniLists.of());
        advanceClockByDelay();
        List<WorkContainer<String, String>> maybeGetWork2 = this.wm.maybeGetWork(2);
        assertOffsets(maybeGetWork2, UniLists.of(0));
        WorkContainer<String, String> workContainer = maybeGetWork2.get(0);
        this.wm.onFailure(workContainer);
        advanceClock(workContainer.getRetryDelay().minus(Duration.ofSeconds(1L)));
        assertOffsets(this.wm.maybeGetWork(2), UniLists.of());
        advanceClock(workContainer.getRetryDelay().plus(Duration.ofSeconds(1L)));
        List<WorkContainer<String, String>> maybeGetWork3 = this.wm.maybeGetWork(2);
        assertOffsets(maybeGetWork3, UniLists.of(0));
        this.wm.onSuccess(maybeGetWork3.get(0));
        assertOffsets(this.successfulWork, UniLists.of(0));
        List<WorkContainer<String, String>> maybeGetWork4 = this.wm.maybeGetWork(2);
        assertOffsets(maybeGetWork4, UniLists.of(1));
        this.wm.onSuccess(maybeGetWork4.get(0));
        List<WorkContainer<String, String>> maybeGetWork5 = this.wm.maybeGetWork(2);
        assertOffsets(maybeGetWork5, UniLists.of(2));
        this.wm.onSuccess(maybeGetWork5.get(0));
        assertOffsets(this.successfulWork, UniLists.of(0, 1, 2));
    }

    @Test
    void containerDelay() {
        WorkContainer workContainer = new WorkContainer(0, (ConsumerRecord) null);
        Assertions.assertThat(workContainer.hasDelayPassed(this.clock)).isTrue();
        workContainer.fail(this.clock);
        Assertions.assertThat(workContainer.hasDelayPassed(this.clock)).isFalse();
        advanceClockBySlightlyLessThanDelay();
        Assertions.assertThat(workContainer.hasDelayPassed(this.clock)).isFalse();
        advanceClockByDelay();
        Assertions.assertThat(workContainer.hasDelayPassed(this.clock)).isTrue();
    }

    private void advanceClockBySlightlyLessThanDelay() {
        this.time = this.time.plus((TemporalAmount) WorkContainer.defaultRetryDelay.dividedBy(2L));
    }

    private void advanceClockByDelay() {
        this.time = this.time.plus((TemporalAmount) WorkContainer.defaultRetryDelay);
    }

    private void advanceClock(Duration duration) {
        this.time = this.time.plus((TemporalAmount) duration);
    }

    @Test
    public void insertWrongOrderPreservesOffsetOrdering() {
        setupWorkManager(ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).build());
        Assertions.assertThat(this.wm.getOptions().getOrdering()).isEqualTo(ParallelConsumerOptions.ProcessingOrder.UNORDERED);
        registerSomeWork();
        ConsumerRecord consumerRecord = new ConsumerRecord(INPUT_TOPIC, 0, 10L, "key", "value");
        ConsumerRecord consumerRecord2 = new ConsumerRecord(INPUT_TOPIC, 0, 6L, "key", "value");
        ConsumerRecord consumerRecord3 = new ConsumerRecord(INPUT_TOPIC, 0, 8L, "key", "value");
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(INPUT_TOPIC, 0), UniLists.of(consumerRecord2, consumerRecord3, consumerRecord));
        this.wm.registerWork(new ConsumerRecords(hashMap));
        List<WorkContainer<String, String>> maybeGetWork = this.wm.maybeGetWork(4);
        assertOffsets(maybeGetWork, UniLists.of(0, 1, 2, 6));
        this.wm.onFailure(maybeGetWork.get(1));
        this.wm.onFailure(maybeGetWork.get(3));
        assertOffsets(this.wm.maybeGetWork(10), UniLists.of(8, 10));
        advanceClockByDelay();
        assertOffsets(this.wm.maybeGetWork(10), UniLists.of(1, 6));
    }

    @Disabled
    @Test
    public void maxPerPartition() {
    }

    @Disabled
    @Test
    public void maxPerTopic() {
    }

    @Test
    public void maxInFlight() {
        setupWorkManager(ParallelConsumerOptions.builder().build());
        registerSomeWork();
        Assertions.assertThat(this.wm.maybeGetWork()).hasSize(1);
        Assertions.assertThat(this.wm.maybeGetWork()).isEmpty();
    }

    @Disabled
    @Test
    public void multipleFailures() {
    }

    @Disabled
    @Test
    public void delayedOrdered() {
    }

    @Disabled
    @Test
    public void delayedUnordered() {
    }

    @Test
    public void orderedByPartitionsParallel() {
        setupWorkManager(ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.PARTITION).build());
        registerSomeWork();
        assignPartition(2);
        ConsumerRecord consumerRecord = new ConsumerRecord(INPUT_TOPIC, 2, 10L, "66", "value");
        ConsumerRecord consumerRecord2 = new ConsumerRecord(INPUT_TOPIC, 2, 6L, "66", "value");
        ConsumerRecord consumerRecord3 = new ConsumerRecord(INPUT_TOPIC, 2, 8L, "66", "value");
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(INPUT_TOPIC, 2), UniLists.of(consumerRecord2, consumerRecord3, consumerRecord));
        this.wm.registerWork(new ConsumerRecords(hashMap));
        List<WorkContainer<String, String>> maybeGetWork = this.wm.maybeGetWork();
        assertOffsets(maybeGetWork, UniLists.of(0, 6));
        successAll(maybeGetWork);
        List<WorkContainer<String, String>> maybeGetWork2 = this.wm.maybeGetWork();
        assertOffsets(maybeGetWork2, UniLists.of(1, 8));
        successAll(maybeGetWork2);
        List<WorkContainer<String, String>> maybeGetWork3 = this.wm.maybeGetWork();
        assertOffsets(maybeGetWork3, UniLists.of(2, 10));
        successAll(maybeGetWork3);
    }

    private void successAll(List<WorkContainer<String, String>> list) {
        Iterator<WorkContainer<String, String>> it = list.iterator();
        while (it.hasNext()) {
            this.wm.onSuccess(it.next());
        }
    }

    @Test
    void orderedByKeyParallel() {
        setupWorkManager(ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).build());
        Assertions.assertThat(this.wm.getOptions().getOrdering()).isEqualTo(ParallelConsumerOptions.ProcessingOrder.KEY);
        registerSomeWork();
        assignPartition(2);
        ConsumerRecord consumerRecord = new ConsumerRecord(INPUT_TOPIC, 2, 6L, "key-a", "value");
        ConsumerRecord consumerRecord2 = new ConsumerRecord(INPUT_TOPIC, 2, 8L, "key-b", "value");
        ConsumerRecord consumerRecord3 = new ConsumerRecord(INPUT_TOPIC, 2, 10L, "key-a", "value");
        ConsumerRecord consumerRecord4 = new ConsumerRecord(INPUT_TOPIC, 2, 12L, "key-c", "value");
        ConsumerRecord consumerRecord5 = new ConsumerRecord(INPUT_TOPIC, 2, 15L, "key-a", "value");
        ConsumerRecord consumerRecord6 = new ConsumerRecord(INPUT_TOPIC, 2, 20L, "key-c", "value");
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(INPUT_TOPIC, 2), UniLists.of(consumerRecord, consumerRecord2, consumerRecord3, consumerRecord4, consumerRecord5, consumerRecord6));
        this.wm.registerWork(new ConsumerRecords(hashMap));
        List<WorkContainer<String, String>> maybeGetWork = this.wm.maybeGetWork();
        maybeGetWork.sort(Comparator.naturalOrder());
        assertOffsets(maybeGetWork, UniLists.of(0, 6, 8, 12));
        successAll(maybeGetWork);
        List<WorkContainer<String, String>> maybeGetWork2 = this.wm.maybeGetWork();
        maybeGetWork2.sort(Comparator.naturalOrder());
        assertOffsets(maybeGetWork2, UniLists.of(1, 10, 20));
        successAll(maybeGetWork2);
        List<WorkContainer<String, String>> maybeGetWork3 = this.wm.maybeGetWork();
        maybeGetWork3.sort(Comparator.naturalOrder());
        assertOffsets(maybeGetWork3, UniLists.of(2, 15));
        successAll(maybeGetWork3);
        assertOffsets(this.wm.maybeGetWork(), UniLists.of());
    }

    @Disabled
    @Test
    public void unorderedPartitionsGreedy() {
    }

    @ValueSource(ints = {1, 2, 5, 10, 20, 30, ProgressTracker.WARMED_UP_AFTER_X_MESSAGES, 1000})
    @ParameterizedTest
    void highVolumeKeyOrder(int i) {
        setupWorkManager(ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).build());
        KafkaTestUtils kafkaTestUtils = new KafkaTestUtils(INPUT_TOPIC, null, new MockConsumer(OffsetResetStrategy.EARLIEST));
        HashMap<Integer, List<ConsumerRecord<String, String>>> generateRecords = kafkaTestUtils.generateRecords(Range.range(100).list(), i);
        List flatten = kafkaTestUtils.flatten(generateRecords.values());
        flatten.sort(Comparator.comparingLong((v0) -> {
            return v0.offset();
        }));
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(INPUT_TOPIC, 0), flatten);
        this.wm.registerWork(new ConsumerRecords(hashMap));
        Assertions.assertThat(this.wm.maybeGetWork()).hasSameSizeAs(generateRecords.keySet());
    }

    @Test
    void treeMapOrderingCorrect() {
        List<ConsumerRecord<String, String>> generateRecords = new KafkaTestUtils(INPUT_TOPIC, null, new MockConsumer(OffsetResetStrategy.EARLIEST)).generateRecords(10);
        TreeMap treeMap = new TreeMap();
        for (ConsumerRecord<String, String> consumerRecord : generateRecords) {
            treeMap.put(Long.valueOf(consumerRecord.offset()), new WorkContainer(0, consumerRecord));
        }
        Assertions.assertThat(treeMap.navigableKeySet().toArray()).containsExactly(new Object[]{0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L});
    }

    @Test
    public void workQueuesEmptyWhenAllWorkComplete() {
        setupWorkManager(ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).build());
        registerSomeWork();
        List<WorkContainer> maybeGetWork = this.wm.maybeGetWork();
        Assertions.assertThat(maybeGetWork).hasSize(3);
        for (WorkContainer workContainer : maybeGetWork) {
            workContainer.onUserFunctionSuccess();
            this.wm.onSuccess(workContainer);
        }
        Assertions.assertThat(this.wm.getSm().getWorkQueuedInShardsCount()).isZero();
        Assertions.assertThat(this.wm.getNumberOfEntriesInPartitionQueues()).isEqualTo(3L);
        Assertions.assertThat(this.wm.findCompletedEligibleOffsetsAndRemove()).hasSize(1);
        Assertions.assertThat(this.wm.getNumberOfEntriesInPartitionQueues()).isEqualTo(0L);
    }
}
