package io.confluent.parallelconsumer.state;

import com.google.common.truth.Truth;
import io.confluent.csid.utils.ThreadUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.internal.PCModuleTestEnv;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import io.confluent.parallelconsumer.state.RetryQueue;
import java.time.Clock;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.threeten.extra.MutableClock;
import pl.tlinkowski.unij.api.UniLists;

/* loaded from: input_file:io/confluent/parallelconsumer/state/ShardManagerTest.class */
class ShardManagerTest {
    PartitionState<String, String> state;
    WorkManager<String, String> wm;
    ModelUtils mu = new ModelUtils();
    String topic = "myTopic";
    int partition = 0;
    TopicPartition tp = new TopicPartition(this.topic, this.partition);
    ConcurrentSkipListMap<Long, Optional<ConsumerRecord<String, String>>> incompleteOffsets = new ConcurrentSkipListMap<>();

    ShardManagerTest() {
    }

    @BeforeEach
    void setup() {
        this.state = new PartitionState<>(0L, this.mu.getModule(), this.tp, OffsetMapCodecManager.HighestOffsetAndIncompletes.of());
        this.wm = this.mu.getModule().workManager();
        this.wm.onPartitionsAssigned(UniLists.of(this.tp));
    }

    @Test
    void testAssignedQuickRevokeNPE() {
        PCModuleTestEnv module = this.mu.getModule();
        ShardManager shardManager = new ShardManager(module, module.workManager());
        ConsumerRecord consumerRecord = new ConsumerRecord(this.topic, this.partition, 1L, (Object) null, "test1");
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        concurrentHashMap.put(ShardKey.ofKey(consumerRecord), new ProcessingShard(ShardKey.ofKey(consumerRecord), module.options(), this.wm.getPm()));
        shardManager.setProcessingShards(concurrentHashMap);
        this.incompleteOffsets.put(1L, Optional.of(consumerRecord));
        this.state.setIncompleteOffsets(this.incompleteOffsets);
        this.state.onPartitionsRemoved(shardManager);
        Truth.assertThat(shardManager.getShard(ShardKey.ofKey(consumerRecord))).isEmpty();
    }

    @Test
    void retryQueueOrdering() {
        PCModule pCModule = (PCModule) Mockito.mock(PCModule.class);
        Mockito.when(pCModule.clock()).thenReturn(MutableClock.of(Instant.now(), Clock.systemDefaultZone().getZone()));
        Mockito.when(pCModule.options()).thenReturn(ParallelConsumerOptions.builder().build());
        RetryQueue retryQueue = new RetryQueue();
        WorkContainer workContainer = new WorkContainer(0L, new ConsumerRecord("topic", 0, 0L, "k", "v"), pCModule);
        WorkContainer workContainer2 = new WorkContainer(0L, new ConsumerRecord("topic", 0, 1L, "k", "v"), pCModule);
        WorkContainer workContainer3 = new WorkContainer(0L, new ConsumerRecord("topic", 0, 2L, "k", "v"), pCModule);
        WorkContainer workContainer4 = new WorkContainer(0L, new ConsumerRecord("topic", 0, 3L, "k", "v"), pCModule);
        retryQueue.add(workContainer);
        retryQueue.add(workContainer2);
        retryQueue.add(workContainer3);
        retryQueue.add(workContainer4);
        Truth.assertThat(Integer.valueOf(retryQueue.size())).isEqualTo(4);
        Truth.assertThat(workContainer).isNotEqualTo(workContainer2);
        Truth.assertThat(workContainer2).isNotEqualTo(workContainer3);
        Truth.assertThat(Boolean.valueOf(retryQueue.remove(workContainer2))).isTrue();
        Truth.assertThat(Integer.valueOf(retryQueue.size())).isEqualTo(3);
        ((AbstractBooleanAssert) Assertions.assertThat(checkForNoDupes(retryQueue)).as("RetryQueue should not contain duplicates", new Object[0])).isTrue();
        Truth.assertThat(Boolean.valueOf(retryQueue.contains(workContainer))).isTrue();
        Truth.assertThat(Boolean.valueOf(retryQueue.contains(workContainer2))).isFalse();
        Truth.assertThat(Boolean.valueOf(retryQueue.contains(workContainer))).isTrue();
        Truth.assertThat(Boolean.valueOf(retryQueue.contains(workContainer2))).isFalse();
        Truth.assertThat(Boolean.valueOf(retryQueue.contains(workContainer3))).isTrue();
    }

    @Test
    void testRetryQueueOrdering() {
        RetryQueue retryQueue = new RetryQueue();
        PCModule pCModule = (PCModule) Mockito.mock(PCModule.class);
        MutableClock of = MutableClock.of(Instant.now(), Clock.systemDefaultZone().getZone());
        Mockito.when(pCModule.clock()).thenReturn(of);
        Mockito.when(pCModule.options()).thenReturn(ParallelConsumerOptions.builder().build());
        WorkContainer workContainer = new WorkContainer(0L, new ConsumerRecord("topic", 0, 0L, "k", "v"), pCModule);
        workContainer.onUserFunctionFailure(new Throwable("cause"));
        retryQueue.add(workContainer);
        of.add(10L, ChronoUnit.SECONDS);
        WorkContainer workContainer2 = new WorkContainer(0L, new ConsumerRecord("topic", 0, 0L, "k", "v"), pCModule);
        workContainer2.onUserFunctionFailure(new Throwable("cause"));
        retryQueue.add(workContainer2);
        Assertions.assertThat(retryQueue.size()).isEqualTo(1);
    }

    @Test
    void testRetryQueueOrderingMultipleTries() {
        for (int i = 0; i < 5; i++) {
            PCModule pCModule = (PCModule) Mockito.mock(PCModule.class);
            Mockito.when(pCModule.clock()).thenReturn(MutableClock.of(Instant.now(), Clock.systemDefaultZone().getZone()));
            Mockito.when(pCModule.options()).thenReturn(ParallelConsumerOptions.builder().build());
            RetryQueue retryQueue = new RetryQueue();
            WorkContainer workContainer = new WorkContainer(1L, new ConsumerRecord("topic", 0, 0L, "key0", "value0"), pCModule);
            pCModule.clock().setInstant(Instant.now());
            workContainer.onUserFunctionFailure(new RuntimeException("test1"));
            retryQueue.add(workContainer);
            WorkContainer workContainer2 = new WorkContainer(1L, new ConsumerRecord("topic", 0, 1L, "key1", "value0"), pCModule);
            ThreadUtils.sleepQuietly(10);
            pCModule.clock().setInstant(Instant.now());
            workContainer2.onUserFunctionFailure(new RuntimeException("test2"));
            retryQueue.add(workContainer2);
            WorkContainer workContainer3 = new WorkContainer(1L, new ConsumerRecord("topic", 0, 2L, "key2", "value0"), pCModule);
            ThreadUtils.sleepQuietly(10);
            pCModule.clock().setInstant(Instant.now());
            workContainer3.onUserFunctionFailure(new RuntimeException("test3"));
            retryQueue.add(workContainer3);
            ThreadUtils.sleepQuietly(10);
            pCModule.clock().setInstant(Instant.now());
            workContainer.onUserFunctionFailure(new RuntimeException("a"));
            for (int i2 = 0; retryQueue.size() < 4 && i2 < 100; i2++) {
                pCModule.clock().setInstant(Instant.now());
                workContainer.onUserFunctionFailure(new RuntimeException("a"));
                retryQueue.add(workContainer);
            }
            Assertions.assertThat(retryQueue.size()).as("Expecting to have 3 elements", new Object[0]).isEqualTo(3);
            retryQueue.remove(workContainer);
            retryQueue.remove(workContainer2);
            retryQueue.remove(workContainer3);
            Assertions.assertThat(retryQueue.size()).isEqualTo(0);
        }
    }

    private boolean checkForNoDupes(RetryQueue retryQueue) {
        HashSet hashSet = new HashSet();
        RetryQueue.RetryQueueIterator it = retryQueue.iterator();
        while (it.hasNext()) {
            try {
                WorkContainer next = it.next();
                if (!hashSet.add(next.getTopicPartition().topic() + "_" + next.getTopicPartition().partition() + "_" + next.getCr().offset())) {
                    if (it != null) {
                        it.close();
                    }
                    return false;
                }
            } catch (Throwable th) {
                if (it != null) {
                    try {
                        it.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (it == null) {
            return true;
        }
        it.close();
        return true;
    }
}
