/*
 * Decompiled with CFR 0.152.
 */
package io.kroxylicious.testing.kafka.testcontainers;

import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.command.CreateVolumeCmd;
import com.github.dockerjava.api.command.CreateVolumeResponse;
import com.github.dockerjava.api.command.InspectContainerCmd;
import com.github.dockerjava.api.command.InspectContainerResponse;
import com.github.dockerjava.api.command.StopContainerCmd;
import com.github.dockerjava.api.command.WaitContainerCmd;
import com.github.dockerjava.api.exception.NotFoundException;
import com.github.dockerjava.api.model.Bind;
import com.github.dockerjava.api.model.Version;
import com.github.dockerjava.api.model.VersionComponent;
import com.github.dockerjava.api.model.Volume;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.kroxylicious.testing.kafka.api.KafkaCluster;
import io.kroxylicious.testing.kafka.api.TerminationStyle;
import io.kroxylicious.testing.kafka.clients.CloseableAdmin;
import io.kroxylicious.testing.kafka.common.KafkaClusterConfig;
import io.kroxylicious.testing.kafka.common.PortAllocator;
import io.kroxylicious.testing.kafka.common.Utils;
import io.kroxylicious.testing.kafka.internal.AdminSource;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.IntPredicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.TestInfo;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.startupcheck.OneShotStartupCheckStrategy;
import org.testcontainers.containers.startupcheck.StartupCheckStrategy;
import org.testcontainers.dockerclient.DockerClientProviderStrategy;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;

public class TestcontainersKafkaCluster
implements Startable,
KafkaCluster,
KafkaClusterConfig.KafkaEndpoints,
AdminSource {
    private static final System.Logger LOGGER = System.getLogger(TestcontainersKafkaCluster.class.getName());
    public static final int CLIENT_PORT = 9093;
    public static final int ANON_PORT = 9094;
    private static final int INTER_BROKER_PORT = 9092;
    private static final int CONTROLLER_PORT = 9091;
    private static final int ZOOKEEPER_PORT = 2181;
    private static final String QUAY_KAFKA_IMAGE_REPO = "quay.io/ogunalp/kafka-native";
    private static final String QUAY_ZOOKEEPER_IMAGE_REPO = "quay.io/ogunalp/zookeeper-native";
    private static final int CONTAINER_STARTUP_ATTEMPTS = 3;
    private static final Duration STARTUP_TIMEOUT = Duration.ofMinutes(2L);
    private static final Duration RESTART_BACKOFF_DELAY = Duration.ofMillis(2500L);
    private static final DateTimeFormatter NAME_DATE_TIME_FORMAT = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss");
    private static final Duration MINIMUM_RUNNING_DURATION = Duration.ofMillis(500L);
    private static final boolean CONTAINER_ENGINE_PODMAN = TestcontainersKafkaCluster.isContainerEnginePodman();
    private static final String KAFKA_CONTAINER_MOUNT_POINT = "/kafka";
    public static final String WILDCARD_BIND_ADDRESS = "0.0.0.0";
    private static final DockerImageName LATEST_KAFKA_IMAGE = DockerImageName.parse((String)"quay.io/ogunalp/kafka-native").withTag("latest");
    private static final DockerImageName LATEST_ZOOKEEPER_IMAGE = DockerImageName.parse((String)"quay.io/ogunalp/zookeeper-native").withTag("latest");
    private static final String KAFKA_CONTAINER_UID = "1001";
    private static final int READY_TIMEOUT_SECONDS = 120;
    private final DockerImageName kafkaImage;
    private final DockerImageName zookeeperImage;
    private final KafkaClusterConfig clusterConfig;
    private final String logDirVolumeName = TestcontainersKafkaCluster.createNamedVolume();
    private final Network network = Network.newNetwork();
    private final String name;
    private final ZookeeperContainer zookeeper;
    private final Map<Integer, KafkaContainer> nodes = new TreeMap<Integer, KafkaContainer>();
    private final Set<Integer> stoppedBrokers = new HashSet<Integer>();
    private final PortAllocator portsAllocator = new PortAllocator();
    private static final Set<String> volumesPendingCleanup = ConcurrentHashMap.newKeySet();

    public TestcontainersKafkaCluster(KafkaClusterConfig clusterConfig) {
        this(null, null, clusterConfig);
    }

    public TestcontainersKafkaCluster(DockerImageName kafkaImage, DockerImageName zookeeperImage, KafkaClusterConfig clusterConfig) {
        DockerImageName actualKafkaImage = Optional.ofNullable(kafkaImage).orElse(LATEST_KAFKA_IMAGE);
        DockerImageName actualZookeeperImage = Optional.ofNullable(zookeeperImage).orElse(LATEST_ZOOKEEPER_IMAGE);
        this.kafkaImage = this.overrideContainerImageTagIfNecessary(actualKafkaImage, clusterConfig.getKafkaVersion());
        this.zookeeperImage = this.overrideContainerImageTagIfNecessary(actualZookeeperImage, clusterConfig.getKafkaVersion());
        this.clusterConfig = clusterConfig;
        this.name = Optional.ofNullable(clusterConfig.getTestInfo()).map(TestInfo::getDisplayName).map(s -> s.replaceFirst("\\(\\)$", "")).map(s -> String.format("%s.%s", s, NAME_DATE_TIME_FORMAT.format(OffsetDateTime.now(Clock.systemUTC())))).orElse(null);
        this.zookeeper = this.clusterConfig.isKraftMode() ? null : (ZookeeperContainer)((ZookeeperContainer)((ZookeeperContainer)((ZookeeperContainer)new ZookeeperContainer(this.zookeeperImage).withName(this.name).withNetwork(this.network)).withMinimumRunningDuration(MINIMUM_RUNNING_DURATION)).withStartupAttempts(3)).withNetworkAliases(new String[]{"zookeeper"});
        try (PortAllocator.PortAllocationSession portAllocationSession = this.portsAllocator.allocationSession();){
            portAllocationSession.allocate(Set.of(KafkaClusterConfig.KafkaEndpoints.Listener.EXTERNAL, KafkaClusterConfig.KafkaEndpoints.Listener.ANON), 0, clusterConfig.getBrokersNum());
            portAllocationSession.allocate(Set.of(KafkaClusterConfig.KafkaEndpoints.Listener.CONTROLLER), 0, clusterConfig.getKraftControllers());
        }
        clusterConfig.getBrokerConfigs(() -> this).forEach(holder -> this.nodes.put(holder.getBrokerNum(), this.buildKafkaContainer((KafkaClusterConfig.ConfigHolder)holder)));
    }

    @NonNull
    private KafkaContainer buildKafkaContainer(KafkaClusterConfig.ConfigHolder holder) {
        String netAlias = "broker-" + holder.getBrokerNum();
        Properties properties = new Properties();
        properties.putAll((Map<?, ?>)holder.getProperties());
        properties.put("log.dir", this.getBrokerLogDirectory(holder.getBrokerNum()));
        KafkaContainer kafkaContainer = (KafkaContainer)((KafkaContainer)new KafkaContainer(this.kafkaImage).withName(this.name).withNetwork(this.network)).withNetworkAliases(new String[]{netAlias});
        TestcontainersKafkaCluster.copyHostKeyStoreToContainer(kafkaContainer, properties, "ssl.truststore.location");
        TestcontainersKafkaCluster.copyHostKeyStoreToContainer(kafkaContainer, properties, "ssl.keystore.location");
        ((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)kafkaContainer.withEnv("SERVER_PROPERTIES_FILE", "/cnf/server.properties")).withEnv("SERVER_CLUSTER_ID", holder.getKafkaKraftClusterId())).withEnv("SERVER_AUTO_CONFIGURE", "false")).withCopyToContainer(Transferable.of((byte[])this.propertiesToBytes(properties), (int)420), "/cnf/server.properties")).withStartupAttempts(3)).withMinimumRunningDuration(MINIMUM_RUNNING_DURATION)).withStartupTimeout(STARTUP_TIMEOUT);
        if (this.clusterConfig.isSaslScram() && !this.clusterConfig.getUsers().isEmpty()) {
            kafkaContainer.withEnv("SERVER_SCRAM_CREDENTIALS", this.buildScramUsersEnvVar());
        }
        if (holder.isBroker()) {
            kafkaContainer.addFixedExposedPort(holder.getExternalPort(), 9093);
            kafkaContainer.addFixedExposedPort(holder.getAnonPort(), 9094);
        }
        kafkaContainer.addGenericBind(new Bind(this.logDirVolumeName, new Volume(KAFKA_CONTAINER_MOUNT_POINT)));
        if (!this.clusterConfig.isKraftMode()) {
            kafkaContainer.dependsOn(new Startable[]{this.zookeeper});
        }
        return kafkaContainer;
    }

    @NonNull
    private String getBrokerLogDirectory(int brokerNum) {
        return "/kafka/broker-" + brokerNum;
    }

    private static void copyHostKeyStoreToContainer(KafkaContainer container, Properties properties, String key) {
        if (properties.get(key) != null) {
            try {
                Path hostPath = Path.of(String.valueOf(properties.get(key)), new String[0]);
                Path containerPath = Path.of("/cnf", hostPath.getFileName().toString());
                properties.put(key, containerPath.toString());
                container.withCopyToContainer(Transferable.of((byte[])Files.readAllBytes(hostPath), (int)420), containerPath.toString());
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
    }

    private byte[] propertiesToBytes(Properties properties) {
        byte[] byArray;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            properties.store(byteArrayOutputStream, "server.properties");
            byArray = byteArrayOutputStream.toByteArray();
        }
        catch (Throwable throwable) {
            try {
                try {
                    byteArrayOutputStream.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
                throw throwable;
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
        byteArrayOutputStream.close();
        return byArray;
    }

    public synchronized String getBootstrapServers() {
        return this.buildBrokerList(nodeId -> this.getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.EXTERNAL, (int)nodeId));
    }

    private synchronized String buildBrokerList(Function<Integer, KafkaClusterConfig.KafkaEndpoints.EndpointPair> endpointFunc) {
        return this.nodes.keySet().stream().filter(this::isBroker).map(endpointFunc).map(KafkaClusterConfig.KafkaEndpoints.EndpointPair::connectAddress).collect(Collectors.joining(","));
    }

    private DockerImageName overrideContainerImageTagIfNecessary(@NonNull DockerImageName image, @Nullable String overrideVersion) {
        if (overrideVersion == null || overrideVersion.equalsIgnoreCase(image.getVersionPart())) {
            return image;
        }
        if (overrideVersion.equalsIgnoreCase("latest-snapshot")) {
            return image.withTag("latest-snapshot");
        }
        if (Pattern.matches("\\d+(\\.\\d+(\\.\\d+)?)?", overrideVersion)) {
            return image.withTag("latest-kafka-" + overrideVersion);
        }
        return image;
    }

    public String getKafkaVersion() {
        String v = this.kafkaImage.getVersionPart();
        if (v != null) {
            if ("latest".equalsIgnoreCase(v)) {
                return "latest";
            }
            if ("latest-snapshot".equalsIgnoreCase(v)) {
                return "latest-snapshot";
            }
            v = v.replaceFirst("^latest-kafka-", "");
        }
        return v;
    }

    private synchronized Stream<GenericContainer<?>> allContainers() {
        return Stream.concat(this.nodes.values().stream(), Stream.ofNullable(this.zookeeper));
    }

    public synchronized void start() {
        try {
            if (this.zookeeper != null) {
                this.zookeeper.start();
            }
            Startables.deepStart(this.nodes.values().stream()).get(120L, TimeUnit.SECONDS);
            Utils.awaitExpectedBrokerCountInClusterViaTopic(this.clusterConfig.getAnonConnectConfigForCluster(this.buildBrokerList(nodeId -> this.getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.ANON, (int)nodeId))), 120, TimeUnit.SECONDS, this.clusterConfig.getBrokersNum());
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            this.stop();
            throw new RuntimeException("startup failed or timed out", e);
        }
    }

    public synchronized int addBroker() {
        OptionalInt first = IntStream.rangeClosed(0, this.getNumOfBrokers()).filter(cand -> !this.nodes.containsKey(cand)).findFirst();
        if (first.isEmpty()) {
            throw new IllegalStateException("Could not determine new nodeId, existing set " + this.nodes.keySet());
        }
        int newNodeId = first.getAsInt();
        LOGGER.log(System.Logger.Level.DEBUG, "Adding broker with node.id {0} to cluster with existing nodes {1}.", newNodeId, this.nodes.keySet());
        try (PortAllocator.PortAllocationSession portAllocationSession = this.portsAllocator.allocationSession();){
            portAllocationSession.allocate(Set.of(KafkaClusterConfig.KafkaEndpoints.Listener.EXTERNAL, KafkaClusterConfig.KafkaEndpoints.Listener.ANON), newNodeId);
        }
        KafkaClusterConfig.ConfigHolder configHolder = this.clusterConfig.generateConfigForSpecificNode(this, newNodeId);
        KafkaContainer kafkaContainer = this.buildKafkaContainer(configHolder);
        try {
            Startables.deepStart((Startable[])new Startable[]{kafkaContainer}).get(120L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            kafkaContainer.stop();
            throw new RuntimeException(e);
        }
        this.nodes.put(configHolder.getBrokerNum(), kafkaContainer);
        Utils.awaitExpectedBrokerCountInClusterViaTopic(this.clusterConfig.getAnonConnectConfigForCluster(this.buildBrokerList(nodeId -> this.getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.ANON, (int)nodeId))), 120, TimeUnit.SECONDS, this.getNumOfBrokers());
        return configHolder.getBrokerNum();
    }

    public synchronized void removeBroker(int nodeId) throws UnsupportedOperationException, IllegalArgumentException, IllegalStateException {
        if (!this.nodes.containsKey(nodeId)) {
            throw new IllegalArgumentException("Broker node " + nodeId + " is not a member of the cluster.");
        }
        if (this.clusterConfig.isKraftMode() && this.isController(nodeId)) {
            throw new UnsupportedOperationException("Cannot remove controller node " + nodeId + " from a kraft cluster.");
        }
        if (this.nodes.size() < 2) {
            throw new IllegalArgumentException("Cannot remove a node from a cluster with only %d nodes".formatted(this.nodes.size()));
        }
        if (!this.stoppedBrokers.isEmpty()) {
            throw new IllegalStateException("Cannot remove nodes from a cluster with stopped nodes.");
        }
        Optional<Integer> target = this.nodes.keySet().stream().filter(n -> n != nodeId).findFirst();
        if (target.isEmpty()) {
            throw new IllegalStateException("Could not identify a node to be the re-assignment target");
        }
        Utils.awaitReassignmentOfKafkaInternalTopicsIfNecessary(this.clusterConfig.getAnonConnectConfigForCluster(this.buildBrokerList(id -> this.getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.ANON, (int)id))), nodeId, target.get(), 120, TimeUnit.SECONDS);
        this.portsAllocator.deallocate(nodeId);
        this.gracefulStop(this.nodes.remove(nodeId));
        try (OneShotContainer cleanBrokerLogDir = new OneShotContainer();){
            cleanBrokerLogDir.withName("cleanBrokerLogDir").addGenericBind(new Bind(this.logDirVolumeName, new Volume(KAFKA_CONTAINER_MOUNT_POINT))).withCommand(new String[]{"rm", "-rf", this.getBrokerLogDirectory(nodeId)});
            cleanBrokerLogDir.start();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void gracefulStop(KafkaContainer kafkaContainer) {
        String containerId = kafkaContainer.getContainerId();
        try (StopContainerCmd stopContainerCmd = kafkaContainer.getDockerClient().stopContainerCmd(containerId);
             WaitContainerCmd waitCmd = kafkaContainer.getDockerClient().waitContainerCmd(containerId);){
            stopContainerCmd.exec();
            Integer statusCode = waitCmd.start().awaitStatusCode(10L, TimeUnit.SECONDS);
            LOGGER.log(System.Logger.Level.DEBUG, "Shut-down broker {0}, exit status {1}", containerId, statusCode);
        }
        catch (Exception e) {
            LOGGER.log(System.Logger.Level.WARNING, "Ignoring exception whilst shutting down broker {0}", containerId, e);
        }
        finally {
            kafkaContainer.stop();
        }
    }

    public synchronized void stopNodes(IntPredicate nodeIdPredicate, TerminationStyle terminationStyle) {
        List<Map.Entry> kafkaContainersToStop = this.nodes.entrySet().stream().filter(e -> nodeIdPredicate.test((Integer)e.getKey())).filter(e -> !this.stoppedBrokers.contains(e.getKey())).toList();
        if (kafkaContainersToStop.isEmpty()) {
            return;
        }
        LOGGER.log(System.Logger.Level.DEBUG, "Stopping {0}/{1} nodes(s)", kafkaContainersToStop.size(), this.getNumOfBrokers());
        Map<Integer, InspectContainerCmd> inspectCommands = kafkaContainersToStop.stream().collect(Collectors.toMap(Map.Entry::getKey, e -> this.buildInspectionCommandFor((KafkaContainer)((Object)((Object)e.getValue())))));
        kafkaContainersToStop.stream().map(Map.Entry::getValue).forEach(kc -> {
            if (terminationStyle == TerminationStyle.ABRUPT) {
                kc.stop();
            } else {
                this.gracefulStop((KafkaContainer)((Object)kc));
            }
        });
        inspectCommands.forEach((nodeId, inspectContainer) -> Utils.awaitCondition(Long.valueOf(STARTUP_TIMEOUT.toMillis()).intValue(), TimeUnit.MILLISECONDS).until(() -> {
            try {
                inspectContainer.exec();
            }
            catch (NotFoundException e) {
                this.stoppedBrokers.add((Integer)nodeId);
                return true;
            }
            return false;
        }));
        if (this.zookeeper != null) {
            Optional<KafkaClusterConfig.ConfigHolder> config = this.clusterConfig.getBrokerConfigs(() -> this).findFirst();
            Optional<Long> zkSessionTimeout = config.map(KafkaClusterConfig.ConfigHolder::getProperties).map(p -> p.getProperty("zookeeper.session.timeout.ms", "0")).map(Long::parseLong);
            zkSessionTimeout.filter(timeout -> timeout > 0L).ifPresent(timeOut -> {
                try {
                    LOGGER.log(System.Logger.Level.DEBUG, "Awaiting zookeeper session timeout {0}ms so that the broker ephemeral nodes expire.", timeOut);
                    Thread.sleep(timeOut);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
        }
    }

    public synchronized void startNodes(IntPredicate nodeIdPredicate) {
        List<Map.Entry> kafkaContainersToStart = this.nodes.entrySet().stream().filter(e -> nodeIdPredicate.test((Integer)e.getKey())).filter(e -> this.stoppedBrokers.contains(e.getKey())).toList();
        if (kafkaContainersToStart.isEmpty()) {
            return;
        }
        LOGGER.log(System.Logger.Level.DEBUG, "Starting {0}/{1} node(s)", kafkaContainersToStart.size(), this.getNumOfBrokers());
        kafkaContainersToStart.forEach(entry -> {
            Integer nodeId = (Integer)entry.getKey();
            KafkaContainer kc = (KafkaContainer)((Object)((Object)entry.getValue()));
            int originalStartupAttempts = kc.getStartupAttempts();
            try {
                kc.withStartupAttempts(1);
                Awaitility.waitAtMost((Duration)STARTUP_TIMEOUT).pollDelay(RESTART_BACKOFF_DELAY).until(() -> {
                    try {
                        kc.start();
                        return true;
                    }
                    catch (Exception e) {
                        kc.stop();
                        LOGGER.log(System.Logger.Level.DEBUG, "Failed to restart container", (Throwable)e);
                        return false;
                    }
                });
            }
            finally {
                kc.setStartupAttempts(originalStartupAttempts);
                this.stoppedBrokers.remove(nodeId);
            }
        });
    }

    public void close() {
        this.stop();
    }

    public synchronized int getNumOfBrokers() {
        return this.nodes.size();
    }

    public synchronized Set<Integer> getStoppedBrokers() {
        return Set.copyOf(this.stoppedBrokers);
    }

    public synchronized void stop() {
        try {
            ((Stream)this.allContainers().parallel()).forEach(GenericContainer::stop);
        }
        finally {
            try {
                this.network.close();
            }
            finally {
                TestcontainersKafkaCluster.removeNamedVolume(this.logDirVolumeName);
            }
        }
    }

    public String getClusterId() {
        return this.clusterConfig.clusterId();
    }

    public Map<String, Object> getKafkaClientConfiguration() {
        return this.clusterConfig.getConnectConfigForCluster(this.getBootstrapServers());
    }

    public Map<String, Object> getKafkaClientConfiguration(String user, String password) {
        return this.clusterConfig.getConnectConfigForCluster(this.getBootstrapServers(), user, password);
    }

    @Override
    public synchronized KafkaClusterConfig.KafkaEndpoints.EndpointPair getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener listener, int nodeId) {
        switch (listener) {
            case EXTERNAL: {
                return this.buildExposedEndpoint(listener, nodeId, 9093);
            }
            case ANON: {
                return this.buildExposedEndpoint(listener, nodeId, 9094);
            }
            case INTERNAL: {
                return KafkaClusterConfig.KafkaEndpoints.EndpointPair.builder().bind(new KafkaClusterConfig.KafkaEndpoints.Endpoint(WILDCARD_BIND_ADDRESS, 9092)).connect(new KafkaClusterConfig.KafkaEndpoints.Endpoint(String.format("broker-%d", nodeId), 9092)).build();
            }
            case CONTROLLER: {
                KafkaClusterConfig.KafkaEndpoints.EndpointPair result = this.clusterConfig.isKraftMode() ? KafkaClusterConfig.KafkaEndpoints.EndpointPair.builder().bind(new KafkaClusterConfig.KafkaEndpoints.Endpoint(WILDCARD_BIND_ADDRESS, 9091)).connect(new KafkaClusterConfig.KafkaEndpoints.Endpoint(String.format("broker-%d", nodeId), 9091)).build() : KafkaClusterConfig.KafkaEndpoints.EndpointPair.builder().bind(new KafkaClusterConfig.KafkaEndpoints.Endpoint(WILDCARD_BIND_ADDRESS, 2181)).connect(new KafkaClusterConfig.KafkaEndpoints.Endpoint("zookeeper", 2181)).build();
                return result;
            }
        }
        throw new IllegalStateException("Unexpected value: " + listener);
    }

    private KafkaClusterConfig.KafkaEndpoints.EndpointPair buildExposedEndpoint(KafkaClusterConfig.KafkaEndpoints.Listener listener, int nodeId, int bindPort) {
        return KafkaClusterConfig.KafkaEndpoints.EndpointPair.builder().bind(new KafkaClusterConfig.KafkaEndpoints.Endpoint(WILDCARD_BIND_ADDRESS, bindPort)).connect(new KafkaClusterConfig.KafkaEndpoints.Endpoint("localhost", this.portsAllocator.getPort(listener, nodeId))).build();
    }

    private InspectContainerCmd buildInspectionCommandFor(KafkaContainer kc) {
        return kc.getDockerClient().inspectContainerCmd(kc.getContainerId());
    }

    private static DockerClient createDockerClient() {
        ArrayList configurationStrategies = new ArrayList();
        ServiceLoader.load(DockerClientProviderStrategy.class).forEach(configurationStrategies::add);
        DockerClientProviderStrategy firstValidStrategy = DockerClientProviderStrategy.getFirstValidStrategy(configurationStrategies);
        return firstValidStrategy.getDockerClient();
    }

    /*
     * Enabled aggressive exception aggregation
     */
    private static String createNamedVolume() {
        try (DockerClient dockerClient = TestcontainersKafkaCluster.createDockerClient();){
            String string;
            block22: {
                CreateVolumeCmd volumeCmd = dockerClient.createVolumeCmd();
                try {
                    if (CONTAINER_ENGINE_PODMAN) {
                        volumeCmd.withDriverOpts(Map.of("o", "uid=1001"));
                    }
                    String volumeName = ((CreateVolumeResponse)volumeCmd.exec()).getName();
                    volumesPendingCleanup.add(volumeName);
                    if (!CONTAINER_ENGINE_PODMAN) {
                        try (OneShotContainer c = new OneShotContainer();){
                            ((OneShotContainer)c.withName("prepareKafkaVolume").addGenericBind(new Bind(volumeName, new Volume(KAFKA_CONTAINER_MOUNT_POINT))).withCommand(new String[]{"chown", "-R", KAFKA_CONTAINER_UID, KAFKA_CONTAINER_MOUNT_POINT})).withStartupCheckStrategy((StartupCheckStrategy)new OneShotStartupCheckStrategy());
                            c.start();
                        }
                    }
                    string = volumeName;
                    if (volumeCmd == null) break block22;
                }
                catch (Throwable throwable) {
                    if (volumeCmd != null) {
                        try {
                            volumeCmd.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                volumeCmd.close();
            }
            return string;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static void removeNamedVolume(String name) {
        try (DockerClient dockerClient = TestcontainersKafkaCluster.createDockerClient();){
            dockerClient.removeVolumeCmd(name).exec();
        }
        catch (NotFoundException ignored) {
            volumesPendingCleanup.remove(name);
        }
        catch (Throwable t) {
            LOGGER.log(System.Logger.Level.WARNING, "Failed to remove container volume {0}.", name, t);
            LOGGER.log(System.Logger.Level.WARNING, "Please run `(podman|docker) volume ls` and check for orphaned resources.");
        }
    }

    private static boolean isContainerEnginePodman() {
        boolean bl;
        block8: {
            DockerClient dockerClient = TestcontainersKafkaCluster.createDockerClient();
            try {
                Version ver = (Version)dockerClient.versionCmd().exec();
                boolean hasComponentNamedPodman = Optional.ofNullable(ver.getComponents()).stream().flatMap(Collection::stream).map(VersionComponent::getName).filter(Objects::nonNull).map(s -> s.toLowerCase(Locale.ROOT)).anyMatch(n -> n.contains("podman"));
                LOGGER.log(System.Logger.Level.INFO, "Detected container engine as Podman : {0}", hasComponentNamedPodman);
                bl = hasComponentNamedPodman;
                if (dockerClient == null) break block8;
            }
            catch (Throwable throwable) {
                try {
                    if (dockerClient != null) {
                        try {
                            dockerClient.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            dockerClient.close();
        }
        return bl;
    }

    private boolean isController(Integer key) {
        return this.portsAllocator.hasRegisteredPort(KafkaClusterConfig.KafkaEndpoints.Listener.CONTROLLER, key);
    }

    private boolean isBroker(Integer key) {
        return this.portsAllocator.hasRegisteredPort(KafkaClusterConfig.KafkaEndpoints.Listener.EXTERNAL, key);
    }

    @Override
    @NonNull
    public Admin createAdmin() {
        return CloseableAdmin.create(this.clusterConfig.getAnonConnectConfigForCluster(this.buildBrokerList(nodeId -> this.getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.ANON, (int)nodeId))));
    }

    @NonNull
    private String buildScramUsersEnvVar() {
        return this.clusterConfig.getUsers().entrySet().stream().map(e -> "%s=[name=%s,password=%s]".formatted(this.clusterConfig.getSaslMechanism(), e.getKey(), e.getValue())).collect(Collectors.joining(";"));
    }

    static {
        if (!System.getenv().containsKey("TESTCONTAINERS_RYUK_DISABLED")) {
            LOGGER.log(System.Logger.Level.WARNING, "As per https://github.com/containers/podman/issues/7927#issuecomment-731525556 if using podman, set env var TESTCONTAINERS_RYUK_DISABLED=true");
        }
        Runtime.getRuntime().addShutdownHook(new Thread(() -> Set.copyOf(volumesPendingCleanup).forEach(TestcontainersKafkaCluster::removeNamedVolume)));
    }

    public static class ZookeeperContainer
    extends LoggingGenericContainer<ZookeeperContainer> {
        public ZookeeperContainer(DockerImageName zookeeperImage) {
            super(zookeeperImage);
        }
    }

    public static class LoggingGenericContainer<C extends GenericContainer<C>>
    extends GenericContainer<C> {
        private static final String CONTAINER_LOGS_DIR = "container.logs.dir";
        private String name;

        public LoggingGenericContainer(DockerImageName dockerImageName) {
            super(dockerImageName);
        }

        protected void containerIsStarting(InspectContainerResponse containerInfo) {
            super.containerIsStarting(containerInfo);
            Optional.ofNullable(System.getProperty(CONTAINER_LOGS_DIR)).ifPresent(logDir -> {
                Path target = Path.of(logDir, new String[0]);
                if (this.name != null) {
                    target = target.resolve(this.name);
                }
                target = target.resolve(String.format("%s.%s.%s", this.getContainerName().replaceFirst(File.separator, ""), this.getContainerId(), "log"));
                target.getParent().toFile().mkdirs();
                try (FileWriter writer = new FileWriter(target.toFile());){
                    LOGGER.log(System.Logger.Level.DEBUG, "writing logs for {0} to {1}", this.getContainerName(), target);
                    super.followOutput(outputFrame -> {
                        try {
                            if (outputFrame.equals(OutputFrame.END)) {
                                writer.close();
                            } else {
                                writer.write(outputFrame.getUtf8String());
                                writer.flush();
                            }
                        }
                        catch (IOException iOException) {
                            // empty catch block
                        }
                    });
                }
                catch (IOException e) {
                    this.logger().warn("Failed to create container log file: {}", (Object)target);
                }
            });
        }

        public LoggingGenericContainer<C> withName(String name) {
            this.name = name;
            return this;
        }

        public LoggingGenericContainer<C> addGenericBind(Bind bind) {
            super.getBinds().add(bind);
            return this;
        }
    }

    public static class KafkaContainer
    extends LoggingGenericContainer<KafkaContainer> {
        public KafkaContainer(DockerImageName dockerImageName) {
            super(dockerImageName);
        }

        protected void addFixedExposedPort(int hostPort, int containerPort) {
            super.addFixedExposedPort(hostPort, containerPort);
        }
    }

    public static class OneShotContainer
    extends LoggingGenericContainer<OneShotContainer> {
        public OneShotContainer() {
            super(DockerImageName.parse((String)"registry.access.redhat.com/ubi9/ubi-minimal"));
            this.withStartupAttempts(3);
            this.withStartupCheckStrategy((StartupCheckStrategy)new OneShotStartupCheckStrategy());
        }
    }
}

