package io.confluent.parallelconsumer;

import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTest;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.assertj.core.api.Assertions;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.internal.verification.VerificationModeFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/parallelconsumer/JStreamParallelEoSStreamProcessorTest.class */
public class JStreamParallelEoSStreamProcessorTest extends ParallelEoSStreamProcessorTestBase {
    private static final Logger log = LoggerFactory.getLogger(JStreamParallelEoSStreamProcessorTest.class);
    JStreamParallelEoSStreamProcessor<String, String> streaming;

    @BeforeEach
    public void setupData() {
        super.primeFirstRecord();
    }

    @Override // io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase
    protected ParallelEoSStreamProcessor initAsyncConsumer(ParallelConsumerOptions parallelConsumerOptions) {
        this.streaming = new JStreamParallelEoSStreamProcessor<>(this.consumerSpy, this.producerSpy, ParallelConsumerOptions.builder().build());
        return this.streaming;
    }

    @Test
    public void testStream() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Stream pollProduceAndStream = this.streaming.pollProduceAndStream(consumerRecord -> {
            ProducerRecord producerRecord = (ProducerRecord) Mockito.mock(ProducerRecord.class);
            log.info("Consumed and produced record ({}), and returning a derivative result to produce to output topic: {}", consumerRecord, producerRecord);
            this.myRecordProcessingAction.apply((ConsumerRecord<String, String>) consumerRecord);
            countDownLatch.countDown();
            return Lists.list(new ProducerRecord[]{producerRecord});
        });
        awaitLatch(countDownLatch);
        waitForSomeLoopCycles(2);
        ((ParallelEoSStreamProcessorTest.MyAction) Mockito.verify(this.myRecordProcessingAction, VerificationModeFactory.times(1))).apply((ConsumerRecord<String, String>) ArgumentMatchers.any());
        Assertions.assertThat(pollProduceAndStream.peek(consumeProduceResult -> {
            log.info("streaming test {}", consumeProduceResult.getIn().value());
        })).hasSize(1);
    }

    @Test
    public void testConsumeAndProduce() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Stream pollProduceAndStream = this.streaming.pollProduceAndStream(consumerRecord -> {
            ProducerRecord producerRecord = new ProducerRecord("output", "akey", this.myRecordProcessingAction.apply((ConsumerRecord<String, String>) consumerRecord));
            log.info("Consumed a record ({}), and returning a derivative result record to be produced: {}", consumerRecord, producerRecord);
            List list = Lists.list(new ProducerRecord[]{producerRecord});
            countDownLatch.countDown();
            return list;
        });
        awaitLatch(countDownLatch);
        resumeControlLoop();
        waitForSomeLoopCycles(1);
        ((ParallelEoSStreamProcessorTest.MyAction) Mockito.verify(this.myRecordProcessingAction, VerificationModeFactory.times(1))).apply((ConsumerRecord<String, String>) ArgumentMatchers.any());
        Assertions.assertThat((List) pollProduceAndStream.peek(consumeProduceResult -> {
            if (consumeProduceResult == null) {
                log.info("null");
            } else {
                ConsumerRecord in = consumeProduceResult.getIn();
                log.info("{}:{}:{}:{}", new Object[]{in.key(), in.value(), consumeProduceResult.getOut(), consumeProduceResult.getMeta()});
            }
        }).collect(Collectors.toList())).hasSize(1);
    }

    @Test
    public void testFlatMapProduce() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Stream pollProduceAndStream = this.streaming.pollProduceAndStream(consumerRecord -> {
            List list = Lists.list(new ProducerRecord[]{new ProducerRecord("output", "key", this.myRecordProcessingAction.apply((ConsumerRecord<String, String>) consumerRecord)), new ProducerRecord("output", "key", this.myRecordProcessingAction.apply((ConsumerRecord<String, String>) consumerRecord))});
            countDownLatch.countDown();
            return list;
        });
        awaitLatch(countDownLatch);
        waitForSomeLoopCycles(1);
        ((ParallelEoSStreamProcessorTest.MyAction) Mockito.verify(this.myRecordProcessingAction, VerificationModeFactory.times(2))).apply((ConsumerRecord<String, String>) ArgumentMatchers.any());
        Assertions.assertThat(pollProduceAndStream).hasSize(2);
    }
}
