package com.github.charithe.kafka;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileAttribute;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/charithe/kafka/EphemeralKafkaBroker.class */
public class EphemeralKafkaBroker {
    private static final Logger LOGGER = LoggerFactory.getLogger(EphemeralKafkaBroker.class);
    private static final int ALLOCATE_RANDOM_PORT = -1;
    private static final String LOCALHOST = "localhost";
    private int kafkaPort;
    private int zookeeperPort;
    private Properties overrideBrokerProperties;
    private TestingServer zookeeper;
    private boolean managedZk;
    private KafkaServerStartable kafkaServer;
    private Path kafkaLogDir;
    private volatile boolean brokerStarted;

    public static EphemeralKafkaBroker create() {
        return create(-1);
    }

    public static EphemeralKafkaBroker create(int i) {
        return create(i, -1);
    }

    public static EphemeralKafkaBroker create(int i, int i2) {
        return create(i, i2, null);
    }

    public static EphemeralKafkaBroker create(int i, int i2, Properties properties) {
        return new EphemeralKafkaBroker(i, i2, properties);
    }

    EphemeralKafkaBroker(int i, int i2, Properties properties) {
        this.managedZk = false;
        this.brokerStarted = false;
        this.kafkaPort = i;
        this.zookeeperPort = i2;
        this.overrideBrokerProperties = properties;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EphemeralKafkaBroker(TestingServer testingServer, int i, Properties properties) {
        this.managedZk = false;
        this.brokerStarted = false;
        this.zookeeper = testingServer;
        this.kafkaPort = i;
        this.overrideBrokerProperties = properties;
        this.managedZk = true;
    }

    public CompletableFuture<Void> start() throws Exception {
        if (!this.brokerStarted) {
            synchronized (this) {
                if (!this.brokerStarted) {
                    return startBroker();
                }
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> startBroker() throws Exception {
        if (!this.managedZk) {
            if (this.zookeeperPort == -1) {
                this.zookeeper = new TestingServer(true);
                this.zookeeperPort = this.zookeeper.getPort();
            } else {
                this.zookeeper = new TestingServer(this.zookeeperPort, true);
            }
        }
        this.kafkaPort = this.kafkaPort == -1 ? InstanceSpec.getRandomPort() : this.kafkaPort;
        KafkaConfig buildKafkaConfig = buildKafkaConfig(this.zookeeper.getConnectString());
        LOGGER.info("Starting Kafka server with config: {}", buildKafkaConfig.props());
        this.kafkaServer = new KafkaServerStartable(buildKafkaConfig);
        this.brokerStarted = true;
        Integer num = this.kafkaServer.serverConfig().getInt(KafkaConfig.BrokerIdProp());
        if (num != null) {
            Files.write(this.kafkaLogDir.resolve("meta.properties"), ("version=0\nbroker.id=" + num).getBytes(StandardCharsets.UTF_8), new OpenOption[0]);
        }
        return CompletableFuture.runAsync(() -> {
            this.kafkaServer.startup();
        });
    }

    public void stop() throws ExecutionException, InterruptedException {
        if (this.brokerStarted) {
            synchronized (this) {
                if (this.brokerStarted) {
                    stopBroker();
                    this.brokerStarted = false;
                }
            }
        }
    }

    private void stopBroker() throws ExecutionException, InterruptedException {
        stopBrokerAsync().get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> stopBrokerAsync() {
        return CompletableFuture.runAsync(() -> {
            try {
                if (this.kafkaServer != null) {
                    LOGGER.info("Shutting down Kafka Server");
                    this.kafkaServer.shutdown();
                    this.kafkaServer.awaitShutdown();
                }
                if (this.zookeeper != null && !this.managedZk) {
                    LOGGER.info("Shutting down Zookeeper");
                    this.zookeeper.close();
                }
                if (Files.exists(this.kafkaLogDir, new LinkOption[0])) {
                    LOGGER.info("Deleting the log dir:  {}", this.kafkaLogDir);
                    Files.walkFileTree(this.kafkaLogDir, new SimpleFileVisitor<Path>() { // from class: com.github.charithe.kafka.EphemeralKafkaBroker.1
                        @Override // java.nio.file.SimpleFileVisitor, java.nio.file.FileVisitor
                        public FileVisitResult visitFile(Path path, BasicFileAttributes basicFileAttributes) throws IOException {
                            Files.deleteIfExists(path);
                            return FileVisitResult.CONTINUE;
                        }

                        @Override // java.nio.file.SimpleFileVisitor, java.nio.file.FileVisitor
                        public FileVisitResult postVisitDirectory(Path path, IOException iOException) throws IOException {
                            Files.deleteIfExists(path);
                            return FileVisitResult.CONTINUE;
                        }
                    });
                }
            } catch (Exception e) {
                LOGGER.error("Failed to clean-up Kafka", e);
            }
        });
    }

    private KafkaConfig buildKafkaConfig(String str) throws IOException {
        this.kafkaLogDir = Files.createTempDirectory("kafka_junit", new FileAttribute[0]);
        Properties properties = new Properties();
        properties.put("advertised.listeners", "PLAINTEXT://localhost:" + this.kafkaPort);
        properties.put("listeners", "PLAINTEXT://0.0.0.0:" + this.kafkaPort);
        properties.put("port", this.kafkaPort + "");
        properties.put("broker.id", "1");
        properties.put("log.dirs", this.kafkaLogDir.toAbsolutePath().toString());
        properties.put("zookeeper.connect", str);
        properties.put("leader.imbalance.check.interval.seconds", "1");
        properties.put("offsets.topic.num.partitions", "1");
        properties.put("offsets.topic.replication.factor", "1");
        properties.put("default.replication.factor", "1");
        properties.put("num.partitions", "1");
        properties.put("group.min.session.timeout.ms", "100");
        if (this.overrideBrokerProperties != null) {
            properties.putAll(this.overrideBrokerProperties);
        }
        return new KafkaConfig(properties);
    }

    public Properties producerConfig() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:" + this.kafkaPort);
        properties.put("acks", "1");
        properties.put("batch.size", "10");
        properties.put("client.id", "kafka-junit");
        properties.put("request.timeout.ms", "500");
        return properties;
    }

    public Properties consumerConfig() {
        return consumerConfig(true);
    }

    public Properties consumerConfig(boolean z) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:" + this.kafkaPort);
        properties.put("group.id", "kafka-junit-consumer");
        properties.put("enable.auto.commit", String.valueOf(z));
        properties.put("auto.commit.interval.ms", "10");
        properties.put("auto.offset.reset", "earliest");
        properties.put("heartbeat.interval.ms", "100");
        properties.put("session.timeout.ms", "200");
        properties.put("fetch.max.wait.ms", "200");
        properties.put("metadata.max.age.ms", "100");
        return properties;
    }

    public <K, V> KafkaProducer<K, V> createProducer(Serializer<K> serializer, Serializer<V> serializer2, Properties properties) {
        Properties producerConfig = producerConfig();
        if (properties != null) {
            producerConfig.putAll(properties);
        }
        serializer.configure(Maps.fromProperties(producerConfig), true);
        serializer2.configure(Maps.fromProperties(producerConfig), false);
        return new KafkaProducer<>(producerConfig, serializer, serializer2);
    }

    public <K, V> KafkaConsumer<K, V> createConsumer(Deserializer<K> deserializer, Deserializer<V> deserializer2, Properties properties) {
        Properties consumerConfig = consumerConfig();
        if (properties != null) {
            consumerConfig.putAll(properties);
        }
        deserializer.configure(Maps.fromProperties(consumerConfig), true);
        deserializer2.configure(Maps.fromProperties(consumerConfig), false);
        return new KafkaConsumer<>(consumerConfig, deserializer, deserializer2);
    }

    public Optional<Integer> getKafkaPort() {
        return this.brokerStarted ? Optional.of(Integer.valueOf(this.kafkaPort)) : Optional.empty();
    }

    public Optional<Integer> getZookeeperPort() {
        return this.brokerStarted ? Optional.of(Integer.valueOf(this.zookeeperPort)) : Optional.empty();
    }

    public Optional<String> getLogDir() {
        return this.brokerStarted ? Optional.of(this.kafkaLogDir.toString()) : Optional.empty();
    }

    public Optional<String> getZookeeperConnectString() {
        return this.brokerStarted ? Optional.of(this.zookeeper.getConnectString()) : Optional.empty();
    }

    public Optional<String> getBrokerList() {
        return this.brokerStarted ? Optional.of("localhost:" + this.kafkaPort) : Optional.empty();
    }

    public boolean isRunning() {
        return this.brokerStarted;
    }
}
