package io.confluent.parallelconsumer.examples.reactor;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.reactor.ReactorProcessor;
import java.util.Properties;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/confluent/parallelconsumer/examples/reactor/ReactorApp.class */
public class ReactorApp {
    private static final Logger log = LoggerFactory.getLogger(ReactorApp.class);
    static String inputTopic = "input-topic-" + RandomUtils.nextInt();
    ReactorProcessor<String, String> parallelConsumer;

    Consumer<String, String> getKafkaConsumer() {
        return new KafkaConsumer(new Properties());
    }

    Producer<String, String> getKafkaProducer() {
        return new KafkaProducer(new Properties());
    }

    void run() {
        Consumer<String, String> kafkaConsumer = getKafkaConsumer();
        this.parallelConsumer = new ReactorProcessor<>(ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).consumer(kafkaConsumer).producer(getKafkaProducer()).build());
        this.parallelConsumer.subscribe(UniLists.of(inputTopic));
        postSetup();
        getPort();
        this.parallelConsumer.react(consumerRecord -> {
            log.info("Concurrently constructing and returning RequestInfo from record: {}", consumerRecord);
            UniMaps.of("recordKey", (String) consumerRecord.key(), "payload", (String) consumerRecord.value());
            return Mono.just("something todo");
        });
    }

    protected int getPort() {
        return 8080;
    }

    void close() {
        this.parallelConsumer.closeDrainFirst();
    }

    protected void postSetup() {
    }
}
