package com.expedia.www.haystack.pipes.commons.kafka;

import com.expedia.www.haystack.commons.config.Configuration;
import com.expedia.www.haystack.pipes.commons.SystemExitUncaughtExceptionHandler;
import com.expedia.www.haystack.pipes.commons.health.HealthController;
import com.netflix.servo.util.VisibleForTesting;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.cfg4j.provider.ConfigurationProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/haystack-pipes-commons-2.0.0.jar:com/expedia/www/haystack/pipes/commons/kafka/KafkaStreamStarter.class */
public class KafkaStreamStarter {
    private final long MAX_CLOSE_TIMEOUT_SEC = 30;

    @VisibleForTesting
    static Factory factory = new Factory();

    @VisibleForTesting
    static Logger logger = LoggerFactory.getLogger((Class<?>) KafkaStreamStarter.class);

    @VisibleForTesting
    static ConfigurationProvider CONFIGURATION_PROVIDER = new Configuration().createMergeConfigurationProvider();
    static final String STARTING_MSG_WITH_TO_TOPIC = "Attempting to start stream pointing at Kafka [%s] from topic [%s] to topic [%s]";
    static final String STARTING_MSG_WITHOUT_TO_TOPIC = "Attempting to start stream pointing at Kafka [%s] from topic [%s]";
    static final String STARTED_MSG = "Now started Stream %s";
    private final HealthController healthController;
    public final Class<? extends KafkaStreamBuilder> containingClass;
    public final String clientId;
    private final StreamsConfig streamsConfig;

    @VisibleForTesting
    static KafkaConfig kafkaConfig;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/haystack-pipes-commons-2.0.0.jar:com/expedia/www/haystack/pipes/commons/kafka/KafkaStreamStarter$Factory.class */
    public static class Factory {
        Factory() {
        }

        KStreamBuilder createKStreamBuilder() {
            return new KStreamBuilder();
        }

        KafkaStreams createKafkaStreams(KStreamBuilder kStreamBuilder, KafkaStreamStarter kafkaStreamStarter) {
            return new KafkaStreams(kStreamBuilder, kafkaStreamStarter.streamsConfig);
        }

        SystemExitUncaughtExceptionHandler createSystemExitUncaughtExceptionHandler(KafkaStreams kafkaStreams, HealthController healthController) {
            return new SystemExitUncaughtExceptionHandler(kafkaStreams, healthController);
        }
    }

    public KafkaStreamStarter(Class<? extends KafkaStreamBuilder> cls, String str, KafkaConfig kafkaConfig2, HealthController healthController) {
        this.containingClass = cls;
        this.clientId = str;
        this.healthController = healthController;
        kafkaConfig = kafkaConfig2;
        this.streamsConfig = new StreamsConfig(getProperties());
    }

    public void createAndStartStream(KafkaStreamBuilder kafkaStreamBuilder) {
        KStreamBuilder createKStreamBuilder = factory.createKStreamBuilder();
        kafkaStreamBuilder.buildStreamTopology(createKStreamBuilder);
        startKafkaStreams(createKStreamBuilder);
    }

    private void startKafkaStreams(KStreamBuilder kStreamBuilder) {
        KafkaStreams createKafkaStreams = factory.createKafkaStreams(kStreamBuilder, this);
        createKafkaStreams.setUncaughtExceptionHandler(factory.createSystemExitUncaughtExceptionHandler(createKafkaStreams, this.healthController));
        String toTopic = getToTopic();
        if (StringUtils.isEmpty(toTopic)) {
            logger.info(String.format(STARTING_MSG_WITHOUT_TO_TOPIC, getIpAnPort(), getFromTopic()));
        } else {
            logger.info(String.format(STARTING_MSG_WITH_TO_TOPIC, getIpAnPort(), getFromTopic(), toTopic));
        }
        createKafkaStreams.start();
        this.healthController.setHealthy();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            createKafkaStreams.close(30L, TimeUnit.SECONDS);
        }));
        logger.info(String.format(STARTED_MSG, kStreamBuilder.getClass().getSimpleName()));
    }

    Properties getProperties() {
        Properties properties = new Properties();
        properties.put("client.id", this.clientId);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.containingClass.getName());
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, this.containingClass.getSimpleName());
        properties.put("bootstrap.servers", getIpAnPort());
        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), Integer.valueOf(getConsumerSessionTimeout()));
        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, Integer.valueOf(getThreadCount()));
        return properties;
    }

    private String getIpAnPort() {
        return kafkaConfig.brokers() + ":" + kafkaConfig.port();
    }

    private String getFromTopic() {
        return kafkaConfig.fromtopic();
    }

    private String getToTopic() {
        return kafkaConfig.totopic();
    }

    private int getThreadCount() {
        return kafkaConfig.threadcount();
    }

    private int getConsumerSessionTimeout() {
        return kafkaConfig.sessiontimeout();
    }
}
