package io.confluent.parallelconsumer.examples.vertx;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.vertx.JStreamVertxParallelStreamProcessor;
import io.confluent.parallelconsumer.vertx.VertxParallelEoSStreamProcessor;
import java.util.Properties;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;

/* loaded from: input_file:io/confluent/parallelconsumer/examples/vertx/VertxApp.class */
public class VertxApp {
    private static final Logger log = LoggerFactory.getLogger(VertxApp.class);
    static String inputTopic = "input-topic-" + RandomUtils.nextInt();
    JStreamVertxParallelStreamProcessor<String, String> parallelConsumer;

    Consumer<String, String> getKafkaConsumer() {
        return new KafkaConsumer(new Properties());
    }

    Producer<String, String> getKafkaProducer() {
        return new KafkaProducer(new Properties());
    }

    void run() {
        Consumer<String, String> kafkaConsumer = getKafkaConsumer();
        this.parallelConsumer = JStreamVertxParallelStreamProcessor.createEosStreamProcessor(ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).consumer(kafkaConsumer).producer(getKafkaProducer()).build());
        this.parallelConsumer.subscribe(UniLists.of(inputTopic));
        postSetup();
        int port = getPort();
        this.parallelConsumer.vertxHttpReqInfoStream(consumerRecord -> {
            log.info("Concurrently constructing and returning RequestInfo from record: {}", consumerRecord);
            return new VertxParallelEoSStreamProcessor.RequestInfo("localhost", port, "/api", UniMaps.of("recordKey", (String) consumerRecord.key(), "payload", (String) consumerRecord.value()));
        }).forEach(vertxCPResult -> {
            log.info("From result stream: {}", vertxCPResult);
        });
    }

    protected int getPort() {
        return 8080;
    }

    void close() {
        this.parallelConsumer.closeDrainFirst();
    }

    protected void postSetup() {
    }
}
