package kr.jm.utils.kafka;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kr.jm.utils.enums.OS;
import kr.jm.utils.helper.JMLog;
import kr.jm.utils.helper.JMString;
import kr.jm.utils.helper.JMThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kr/jm/utils/kafka/JMKafkaBroker.class */
public class JMKafkaBroker extends KafkaServerStartable {
    private static final Logger log = LoggerFactory.getLogger(JMKafkaBroker.class);
    private ExecutorService kafkaBrokerThreadPool;
    private int port;

    public JMKafkaBroker(Properties properties) {
        super(new KafkaConfig(properties));
        this.kafkaBrokerThreadPool = JMThread.newSingleThreadPool();
    }

    public JMKafkaBroker(String str) {
        this(str, OS.getHostname(), OS.getAvailableLocalPort());
    }

    public JMKafkaBroker(String str, String str2, int i) {
        this(str, str2, i, "kafka-broker-log");
    }

    public JMKafkaBroker(String str, String str2, int i, String str3) {
        this(buildProperties(str, str2, i, str3));
        this.port = i;
    }

    public static Properties buildProperties(String str, String str2, int i, String str3) {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", str);
        properties.put("listeners", "PLAINTEXT://" + JMString.buildIpOrHostnamePortPair(str2, i));
        properties.put("brokerid", str2 + "-" + System.currentTimeMillis());
        properties.put("log.dir", str3);
        return properties;
    }

    public void startup() {
        JMThread.runAsync(() -> {
            Thread.currentThread().setName("JMKafkaBroker-" + OS.getHostname());
            JMLog.info(log, "startup");
            super.startup();
        }, this.kafkaBrokerThreadPool);
    }

    public JMKafkaBroker start() {
        startup();
        return this;
    }

    public void stop() {
        log.info("shutdown starting ms - " + System.currentTimeMillis());
        this.kafkaBrokerThreadPool.shutdown();
        shutdown();
        log.info("shutdown completely ms - " + System.currentTimeMillis());
    }

    public int getPort() {
        return this.port;
    }
}
