package io.kroxylicious.testing.kafka.testcontainers;

import com.github.dockerjava.api.command.InspectContainerResponse;
import io.kroxylicious.testing.kafka.api.KafkaCluster;
import io.kroxylicious.testing.kafka.common.KafkaClusterConfig;
import io.kroxylicious.testing.kafka.common.ListeningSocketPreallocator;
import io.kroxylicious.testing.kafka.common.Utils;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.System;
import java.net.ServerSocket;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster.class */
public class TestcontainersKafkaCluster implements Startable, KafkaCluster {
    public static final int CLIENT_PORT = 9093;
    public static final int ANON_PORT = 9094;
    private static final int INTER_BROKER_PORT = 9092;
    private static final int CONTROLLER_PORT = 9091;
    public static final int ZOOKEEPER_PORT = 2181;
    private static final String QUAY_KAFKA_IMAGE_REPO = "quay.io/ogunalp/kafka-native";
    private static final String QUAY_ZOOKEEPER_IMAGE_REPO = "quay.io/ogunalp/zookeeper-native";
    private static final int READY_TIMEOUT_SECONDS = 120;
    private final DockerImageName kafkaImage;
    private final DockerImageName zookeeperImage;
    private final KafkaClusterConfig clusterConfig;
    private final Network network;
    private final ZookeeperContainer zookeeper;
    private final Collection<KafkaContainer> brokers;
    private final KafkaClusterConfig.KafkaEndpoints kafkaEndpoints;
    private List<ServerSocket> clientPorts;
    private List<ServerSocket> anonPorts;
    private static final System.Logger LOGGER = System.getLogger(TestcontainersKafkaCluster.class.getName());
    private static DockerImageName DEFAULT_KAFKA_IMAGE = DockerImageName.parse("quay.io/ogunalp/kafka-native:latest-snapshot");
    private static DockerImageName DEFAULT_ZOOKEEPER_IMAGE = DockerImageName.parse("quay.io/ogunalp/zookeeper-native:latest-snapshot");

    /* loaded from: input_file:io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster$KafkaContainer.class */
    public static class KafkaContainer extends LoggingGenericContainer<KafkaContainer> {
        public KafkaContainer(DockerImageName dockerImageName) {
            super(dockerImageName);
        }

        protected void addFixedExposedPort(int i, int i2) {
            super.addFixedExposedPort(i, i2);
        }
    }

    /* loaded from: input_file:io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster$LoggingGenericContainer.class */
    public static class LoggingGenericContainer<C extends GenericContainer<C>> extends GenericContainer<C> {
        private static final String CONTAINER_LOGS_DIR = "container.logs.dir";
        private String name;

        public LoggingGenericContainer(DockerImageName dockerImageName) {
            super(dockerImageName);
        }

        protected void containerIsStarting(InspectContainerResponse inspectContainerResponse) {
            super.containerIsStarting(inspectContainerResponse);
            Optional.ofNullable(System.getProperty(CONTAINER_LOGS_DIR)).ifPresent(str -> {
                Path of = Path.of(str, new String[0]);
                if (this.name != null) {
                    of = of.resolve(this.name);
                }
                Path resolve = of.resolve(String.format("%s.%s.%s", getContainerName().replaceFirst(File.separator, ""), getContainerId(), "log"));
                resolve.getParent().toFile().mkdirs();
                try {
                    FileWriter fileWriter = new FileWriter(resolve.toFile());
                    super.followOutput(outputFrame -> {
                        try {
                            if (outputFrame.equals(OutputFrame.END)) {
                                fileWriter.close();
                            } else {
                                fileWriter.write(outputFrame.getUtf8String());
                            }
                        } catch (IOException e) {
                        }
                    });
                } catch (IOException e) {
                    logger().warn("Failed to create container log file: {}", resolve);
                }
            });
        }

        public LoggingGenericContainer<C> withName(String str) {
            this.name = str;
            return this;
        }
    }

    /* loaded from: input_file:io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster$ZookeeperContainer.class */
    public static class ZookeeperContainer extends LoggingGenericContainer<ZookeeperContainer> {
        public ZookeeperContainer(DockerImageName dockerImageName) {
            super(dockerImageName);
        }
    }

    public TestcontainersKafkaCluster(KafkaClusterConfig kafkaClusterConfig) {
        this(null, null, kafkaClusterConfig);
    }

    public TestcontainersKafkaCluster(DockerImageName dockerImageName, DockerImageName dockerImageName2, final KafkaClusterConfig kafkaClusterConfig) {
        this.network = Network.newNetwork();
        setDefaultKafkaImage(kafkaClusterConfig.getKafkaVersion());
        this.kafkaImage = (DockerImageName) Optional.ofNullable(dockerImageName).orElse(DEFAULT_KAFKA_IMAGE);
        this.zookeeperImage = (DockerImageName) Optional.ofNullable(dockerImageName2).orElse(DEFAULT_ZOOKEEPER_IMAGE);
        this.clusterConfig = kafkaClusterConfig;
        String str = (String) Optional.ofNullable(kafkaClusterConfig.getTestInfo()).map((v0) -> {
            return v0.getDisplayName();
        }).map(str2 -> {
            return str2.replaceFirst("\\(\\)$", "");
        }).map(str3 -> {
            return String.format("%s.%s", str3, OffsetDateTime.now());
        }).orElse(null);
        if (this.clusterConfig.isKraftMode()) {
            this.zookeeper = null;
        } else {
            this.zookeeper = (ZookeeperContainer) ((ZookeeperContainer) new ZookeeperContainer(this.zookeeperImage).withName(str).withNetwork(this.network)).withNetworkAliases(new String[]{"zookeeper"});
        }
        ListeningSocketPreallocator listeningSocketPreallocator = new ListeningSocketPreallocator();
        try {
            this.clientPorts = (List) listeningSocketPreallocator.preAllocateListeningSockets(kafkaClusterConfig.getBrokersNum().intValue()).collect(Collectors.toList());
            this.anonPorts = (List) listeningSocketPreallocator.preAllocateListeningSockets(kafkaClusterConfig.getBrokersNum().intValue()).collect(Collectors.toList());
            listeningSocketPreallocator.close();
            this.kafkaEndpoints = new KafkaClusterConfig.KafkaEndpoints() { // from class: io.kroxylicious.testing.kafka.testcontainers.TestcontainersKafkaCluster.1
                @Override // io.kroxylicious.testing.kafka.common.KafkaClusterConfig.KafkaEndpoints
                public KafkaClusterConfig.KafkaEndpoints.EndpointPair getClientEndpoint(int i) {
                    return buildExposedEndpoint(i, TestcontainersKafkaCluster.CLIENT_PORT, TestcontainersKafkaCluster.this.clientPorts);
                }

                @Override // io.kroxylicious.testing.kafka.common.KafkaClusterConfig.KafkaEndpoints
                public KafkaClusterConfig.KafkaEndpoints.EndpointPair getAnonEndpoint(int i) {
                    return buildExposedEndpoint(i, TestcontainersKafkaCluster.ANON_PORT, TestcontainersKafkaCluster.this.anonPorts);
                }

                @Override // io.kroxylicious.testing.kafka.common.KafkaClusterConfig.KafkaEndpoints
                public KafkaClusterConfig.KafkaEndpoints.EndpointPair getInterBrokerEndpoint(int i) {
                    return KafkaClusterConfig.KafkaEndpoints.EndpointPair.builder().bind(new KafkaClusterConfig.KafkaEndpoints.Endpoint("0.0.0.0", TestcontainersKafkaCluster.INTER_BROKER_PORT)).connect(new KafkaClusterConfig.KafkaEndpoints.Endpoint(String.format("broker-%d", Integer.valueOf(i)), TestcontainersKafkaCluster.INTER_BROKER_PORT)).build();
                }

                @Override // io.kroxylicious.testing.kafka.common.KafkaClusterConfig.KafkaEndpoints
                public KafkaClusterConfig.KafkaEndpoints.EndpointPair getControllerEndpoint(int i) {
                    return kafkaClusterConfig.isKraftMode() ? KafkaClusterConfig.KafkaEndpoints.EndpointPair.builder().bind(new KafkaClusterConfig.KafkaEndpoints.Endpoint("0.0.0.0", TestcontainersKafkaCluster.CONTROLLER_PORT)).connect(new KafkaClusterConfig.KafkaEndpoints.Endpoint(String.format("broker-%d", Integer.valueOf(i)), TestcontainersKafkaCluster.CONTROLLER_PORT)).build() : KafkaClusterConfig.KafkaEndpoints.EndpointPair.builder().bind(new KafkaClusterConfig.KafkaEndpoints.Endpoint("0.0.0.0", TestcontainersKafkaCluster.ZOOKEEPER_PORT)).connect(new KafkaClusterConfig.KafkaEndpoints.Endpoint("zookeeper", TestcontainersKafkaCluster.ZOOKEEPER_PORT)).build();
                }

                private KafkaClusterConfig.KafkaEndpoints.EndpointPair buildExposedEndpoint(int i, int i2, List<ServerSocket> list) {
                    return KafkaClusterConfig.KafkaEndpoints.EndpointPair.builder().bind(new KafkaClusterConfig.KafkaEndpoints.Endpoint("0.0.0.0", i2)).connect(new KafkaClusterConfig.KafkaEndpoints.Endpoint("localhost", list.get(i).getLocalPort())).build();
                }
            };
            Supplier<KafkaClusterConfig.KafkaEndpoints> supplier = () -> {
                return this.kafkaEndpoints;
            };
            () -> {
                return new KafkaClusterConfig.KafkaEndpoints.Endpoint("zookeeper", ZOOKEEPER_PORT);
            };
            this.brokers = (Collection) kafkaClusterConfig.getBrokerConfigs(supplier).map(configHolder -> {
                KafkaContainer kafkaContainer = (KafkaContainer) ((KafkaContainer) new KafkaContainer(this.kafkaImage).withName(str).withNetwork(this.network)).withNetworkAliases(new String[]{"broker-" + configHolder.getBrokerNum()});
                copyHostKeyStoreToContainer(kafkaContainer, configHolder.getProperties(), "ssl.truststore.location");
                copyHostKeyStoreToContainer(kafkaContainer, configHolder.getProperties(), "ssl.keystore.location");
                ((KafkaContainer) ((KafkaContainer) ((KafkaContainer) kafkaContainer.withEnv("SERVER_PROPERTIES_FILE", "/cnf/server.properties")).withEnv("SERVER_CLUSTER_ID", configHolder.getKafkaKraftClusterId())).withCopyToContainer(Transferable.of(propertiesToBytes(configHolder.getProperties()), 420), "/cnf/server.properties")).withStartupTimeout(Duration.ofMinutes(2L));
                kafkaContainer.addFixedExposedPort(configHolder.getExternalPort().intValue(), CLIENT_PORT);
                kafkaContainer.addFixedExposedPort(configHolder.getAnonPort().intValue(), ANON_PORT);
                if (!this.clusterConfig.isKraftMode()) {
                    kafkaContainer.dependsOn(new Startable[]{this.zookeeper});
                }
                return kafkaContainer;
            }).collect(Collectors.toList());
        } catch (Throwable th) {
            try {
                listeningSocketPreallocator.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void setDefaultKafkaImage(String str) {
        DEFAULT_KAFKA_IMAGE = DockerImageName.parse("quay.io/ogunalp/kafka-native:" + str + "-snapshot");
        DEFAULT_ZOOKEEPER_IMAGE = DockerImageName.parse("quay.io/ogunalp/zookeeper-native:" + str + "-snapshot");
    }

    private static void copyHostKeyStoreToContainer(KafkaContainer kafkaContainer, Properties properties, String str) {
        if (properties.get(str) != null) {
            try {
                Path of = Path.of(String.valueOf(properties.get(str)), new String[0]);
                Path of2 = Path.of("/cnf", of.getFileName().toString());
                properties.put(str, of2.toString());
                kafkaContainer.withCopyToContainer(Transferable.of(Files.readAllBytes(of), 420), of2.toString());
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
    }

    private byte[] propertiesToBytes(Properties properties) {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                properties.store(byteArrayOutputStream, "server.properties");
                byte[] byteArray = byteArrayOutputStream.toByteArray();
                byteArrayOutputStream.close();
                return byteArray;
            } finally {
            }
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public String getBootstrapServers() {
        return this.clusterConfig.buildClientBootstrapServers(this.kafkaEndpoints);
    }

    public String getKafkaVersion() {
        return this.kafkaImage.getVersionPart();
    }

    private Stream<GenericContainer<?>> allContainers() {
        return Stream.concat(this.brokers.stream(), Stream.ofNullable(this.zookeeper));
    }

    public void start() {
        try {
            createNetwork();
            if (this.zookeeper != null) {
                this.zookeeper.start();
            }
            Startables.deepStart(this.brokers.stream()).get(120L, TimeUnit.SECONDS);
            Utils.awaitExpectedBrokerCountInCluster(this.clusterConfig.getAnonConnectConfigForCluster(this.kafkaEndpoints), READY_TIMEOUT_SECONDS, TimeUnit.SECONDS, this.clusterConfig.getBrokersNum());
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            stop();
            throw new RuntimeException("startup failed or timed out", e);
        }
    }

    private void createNetwork() {
        Unreliables.retryUntilSuccess(3, () -> {
            try {
                this.network.getId();
                return null;
            } catch (Throwable th) {
                this.network.close();
                return null;
            }
        });
    }

    public void close() {
        stop();
    }

    public void stop() {
        ((Stream) allContainers().parallel()).forEach((v0) -> {
            v0.stop();
        });
    }

    public String getClusterId() {
        return this.clusterConfig.clusterId();
    }

    public Map<String, Object> getKafkaClientConfiguration() {
        return this.clusterConfig.getConnectConfigForCluster(getBootstrapServers());
    }

    public Map<String, Object> getKafkaClientConfiguration(String str, String str2) {
        return this.clusterConfig.getConnectConfigForCluster(getBootstrapServers(), str, str2);
    }

    static {
        if (System.getenv().containsKey("TESTCONTAINERS_RYUK_DISABLED")) {
            return;
        }
        LOGGER.log(System.Logger.Level.WARNING, "As per https://github.com/containers/podman/issues/7927#issuecomment-731525556 if using podman, set env var TESTCONTAINERS_RYUK_DISABLED=true");
    }
}
