package org.creekservice.internal.kafka.streams.test.extension.testsuite;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.creekservice.api.base.type.Preconditions;
import org.creekservice.api.system.test.extension.component.definition.ServiceDefinition;
import org.creekservice.api.system.test.extension.test.env.suite.service.ConfigurableServiceInstance;
import org.creekservice.api.system.test.extension.test.env.suite.service.ServiceInstance;

/* loaded from: input_file:org/creekservice/internal/kafka/streams/test/extension/testsuite/KafkaContainerDef.class */
final class KafkaContainerDef implements ServiceDefinition {
    public static final int TEST_NETWORK_PORT = 9093;
    public static final int SERVICE_NETWORK_PORT = 9092;
    private static final int ZOOKEEPER_PORT = 2181;
    private static final String DEFAULT_INTERNAL_PARTITION_COUNT = "1";
    private static final String DEFAULT_INTERNAL_TOPIC_RF = "1";
    private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(90);
    private static final Duration SHUTDOWN_TIMEOUT = Duration.ofSeconds(1);
    private final String name;
    private final String kafkaDockerImage;

    /* loaded from: input_file:org/creekservice/internal/kafka/streams/test/extension/testsuite/KafkaContainerDef$FailedToConfigureKafkaException.class */
    private static final class FailedToConfigureKafkaException extends RuntimeException {
        FailedToConfigureKafkaException(ServiceInstance.ExecResult execResult) {
            super("Failed to configure Kafka's advertised listeners. 'kafka-configs''s exec result: " + execResult);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaContainerDef(String str, String str2) {
        this.name = "kafka-" + Preconditions.requireNonBlank(str, "clusterName");
        this.kafkaDockerImage = Preconditions.requireNonBlank(str2, "kafkaDockerImage");
    }

    public String name() {
        return this.name;
    }

    public String dockerImage() {
        return this.kafkaDockerImage;
    }

    public void configureInstance(ConfigurableServiceInstance configurableServiceInstance) {
        ArrayList arrayList = new ArrayList();
        arrayList.add("#!/bin/bash");
        arrayList.addAll(setUpZooKeeper(configurableServiceInstance));
        arrayList.addAll(setUpKafka(configurableServiceInstance));
        configurableServiceInstance.setCommand(new String[]{"sh", "-c", String.join(System.lineSeparator(), arrayList)});
        configurableServiceInstance.setStartupTimeout(STARTUP_TIMEOUT);
        configurableServiceInstance.setShutdownTimeout(SHUTDOWN_TIMEOUT);
    }

    public void instanceStarted(ServiceInstance serviceInstance) {
        String serviceNetworkListener = serviceNetworkListener(serviceInstance);
        ServiceInstance.ExecResult execOnInstance = serviceInstance.execOnInstance(new String[]{"kafka-configs", "--alter", "--bootstrap-server", serviceNetworkListener, "--entity-type", "brokers", "--entity-name", "1", "--add-config", "advertised.listeners=[" + testNetworkListener(serviceInstance) + "," + serviceNetworkListener + "]"});
        if (execOnInstance.exitCode() != 0) {
            throw new FailedToConfigureKafkaException(execOnInstance);
        }
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        KafkaContainerDef kafkaContainerDef = (KafkaContainerDef) obj;
        return Objects.equals(this.name, kafkaContainerDef.name) && Objects.equals(this.kafkaDockerImage, kafkaContainerDef.kafkaDockerImage);
    }

    public int hashCode() {
        return Objects.hash(this.name, this.kafkaDockerImage);
    }

    private static List<String> setUpZooKeeper(ConfigurableServiceInstance configurableServiceInstance) {
        configurableServiceInstance.addEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181");
        return List.of("echo 'clientPort=2181' > zookeeper.properties", "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties", "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties", "zookeeper-server-start ./zookeeper.properties &");
    }

    private static List<String> setUpKafka(ConfigurableServiceInstance configurableServiceInstance) {
        configurableServiceInstance.addExposedPorts(new int[]{TEST_NETWORK_PORT}).addEnv("KAFKA_LISTENERS", "SERVICE_NETWORK://0.0.0.0:9092,TEST_NETWORK://0.0.0.0:9093").addEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "SERVICE_NETWORK:PLAINTEXT,TEST_NETWORK:PLAINTEXT").addEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "SERVICE_NETWORK").addEnv("KAFKA_ADVERTISED_LISTENERS", serviceNetworkListener(configurableServiceInstance)).addEnv("KAFKA_BROKER_ID", "1").addEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1").addEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1").addEnv("KAFKA_TRANSACTION_STATE_LOG_NUM_PARTITIONS", "1").addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1").addEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1").addEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", "9223372036854775807").addEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0").addEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false");
        return List.of("echo '' > /etc/confluent/docker/ensure", "/etc/confluent/docker/run");
    }

    private static String testNetworkListener(ServiceInstance serviceInstance) {
        return "TEST_NETWORK://" + serviceInstance.testNetworkHostname() + ":" + serviceInstance.testNetworkPort(TEST_NETWORK_PORT);
    }

    private static String serviceNetworkListener(ServiceInstance serviceInstance) {
        return "SERVICE_NETWORK://" + serviceInstance.serviceNetworkHostname() + ":9092";
    }
}
