package com.github.charithe.kafka;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.server.KafkaConfig;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;

/* loaded from: input_file:com/github/charithe/kafka/EphemeralKafkaCluster.class */
public class EphemeralKafkaCluster implements AutoCloseable {
    private static final int ALLOCATE_RANDOM_PORT = -1;
    private int numBroker;
    private TestingServer zookeeper;
    private final List<EphemeralKafkaBroker> brokers = new ArrayList();

    private EphemeralKafkaCluster(int i, int i2) throws Exception {
        this.zookeeper = new TestingServer(i2);
        this.numBroker = i;
        for (int i3 = 0; i3 < i; i3++) {
            addBroker();
        }
    }

    public static EphemeralKafkaCluster create(int i) throws Exception {
        return create(i, ALLOCATE_RANDOM_PORT);
    }

    public static EphemeralKafkaCluster create(int i, int i2) throws Exception {
        return new EphemeralKafkaCluster(i, i2);
    }

    public boolean isHealthy() {
        return this.brokers.stream().filter(ephemeralKafkaBroker -> {
            return !ephemeralKafkaBroker.isRunning();
        }).count() == 0;
    }

    public boolean isRunning() {
        return this.brokers.stream().filter((v0) -> {
            return v0.isRunning();
        }).count() > 0;
    }

    public void stop() throws IOException, ExecutionException, InterruptedException {
        CompletableFuture.allOf((CompletableFuture[]) this.brokers.stream().map((v0) -> {
            return v0.stopBrokerAsync();
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).get();
        this.brokers.clear();
        this.zookeeper.stop();
    }

    private EphemeralKafkaBroker addBroker() throws Exception {
        int randomPort = InstanceSpec.getRandomPort();
        Properties properties = new Properties();
        properties.setProperty(KafkaConfig.BrokerIdProp(), this.brokers.size() + "");
        properties.setProperty(KafkaConfig.ZkConnectProp(), this.zookeeper.getConnectString());
        properties.setProperty(KafkaConfig.ControlledShutdownEnableProp(), "false");
        properties.setProperty(KafkaConfig.ControlledShutdownMaxRetriesProp(), "1");
        properties.setProperty(KafkaConfig.DeleteTopicEnableProp(), "true");
        properties.setProperty(KafkaConfig.PortProp(), "" + randomPort);
        properties.setProperty(KafkaConfig.SslEnabledProtocolsProp(), "false");
        properties.setProperty(KafkaConfig.AutoCreateTopicsEnableProp(), "true");
        properties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "300");
        properties.setProperty(KafkaConfig.ReplicaFetchWaitMaxMsProp(), "100");
        properties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "10");
        properties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), this.numBroker + "");
        properties.setProperty(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp(), "1");
        properties.setProperty(KafkaConfig.ZkSessionTimeoutMsProp(), "200");
        properties.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsDoc(), "200");
        properties.setProperty(KafkaConfig.AdvertisedHostNameProp(), "localhost");
        properties.setProperty(KafkaConfig.AdvertisedPortProp(), randomPort + "");
        properties.setProperty(KafkaConfig.AdvertisedListenersProp(), "PLAINTEXT://localhost:" + randomPort);
        properties.setProperty(KafkaConfig.HostNameProp(), "localhost");
        properties.setProperty(KafkaConfig.MinInSyncReplicasProp(), Math.max(1, this.numBroker - 1) + "");
        EphemeralKafkaBroker ephemeralKafkaBroker = new EphemeralKafkaBroker(this.zookeeper, randomPort, properties);
        ephemeralKafkaBroker.start().get();
        this.brokers.add(ephemeralKafkaBroker);
        return ephemeralKafkaBroker;
    }

    public List<EphemeralKafkaBroker> getBrokers() {
        return Collections.unmodifiableList(this.brokers);
    }

    public String connectionString() {
        return (String) this.brokers.stream().filter((v0) -> {
            return v0.isRunning();
        }).map((v0) -> {
            return v0.getBrokerList();
        }).map((v0) -> {
            return v0.get();
        }).collect(Collectors.joining(","));
    }

    public Properties producerConfig() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", connectionString());
        properties.put("acks", "all");
        properties.put("batch.size", "100");
        properties.put("client.id", "kafka-junit");
        properties.put("request.timeout.ms", "5000");
        properties.put("max.in.flight.requests.per.connection", "1");
        properties.put("retries", Integer.MAX_VALUE);
        properties.put("linger.ms", 0);
        return properties;
    }

    public Properties consumerConfig() {
        return consumerConfig(true);
    }

    public Properties consumerConfig(boolean z) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", connectionString());
        properties.put("group.id", "kafka-junit-consumer");
        properties.put("enable.auto.commit", String.valueOf(z));
        properties.put("auto.commit.interval.ms", "100");
        properties.put("auto.offset.reset", "earliest");
        properties.put("heartbeat.interval.ms", "100");
        properties.put("session.timeout.ms", "200");
        properties.put("fetch.max.wait.ms", "500");
        properties.put("metadata.max.age.ms", "100");
        return properties;
    }

    public void createTopics(String... strArr) throws ExecutionException, InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", connectionString());
        AdminClient create = AdminClient.create(hashMap);
        Throwable th = null;
        try {
            try {
                create.createTopics((List) Stream.of((Object[]) strArr).map(str -> {
                    return new NewTopic(str, this.numBroker, (short) this.numBroker);
                }).collect(Collectors.toList())).all().get();
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        stop();
    }
}
