package org.spring.beet.test.embedded.kafka;

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.kafka.common.utils.Time;
import org.spring.beet.test.TestUtils;
import scala.Some;
import scala.collection.immutable.Vector$;

/* loaded from: input_file:org/spring/beet/test/embedded/kafka/EmbeddedKafkaCluster.class */
public class EmbeddedKafkaCluster {
    private final List<Integer> ports;
    private final String zkConnection;
    private final Map<String, String> baseProperties;
    private final String brokerList;
    private final List<KafkaServer> brokers = new ArrayList();
    private final List<File> logDirs = new ArrayList();

    public EmbeddedKafkaCluster(String str, Map<String, String> map, List<Integer> list) {
        this.zkConnection = str;
        this.ports = resolvePorts(list);
        this.baseProperties = map;
        this.brokerList = constructBrokerList(this.ports);
    }

    private List<Integer> resolvePorts(List<Integer> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<Integer> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(Integer.valueOf(resolvePort(it.next().intValue())));
        }
        return arrayList;
    }

    private int resolvePort(int i) {
        return i == -1 ? TestUtils.getAvailablePort() : i;
    }

    private String constructBrokerList(List<Integer> list) {
        StringBuilder sb = new StringBuilder();
        for (Integer num : list) {
            if (sb.length() > 0) {
                sb.append(",");
            }
            sb.append("localhost:").append(num);
        }
        return sb.toString();
    }

    public void startup() {
        for (int i = 0; i < this.ports.size(); i++) {
            Integer num = this.ports.get(i);
            File constructTempDir = TestUtils.constructTempDir("kafka-local");
            HashMap hashMap = new HashMap();
            hashMap.putAll(this.baseProperties);
            hashMap.put("zookeeper.connect", this.zkConnection);
            hashMap.put("broker.id", String.valueOf(i + 1));
            hashMap.put("host.name", "localhost");
            hashMap.put("port", Integer.toString(num.intValue()));
            hashMap.put("log.dir", constructTempDir.getAbsolutePath());
            hashMap.put("log.flush.interval.messages", String.valueOf(1));
            hashMap.put("advertised.host.name", "localhost");
            this.brokers.add(startBroker(hashMap));
            this.logDirs.add(constructTempDir);
        }
    }

    private KafkaServer startBroker(Map<String, String> map) {
        KafkaServer kafkaServer = new KafkaServer(new KafkaConfig(map), Time.SYSTEM, Some.apply("embedded-kafka-cluster"), Vector$.MODULE$.empty());
        kafkaServer.startup();
        return kafkaServer;
    }

    public String getBrokerList() {
        return this.brokerList;
    }

    public List<Integer> getPorts() {
        return this.ports;
    }

    public String getZkConnection() {
        return this.zkConnection;
    }

    public void shutdown() {
        for (KafkaServer kafkaServer : this.brokers) {
            try {
                kafkaServer.shutdown();
                kafkaServer.awaitShutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        Iterator<File> it = this.logDirs.iterator();
        while (it.hasNext()) {
            TestUtils.deleteFile(it.next());
        }
    }

    public void awaitShutdown() {
        Iterator<KafkaServer> it = this.brokers.iterator();
        while (it.hasNext()) {
            it.next().awaitShutdown();
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("EmbeddedKafkaCluster{");
        sb.append("boostrapServers='").append(this.brokerList).append('\'');
        sb.append('}');
        return sb.toString();
    }
}
