package io.confluent.parallelconsumer;

import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.internal.PCModuleTestEnv;
import io.confluent.parallelconsumer.internal.TestParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import io.confluent.parallelconsumer.state.ModelUtils;
import io.confluent.parallelconsumer.state.PartitionState;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import java.util.ArrayList;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

/* loaded from: input_file:io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorConfigurationTest.class */
class AbstractParallelEoSStreamProcessorConfigurationTest {
    private static final Logger log = LoggerFactory.getLogger(AbstractParallelEoSStreamProcessorConfigurationTest.class);
    PartitionState<String, String> state;
    WorkManager<String, String> wm;
    final MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
    final ParallelConsumerOptions<String, String> testOptions = ParallelConsumerOptions.builder().consumer(this.consumer).build();
    ModelUtils mu = new ModelUtils();
    String topic = "myTopic";
    int partition = 0;
    TopicPartition tp = new TopicPartition(this.topic, this.partition);
    PCModule module = new PCModuleTestEnv();

    AbstractParallelEoSStreamProcessorConfigurationTest() {
    }

    @BeforeEach
    public void setup() {
        this.state = new PartitionState<>(0L, this.mu.getModule(), this.tp, OffsetMapCodecManager.HighestOffsetAndIncompletes.of());
        this.wm = this.mu.getModule().workManager();
        this.wm.onPartitionsAssigned(UniLists.of(this.tp));
    }

    @Test
    void queueTargetLoad() {
        TestParallelEoSStreamProcessor testParallelEoSStreamProcessor = new TestParallelEoSStreamProcessor(ParallelConsumerOptions.builder().batchSize(10).maxConcurrency(2).consumer(new MockConsumer(OffsetResetStrategy.LATEST)).build());
        try {
            Assertions.assertEquals(40, testParallelEoSStreamProcessor.getTargetLoad());
            testParallelEoSStreamProcessor.close();
        } catch (Throwable th) {
            try {
                testParallelEoSStreamProcessor.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testHandleStaleWorkSplit() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new WorkContainer(0L, new ConsumerRecord(this.topic, this.partition, 0L, "test_k", "test_v1"), this.module));
        arrayList.add(new WorkContainer(1L, new ConsumerRecord(this.topic, this.partition, 1L, "test_k", "test_v2"), this.module));
        TestParallelEoSStreamProcessor testParallelEoSStreamProcessor = new TestParallelEoSStreamProcessor(this.testOptions);
        try {
            testParallelEoSStreamProcessor.setWm(this.wm);
            testParallelEoSStreamProcessor.runUserFunc(pollContextInternal -> {
                return new ArrayList();
            }, str -> {
            }, arrayList);
            Assertions.assertEquals(testParallelEoSStreamProcessor.getMailBoxSuccessCnt(), 1L);
            Assertions.assertEquals(testParallelEoSStreamProcessor.getMailBoxFailedCnt(), 1L);
            testParallelEoSStreamProcessor.close();
        } catch (Throwable th) {
            try {
                testParallelEoSStreamProcessor.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testHandleStaleWorkNoSplit() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new WorkContainer(0L, new ConsumerRecord(this.topic, this.partition, 0L, "test_k", "test_v1"), this.module));
        arrayList.add(new WorkContainer(0L, new ConsumerRecord(this.topic, this.partition, 1L, "test_k", "test_v2"), this.module));
        TestParallelEoSStreamProcessor testParallelEoSStreamProcessor = new TestParallelEoSStreamProcessor(this.testOptions);
        try {
            testParallelEoSStreamProcessor.setWm(this.wm);
            testParallelEoSStreamProcessor.runUserFunc(pollContextInternal -> {
                return new ArrayList();
            }, str -> {
            }, arrayList);
            Assertions.assertEquals(testParallelEoSStreamProcessor.getMailBoxSuccessCnt(), 2L);
            Assertions.assertEquals(testParallelEoSStreamProcessor.getMailBoxFailedCnt(), 0L);
            testParallelEoSStreamProcessor.close();
        } catch (Throwable th) {
            try {
                testParallelEoSStreamProcessor.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
