package org.graylog.testing.kafka;

import com.github.dockerjava.api.command.InspectContainerResponse;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Base64;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.graylog2.shared.utilities.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:org/graylog/testing/kafka/KafkaContainer.class */
public class KafkaContainer extends GenericContainer<KafkaContainer> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaContainer.class);
    private static final Version DEFAULT_VERSION = Version.V34;
    private static final String KAFKA_ADVERTISED_LISTENERS_FILE = "/.env-kafka-advertised-listeners";
    private Admin adminClient;
    private final Network network;

    /* loaded from: input_file:org/graylog/testing/kafka/KafkaContainer$Version.class */
    public enum Version {
        V34("3.4.0");

        private final String version;

        Version(String str) {
            this.version = str;
        }

        public String getVersion() {
            return this.version;
        }
    }

    public static KafkaContainer create() {
        return new KafkaContainer(DEFAULT_VERSION, Network.newNetwork());
    }

    public static KafkaContainer create(Version version) {
        return new KafkaContainer(version, Network.newNetwork());
    }

    private KafkaContainer(Version version, Network network) {
        super(DockerImageName.parse(StringUtils.f("bitnami/kafka:%s", new Object[]{version.getVersion()})));
        this.adminClient = null;
        this.network = network;
        withExposedPorts(new Integer[]{9092});
        withNetwork((Network) Objects.requireNonNull(network, "network cannot be null"));
        withEnv("KAFKA_HEAP_OPTS", "-Xmx256m -Xms256m");
        withEnv("ALLOW_PLAINTEXT_LISTENER", "yes");
        withEnv("KAFKA_KRAFT_CLUSTER_ID", generateKraftClusterId());
        withEnv("KAFKA_CFG_NODE_ID", "1");
        withEnv("KAFKA_CFG_CONTROLLER_QUORUM_VOTERS", "1@127.0.0.1:9093");
        withEnv("KAFKA_CFG_PROCESS_ROLES", "broker,controller");
        withEnv("KAFKA_CFG_CONTROLLER_LISTENER_NAMES", "CONTROLLER");
        withEnv("KAFKA_CFG_LISTENERS", "EXTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,INTERNAL://0.0.0.0:9094");
        withEnv("KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP", "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT");
        withEnv("KAFKA_CFG_INTER_BROKER_LISTENER_NAME", "INTERNAL");
        withEnv("KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
        withEnv("KAFKA_CFG_OFFSETS_TOPIC_NUM_PARTITIONS", "1");
        withEnv("KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR", "1");
        withEnv("KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1");
        withEnv("KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR", "1");
        withCreateContainerCmdModifier(createContainerCmd -> {
            createContainerCmd.withEntrypoint(new String[]{"sh"});
        });
        withCommand(new String[]{"-c", StringUtils.f("while [ ! -f %s ]; do sleep 0.1; done; export KAFKA_CFG_ADVERTISED_LISTENERS=\"$(cat %s)\"; /entrypoint.sh /run.sh", new Object[]{KAFKA_ADVERTISED_LISTENERS_FILE, KAFKA_ADVERTISED_LISTENERS_FILE})});
        waitingFor(Wait.forLogMessage(".*Kafka Server started.*", 1).withStartupTimeout(Duration.ofSeconds(60L)));
    }

    private static String generateKraftClusterId() {
        return Base64.getEncoder().encodeToString(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)).substring(0, 22);
    }

    public int getKafkaPort() {
        return getMappedPort(9092).intValue();
    }

    protected void containerIsStarting(InspectContainerResponse inspectContainerResponse) {
        super.containerIsStarting(inspectContainerResponse);
        copyFileToContainer(Transferable.of(StringUtils.f("EXTERNAL://%s:%d,INTERNAL://%s:9094", new Object[]{getHost(), Integer.valueOf(getKafkaPort()), inspectContainerResponse.getConfig().getHostName()})), KAFKA_ADVERTISED_LISTENERS_FILE);
    }

    protected void containerIsStarted(InspectContainerResponse inspectContainerResponse) {
        super.containerIsStarted(inspectContainerResponse);
        this.adminClient = Admin.create(Map.of("bootstrap.servers", "localhost:" + getKafkaPort()));
    }

    public KafkaProducer<String, byte[]> createByteArrayProducer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:" + getKafkaPort());
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("client.id", "graylog-node-" + UUID.randomUUID());
        properties.put("acks", "1");
        properties.put("linger.ms", 0);
        return new KafkaProducer<>(properties);
    }

    public void createTopic(String str) throws Exception {
        LOG.info("Creating topic: {}", str);
        adminClient().createTopics(Set.of(new NewTopic(str, 1, (short) 1))).all().get(30L, TimeUnit.SECONDS);
    }

    public Set<String> listTopics() throws Exception {
        return (Set) adminClient().listTopics().names().get(30L, TimeUnit.SECONDS);
    }

    public Admin adminClient() {
        return (Admin) Objects.requireNonNull(this.adminClient, "adminClient is not initialized yet");
    }

    public void close() {
        super.close();
        this.network.close();
    }
}
