package org.commonjava.indy.subsys.kafka.boot;

import java.time.Duration;
import java.util.Properties;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.commonjava.indy.action.BootupAction;
import org.commonjava.indy.action.IndyLifecycleException;
import org.commonjava.indy.action.ShutdownAction;
import org.commonjava.indy.subsys.kafka.conf.KafkaConfig;
import org.commonjava.indy.subsys.kafka.handler.ServiceEventHandler;
import org.commonjava.indy.subsys.kafka.trace.TracingKafkaClientSupplier;
import org.commonjava.indy.subsys.trace.config.IndyTraceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:org/commonjava/indy/subsys/kafka/boot/KafkaStreamBooter.class */
public class KafkaStreamBooter implements BootupAction, ShutdownAction {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Inject
    Instance<ServiceEventHandler> serviceEventHandlers;

    @Inject
    KafkaConfig config;

    @Inject
    IndyTraceConfiguration traceConfig;

    @Inject
    TracingKafkaClientSupplier traceClientSupplier;
    private static final long DEFAULT_KAFKA_STREAM_CLOSE_TIMEOUT = 900000;
    private KafkaStreams streams;

    public void init() throws IndyLifecycleException {
        if (!this.config.isEnabled()) {
            this.logger.warn("Kafka stream is disabled, this will effect communicating with the microservices.");
            return;
        }
        this.logger.info("Start init kafka streaming");
        Serde String = Serdes.String();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        if (this.config.getTopics() == null) {
            this.logger.warn("Don't find topics configured in config, will not start kafka consumer streaming.");
            return;
        }
        for (String str : this.config.getTopics()) {
            KStream<String, String> stream = streamsBuilder.stream(str, Consumed.with(String, String));
            for (ServiceEventHandler serviceEventHandler : this.serviceEventHandlers) {
                if (serviceEventHandler.canHandle(str)) {
                    serviceEventHandler.dispatchEvent(stream, str);
                }
            }
        }
        Properties kafkaProps = setKafkaProps();
        if (this.traceConfig.isEnabled() && this.config.isTracing()) {
            this.logger.info("The trace is enabled for Kafka client, so inject otel instrumentation to Kafka client.");
            this.streams = new KafkaStreams(streamsBuilder.build(), kafkaProps, this.traceClientSupplier);
        } else {
            this.streams = new KafkaStreams(streamsBuilder.build(), kafkaProps);
        }
        try {
            this.streams.start();
        } catch (Throwable th) {
            throw new IndyLifecycleException("Failed to start Kafka consumer streaming.", th, new Object[0]);
        }
    }

    public String getId() {
        return "kafka streaming boot";
    }

    public int getBootPriority() {
        return 100;
    }

    private Properties setKafkaProps() {
        Properties properties = new Properties();
        properties.putIfAbsent("application.id", this.config.getGroup());
        properties.putIfAbsent("bootstrap.servers", this.config.getBootstrapServers());
        properties.putIfAbsent("buffered.records.per.partition", this.config.getRecordsPerPartition());
        this.logger.info("Kafka props: {}", properties);
        return properties;
    }

    public void stop() {
        if (!this.config.isEnabled()) {
            this.logger.info("Kafka stream is disabled, no need to close it.");
            return;
        }
        this.logger.info("Closing Kafka streams...");
        if (this.streams != null) {
            try {
                this.streams.close(Duration.ofMillis(DEFAULT_KAFKA_STREAM_CLOSE_TIMEOUT));
            } catch (Exception e) {
                this.logger.error("Kafka streams closing failed. Cause: ", e);
            }
        }
        this.logger.info("Kafka streams for message consuming stopped.");
    }

    public int getShutdownPriority() {
        return 100;
    }
}
