/*
 * Decompiled with CFR 0.152.
 */
package com.github.sakserv.minicluster.impl;

import com.github.sakserv.minicluster.MiniCluster;
import com.github.sakserv.minicluster.systemtime.LocalSystemTime;
import com.github.sakserv.minicluster.util.FileUtils;
import java.lang.reflect.Constructor;
import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

public class KafkaLocalBroker
implements MiniCluster {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalBroker.class);
    private KafkaServer kafkaServer;
    private KafkaConfig kafkaConfig;
    private String kafkaHostname;
    private Integer kafkaPort;
    private Integer kafkaBrokerId;
    private Properties kafkaProperties;
    private String kafkaTempDir;
    private String zookeeperConnectionString;

    public String getKafkaHostname() {
        return this.kafkaHostname;
    }

    public Integer getKafkaPort() {
        return this.kafkaPort;
    }

    public Integer getKafkaBrokerId() {
        return this.kafkaBrokerId;
    }

    public Properties getKafkaProperties() {
        return this.kafkaProperties;
    }

    public String getKafkaTempDir() {
        return this.kafkaTempDir;
    }

    public String getZookeeperConnectionString() {
        return this.zookeeperConnectionString;
    }

    private KafkaLocalBroker(Builder builder) {
        this.kafkaHostname = builder.kafkaHostname;
        this.kafkaPort = builder.kafkaPort;
        this.kafkaBrokerId = builder.kafkaBrokerId;
        this.kafkaProperties = builder.kafkaProperties;
        this.kafkaTempDir = builder.kafkaTempDir;
        this.zookeeperConnectionString = builder.zookeeperConnectionString;
    }

    public void start() throws Exception {
        LOG.info("KAFKA: Starting Kafka on port: {}", (Object)this.kafkaPort);
        this.configure();
        Class<KafkaServer> kafkaServerClazz = KafkaServer.class;
        Constructor<?>[] kafkaServerConstructors = kafkaServerClazz.getConstructors();
        if (kafkaServerConstructors.length != 1) {
            throw new Exception("kafka.server.KafkaServer has more than one constructor, expected only 1");
        }
        Constructor<?> kafkaServerConstructor = kafkaServerConstructors[0];
        if (kafkaServerConstructor.getParameterTypes().length == 2) {
            this.kafkaServer = (KafkaServer)kafkaServerConstructor.newInstance(this.kafkaConfig, new LocalSystemTime());
        } else if (kafkaServerConstructor.getParameterTypes().length == 3) {
            Option threadPrefixName = Option.apply((Object)"kafka-mini-cluster");
            this.kafkaServer = (KafkaServer)kafkaServerConstructor.newInstance(this.kafkaConfig, new LocalSystemTime(), threadPrefixName);
        }
        this.kafkaServer.startup();
    }

    public void stop() throws Exception {
        this.stop(true);
    }

    public void stop(boolean cleanUp) throws Exception {
        LOG.info("KAFKA: Stopping Kafka on port: {}", (Object)this.kafkaPort);
        this.kafkaServer.shutdown();
        if (cleanUp) {
            this.cleanUp();
        }
    }

    public void configure() throws Exception {
        this.kafkaProperties.put("advertised.host.name", this.kafkaHostname);
        this.kafkaProperties.put("port", this.kafkaPort + "");
        this.kafkaProperties.put("broker.id", this.kafkaBrokerId + "");
        this.kafkaProperties.put("log.dir", this.kafkaTempDir);
        this.kafkaProperties.put("enable.zookeeper", "true");
        this.kafkaProperties.put("zookeeper.connect", this.zookeeperConnectionString);
        this.kafkaConfig = KafkaConfig.fromProps((Properties)this.kafkaProperties);
    }

    public void cleanUp() throws Exception {
        FileUtils.deleteFolder((String)this.kafkaTempDir);
    }

    public static class Builder {
        private String kafkaHostname;
        private Integer kafkaPort;
        private Integer kafkaBrokerId;
        private Properties kafkaProperties;
        private String kafkaTempDir;
        private String zookeeperConnectionString;

        public Builder setKafkaHostname(String kafkaHostname) {
            this.kafkaHostname = kafkaHostname;
            return this;
        }

        public Builder setKafkaPort(Integer kafkaPort) {
            this.kafkaPort = kafkaPort;
            return this;
        }

        public Builder setKafkaBrokerId(Integer kafkaBrokerId) {
            this.kafkaBrokerId = kafkaBrokerId;
            return this;
        }

        public Builder setKafkaProperties(Properties kafkaProperties) {
            this.kafkaProperties = kafkaProperties;
            return this;
        }

        public Builder setKafkaTempDir(String kafkaTempDir) {
            this.kafkaTempDir = kafkaTempDir;
            return this;
        }

        public Builder setZookeeperConnectionString(String zookeeperConnectionString) {
            this.zookeeperConnectionString = zookeeperConnectionString;
            return this;
        }

        public KafkaLocalBroker build() {
            KafkaLocalBroker kafkaLocalBroker = new KafkaLocalBroker(this);
            this.validateObject(kafkaLocalBroker);
            return kafkaLocalBroker;
        }

        public void validateObject(KafkaLocalBroker kafkaLocalBroker) {
            if (kafkaLocalBroker.kafkaHostname == null) {
                throw new IllegalArgumentException("ERROR: Missing required config: Kafka Hostname");
            }
            if (kafkaLocalBroker.kafkaPort == null) {
                throw new IllegalArgumentException("ERROR: Missing required config: Kafka Port");
            }
            if (kafkaLocalBroker.kafkaBrokerId == null) {
                throw new IllegalArgumentException("ERROR: Missing required config: Kafka Broker Id");
            }
            if (kafkaLocalBroker.kafkaProperties == null) {
                throw new IllegalArgumentException("ERROR: Missing required config: Kafka Properties");
            }
            if (kafkaLocalBroker.kafkaTempDir == null) {
                throw new IllegalArgumentException("ERROR: Missing required config: Kafka Temp Dir");
            }
            if (kafkaLocalBroker.zookeeperConnectionString == null) {
                throw new IllegalArgumentException("ERROR: Missing required config: Zookeeper Connection String");
            }
        }
    }
}

