package org.cg.eventbus;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import kafka.admin.TopicCommand;
import kafka.cluster.Broker;
import kafka.cluster.Cluster;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.log4j.Logger;
import scala.collection.Iterator;
import scala.collection.Seq;

/* loaded from: input_file:org/cg/eventbus/EventBusManager.class */
public class EventBusManager {
    private static final Logger logger = Logger.getLogger(EventBusManager.class);
    private ZkClient zkClient;

    /* loaded from: input_file:org/cg/eventbus/EventBusManager$Connection.class */
    public class Connection {
        private String host;
        private int port;

        public Connection() {
        }

        public Connection(String str, int i) {
            this.host = str;
            this.port = i;
        }

        public String getHost() {
            return this.host;
        }

        public void setHost(String str) {
            this.host = str;
        }

        public int getPort() {
            return this.port;
        }

        public void setPort(int i) {
            this.port = i;
        }

        public String toString() {
            return "Connection [host=" + this.host + ", port=" + this.port + "]";
        }
    }

    public EventBusManager(String str) {
        this.zkClient = new ZkClient(str, 30000, 30000, ZKStringSerializer$.MODULE$);
    }

    public boolean isEventBusUp() {
        return 0 < getEventBusSize();
    }

    public int getEventBusSize() {
        return ZkUtils.getCluster(this.zkClient).size();
    }

    public Map<Integer, Connection> getEventBusInfo() {
        HashMap hashMap = new HashMap();
        int eventBusSize = getEventBusSize();
        if (0 >= eventBusSize) {
            logger.error("empty EventBus, size=" + eventBusSize);
            return null;
        }
        Cluster cluster = ZkUtils.getCluster(this.zkClient);
        for (int i = 0; i < eventBusSize; i++) {
            hashMap.put(Integer.valueOf(((Broker) cluster.getBroker(i).get()).id()), new Connection(((Broker) cluster.getBroker(i).get()).host(), ((Broker) cluster.getBroker(i).get()).port()));
        }
        return hashMap;
    }

    public boolean isBrokerUp(int i) {
        Map<Integer, Connection> eventBusInfo = getEventBusInfo();
        if (null == eventBusInfo) {
            logger.error("Failed to get EventBus info brokerId=" + i);
            return false;
        }
        if (eventBusInfo.containsKey(Integer.valueOf(i))) {
            return true;
        }
        logger.info("No broker in the EventBus brokerId=" + i);
        return false;
    }

    public List<String> getAllTopics() throws Exception {
        try {
            Seq allTopics = ZkUtils.getAllTopics(this.zkClient);
            new ArrayList();
            String[] strArr = new String[allTopics.size()];
            allTopics.copyToArray(strArr);
            return Arrays.asList(strArr);
        } catch (Exception e) {
            throw new Exception("Failed to get all the topics.", e);
        }
    }

    public boolean hasTopic(String str) throws Exception {
        return getAllTopics().contains(str);
    }

    public int getPartitionByTopic(String str) {
        Map<String, Integer> allPartitions = getAllPartitions();
        if (allPartitions.containsKey(str)) {
            return allPartitions.get(str).intValue();
        }
        return -1;
    }

    public boolean createTopic(String str, int i, int i2) throws Exception {
        List<String> allTopics = getAllTopics();
        if (null == allTopics || allTopics.contains(str)) {
            logger.error("topic [" + str + "] is existing in this EventBus");
            if (i2 <= getPartitionByTopic(str)) {
                return false;
            }
            logger.info("increase the partition number for " + str + "  from " + getPartitionByTopic(str) + " to " + i2);
            increasePartitionByTopic(str, i2);
            return true;
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add("--create");
        arrayList.add("--replication-factor");
        arrayList.add(Integer.toString(i));
        arrayList.add("--partitions");
        arrayList.add(Integer.toString(i2));
        arrayList.add("--topic");
        arrayList.add(str);
        String[] strArr = new String[arrayList.size()];
        arrayList.toArray(strArr);
        TopicCommand.createTopic(this.zkClient, new TopicCommand.TopicCommandOptions(strArr));
        return true;
    }

    public boolean increasePartitionByTopic(String str, int i) {
        Map<String, Integer> allPartitions = getAllPartitions();
        if (!allPartitions.containsKey(str)) {
            logger.error("Failed to find topic [" + str + "]");
            return false;
        }
        if (allPartitions.get(str).intValue() >= i) {
            logger.error("topic [" + str + "] has " + allPartitions.get(str).intValue() + " partitions, which is more than partition size setting [" + i + "]");
            return false;
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add("--alter");
        arrayList.add("--partitions");
        arrayList.add(Integer.toString(i));
        arrayList.add("--topic");
        arrayList.add(str);
        String[] strArr = new String[arrayList.size()];
        arrayList.toArray(strArr);
        TopicCommand.alterTopic(this.zkClient, new TopicCommand.TopicCommandOptions(strArr));
        return true;
    }

    public Map<String, Integer> getAllPartitions() {
        scala.collection.mutable.Map partitionAssignmentForTopics = ZkUtils.getPartitionAssignmentForTopics(this.zkClient, ZkUtils.getAllTopics(this.zkClient));
        Iterator keysIterator = partitionAssignmentForTopics.keysIterator();
        HashMap hashMap = new HashMap();
        while (keysIterator.hasNext()) {
            String str = (String) keysIterator.next();
            hashMap.put(str, Integer.valueOf(((scala.collection.Map) partitionAssignmentForTopics.get(str).get()).size()));
        }
        return hashMap;
    }

    public boolean deleteTopic(String str) throws Exception {
        if (hasTopic(str)) {
            this.zkClient.deleteRecursive(ZkUtils.getTopicPath(str));
            return !hasTopic(str);
        }
        logger.error("This EventBus does not have this topic [" + str + "]");
        return false;
    }

    public static boolean pingServer(String str) {
        return new EventBusManager(str).isEventBusUp();
    }

    public void shutdown() {
        this.zkClient.close();
    }
}
