/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.state;

import com.google.common.truth.Truth;
import io.confluent.csid.utils.KafkaTestUtils;
import io.confluent.csid.utils.LongPollingMockConsumer;
import io.confluent.csid.utils.Range;
import io.confluent.parallelconsumer.FakeRuntimeException;
import io.confluent.parallelconsumer.ManagedTruth;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.internal.PCModuleTestEnv;
import io.confluent.parallelconsumer.state.PartitionState;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import java.time.Duration;
import java.time.temporal.TemporalAmount;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.assertj.core.api.AbstractListAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.extra.MutableClock;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;

@Execution(value=ExecutionMode.SAME_THREAD)
public class WorkManagerTest {
    private static final Logger log = LoggerFactory.getLogger(WorkManagerTest.class);
    public static final String INPUT_TOPIC = "input";
    public static final String OUTPUT_TOPIC = "output";
    WorkManager<String, String> wm;
    int offset;
    PCModuleTestEnv module;
    protected List<WorkContainer<String, String>> successfulWork = new ArrayList<WorkContainer<String, String>>();

    @BeforeEach
    public void setup() {
        ParallelConsumerOptions options = ParallelConsumerOptions.builder().build();
        this.setupWorkManager(options);
    }

    private MutableClock getClock() {
        return this.module.getMutableClock();
    }

    private void setupWorkManager(ParallelConsumerOptions options) {
        this.offset = 0;
        MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
        ParallelConsumerOptions optsOverride = options.toBuilder().consumer((Consumer)mockConsumer).build();
        this.module = new PCModuleTestEnv((ParallelConsumerOptions<String, String>)optsOverride);
        this.wm = this.module.workManager();
        this.wm.getSuccessfulWorkListeners().add(work -> {
            log.debug("Heard some successful work: {}", work);
            this.successfulWork.add((WorkContainer<String, String>)work);
        });
        this.module.setWorkManager(this.wm);
    }

    private void assignPartition(int partition) {
        this.wm.onPartitionsAssigned((Collection)UniLists.of((Object)this.topicPartitionOf(partition)));
    }

    @NotNull
    private TopicPartition topicPartitionOf(int partition) {
        return new TopicPartition(INPUT_TOPIC, partition);
    }

    private void registerSomeWork() {
        this.registerSomeWork(0);
    }

    private void registerSomeWork(int partition) {
        this.assignPartition(partition);
        String key = "key-0";
        ConsumerRecord<String, String> rec0 = this.makeRec("0", key, partition);
        ConsumerRecord<String, String> rec1 = this.makeRec("1", key, partition);
        ConsumerRecord<String, String> rec2 = this.makeRec("2", key, partition);
        HashMap<TopicPartition, List> m = new HashMap<TopicPartition, List>();
        m.put(this.topicPartitionOf(partition), UniLists.of(rec0, rec1, rec2));
        ConsumerRecords recs = new ConsumerRecords(m);
        this.wm.registerWork(new EpochAndRecordsMap(recs, this.wm.getPm()));
    }

    private ConsumerRecord<String, String> makeRec(String value, String key, int partition) {
        ConsumerRecord stringStringConsumerRecord = new ConsumerRecord(INPUT_TOPIC, partition, (long)this.offset, (Object)key, (Object)value);
        ++this.offset;
        return stringStringConsumerRecord;
    }

    @ParameterizedTest
    @EnumSource
    void basic(ParallelConsumerOptions.ProcessingOrder order) {
        this.setupWorkManager(ParallelConsumerOptions.builder().ordering(order).build());
        this.registerSomeWork();
        List gottenWork = this.wm.getWorkIfAvailable();
        if (order == ParallelConsumerOptions.ProcessingOrder.UNORDERED) {
            Assertions.assertThat((List)gottenWork).hasSize(3);
            this.assertOffsets(gottenWork, UniLists.of((Object)0, (Object)1, (Object)2));
        } else {
            Assertions.assertThat((List)gottenWork).hasSize(1);
            this.assertOffsets(gottenWork, UniLists.of((Object)0));
        }
        this.wm.onSuccessResult((WorkContainer)gottenWork.get(0));
        gottenWork = this.wm.getWorkIfAvailable();
        if (order == ParallelConsumerOptions.ProcessingOrder.UNORDERED) {
            Assertions.assertThat((List)gottenWork).isEmpty();
        } else {
            Assertions.assertThat((List)gottenWork).hasSize(1);
            this.assertOffsets(gottenWork, UniLists.of((Object)1));
        }
        gottenWork = this.wm.getWorkIfAvailable();
        Assertions.assertThat((List)gottenWork).isEmpty();
    }

    @Test
    void testUnorderedAndDelayed() {
        this.setupWorkManager(ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).build());
        this.registerSomeWork();
        int max = 2;
        List workRetrieved = this.wm.getWorkIfAvailable(max);
        Assertions.assertThat((List)workRetrieved).hasSize(2);
        this.assertOffsets(workRetrieved, UniLists.of((Object)0, (Object)1));
        WorkContainer succeed = (WorkContainer)workRetrieved.get(0);
        this.succeed((WorkContainer<String, String>)succeed);
        WorkContainer fail = (WorkContainer)workRetrieved.get(1);
        this.fail((WorkContainer<String, String>)fail);
        workRetrieved = this.wm.getWorkIfAvailable(max);
        this.assertOffsets(workRetrieved, UniLists.of((Object)2), "no order restriction, 1's delay won't have passed - should get remaining in queue not yet failed");
        succeed = (WorkContainer)workRetrieved.get(0);
        this.succeed((WorkContainer<String, String>)succeed);
        workRetrieved = this.wm.getWorkIfAvailable(max);
        this.assertOffsets(workRetrieved, UniLists.of(), "delay won't have passed so should not retrieve anything");
        this.advanceClockBySlightlyLessThanDelay();
        workRetrieved = this.wm.getWorkIfAvailable(max);
        this.assertOffsets(workRetrieved, UniLists.of());
        this.advanceClockByDelay();
        workRetrieved = this.wm.getWorkIfAvailable(max);
        this.assertOffsets(workRetrieved, UniLists.of((Object)1), "should retrieve 1 given clock has been advanced and retry delay should be over");
        succeed = (WorkContainer)workRetrieved.get(0);
        this.succeed((WorkContainer<String, String>)succeed);
        Assertions.assertThat(this.successfulWork).extracting(x -> (int)x.getCr().offset()).isEqualTo((Object)UniLists.of((Object)0, (Object)2, (Object)1));
    }

    private void succeed(WorkContainer<String, String> succeed) {
        succeed.onUserFunctionSuccess();
        this.wm.onSuccessResult(succeed);
    }

    private void succeed(Iterable<WorkContainer<String, String>> succeed) {
        succeed.forEach(this::succeed);
    }

    @Deprecated
    private AbstractListAssert<?, List<? extends Integer>, Integer, ObjectAssert<Integer>> assertOffsets(List<WorkContainer<String, String>> works, List<Integer> expected, String msg) {
        return ((ListAssert)Assertions.assertThat(works).as(msg, new Object[0])).extracting(x -> (int)x.getCr().offset()).isEqualTo(expected);
    }

    private AbstractListAssert<?, List<? extends Integer>, Integer, ObjectAssert<Integer>> assertOffsets(List<WorkContainer<String, String>> works, List<Integer> expected) {
        return this.assertOffsets(works, expected, "offsets of work given");
    }

    @Test
    public void testOrderedInFlightShouldBlockQueue() {
        ParallelConsumerOptions build = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.PARTITION).build();
        this.setupWorkManager(build);
        Assertions.assertThat((Comparable)this.wm.getOptions().getOrdering()).isEqualTo((Object)ParallelConsumerOptions.ProcessingOrder.PARTITION);
        this.registerSomeWork();
        int max = 2;
        List works = this.wm.getWorkIfAvailable(max);
        this.assertOffsets(works, UniLists.of((Object)0));
        WorkContainer w = (WorkContainer)works.get(0);
        works = this.wm.getWorkIfAvailable(max);
        this.assertOffsets(works, UniLists.of());
        this.succeed((WorkContainer<String, String>)w);
        works = this.wm.getWorkIfAvailable(max);
        this.assertOffsets(works, UniLists.of((Object)1));
    }

    @Test
    void testOrderedAndDelayed() {
        ParallelConsumerOptions build = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.PARTITION).build();
        this.setupWorkManager(build);
        Assertions.assertThat((Comparable)this.wm.getOptions().getOrdering()).isEqualTo((Object)ParallelConsumerOptions.ProcessingOrder.PARTITION);
        this.registerSomeWork();
        int maxWorkToGet = 2;
        List works = this.wm.getWorkIfAvailable(maxWorkToGet);
        this.assertOffsets(works, UniLists.of((Object)0));
        WorkContainer wc = (WorkContainer)works.get(0);
        this.fail((WorkContainer<String, String>)wc);
        works = this.wm.getWorkIfAvailable(maxWorkToGet);
        this.assertOffsets(works, UniLists.of());
        this.advanceClockByDelay();
        works = this.wm.getWorkIfAvailable(maxWorkToGet);
        this.assertOffsets(works, UniLists.of((Object)0));
        wc = (WorkContainer)works.get(0);
        this.fail((WorkContainer<String, String>)wc);
        this.advanceClock(wc.getRetryDelayConfig().minus(Duration.ofSeconds(1L)));
        works = this.wm.getWorkIfAvailable(maxWorkToGet);
        this.assertOffsets(works, UniLists.of());
        this.advanceClock(wc.getRetryDelayConfig().plus(Duration.ofSeconds(1L)));
        works = this.wm.getWorkIfAvailable(maxWorkToGet);
        this.assertOffsets(works, UniLists.of((Object)0));
        this.succeed((WorkContainer<String, String>)((WorkContainer)works.get(0)));
        this.assertOffsets(this.successfulWork, UniLists.of((Object)0));
        works = this.wm.getWorkIfAvailable(maxWorkToGet);
        this.assertOffsets(works, UniLists.of((Object)1));
        this.succeed((WorkContainer<String, String>)((WorkContainer)works.get(0)));
        works = this.wm.getWorkIfAvailable(maxWorkToGet);
        this.assertOffsets(works, UniLists.of((Object)2));
        this.succeed((WorkContainer<String, String>)((WorkContainer)works.get(0)));
        this.assertOffsets(this.successfulWork, UniLists.of((Object)0, (Object)1, (Object)2));
    }

    @Test
    void containerDelay() {
        WorkContainer wc = new WorkContainer(0L, (ConsumerRecord)Mockito.mock(ConsumerRecord.class), (PCModule)this.module);
        Assertions.assertThat((boolean)wc.isDelayPassed()).isTrue();
        wc.onUserFunctionFailure((Throwable)((Object)new FakeRuntimeException("")));
        Assertions.assertThat((boolean)wc.isDelayPassed()).isFalse();
        this.advanceClockBySlightlyLessThanDelay();
        Assertions.assertThat((boolean)wc.isDelayPassed()).isFalse();
        this.advanceClockByDelay();
        ManagedTruth.assertThat(wc).isDelayPassed();
    }

    private void advanceClockBySlightlyLessThanDelay() {
        Duration retryDelay = this.module.options().getDefaultMessageRetryDelay();
        Duration duration = retryDelay.dividedBy(2L);
        this.getClock().add((TemporalAmount)duration);
    }

    private void advanceClockByDelay() {
        Duration retryDelay = this.module.options().getDefaultMessageRetryDelay();
        this.getClock().add((TemporalAmount)retryDelay);
    }

    private void advanceClock(Duration by) {
        this.getClock().add((TemporalAmount)by);
    }

    @Test
    void insertWrongOrderPreservesOffsetOrdering() {
        ParallelConsumerOptions build = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).build();
        this.setupWorkManager(build);
        Assertions.assertThat((Comparable)this.wm.getOptions().getOrdering()).isEqualTo((Object)ParallelConsumerOptions.ProcessingOrder.UNORDERED);
        this.registerSomeWork();
        String key = "key";
        int partition = 0;
        ConsumerRecord rec = new ConsumerRecord(INPUT_TOPIC, partition, 10L, (Object)key, (Object)"value");
        ConsumerRecord rec2 = new ConsumerRecord(INPUT_TOPIC, partition, 6L, (Object)key, (Object)"value");
        ConsumerRecord rec3 = new ConsumerRecord(INPUT_TOPIC, partition, 8L, (Object)key, (Object)"value");
        HashMap<TopicPartition, List> m = new HashMap<TopicPartition, List>();
        m.put(this.topicPartitionOf(partition), UniLists.of((Object)rec2, (Object)rec3, (Object)rec));
        ConsumerRecords recs = new ConsumerRecords(m);
        this.registerWork((ConsumerRecords<String, String>)recs);
        int max = 10;
        List works = this.wm.getWorkIfAvailable(4);
        this.assertOffsets(works, UniLists.of((Object)0, (Object)1, (Object)2, (Object)6));
        this.fail((WorkContainer<String, String>)((WorkContainer)works.get(1)));
        this.fail((WorkContainer<String, String>)((WorkContainer)works.get(3)));
        works = this.wm.getWorkIfAvailable(max);
        this.assertOffsets(works, UniLists.of((Object)8, (Object)10));
        this.advanceClockByDelay();
        works = this.wm.getWorkIfAvailable(max);
        this.assertOffsets(works, UniLists.of((Object)1, (Object)6));
    }

    private void registerWork(ConsumerRecords<String, String> recs) {
        this.wm.registerWork(new EpochAndRecordsMap(recs, this.wm.getPm()));
    }

    private void fail(WorkContainer<String, String> wc) {
        wc.onUserFunctionFailure(null);
        this.wm.onFailureResult(wc);
    }

    @Test
    public void maxInFlight() {
        ParallelConsumerOptions.ParallelConsumerOptionsBuilder opts = ParallelConsumerOptions.builder();
        this.setupWorkManager(opts.build());
        this.registerSomeWork();
        Assertions.assertThat((List)this.wm.getWorkIfAvailable()).hasSize(1);
        Assertions.assertThat((List)this.wm.getWorkIfAvailable()).isEmpty();
    }

    @Test
    void orderedByPartitionsParallel() {
        ParallelConsumerOptions build = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.PARTITION).build();
        this.setupWorkManager(build);
        this.registerSomeWork();
        int partition = 2;
        this.assignPartition(2);
        ConsumerRecord rec = new ConsumerRecord(INPUT_TOPIC, partition, 10L, (Object)"66", (Object)"value");
        ConsumerRecord rec2 = new ConsumerRecord(INPUT_TOPIC, partition, 6L, (Object)"66", (Object)"value");
        ConsumerRecord rec3 = new ConsumerRecord(INPUT_TOPIC, partition, 8L, (Object)"66", (Object)"value");
        HashMap<TopicPartition, List> m = new HashMap<TopicPartition, List>();
        m.put(this.topicPartitionOf(partition), UniLists.of((Object)rec2, (Object)rec3, (Object)rec));
        ConsumerRecords recs = new ConsumerRecords(m);
        this.registerWork((ConsumerRecords<String, String>)recs);
        List works = this.wm.getWorkIfAvailable();
        this.assertOffsets(works, UniLists.of((Object)0, (Object)6));
        this.successAll(works);
        works = this.wm.getWorkIfAvailable();
        this.assertOffsets(works, UniLists.of((Object)1, (Object)8));
        this.successAll(works);
        works = this.wm.getWorkIfAvailable();
        this.assertOffsets(works, UniLists.of((Object)2, (Object)10));
        this.successAll(works);
    }

    private void successAll(List<WorkContainer<String, String>> works) {
        for (WorkContainer<String, String> work : works) {
            this.wm.onSuccessResult(work);
        }
    }

    @Test
    void orderedByKeyParallel() {
        ParallelConsumerOptions build = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).build();
        this.setupWorkManager(build);
        Assertions.assertThat((Comparable)this.wm.getOptions().getOrdering()).isEqualTo((Object)ParallelConsumerOptions.ProcessingOrder.KEY);
        this.registerSomeWork();
        int partition = 2;
        this.assignPartition(2);
        ConsumerRecord rec2 = new ConsumerRecord(INPUT_TOPIC, partition, 6L, (Object)"key-a", (Object)"value");
        ConsumerRecord rec3 = new ConsumerRecord(INPUT_TOPIC, partition, 8L, (Object)"key-b", (Object)"value");
        ConsumerRecord rec0 = new ConsumerRecord(INPUT_TOPIC, partition, 10L, (Object)"key-a", (Object)"value");
        ConsumerRecord rec4 = new ConsumerRecord(INPUT_TOPIC, partition, 12L, (Object)"key-c", (Object)"value");
        ConsumerRecord rec5 = new ConsumerRecord(INPUT_TOPIC, partition, 15L, (Object)"key-a", (Object)"value");
        ConsumerRecord rec6 = new ConsumerRecord(INPUT_TOPIC, partition, 20L, (Object)"key-c", (Object)"value");
        HashMap<TopicPartition, List> m = new HashMap<TopicPartition, List>();
        m.put(this.topicPartitionOf(partition), UniLists.of((Object)rec2, (Object)rec3, (Object)rec0, (Object)rec4, (Object)rec5, (Object)rec6));
        ConsumerRecords recs = new ConsumerRecords(m);
        this.registerWork((ConsumerRecords<String, String>)recs);
        List works = this.wm.getWorkIfAvailable();
        works.sort(Comparator.naturalOrder());
        this.assertOffsets(works, UniLists.of((Object)0, (Object)6, (Object)8, (Object)12));
        this.successAll(works);
        works = this.wm.getWorkIfAvailable();
        works.sort(Comparator.naturalOrder());
        this.assertOffsets(works, UniLists.of((Object)1, (Object)10, (Object)20));
        this.successAll(works);
        works = this.wm.getWorkIfAvailable();
        works.sort(Comparator.naturalOrder());
        this.assertOffsets(works, UniLists.of((Object)2, (Object)15));
        this.successAll(works);
        works = this.wm.getWorkIfAvailable();
        this.assertOffsets(works, UniLists.of());
    }

    @ParameterizedTest
    @ValueSource(ints={1, 2, 5, 10, 20, 30, 50, 1000})
    void highVolumeKeyOrder(int quantity) {
        int uniqueKeys = 100;
        ParallelConsumerOptions build = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).build();
        this.setupWorkManager(build);
        KafkaTestUtils ktu = new KafkaTestUtils(INPUT_TOPIC, null, new LongPollingMockConsumer(OffsetResetStrategy.EARLIEST));
        List keys = Range.listOfIntegers((int)uniqueKeys);
        HashMap<Integer, List<ConsumerRecord<String, String>>> records = ktu.generateRecords(keys, quantity);
        List flattened = ktu.flatten(records.values());
        int partition = 0;
        ConsumerRecords recs = new ConsumerRecords(UniMaps.of((Object)this.topicPartitionOf(partition), flattened));
        this.assignPartition(partition);
        this.registerWork((ConsumerRecords<String, String>)recs);
        long awaiting = this.wm.getSm().getNumberOfWorkQueuedInShardsAwaitingSelection();
        Assertions.assertThat((long)awaiting).isEqualTo((long)quantity);
        List work = this.wm.getWorkIfAvailable();
        ManagedTruth.assertTruth(work).hasSameSizeAs(records);
    }

    @Test
    void treeMapOrderingCorrect() {
        KafkaTestUtils ktu = new KafkaTestUtils(INPUT_TOPIC, null, new LongPollingMockConsumer(OffsetResetStrategy.EARLIEST));
        int i = 10;
        List<ConsumerRecord<String, String>> records = ktu.generateRecords(i);
        TreeMap<Long, WorkContainer> treeMap = new TreeMap<Long, WorkContainer>();
        for (ConsumerRecord<String, String> record : records) {
            treeMap.put(record.offset(), new WorkContainer(0L, record, (PCModule)Mockito.mock(PCModuleTestEnv.class)));
        }
        NavigableSet ascendingOrder = treeMap.navigableKeySet();
        Object[] objects = ascendingOrder.toArray();
        Assertions.assertThat((Object[])objects).containsExactly(new Object[]{0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L});
    }

    @Test
    void workQueuesEmptyWhenAllWorkComplete() {
        ParallelConsumerOptions build = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).build();
        this.setupWorkManager(build);
        this.registerSomeWork();
        List work = this.wm.getWorkIfAvailable();
        Assertions.assertThat((List)work).hasSize(3);
        this.succeed(work);
        Assertions.assertThat((long)this.wm.getSm().getNumberOfWorkQueuedInShardsAwaitingSelection()).isZero();
        ((AbstractLongAssert)Assertions.assertThat((long)this.wm.getNumberOfIncompleteOffsets()).as("Partition commit queues are now empty", new Object[0])).isZero();
        Map completedFutureOffsets = this.wm.collectCommitDataForDirtyPartitions();
        Assertions.assertThat((Map)completedFutureOffsets).hasSize(1);
        OffsetAndMetadata sync = (OffsetAndMetadata)completedFutureOffsets.values().stream().findFirst().get();
        Truth.assertThat((Long)sync.offset()).isEqualTo((Object)3);
        Truth.assertThat((String)sync.metadata()).isEmpty();
        PartitionState state = this.wm.getPm().getPartitionState(this.topicPartitionOf(0));
        Truth.assertThat((Iterable)state.getAllIncompleteOffsets()).isEmpty();
    }

    @ParameterizedTest
    @EnumSource
    void resumesFromNextShard(ParallelConsumerOptions.ProcessingOrder order) {
        Assumptions.assumeFalse((order == ParallelConsumerOptions.ProcessingOrder.KEY ? 1 : 0) != 0);
        ParallelConsumerOptions build = ParallelConsumerOptions.builder().ordering(order).build();
        this.setupWorkManager(build);
        this.registerSomeWork();
        this.assignPartition(1);
        this.assignPartition(2);
        HashMap<TopicPartition, List> m = new HashMap<TopicPartition, List>();
        ConsumerRecord rec = new ConsumerRecord(INPUT_TOPIC, 1, 11L, (Object)"11", (Object)"value");
        m.put(this.topicPartitionOf(1), UniLists.of((Object)rec));
        ConsumerRecord rec2 = new ConsumerRecord(INPUT_TOPIC, 2, 21L, (Object)"21", (Object)"value");
        m.put(this.topicPartitionOf(2), UniLists.of((Object)rec2));
        ConsumerRecords recs = new ConsumerRecords(m);
        this.registerWork((ConsumerRecords<String, String>)recs);
        List workContainersOne = this.wm.getWorkIfAvailable(1);
        List workContainersTwo = this.wm.getWorkIfAvailable(1);
        List workContainersThree = this.wm.getWorkIfAvailable(1);
        List workContainersFour = this.wm.getWorkIfAvailable(1);
        Truth.assertThat((Iterable)workContainersOne).hasSize(1);
        Truth.assertThat((Integer)((WorkContainer)workContainersOne.stream().findFirst().get()).getTopicPartition().partition()).isEqualTo((Object)0);
        Truth.assertThat((Iterable)workContainersTwo).hasSize(1);
        Truth.assertThat((Integer)((WorkContainer)workContainersTwo.stream().findFirst().get()).getTopicPartition().partition()).isEqualTo((Object)1);
        Truth.assertThat((Iterable)workContainersThree).hasSize(1);
        Truth.assertThat((Integer)((WorkContainer)workContainersThree.stream().findFirst().get()).getTopicPartition().partition()).isEqualTo((Object)2);
        if (order == ParallelConsumerOptions.ProcessingOrder.PARTITION) {
            Truth.assertThat((Iterable)workContainersFour).isEmpty();
        } else {
            Truth.assertThat((Iterable)workContainersFour).hasSize(1);
            Optional work = workContainersFour.stream().findFirst();
            Truth.assertThat((Integer)((WorkContainer)work.get()).getTopicPartition().partition()).isEqualTo((Object)0);
            Truth.assertThat((Long)((WorkContainer)work.get()).offset()).isEqualTo((Object)1);
            Truth.assertThat((String)((String)((WorkContainer)work.get()).getCr().value())).isEqualTo((Object)"1");
        }
    }

    @Test
    void starvation() {
        this.setupWorkManager(ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.PARTITION).build());
        this.registerSomeWork(0);
        this.registerSomeWork(1);
        this.registerSomeWork(2);
        ArrayList allWork = new ArrayList();
        List work = this.wm.getWorkIfAvailable(2);
        allWork.addAll(work);
        Truth.assertWithMessage((String)"Should be able to get 2 records of work, one from each partition shard").that((Iterable)work).hasSize(2);
        TopicPartition tpOne = ((WorkContainer)work.get(0)).getTopicPartition();
        TopicPartition tpTwo = ((WorkContainer)work.get(1)).getTopicPartition();
        Truth.assertWithMessage((String)"The partitions should be different").that((Object)tpOne).isNotEqualTo((Object)tpTwo);
        work = this.wm.getWorkIfAvailable(2);
        Truth.assertWithMessage((String)"Should be able to get only 1 more, from the third shard").that((Iterable)work).hasSize(1);
        allWork.addAll(work);
        TopicPartition topicPartition = ((WorkContainer)work.get(0)).getTopicPartition();
        Truth.assertWithMessage((String)"TPs all unique").that((Iterable)allWork.stream().map(WorkContainer::getTopicPartition).collect(Collectors.toList())).containsNoDuplicates();
    }

    public static class FluentQueue<T>
    implements Iterable<T> {
        ArrayDeque<T> work = new ArrayDeque();

        Collection<T> add(Collection<T> c) {
            this.work.addAll(c);
            return c;
        }

        public T poll() {
            return this.work.poll();
        }

        @Override
        public Iterator<T> iterator() {
            return this.work.iterator();
        }

        public int size() {
            return this.work.size();
        }
    }
}

