package io.goodforgod.testcontainers.extensions.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import org.jetbrains.annotations.NotNull;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:io/goodforgod/testcontainers/extensions/kafka/KafkaContainerExtra.class */
public class KafkaContainerExtra extends KafkaContainer {
    private static final String EXTERNAL_TEST_KAFKA_BOOTSTRAP = "EXTERNAL_TEST_KAFKA_BOOTSTRAP_SERVERS";
    private static final String EXTERNAL_TEST_KAFKA_PREFIX = "EXTERNAL_TEST_KAFKA_";
    private volatile KafkaConnectionImpl connection;

    public KafkaContainerExtra(String str) {
        this(DockerImageName.parse(str));
    }

    public KafkaContainerExtra(DockerImageName dockerImageName) {
        super(dockerImageName);
        String str = "kafka-" + System.currentTimeMillis();
        withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger(KafkaContainerExtra.class)).withMdc("image", dockerImageName.asCanonicalNameString()).withMdc("alias", str));
        withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false");
        withEnv("AUTO_CREATE_TOPICS", "true");
        withEnv("KAFKA_LOG4J_LOGGERS", "org.apache.zookeeper=ERROR,org.kafka.zookeeper=ERROR,kafka.zookeeper=ERROR,org.apache.kafka=ERROR,kafka=ERROR,kafka.network=ERROR,kafka.cluster=ERROR,kafka.controller=ERROR,kafka.coordinator=INFO,kafka.log=ERROR,kafka.server=ERROR,state.change.logger=ERROR");
        withEnv("ZOOKEEPER_LOG4J_LOGGERS", "org.apache.zookeeper=ERROR,org.kafka.zookeeper=ERROR,org.kafka.zookeeper.server=ERROR,kafka.zookeeper=ERROR,org.apache.kafka=ERROR");
        withEmbeddedZookeeper();
        withExposedPorts(new Integer[]{9092, 9093});
        waitingFor(Wait.forListeningPort());
        withStartupTimeout(Duration.ofMinutes(5L));
        setNetworkAliases(new ArrayList(List.of(str)));
    }

    @NotNull
    public KafkaConnection connection() {
        if (this.connection == null) {
            Optional<KafkaConnection> connectionExternal = getConnectionExternal();
            if (connectionExternal.isEmpty() && !isRunning()) {
                throw new IllegalStateException("KafkaConnection can't be create for container that is not running");
            }
            this.connection = (KafkaConnectionImpl) connectionExternal.orElseGet(() -> {
                String str = (String) getNetworkAliases().get(getNetworkAliases().size() - 1);
                Properties properties = new Properties();
                properties.put("bootstrap.servers", getBootstrapServers());
                Properties properties2 = new Properties();
                properties2.put("bootstrap.servers", String.format("%s:%s", str, "9092"));
                return new KafkaConnectionImpl(properties, properties2);
            });
        }
        return this.connection;
    }

    public void start() {
        if (getConnectionExternal().isEmpty()) {
            super.start();
        }
    }

    public void stop() {
        this.connection.close();
        this.connection = null;
        super.stop();
    }

    @NotNull
    private static Optional<KafkaConnection> getConnectionExternal() {
        if (System.getenv(EXTERNAL_TEST_KAFKA_BOOTSTRAP) == null) {
            return Optional.empty();
        }
        Properties properties = new Properties();
        System.getenv().forEach((str, str2) -> {
            if (str.startsWith(EXTERNAL_TEST_KAFKA_PREFIX)) {
                properties.put(str.replace(EXTERNAL_TEST_KAFKA_PREFIX, "").replace("_", ".").toLowerCase(), str2);
            }
        });
        return Optional.of(new KafkaConnectionImpl(properties, null));
    }
}
