/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer;

import io.confluent.csid.utils.LatchTestUtils;
import io.confluent.parallelconsumer.JStreamParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTest;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.assertj.core.api.Assertions;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.internal.verification.VerificationModeFactory;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class JStreamParallelEoSStreamProcessorTest
extends ParallelEoSStreamProcessorTestBase {
    private static final Logger log = LoggerFactory.getLogger(JStreamParallelEoSStreamProcessorTest.class);
    JStreamParallelEoSStreamProcessor<String, String> streaming;

    JStreamParallelEoSStreamProcessorTest() {
    }

    @BeforeEach
    public void setupData() {
        super.primeFirstRecord();
    }

    protected ParallelEoSStreamProcessor<String, String> initAsyncConsumer(ParallelConsumerOptions<String, String> options) {
        this.streaming = new JStreamParallelEoSStreamProcessor(options);
        return this.streaming;
    }

    @Test
    void testStream() {
        CountDownLatch latch = new CountDownLatch(1);
        Stream streamedResults = this.streaming.pollProduceAndStream(record -> {
            ProducerRecord mock = (ProducerRecord)Mockito.mock(ProducerRecord.class);
            log.info("Consumed and produced record ({}), and returning a derivative result to produce to output topic: {}", record, (Object)mock);
            this.myRecordProcessingAction.apply((ConsumerRecord<String, String>)record.getSingleConsumerRecord());
            latch.countDown();
            return Lists.list((Object[])new ProducerRecord[]{mock});
        });
        LatchTestUtils.awaitLatch(latch);
        this.awaitForSomeLoopCycles(2);
        ((ParallelEoSStreamProcessorTest.MyAction)Mockito.verify((Object)this.myRecordProcessingAction, (VerificationMode)VerificationModeFactory.times((int)1))).apply((ConsumerRecord<String, String>)((ConsumerRecord)ArgumentMatchers.any()));
        Assertions.assertThat((Stream)streamedResults).hasSize(1);
    }

    @Test
    void testConsumeAndProduce() {
        CountDownLatch latch = new CountDownLatch(1);
        Stream stream = this.streaming.pollProduceAndStream(record -> {
            String apply = this.myRecordProcessingAction.apply((ConsumerRecord<String, String>)record.getSingleConsumerRecord());
            ProducerRecord result = new ProducerRecord(this.OUTPUT_TOPIC, (Object)"akey", (Object)apply);
            log.info("Consumed a record ({}), and returning a derivative result record to be produced: {}", record, (Object)result);
            List result1 = Lists.list((Object[])new ProducerRecord[]{result});
            latch.countDown();
            return result1;
        });
        LatchTestUtils.awaitLatch(latch);
        this.resumeControlLoop();
        this.awaitForSomeLoopCycles(1);
        ((ParallelEoSStreamProcessorTest.MyAction)Mockito.verify((Object)this.myRecordProcessingAction, (VerificationMode)VerificationModeFactory.times((int)1))).apply((ConsumerRecord<String, String>)((ConsumerRecord)ArgumentMatchers.any()));
        Stream<ParallelStreamProcessor.ConsumeProduceResult> myResultStream = stream.peek(x -> {
            if (x != null) {
                ConsumerRecord left = x.getIn().getSingleConsumerRecord();
                log.info("{}:{}:{}:{}", new Object[]{left.key(), left.value(), x.getOut(), x.getMeta()});
            } else {
                log.info("null");
            }
        });
        Assertions.assertThat(myResultStream).hasSize(1);
    }

    @Test
    void testFlatMapProduce() {
        CountDownLatch latch = new CountDownLatch(1);
        Stream myResultStream = this.streaming.pollProduceAndStream(record -> {
            String apply1 = this.myRecordProcessingAction.apply((ConsumerRecord<String, String>)record.getSingleConsumerRecord());
            String apply2 = this.myRecordProcessingAction.apply((ConsumerRecord<String, String>)record.getSingleConsumerRecord());
            List list = Lists.list((Object[])new ProducerRecord[]{new ProducerRecord(this.OUTPUT_TOPIC, (Object)"key", (Object)apply1), new ProducerRecord(this.OUTPUT_TOPIC, (Object)"key", (Object)apply2)});
            latch.countDown();
            return list;
        });
        LatchTestUtils.awaitLatch(latch);
        this.awaitForSomeLoopCycles(1);
        ((ParallelEoSStreamProcessorTest.MyAction)Mockito.verify((Object)this.myRecordProcessingAction, (VerificationMode)VerificationModeFactory.times((int)2))).apply((ConsumerRecord<String, String>)((ConsumerRecord)ArgumentMatchers.any()));
        Assertions.assertThat((Stream)myResultStream).hasSize(2);
    }
}

