package io.confluent.parallelconsumer.examples.core;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import java.util.Properties;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

/* loaded from: input_file:io/confluent/parallelconsumer/examples/core/CoreApp.class */
public class CoreApp {
    private static final Logger log = LoggerFactory.getLogger(CoreApp.class);
    String inputTopic = "input-topic-" + RandomUtils.nextInt();
    String outputTopic = "output-topic-" + RandomUtils.nextInt();
    ParallelStreamProcessor<String, String> parallelConsumer;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/parallelconsumer/examples/core/CoreApp$Result.class */
    public static final class Result {
        private final String payload;

        public Result(String str) {
            this.payload = str;
        }

        public String getPayload() {
            return this.payload;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Result)) {
                return false;
            }
            String payload = getPayload();
            String payload2 = ((Result) obj).getPayload();
            return payload == null ? payload2 == null : payload.equals(payload2);
        }

        public int hashCode() {
            String payload = getPayload();
            return (1 * 59) + (payload == null ? 43 : payload.hashCode());
        }

        public String toString() {
            return "CoreApp.Result(payload=" + getPayload() + ")";
        }
    }

    Consumer<String, String> getKafkaConsumer() {
        return new KafkaConsumer(new Properties());
    }

    Producer<String, String> getKafkaProducer() {
        return new KafkaProducer(new Properties());
    }

    void run() {
        this.parallelConsumer = setupParallelConsumer();
        postSetup();
        this.parallelConsumer.poll(consumerRecord -> {
            log.info("Concurrently processing a record: {}", consumerRecord);
        });
    }

    protected void postSetup() {
    }

    ParallelStreamProcessor<String, String> setupParallelConsumer() {
        Consumer<String, String> kafkaConsumer = getKafkaConsumer();
        ParallelStreamProcessor<String, String> createEosStreamProcessor = ParallelStreamProcessor.createEosStreamProcessor(ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).maxMessagesToQueue(1000).maxNumberMessagesBeyondBaseCommitOffset(1000).consumer(kafkaConsumer).producer(getKafkaProducer()).build());
        createEosStreamProcessor.subscribe(UniLists.of(this.inputTopic));
        return createEosStreamProcessor;
    }

    void close() {
        this.parallelConsumer.close();
    }

    void runPollAndProduce() {
        this.parallelConsumer = setupParallelConsumer();
        postSetup();
        this.parallelConsumer.pollAndProduce(consumerRecord -> {
            return new ProducerRecord(this.outputTopic, (String) consumerRecord.key(), processBrokerRecord(consumerRecord).payload);
        }, consumeProduceResult -> {
            log.debug("Message {} saved to broker at offset {}", consumeProduceResult.getOut(), Long.valueOf(consumeProduceResult.getMeta().offset()));
        });
    }

    private Result processBrokerRecord(ConsumerRecord<String, String> consumerRecord) {
        return new Result("Some payload from " + ((String) consumerRecord.value()));
    }
}
