package io.kroxylicious.testing.kafka.invm;

import io.kroxylicious.testing.kafka.api.KafkaCluster;
import io.kroxylicious.testing.kafka.common.KafkaClusterConfig;
import io.kroxylicious.testing.kafka.common.ListeningSocketPreallocator;
import io.kroxylicious.testing.kafka.common.Utils;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.System;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRaftServer;
import kafka.server.KafkaServer;
import kafka.server.Server;
import kafka.tools.StorageTool;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.jetbrains.annotations.NotNull;
import scala.Option;

/* loaded from: input_file:io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.class */
public class InVMKafkaCluster implements KafkaCluster {
    private static final System.Logger LOGGER = System.getLogger(InVMKafkaCluster.class.getName());
    private final KafkaClusterConfig clusterConfig;
    private final Path tempDirectory;
    private final ServerCnxnFactory zooFactory;
    private final ZooKeeperServer zooServer;
    private final List<Server> servers;
    private final KafkaClusterConfig.KafkaEndpoints kafkaEndpoints;
    private final List<ServerSocket> externalPorts;
    private final List<ServerSocket> anonPorts;
    private final List<ServerSocket> interBrokerPorts;
    private final List<ServerSocket> controllerPorts;

    public InVMKafkaCluster(KafkaClusterConfig kafkaClusterConfig) {
        this.clusterConfig = kafkaClusterConfig;
        try {
            this.tempDirectory = Files.createTempDirectory("kafka", new FileAttribute[0]);
            this.tempDirectory.toFile().deleteOnExit();
            ListeningSocketPreallocator listeningSocketPreallocator = new ListeningSocketPreallocator();
            try {
                this.externalPorts = (List) listeningSocketPreallocator.preAllocateListeningSockets(kafkaClusterConfig.getBrokersNum().intValue()).collect(Collectors.toUnmodifiableList());
                this.anonPorts = (List) listeningSocketPreallocator.preAllocateListeningSockets(kafkaClusterConfig.getBrokersNum().intValue()).collect(Collectors.toUnmodifiableList());
                this.interBrokerPorts = (List) listeningSocketPreallocator.preAllocateListeningSockets(kafkaClusterConfig.getBrokersNum().intValue()).collect(Collectors.toUnmodifiableList());
                this.controllerPorts = allocateControllerPorts(kafkaClusterConfig, listeningSocketPreallocator);
                listeningSocketPreallocator.close();
                if (kafkaClusterConfig.isKraftMode()) {
                    this.zooFactory = null;
                    this.zooServer = null;
                } else {
                    this.zooFactory = ServerCnxnFactory.createFactory(new InetSocketAddress("localhost", Integer.valueOf(this.controllerPorts.get(0).getLocalPort()).intValue()), 1024);
                    Path resolve = this.tempDirectory.resolve("zoo");
                    Path resolve2 = resolve.resolve("snapshot");
                    Path resolve3 = resolve.resolve("log");
                    resolve2.toFile().mkdirs();
                    resolve3.toFile().mkdirs();
                    this.zooServer = new ZooKeeperServer(resolve2.toFile(), resolve3.toFile(), 500);
                }
                this.kafkaEndpoints = new KafkaClusterConfig.KafkaEndpoints() { // from class: io.kroxylicious.testing.kafka.invm.InVMKafkaCluster.1
                    @Override // io.kroxylicious.testing.kafka.common.KafkaClusterConfig.KafkaEndpoints
                    public KafkaClusterConfig.KafkaEndpoints.EndpointPair getClientEndpoint(int i) {
                        return buildEndpointPair(InVMKafkaCluster.this.externalPorts, i);
                    }

                    @Override // io.kroxylicious.testing.kafka.common.KafkaClusterConfig.KafkaEndpoints
                    public KafkaClusterConfig.KafkaEndpoints.EndpointPair getAnonEndpoint(int i) {
                        return buildEndpointPair(InVMKafkaCluster.this.anonPorts, i);
                    }

                    @Override // io.kroxylicious.testing.kafka.common.KafkaClusterConfig.KafkaEndpoints
                    public KafkaClusterConfig.KafkaEndpoints.EndpointPair getInterBrokerEndpoint(int i) {
                        return buildEndpointPair(InVMKafkaCluster.this.interBrokerPorts, i);
                    }

                    @Override // io.kroxylicious.testing.kafka.common.KafkaClusterConfig.KafkaEndpoints
                    public KafkaClusterConfig.KafkaEndpoints.EndpointPair getControllerEndpoint(int i) {
                        return buildEndpointPair(InVMKafkaCluster.this.controllerPorts, i);
                    }

                    private KafkaClusterConfig.KafkaEndpoints.EndpointPair buildEndpointPair(List<ServerSocket> list, int i) {
                        ServerSocket serverSocket = list.get(i);
                        return KafkaClusterConfig.KafkaEndpoints.EndpointPair.builder().bind(new KafkaClusterConfig.KafkaEndpoints.Endpoint("0.0.0.0", serverSocket.getLocalPort())).connect(new KafkaClusterConfig.KafkaEndpoints.Endpoint("localhost", serverSocket.getLocalPort())).build();
                    }
                };
                this.servers = (List) kafkaClusterConfig.getBrokerConfigs(() -> {
                    return this.kafkaEndpoints;
                }).map(this::buildKafkaServer).collect(Collectors.toList());
            } finally {
            }
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private List<ServerSocket> allocateControllerPorts(KafkaClusterConfig kafkaClusterConfig, ListeningSocketPreallocator listeningSocketPreallocator) {
        return kafkaClusterConfig.isKraftMode() ? (List) listeningSocketPreallocator.preAllocateListeningSockets(kafkaClusterConfig.getBrokersNum().intValue()).collect(Collectors.toUnmodifiableList()) : (List) listeningSocketPreallocator.preAllocateListeningSockets(1).collect(Collectors.toUnmodifiableList());
    }

    @NotNull
    private Server buildKafkaServer(KafkaClusterConfig.ConfigHolder configHolder) {
        KafkaConfig buildBrokerConfig = buildBrokerConfig(configHolder, this.tempDirectory);
        Option apply = Option.apply((Object) null);
        if (!this.clusterConfig.isKraftMode()) {
            return new KafkaServer(buildBrokerConfig, Time.SYSTEM, apply, false);
        }
        StorageTool.formatCommand(System.out, StorageTool.configToLogDirectories(buildBrokerConfig), StorageTool.buildMetadataProperties(configHolder.getKafkaKraftClusterId(), buildBrokerConfig), MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, true);
        return new KafkaRaftServer(buildBrokerConfig, Time.SYSTEM, apply);
    }

    @NotNull
    private KafkaConfig buildBrokerConfig(KafkaClusterConfig.ConfigHolder configHolder, Path path) {
        Properties properties = new Properties();
        properties.putAll(configHolder.getProperties());
        properties.setProperty(KafkaConfig.LogDirProp(), path.resolve(String.format("broker-%d", Integer.valueOf(configHolder.getBrokerNum()))).toAbsolutePath().toString());
        LOGGER.log(System.Logger.Level.DEBUG, "Generated config {0}", new Object[]{properties});
        return new KafkaConfig(properties);
    }

    public void start() {
        if (this.zooFactory != null) {
            try {
                this.zooFactory.startup(this.zooServer);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e2);
            }
        }
        ((Stream) this.servers.stream().parallel()).forEach((v0) -> {
            v0.startup();
        });
        Utils.awaitExpectedBrokerCountInCluster(this.clusterConfig.getAnonConnectConfigForCluster(this.kafkaEndpoints), 120, TimeUnit.SECONDS, this.clusterConfig.getBrokersNum());
    }

    public String getClusterId() {
        return this.clusterConfig.clusterId();
    }

    public String getBootstrapServers() {
        return this.clusterConfig.buildClientBootstrapServers(this.kafkaEndpoints);
    }

    public Map<String, Object> getKafkaClientConfiguration() {
        return this.clusterConfig.getConnectConfigForCluster(getBootstrapServers());
    }

    public Map<String, Object> getKafkaClientConfiguration(String str, String str2) {
        return this.clusterConfig.getConnectConfigForCluster(getBootstrapServers(), str, str2);
    }

    /* JADX WARN: Finally extract failed */
    public void close() throws Exception {
        Stream<R> map;
        try {
            try {
                ((Stream) this.servers.stream().parallel()).forEach((v0) -> {
                    v0.shutdown();
                });
                if (this.zooServer != null) {
                    this.zooServer.shutdown(true);
                }
                releaseAllPorts();
                if (this.tempDirectory.toFile().exists()) {
                    map = Files.walk(this.tempDirectory, new FileVisitOption[0]).sorted(Comparator.reverseOrder()).map((v0) -> {
                        return v0.toFile();
                    });
                    try {
                        map.forEach((v0) -> {
                            v0.delete();
                        });
                        if (map != 0) {
                            map.close();
                        }
                    } finally {
                    }
                }
            } catch (Throwable th) {
                releaseAllPorts();
                if (this.tempDirectory.toFile().exists()) {
                    map = Files.walk(this.tempDirectory, new FileVisitOption[0]).sorted(Comparator.reverseOrder()).map((v0) -> {
                        return v0.toFile();
                    });
                    try {
                        map.forEach((v0) -> {
                            v0.delete();
                        });
                        if (map != 0) {
                            map.close();
                        }
                    } finally {
                    }
                }
                throw th;
            }
        } catch (Throwable th2) {
            if (this.zooServer != null) {
                this.zooServer.shutdown(true);
            }
            throw th2;
        }
    }

    public int getNumOfBrokers() {
        return this.clusterConfig.getBrokersNum().intValue();
    }

    private void releaseAllPorts() {
        releasePorts(this.controllerPorts);
        releasePorts(this.interBrokerPorts);
        releasePorts(this.anonPorts);
        releasePorts(this.externalPorts);
    }

    private void releasePorts(List<ServerSocket> list) {
        list.forEach(serverSocket -> {
            try {
                serverSocket.close();
            } catch (IOException e) {
                LOGGER.log(System.Logger.Level.WARNING, "failed to close socket: {0} due to: {1}", new Object[]{serverSocket, e.getMessage(), e});
            }
        });
    }
}
