package io.confluent.parallelconsumer.examples.metrics;

import com.sun.net.httpserver.HttpServer;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

/* loaded from: input_file:io/confluent/parallelconsumer/examples/metrics/CoreApp.class */
public class CoreApp {
    private static final Logger log = LoggerFactory.getLogger(CoreApp.class);
    public static final String METRICS_ENDPOINT = "/prometheus";
    private KafkaClientMetrics kafkaClientMetrics;
    ParallelStreamProcessor<String, String> parallelConsumer;
    final PrometheusMeterRegistry meterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
    String inputTopic = "input-topic";
    String outputTopic = "output-topic-" + RandomUtils.nextInt();
    private final Map<String, String> envVars = System.getenv();
    private final ExecutorService metricsEndpointExecutor = Executors.newSingleThreadExecutor();

    Consumer<String, String> getKafkaConsumer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.envVars.getOrDefault("BOOTSTRAP_SERVERS", "kafka:9092"));
        properties.put("group.id", this.envVars.getOrDefault("GROUP_ID", "pc-instance"));
        properties.put("enable.auto.commit", false);
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return new KafkaConsumer(properties);
    }

    void setupPrometheusEndpoint() {
        try {
            HttpServer create = HttpServer.create(new InetSocketAddress(7001), 0);
            create.createContext(METRICS_ENDPOINT, httpExchange -> {
                String scrape = this.meterRegistry.scrape();
                httpExchange.sendResponseHeaders(200, scrape.getBytes().length);
                OutputStream responseBody = httpExchange.getResponseBody();
                try {
                    responseBody.write(scrape.getBytes());
                    if (responseBody != null) {
                        responseBody.close();
                    }
                } catch (Throwable th) {
                    if (responseBody != null) {
                        try {
                            responseBody.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            });
            ExecutorService executorService = this.metricsEndpointExecutor;
            Objects.requireNonNull(create);
            executorService.submit(create::start);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    void run() {
        this.parallelConsumer = setupParallelConsumer();
        postSetup();
        this.parallelConsumer.poll(pollContext -> {
            log.info("Concurrently processing a record: {}", pollContext);
        });
    }

    protected void postSetup() {
        setupPrometheusEndpoint();
    }

    ParallelStreamProcessor<String, String> setupParallelConsumer() {
        Consumer<String, String> kafkaConsumer = getKafkaConsumer();
        ParallelStreamProcessor<String, String> createEosStreamProcessor = ParallelStreamProcessor.createEosStreamProcessor(ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).maxConcurrency(1000).consumer(kafkaConsumer).meterRegistry(this.meterRegistry).metricsTags(Tags.of(new Tag[]{Tag.of("common-tag", "tag1")})).pcInstanceTag(UUID.randomUUID().toString()).build());
        createEosStreamProcessor.subscribe(UniLists.of(this.inputTopic));
        this.kafkaClientMetrics = new KafkaClientMetrics(kafkaConsumer);
        this.kafkaClientMetrics.bindTo(this.meterRegistry);
        return createEosStreamProcessor;
    }

    void close() {
        this.kafkaClientMetrics.close();
        this.parallelConsumer.close();
        this.metricsEndpointExecutor.shutdownNow();
    }
}
