package com.expedia.www.haystack.pipes.commons.kafka;

import com.expedia.www.haystack.commons.config.Configuration;
import com.expedia.www.haystack.pipes.commons.health.HealthController;
import com.netflix.servo.util.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.cfg4j.provider.ConfigurationProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/haystack-pipes-commons-2.0.0.jar:com/expedia/www/haystack/pipes/commons/kafka/KafkaConsumerStarter.class */
public class KafkaConsumerStarter {

    @VisibleForTesting
    static Logger logger = LoggerFactory.getLogger((Class<?>) KafkaConsumerStarter.class);

    @VisibleForTesting
    static ConfigurationProvider CONFIGURATION_PROVIDER = new Configuration().createMergeConfigurationProvider();
    static final String STARTED_MSG = "Now started Stream %s";
    private final HealthController healthController;
    public final Class<?> containingClass;
    public final String clientId;
    private final List<ConsumerTask> tasks = new ArrayList();
    private final KafkaConfig kafkaConfig;

    public KafkaConsumerStarter(Class<?> cls, String str, KafkaConfig kafkaConfig, HealthController healthController) {
        this.containingClass = cls;
        this.clientId = str;
        this.kafkaConfig = kafkaConfig;
        this.healthController = healthController;
    }

    public void createAndStartConsumer(SpanProcessorSupplier spanProcessorSupplier) {
        for (int i = 0; i <= getThreadCount(); i++) {
            ConsumerTask consumerTask = new ConsumerTask(this.kafkaConfig, this.containingClass, spanProcessorSupplier, this.healthController);
            this.tasks.add(consumerTask);
            Thread thread = new Thread(consumerTask);
            thread.setDaemon(true);
            thread.start();
        }
        this.healthController.setHealthy();
        Runtime.getRuntime().addShutdownHook(new Thread(this::close));
        logger.info(String.format(STARTED_MSG, this.containingClass.getSimpleName()));
    }

    private void close() {
        this.tasks.forEach(consumerTask -> {
            try {
                consumerTask.close();
            } catch (IOException e) {
            }
        });
    }

    private int getThreadCount() {
        return this.kafkaConfig.threadcount();
    }
}
