package io.confluent.parallelconsumer.examples.streams;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

/* loaded from: input_file:io/confluent/parallelconsumer/examples/streams/StreamsApp.class */
public class StreamsApp {
    private static final Logger log = LoggerFactory.getLogger(StreamsApp.class);
    static String inputTopic = "input-topic-" + RandomUtils.nextInt();
    static String outputTopicName = "my-output-topic-" + RandomUtils.nextInt();
    KafkaStreams streams;
    ParallelStreamProcessor<String, String> parallelConsumer;
    AtomicInteger messageCount = new AtomicInteger();

    Consumer<String, String> getKafkaConsumer() {
        return new KafkaConsumer(new Properties());
    }

    Producer<String, String> getKafkaProducer() {
        return new KafkaProducer(new Properties());
    }

    void run() {
        preprocess();
        concurrentProcess();
    }

    void preprocess() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(inputTopic).mapValues((str, str2) -> {
            log.info("Streams preprocessing key: {} value: {}", str, str2);
            return String.valueOf(str2.length());
        }).to(outputTopicName);
        startStreams(streamsBuilder.build());
    }

    void startStreams(Topology topology) {
        this.streams = new KafkaStreams(topology, getStreamsProperties());
        this.streams.start();
    }

    void concurrentProcess() {
        setupParallelConsumer();
        this.parallelConsumer.poll(pollContext -> {
            log.info("Concurrently processing a record: {}", pollContext);
            this.messageCount.getAndIncrement();
        });
    }

    private void setupParallelConsumer() {
        Consumer<String, String> kafkaConsumer = getKafkaConsumer();
        this.parallelConsumer = ParallelStreamProcessor.createEosStreamProcessor(ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).consumer(kafkaConsumer).producer(getKafkaProducer()).build());
        this.parallelConsumer.subscribe(UniLists.of(outputTopicName));
    }

    Properties getStreamsProperties() {
        Properties properties = new Properties();
        properties.put("application.id", getClass().getName());
        properties.put("bootstrap.servers", getServerConfig());
        properties.put("default.key.serde", Serdes.String().getClass());
        properties.put("default.value.serde", Serdes.String().getClass());
        return properties;
    }

    String getServerConfig() {
        return "add your server here";
    }

    void close() {
        this.streams.close();
        this.parallelConsumer.close();
    }
}
