package io.kroxylicious.testing.kafka.testcontainers;

import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.command.CreateVolumeCmd;
import com.github.dockerjava.api.command.CreateVolumeResponse;
import com.github.dockerjava.api.command.InspectContainerCmd;
import com.github.dockerjava.api.command.InspectContainerResponse;
import com.github.dockerjava.api.command.StopContainerCmd;
import com.github.dockerjava.api.command.WaitContainerCmd;
import com.github.dockerjava.api.exception.NotFoundException;
import com.github.dockerjava.api.model.Bind;
import com.github.dockerjava.api.model.Volume;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.kroxylicious.testing.kafka.api.KafkaCluster;
import io.kroxylicious.testing.kafka.api.TerminationStyle;
import io.kroxylicious.testing.kafka.clients.CloseableAdmin;
import io.kroxylicious.testing.kafka.common.KafkaClusterConfig;
import io.kroxylicious.testing.kafka.common.PortAllocator;
import io.kroxylicious.testing.kafka.common.Utils;
import io.kroxylicious.testing.kafka.common.Version;
import io.kroxylicious.testing.kafka.internal.AdminSource;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.System;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.IntPredicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.Admin;
import org.awaitility.Awaitility;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.startupcheck.OneShotStartupCheckStrategy;
import org.testcontainers.dockerclient.DockerClientProviderStrategy;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster.class */
public class TestcontainersKafkaCluster implements Startable, KafkaCluster, KafkaClusterConfig.KafkaEndpoints, AdminSource {
    public static final int CLIENT_PORT = 9093;
    public static final int ANON_PORT = 9094;
    private static final int INTER_BROKER_PORT = 9092;
    private static final int CONTROLLER_PORT = 9091;
    private static final int ZOOKEEPER_PORT = 2181;
    private static final int CONTAINER_STARTUP_ATTEMPTS = 3;
    private static final String KAFKA_CONTAINER_MOUNT_POINT = "/kafka";
    public static final String WILDCARD_BIND_ADDRESS = "0.0.0.0";
    private static final String KAFKA_CONTAINER_UID = "1001";
    private static final int READY_TIMEOUT_SECONDS = 120;
    private final DockerImageName kafkaImage;
    private final DockerImageName zookeeperImage;
    private final KafkaClusterConfig clusterConfig;
    private final String logDirVolumeName;
    private final Network network;
    private final String name;
    private final ZookeeperContainer zookeeper;
    private final Map<Integer, KafkaContainer> nodes;
    private final Set<Integer> stoppedBrokers;
    private final PortAllocator portsAllocator;
    private static final System.Logger LOGGER = System.getLogger(TestcontainersKafkaCluster.class.getName());
    private static final Duration STARTUP_TIMEOUT = Duration.ofMinutes(2);
    private static final Duration RESTART_BACKOFF_DELAY = Duration.ofMillis(2500);
    private static final DateTimeFormatter NAME_DATE_TIME_FORMAT = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss");
    private static final Duration MINIMUM_RUNNING_DURATION = Duration.ofMillis(500);
    private static final boolean CONTAINER_ENGINE_PODMAN = isContainerEnginePodman();
    private static final String QUAY_KAFKA_IMAGE_REPO = "quay.io/ogunalp/kafka-native";
    private static final DockerImageName LATEST_KAFKA_IMAGE = DockerImageName.parse(QUAY_KAFKA_IMAGE_REPO).withTag(Version.LATEST_RELEASE);
    private static final String QUAY_ZOOKEEPER_IMAGE_REPO = "quay.io/ogunalp/zookeeper-native";
    private static final DockerImageName LATEST_ZOOKEEPER_IMAGE = DockerImageName.parse(QUAY_ZOOKEEPER_IMAGE_REPO).withTag(Version.LATEST_RELEASE);
    private static final Set<String> volumesPendingCleanup = ConcurrentHashMap.newKeySet();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.kroxylicious.testing.kafka.testcontainers.TestcontainersKafkaCluster$1, reason: invalid class name */
    /* loaded from: input_file:io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$kroxylicious$testing$kafka$common$KafkaClusterConfig$KafkaEndpoints$Listener = new int[KafkaClusterConfig.KafkaEndpoints.Listener.values().length];

        static {
            try {
                $SwitchMap$io$kroxylicious$testing$kafka$common$KafkaClusterConfig$KafkaEndpoints$Listener[KafkaClusterConfig.KafkaEndpoints.Listener.EXTERNAL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$kroxylicious$testing$kafka$common$KafkaClusterConfig$KafkaEndpoints$Listener[KafkaClusterConfig.KafkaEndpoints.Listener.ANON.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$kroxylicious$testing$kafka$common$KafkaClusterConfig$KafkaEndpoints$Listener[KafkaClusterConfig.KafkaEndpoints.Listener.INTERNAL.ordinal()] = TestcontainersKafkaCluster.CONTAINER_STARTUP_ATTEMPTS;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$kroxylicious$testing$kafka$common$KafkaClusterConfig$KafkaEndpoints$Listener[KafkaClusterConfig.KafkaEndpoints.Listener.CONTROLLER.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster$KafkaContainer.class */
    public static class KafkaContainer extends LoggingGenericContainer<KafkaContainer> {
        public KafkaContainer(DockerImageName dockerImageName) {
            super(dockerImageName);
        }

        protected void addFixedExposedPort(int i, int i2) {
            super.addFixedExposedPort(i, i2);
        }
    }

    /* loaded from: input_file:io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster$LoggingGenericContainer.class */
    public static class LoggingGenericContainer<C extends GenericContainer<C>> extends GenericContainer<C> {
        private static final String CONTAINER_LOGS_DIR = "container.logs.dir";
        private String name;

        public LoggingGenericContainer(DockerImageName dockerImageName) {
            super(dockerImageName);
        }

        protected void containerIsStarting(InspectContainerResponse inspectContainerResponse) {
            super.containerIsStarting(inspectContainerResponse);
            Optional.ofNullable(System.getProperty(CONTAINER_LOGS_DIR)).ifPresent(str -> {
                Path of = Path.of(str, new String[0]);
                if (this.name != null) {
                    of = of.resolve(this.name);
                }
                Path resolve = of.resolve(String.format("%s.%s.%s", getContainerName().replaceFirst(File.separator, ""), getContainerId(), "log"));
                resolve.getParent().toFile().mkdirs();
                try {
                    FileWriter fileWriter = new FileWriter(resolve.toFile());
                    try {
                        TestcontainersKafkaCluster.LOGGER.log(System.Logger.Level.DEBUG, "writing logs for {0} to {1}", new Object[]{getContainerName(), resolve});
                        super.followOutput(outputFrame -> {
                            try {
                                if (outputFrame.equals(OutputFrame.END)) {
                                    fileWriter.close();
                                } else {
                                    fileWriter.write(outputFrame.getUtf8String());
                                    fileWriter.flush();
                                }
                            } catch (IOException e) {
                            }
                        });
                        fileWriter.close();
                    } finally {
                    }
                } catch (IOException e) {
                    logger().warn("Failed to create container log file: {}", resolve);
                }
            });
        }

        public LoggingGenericContainer<C> withName(String str) {
            this.name = str;
            return this;
        }

        public LoggingGenericContainer<C> addGenericBind(Bind bind) {
            super.getBinds().add(bind);
            return this;
        }
    }

    /* loaded from: input_file:io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster$OneShotContainer.class */
    public static class OneShotContainer extends LoggingGenericContainer<OneShotContainer> {
        public OneShotContainer() {
            super(DockerImageName.parse("registry.access.redhat.com/ubi9/ubi-minimal"));
            withStartupAttempts(TestcontainersKafkaCluster.CONTAINER_STARTUP_ATTEMPTS);
            withStartupCheckStrategy(new OneShotStartupCheckStrategy());
        }
    }

    /* loaded from: input_file:io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster$ZookeeperContainer.class */
    public static class ZookeeperContainer extends LoggingGenericContainer<ZookeeperContainer> {
        public ZookeeperContainer(DockerImageName dockerImageName) {
            super(dockerImageName);
        }
    }

    public TestcontainersKafkaCluster(KafkaClusterConfig kafkaClusterConfig) {
        this(null, null, kafkaClusterConfig);
    }

    public TestcontainersKafkaCluster(DockerImageName dockerImageName, DockerImageName dockerImageName2, KafkaClusterConfig kafkaClusterConfig) {
        this.logDirVolumeName = createNamedVolume();
        this.network = Network.newNetwork();
        this.nodes = new TreeMap();
        this.stoppedBrokers = new HashSet();
        this.portsAllocator = new PortAllocator();
        DockerImageName dockerImageName3 = (DockerImageName) Optional.ofNullable(dockerImageName).orElse(LATEST_KAFKA_IMAGE);
        DockerImageName dockerImageName4 = (DockerImageName) Optional.ofNullable(dockerImageName2).orElse(LATEST_ZOOKEEPER_IMAGE);
        this.kafkaImage = overrideContainerImageTagIfNecessary(dockerImageName3, kafkaClusterConfig.getKafkaVersion());
        this.zookeeperImage = overrideContainerImageTagIfNecessary(dockerImageName4, kafkaClusterConfig.getKafkaVersion());
        this.clusterConfig = kafkaClusterConfig;
        this.name = (String) Optional.ofNullable(kafkaClusterConfig.getTestInfo()).map((v0) -> {
            return v0.getDisplayName();
        }).map(str -> {
            return str.replaceFirst("\\(\\)$", "");
        }).map(str2 -> {
            return String.format("%s.%s", str2, NAME_DATE_TIME_FORMAT.format(OffsetDateTime.now(Clock.systemUTC())));
        }).orElse(null);
        if (this.clusterConfig.isKraftMode()) {
            this.zookeeper = null;
        } else {
            this.zookeeper = (ZookeeperContainer) ((ZookeeperContainer) ((ZookeeperContainer) ((ZookeeperContainer) new ZookeeperContainer(this.zookeeperImage).withName(this.name).withNetwork(this.network)).withMinimumRunningDuration(MINIMUM_RUNNING_DURATION)).withStartupAttempts(CONTAINER_STARTUP_ATTEMPTS)).withNetworkAliases(new String[]{"zookeeper"});
        }
        PortAllocator.PortAllocationSession allocationSession = this.portsAllocator.allocationSession();
        try {
            allocationSession.allocate(Set.of(KafkaClusterConfig.KafkaEndpoints.Listener.EXTERNAL, KafkaClusterConfig.KafkaEndpoints.Listener.ANON), 0, kafkaClusterConfig.getBrokersNum().intValue());
            allocationSession.allocate(Set.of(KafkaClusterConfig.KafkaEndpoints.Listener.CONTROLLER), 0, kafkaClusterConfig.getKraftControllers().intValue());
            if (allocationSession != null) {
                allocationSession.close();
            }
            kafkaClusterConfig.getBrokerConfigs(() -> {
                return this;
            }).forEach(configHolder -> {
                this.nodes.put(Integer.valueOf(configHolder.getBrokerNum()), buildKafkaContainer(configHolder));
            });
        } catch (Throwable th) {
            if (allocationSession != null) {
                try {
                    allocationSession.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @NonNull
    private KafkaContainer buildKafkaContainer(KafkaClusterConfig.ConfigHolder configHolder) {
        String str = "broker-" + configHolder.getBrokerNum();
        Properties properties = new Properties();
        properties.putAll(configHolder.getProperties());
        properties.put("log.dir", getBrokerLogDirectory(configHolder.getBrokerNum()));
        KafkaContainer kafkaContainer = (KafkaContainer) ((KafkaContainer) new KafkaContainer(this.kafkaImage).withName(this.name).withNetwork(this.network)).withNetworkAliases(new String[]{str});
        copyHostKeyStoreToContainer(kafkaContainer, properties, "ssl.truststore.location");
        copyHostKeyStoreToContainer(kafkaContainer, properties, "ssl.keystore.location");
        ((KafkaContainer) ((KafkaContainer) ((KafkaContainer) ((KafkaContainer) ((KafkaContainer) ((KafkaContainer) kafkaContainer.withEnv("SERVER_PROPERTIES_FILE", "/cnf/server.properties")).withEnv("SERVER_CLUSTER_ID", configHolder.getKafkaKraftClusterId())).withEnv("SERVER_AUTO_CONFIGURE", "false")).withCopyToContainer(Transferable.of(propertiesToBytes(properties), 420), "/cnf/server.properties")).withStartupAttempts(CONTAINER_STARTUP_ATTEMPTS)).withMinimumRunningDuration(MINIMUM_RUNNING_DURATION)).withStartupTimeout(STARTUP_TIMEOUT);
        if (configHolder.isBroker()) {
            kafkaContainer.addFixedExposedPort(configHolder.getExternalPort().intValue(), CLIENT_PORT);
            kafkaContainer.addFixedExposedPort(configHolder.getAnonPort().intValue(), ANON_PORT);
        }
        kafkaContainer.addGenericBind(new Bind(this.logDirVolumeName, new Volume(KAFKA_CONTAINER_MOUNT_POINT)));
        if (!this.clusterConfig.isKraftMode()) {
            kafkaContainer.dependsOn(new Startable[]{this.zookeeper});
        }
        return kafkaContainer;
    }

    @NonNull
    private String getBrokerLogDirectory(int i) {
        return "/kafka/broker-" + i;
    }

    private static void copyHostKeyStoreToContainer(KafkaContainer kafkaContainer, Properties properties, String str) {
        if (properties.get(str) != null) {
            try {
                Path of = Path.of(String.valueOf(properties.get(str)), new String[0]);
                Path of2 = Path.of("/cnf", of.getFileName().toString());
                properties.put(str, of2.toString());
                kafkaContainer.withCopyToContainer(Transferable.of(Files.readAllBytes(of), 420), of2.toString());
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
    }

    private byte[] propertiesToBytes(Properties properties) {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                properties.store(byteArrayOutputStream, "server.properties");
                byte[] byteArray = byteArrayOutputStream.toByteArray();
                byteArrayOutputStream.close();
                return byteArray;
            } finally {
            }
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public synchronized String getBootstrapServers() {
        return buildBrokerList(num -> {
            return getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.EXTERNAL, num.intValue());
        });
    }

    private synchronized String buildBrokerList(Function<Integer, KafkaClusterConfig.KafkaEndpoints.EndpointPair> function) {
        return (String) this.nodes.keySet().stream().filter(this::isBroker).map(function).map((v0) -> {
            return v0.connectAddress();
        }).collect(Collectors.joining(","));
    }

    private DockerImageName overrideContainerImageTagIfNecessary(@NonNull DockerImageName dockerImageName, @Nullable String str) {
        return (str == null || str.equalsIgnoreCase(dockerImageName.getVersionPart())) ? dockerImageName : str.equalsIgnoreCase(Version.LATEST_SNAPSHOT) ? dockerImageName.withTag(Version.LATEST_SNAPSHOT) : Pattern.matches("\\d+(\\.\\d+(\\.\\d+)?)?", str) ? dockerImageName.withTag("latest-kafka-" + str) : dockerImageName;
    }

    public String getKafkaVersion() {
        String versionPart = this.kafkaImage.getVersionPart();
        if (versionPart != null) {
            if (Version.LATEST_RELEASE.equalsIgnoreCase(versionPart)) {
                return Version.LATEST_RELEASE;
            }
            if (Version.LATEST_SNAPSHOT.equalsIgnoreCase(versionPart)) {
                return Version.LATEST_SNAPSHOT;
            }
            versionPart = versionPart.replaceFirst("^latest-kafka-", "");
        }
        return versionPart;
    }

    private synchronized Stream<GenericContainer<?>> allContainers() {
        return Stream.concat(this.nodes.values().stream(), Stream.ofNullable(this.zookeeper));
    }

    public synchronized void start() {
        try {
            if (this.zookeeper != null) {
                this.zookeeper.start();
            }
            Startables.deepStart(this.nodes.values().stream()).get(120L, TimeUnit.SECONDS);
            Utils.awaitExpectedBrokerCountInClusterViaTopic(this.clusterConfig.getAnonConnectConfigForCluster(buildBrokerList(num -> {
                return getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.ANON, num.intValue());
            })), READY_TIMEOUT_SECONDS, TimeUnit.SECONDS, this.clusterConfig.getBrokersNum());
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            stop();
            throw new RuntimeException("startup failed or timed out", e);
        }
    }

    public synchronized int addBroker() {
        OptionalInt findFirst = IntStream.rangeClosed(0, getNumOfBrokers()).filter(i -> {
            return !this.nodes.containsKey(Integer.valueOf(i));
        }).findFirst();
        if (findFirst.isEmpty()) {
            throw new IllegalStateException("Could not determine new nodeId, existing set " + this.nodes.keySet());
        }
        int asInt = findFirst.getAsInt();
        LOGGER.log(System.Logger.Level.DEBUG, "Adding broker with node.id {0} to cluster with existing nodes {1}.", new Object[]{Integer.valueOf(asInt), this.nodes.keySet()});
        PortAllocator.PortAllocationSession allocationSession = this.portsAllocator.allocationSession();
        try {
            allocationSession.allocate(Set.of(KafkaClusterConfig.KafkaEndpoints.Listener.EXTERNAL, KafkaClusterConfig.KafkaEndpoints.Listener.ANON), asInt);
            if (allocationSession != null) {
                allocationSession.close();
            }
            KafkaClusterConfig.ConfigHolder generateConfigForSpecificNode = this.clusterConfig.generateConfigForSpecificNode(this, asInt);
            KafkaContainer buildKafkaContainer = buildKafkaContainer(generateConfigForSpecificNode);
            try {
                Startables.deepStart(new Startable[]{buildKafkaContainer}).get(120L, TimeUnit.SECONDS);
                this.nodes.put(Integer.valueOf(generateConfigForSpecificNode.getBrokerNum()), buildKafkaContainer);
                Utils.awaitExpectedBrokerCountInClusterViaTopic(this.clusterConfig.getAnonConnectConfigForCluster(buildBrokerList(num -> {
                    return getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.ANON, num.intValue());
                })), READY_TIMEOUT_SECONDS, TimeUnit.SECONDS, Integer.valueOf(getNumOfBrokers()));
                return generateConfigForSpecificNode.getBrokerNum();
            } catch (Exception e) {
                buildKafkaContainer.stop();
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            if (allocationSession != null) {
                try {
                    allocationSession.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public synchronized void removeBroker(int i) throws UnsupportedOperationException, IllegalArgumentException, IllegalStateException {
        if (!this.nodes.containsKey(Integer.valueOf(i))) {
            throw new IllegalArgumentException("Broker node " + i + " is not a member of the cluster.");
        }
        if (this.clusterConfig.isKraftMode() && isController(Integer.valueOf(i))) {
            throw new UnsupportedOperationException("Cannot remove controller node " + i + " from a kraft cluster.");
        }
        if (this.nodes.size() < 2) {
            throw new IllegalArgumentException("Cannot remove a node from a cluster with only %d nodes".formatted(Integer.valueOf(this.nodes.size())));
        }
        if (!this.stoppedBrokers.isEmpty()) {
            throw new IllegalStateException("Cannot remove nodes from a cluster with stopped nodes.");
        }
        Optional<Integer> findFirst = this.nodes.keySet().stream().filter(num -> {
            return num.intValue() != i;
        }).findFirst();
        if (findFirst.isEmpty()) {
            throw new IllegalStateException("Could not identify a node to be the re-assignment target");
        }
        Utils.awaitReassignmentOfKafkaInternalTopicsIfNecessary(this.clusterConfig.getAnonConnectConfigForCluster(buildBrokerList(num2 -> {
            return getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.ANON, num2.intValue());
        })), i, findFirst.get().intValue(), READY_TIMEOUT_SECONDS, TimeUnit.SECONDS);
        this.portsAllocator.deallocate(i);
        gracefulStop(this.nodes.remove(Integer.valueOf(i)));
        OneShotContainer oneShotContainer = new OneShotContainer();
        try {
            oneShotContainer.withName("cleanBrokerLogDir").addGenericBind(new Bind(this.logDirVolumeName, new Volume(KAFKA_CONTAINER_MOUNT_POINT))).withCommand(new String[]{"rm", "-rf", getBrokerLogDirectory(i)});
            oneShotContainer.start();
            oneShotContainer.close();
        } catch (Throwable th) {
            try {
                oneShotContainer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void gracefulStop(KafkaContainer kafkaContainer) {
        String containerId = kafkaContainer.getContainerId();
        try {
            try {
                StopContainerCmd stopContainerCmd = kafkaContainer.getDockerClient().stopContainerCmd(containerId);
                try {
                    WaitContainerCmd waitContainerCmd = kafkaContainer.getDockerClient().waitContainerCmd(containerId);
                    try {
                        stopContainerCmd.exec();
                        LOGGER.log(System.Logger.Level.DEBUG, "Shut-down broker {0}, exit status {1}", new Object[]{containerId, waitContainerCmd.start().awaitStatusCode(10L, TimeUnit.SECONDS)});
                        if (waitContainerCmd != null) {
                            waitContainerCmd.close();
                        }
                        if (stopContainerCmd != null) {
                            stopContainerCmd.close();
                        }
                        kafkaContainer.stop();
                    } catch (Throwable th) {
                        if (waitContainerCmd != null) {
                            try {
                                waitContainerCmd.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    if (stopContainerCmd != null) {
                        try {
                            stopContainerCmd.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } catch (Exception e) {
                LOGGER.log(System.Logger.Level.WARNING, "Ignoring exception whilst shutting down broker {0}", new Object[]{containerId, e});
                kafkaContainer.stop();
            }
        } catch (Throwable th5) {
            kafkaContainer.stop();
            throw th5;
        }
    }

    public synchronized void stopNodes(IntPredicate intPredicate, TerminationStyle terminationStyle) {
        List<Map.Entry<Integer, KafkaContainer>> list = this.nodes.entrySet().stream().filter(entry -> {
            return intPredicate.test(((Integer) entry.getKey()).intValue());
        }).filter(entry2 -> {
            return !this.stoppedBrokers.contains(entry2.getKey());
        }).toList();
        if (list.isEmpty()) {
            return;
        }
        LOGGER.log(System.Logger.Level.DEBUG, "Stopping {0}/{1} nodes(s)", new Object[]{Integer.valueOf(list.size()), Integer.valueOf(getNumOfBrokers())});
        Map map = (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry3 -> {
            return buildInspectionCommandFor((KafkaContainer) entry3.getValue());
        }));
        list.stream().map((v0) -> {
            return v0.getValue();
        }).forEach(kafkaContainer -> {
            if (terminationStyle == TerminationStyle.ABRUPT) {
                kafkaContainer.stop();
            } else {
                gracefulStop(kafkaContainer);
            }
        });
        map.forEach((num, inspectContainerCmd) -> {
            Utils.awaitCondition(Long.valueOf(STARTUP_TIMEOUT.toMillis()).intValue(), TimeUnit.MILLISECONDS).until(() -> {
                try {
                    inspectContainerCmd.exec();
                    return false;
                } catch (NotFoundException e) {
                    this.stoppedBrokers.add(num);
                    return true;
                }
            });
        });
        if (this.zookeeper != null) {
            this.clusterConfig.getBrokerConfigs(() -> {
                return this;
            }).findFirst().map((v0) -> {
                return v0.getProperties();
            }).map(properties -> {
                return properties.getProperty(KafkaConfig.ZkSessionTimeoutMsProp(), "0");
            }).map(Long::parseLong).filter(l -> {
                return l.longValue() > 0;
            }).ifPresent(l2 -> {
                try {
                    LOGGER.log(System.Logger.Level.DEBUG, "Awaiting zookeeper session timeout {0}ms so that the broker ephemeral nodes expire.", new Object[]{l2});
                    Thread.sleep(l2.longValue());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
        }
    }

    public synchronized void startNodes(IntPredicate intPredicate) {
        List<Map.Entry<Integer, KafkaContainer>> list = this.nodes.entrySet().stream().filter(entry -> {
            return intPredicate.test(((Integer) entry.getKey()).intValue());
        }).filter(entry2 -> {
            return this.stoppedBrokers.contains(entry2.getKey());
        }).toList();
        if (list.isEmpty()) {
            return;
        }
        LOGGER.log(System.Logger.Level.DEBUG, "Starting {0}/{1} node(s)", new Object[]{Integer.valueOf(list.size()), Integer.valueOf(getNumOfBrokers())});
        list.forEach(entry3 -> {
            Integer num = (Integer) entry3.getKey();
            KafkaContainer kafkaContainer = (KafkaContainer) entry3.getValue();
            int startupAttempts = kafkaContainer.getStartupAttempts();
            try {
                kafkaContainer.withStartupAttempts(1);
                Awaitility.waitAtMost(STARTUP_TIMEOUT).pollDelay(RESTART_BACKOFF_DELAY).until(() -> {
                    try {
                        kafkaContainer.start();
                        return true;
                    } catch (Exception e) {
                        kafkaContainer.stop();
                        LOGGER.log(System.Logger.Level.DEBUG, "Failed to restart container", e);
                        return false;
                    }
                });
                kafkaContainer.setStartupAttempts(startupAttempts);
                this.stoppedBrokers.remove(num);
            } catch (Throwable th) {
                kafkaContainer.setStartupAttempts(startupAttempts);
                this.stoppedBrokers.remove(num);
                throw th;
            }
        });
    }

    public void close() {
        stop();
    }

    public synchronized int getNumOfBrokers() {
        return this.nodes.size();
    }

    public synchronized Set<Integer> getStoppedBrokers() {
        return Set.copyOf(this.stoppedBrokers);
    }

    public synchronized void stop() {
        try {
            ((Stream) allContainers().parallel()).forEach((v0) -> {
                v0.stop();
            });
            try {
                this.network.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                this.network.close();
                throw th;
            } finally {
            }
        }
    }

    public String getClusterId() {
        return this.clusterConfig.clusterId();
    }

    public Map<String, Object> getKafkaClientConfiguration() {
        return this.clusterConfig.getConnectConfigForCluster(getBootstrapServers());
    }

    public Map<String, Object> getKafkaClientConfiguration(String str, String str2) {
        return this.clusterConfig.getConnectConfigForCluster(getBootstrapServers(), str, str2);
    }

    @Override // io.kroxylicious.testing.kafka.common.KafkaClusterConfig.KafkaEndpoints
    public synchronized KafkaClusterConfig.KafkaEndpoints.EndpointPair getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener listener, int i) {
        switch (AnonymousClass1.$SwitchMap$io$kroxylicious$testing$kafka$common$KafkaClusterConfig$KafkaEndpoints$Listener[listener.ordinal()]) {
            case 1:
                return buildExposedEndpoint(listener, i, CLIENT_PORT);
            case 2:
                return buildExposedEndpoint(listener, i, ANON_PORT);
            case CONTAINER_STARTUP_ATTEMPTS /* 3 */:
                return KafkaClusterConfig.KafkaEndpoints.EndpointPair.builder().bind(new KafkaClusterConfig.KafkaEndpoints.Endpoint(WILDCARD_BIND_ADDRESS, INTER_BROKER_PORT)).connect(new KafkaClusterConfig.KafkaEndpoints.Endpoint(String.format("broker-%d", Integer.valueOf(i)), INTER_BROKER_PORT)).build();
            case 4:
                return this.clusterConfig.isKraftMode() ? KafkaClusterConfig.KafkaEndpoints.EndpointPair.builder().bind(new KafkaClusterConfig.KafkaEndpoints.Endpoint(WILDCARD_BIND_ADDRESS, CONTROLLER_PORT)).connect(new KafkaClusterConfig.KafkaEndpoints.Endpoint(String.format("broker-%d", Integer.valueOf(i)), CONTROLLER_PORT)).build() : KafkaClusterConfig.KafkaEndpoints.EndpointPair.builder().bind(new KafkaClusterConfig.KafkaEndpoints.Endpoint(WILDCARD_BIND_ADDRESS, ZOOKEEPER_PORT)).connect(new KafkaClusterConfig.KafkaEndpoints.Endpoint("zookeeper", ZOOKEEPER_PORT)).build();
            default:
                throw new IllegalStateException("Unexpected value: " + listener);
        }
    }

    private KafkaClusterConfig.KafkaEndpoints.EndpointPair buildExposedEndpoint(KafkaClusterConfig.KafkaEndpoints.Listener listener, int i, int i2) {
        return KafkaClusterConfig.KafkaEndpoints.EndpointPair.builder().bind(new KafkaClusterConfig.KafkaEndpoints.Endpoint(WILDCARD_BIND_ADDRESS, i2)).connect(new KafkaClusterConfig.KafkaEndpoints.Endpoint("localhost", this.portsAllocator.getPort(listener, i))).build();
    }

    private InspectContainerCmd buildInspectionCommandFor(KafkaContainer kafkaContainer) {
        return kafkaContainer.getDockerClient().inspectContainerCmd(kafkaContainer.getContainerId());
    }

    private static DockerClient createDockerClient() {
        ArrayList arrayList = new ArrayList();
        ServiceLoader load = ServiceLoader.load(DockerClientProviderStrategy.class);
        Objects.requireNonNull(arrayList);
        load.forEach((v1) -> {
            r1.add(v1);
        });
        return DockerClientProviderStrategy.getFirstValidStrategy(arrayList).getDockerClient();
    }

    private static String createNamedVolume() {
        try {
            DockerClient createDockerClient = createDockerClient();
            try {
                CreateVolumeCmd createVolumeCmd = createDockerClient.createVolumeCmd();
                try {
                    if (CONTAINER_ENGINE_PODMAN) {
                        createVolumeCmd.withDriverOpts(Map.of("o", "uid=1001"));
                    }
                    String name = ((CreateVolumeResponse) createVolumeCmd.exec()).getName();
                    volumesPendingCleanup.add(name);
                    if (!CONTAINER_ENGINE_PODMAN) {
                        OneShotContainer oneShotContainer = new OneShotContainer();
                        try {
                            ((OneShotContainer) oneShotContainer.withName("prepareKafkaVolume").addGenericBind(new Bind(name, new Volume(KAFKA_CONTAINER_MOUNT_POINT))).withCommand(new String[]{"chown", "-R", KAFKA_CONTAINER_UID, KAFKA_CONTAINER_MOUNT_POINT})).withStartupCheckStrategy(new OneShotStartupCheckStrategy());
                            oneShotContainer.start();
                            oneShotContainer.close();
                        } catch (Throwable th) {
                            try {
                                oneShotContainer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                            throw th;
                        }
                    }
                    if (createVolumeCmd != null) {
                        createVolumeCmd.close();
                    }
                    if (createDockerClient != null) {
                        createDockerClient.close();
                    }
                    return name;
                } catch (Throwable th3) {
                    if (createVolumeCmd != null) {
                        try {
                            createVolumeCmd.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } catch (Throwable th5) {
                if (createDockerClient != null) {
                    try {
                        createDockerClient.close();
                    } catch (Throwable th6) {
                        th5.addSuppressed(th6);
                    }
                }
                throw th5;
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static void removeNamedVolume(String str) {
        try {
            DockerClient createDockerClient = createDockerClient();
            try {
                createDockerClient.removeVolumeCmd(str).exec();
                if (createDockerClient != null) {
                    createDockerClient.close();
                }
            } catch (Throwable th) {
                if (createDockerClient != null) {
                    try {
                        createDockerClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (NotFoundException e) {
            volumesPendingCleanup.remove(str);
        } catch (Throwable th3) {
            LOGGER.log(System.Logger.Level.WARNING, "Failed to remove container volume {0}.", new Object[]{str, th3});
            LOGGER.log(System.Logger.Level.WARNING, "Please run `(podman|docker) volume ls` and check for orphaned resources.");
        }
    }

    private static boolean isContainerEnginePodman() {
        try {
            DockerClient createDockerClient = createDockerClient();
            try {
                boolean anyMatch = Optional.ofNullable(((com.github.dockerjava.api.model.Version) createDockerClient.versionCmd().exec()).getComponents()).stream().flatMap((v0) -> {
                    return v0.stream();
                }).map((v0) -> {
                    return v0.getName();
                }).filter((v0) -> {
                    return Objects.nonNull(v0);
                }).map(str -> {
                    return str.toLowerCase(Locale.ROOT);
                }).anyMatch(str2 -> {
                    return str2.contains("podman");
                });
                LOGGER.log(System.Logger.Level.INFO, "Detected container engine as Podman : {0}", new Object[]{Boolean.valueOf(anyMatch)});
                if (createDockerClient != null) {
                    createDockerClient.close();
                }
                return anyMatch;
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private boolean isController(Integer num) {
        return this.portsAllocator.hasRegisteredPort(KafkaClusterConfig.KafkaEndpoints.Listener.CONTROLLER, num.intValue());
    }

    private boolean isBroker(Integer num) {
        return this.portsAllocator.hasRegisteredPort(KafkaClusterConfig.KafkaEndpoints.Listener.EXTERNAL, num.intValue());
    }

    @Override // io.kroxylicious.testing.kafka.internal.AdminSource
    @NonNull
    public Admin createAdmin() {
        return CloseableAdmin.create(this.clusterConfig.getAnonConnectConfigForCluster(buildBrokerList(num -> {
            return getEndpointPair(KafkaClusterConfig.KafkaEndpoints.Listener.ANON, num.intValue());
        })));
    }

    static {
        if (!System.getenv().containsKey("TESTCONTAINERS_RYUK_DISABLED")) {
            LOGGER.log(System.Logger.Level.WARNING, "As per https://github.com/containers/podman/issues/7927#issuecomment-731525556 if using podman, set env var TESTCONTAINERS_RYUK_DISABLED=true");
        }
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            Set.copyOf(volumesPendingCleanup).forEach(TestcontainersKafkaCluster::removeNamedVolume);
        }));
    }
}
