package io.confluent.parallelconsumer;

import io.confluent.csid.utils.Java8StreamUtils;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/parallelconsumer/JStreamParallelEoSStreamProcessor.class */
public class JStreamParallelEoSStreamProcessor<K, V> extends ParallelEoSStreamProcessor<K, V> implements JStreamParallelStreamProcessor<K, V> {
    private static final Logger log = LoggerFactory.getLogger(JStreamParallelEoSStreamProcessor.class);
    private final Stream<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> stream;
    private final ConcurrentLinkedDeque<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> userProcessResultsStream;

    public JStreamParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> parallelConsumerOptions) {
        super(parallelConsumerOptions);
        this.userProcessResultsStream = new ConcurrentLinkedDeque<>();
        this.stream = Java8StreamUtils.setupStreamFromDeque(this.userProcessResultsStream);
    }

    @Override // io.confluent.parallelconsumer.JStreamParallelStreamProcessor
    public Stream<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> pollProduceAndStream(Function<PollContext<K, V>, List<ProducerRecord<K, V>>> function) {
        super.pollAndProduceMany(function, consumeProduceResult -> {
            log.trace("Wrapper callback applied, sending result to stream. Input: {}", consumeProduceResult);
            this.userProcessResultsStream.add(consumeProduceResult);
        });
        return this.stream;
    }
}
