package io.specmesh.kafka;

import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.specmesh.kafka.schema.SchemaRegistryContainer;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.acl.AclBinding;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:io/specmesh/kafka/DockerKafkaEnvironment.class */
public final class DockerKafkaEnvironment implements KafkaEnvironment, BeforeAllCallback, BeforeEachCallback, AfterEachCallback, AfterAllCallback {
    private final int startUpAttempts;
    private final Duration startUpTimeout;
    private final DockerImageName kafkaDockerImage;
    private final Map<String, String> kafkaEnv;
    private final Optional<DockerImageName> srDockerImage;
    private final Map<String, String> srEnv;
    private final Set<AclBinding> aclBindings;
    private final Optional<Credentials> adminUser;
    private final Optional<Network> explicitNetwork;
    private Network network;
    private KafkaContainer kafkaBroker;
    private SchemaRegistryContainer schemaRegistry;
    private boolean invokedStatically = false;
    private AtomicInteger setUpCount = new AtomicInteger(1);

    /* loaded from: input_file:io/specmesh/kafka/DockerKafkaEnvironment$Builder.class */
    public static final class Builder {
        private static final int DEFAULT_CONTAINER_STARTUP_ATTEMPTS = 3;
        private static final String DEFAULT_KAFKA_DOCKER_IMAGE = "confluentinc/cp-kafka:7.5.3";
        private static final String DEFAULT_SCHEMA_REG_IMAGE = "confluentinc/cp-schema-registry:7.5.3";
        private int startUpAttempts = DEFAULT_CONTAINER_STARTUP_ATTEMPTS;
        private Duration startUpTimeout = DEFAULT_CONTAINER_STARTUP_TIMEOUT;
        private DockerImageName kafkaDockerImage = DockerImageName.parse(DEFAULT_KAFKA_DOCKER_IMAGE);
        private final Map<String, String> kafkaEnv = new HashMap(DEFAULT_KAFKA_ENV);
        private Optional<DockerImageName> srImage = Optional.of(DockerImageName.parse(DEFAULT_SCHEMA_REG_IMAGE));
        private final Map<String, String> srEnv = new HashMap();
        private final Map<String, String> userPasswords = new LinkedHashMap();
        private boolean enableAcls = false;
        private final Set<AclBinding> aclBindings = new HashSet();
        private Optional<Network> explicitNetwork = Optional.empty();
        private static final Duration DEFAULT_CONTAINER_STARTUP_TIMEOUT = Duration.ofSeconds(30);
        private static final Map<String, String> DEFAULT_KAFKA_ENV = Map.of("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false");

        public Builder withNetwork(Network network) {
            this.explicitNetwork = Optional.of(network);
            return this;
        }

        public Builder withContainerStartUpAttempts(int i) {
            if (i <= 0) {
                throw new IllegalArgumentException("container startup attempts must be positive, but was: " + i);
            }
            this.startUpAttempts = i;
            return this;
        }

        public Builder withContainerStartUpTimeout(Duration duration) {
            this.startUpTimeout = (Duration) Objects.requireNonNull(duration, "timeout");
            return this;
        }

        public Builder withKafkaImage(String str) {
            this.kafkaDockerImage = DockerImageName.parse(str);
            return this;
        }

        public Builder withKafkaEnv(String str, String str2) {
            return withKafkaEnv(Map.of(str, str2));
        }

        public Builder withKafkaEnv(Map<String, String> map) {
            this.kafkaEnv.putAll(map);
            return this;
        }

        public Builder withoutSchemaRegistry() {
            this.srImage = Optional.empty();
            return this;
        }

        public Builder withSchemaRegistryImage(String str) {
            this.srImage = Optional.of(DockerImageName.parse(str));
            return this;
        }

        public Builder withSchemaRegistryEnv(String str, String str2) {
            return withSchemaRegistryEnv(Map.of(str, str2));
        }

        public Builder withSchemaRegistryEnv(Map<String, String> map) {
            this.srEnv.putAll(map);
            return this;
        }

        public Builder withSaslAuthentication(String str, String str2, String... strArr) {
            if (strArr.length % 2 != 0) {
                throw new IllegalArgumentException("additional users format user1, password1, ... userN, passwordN");
            }
            this.userPasswords.put(str, str2);
            int i = 0;
            while (i < strArr.length) {
                Map<String, String> map = this.userPasswords;
                String str3 = strArr[i];
                int i2 = i + 1;
                map.put(str3, strArr[i2]);
                i = i2 + 1;
            }
            return this;
        }

        public Builder withKafkaAcls(AclBinding... aclBindingArr) {
            return withKafkaAcls(List.of((Object[]) aclBindingArr));
        }

        public Builder withKafkaAcls(Collection<? extends AclBinding> collection) {
            this.enableAcls = true;
            this.aclBindings.addAll(collection);
            return this;
        }

        public DockerKafkaEnvironment build() {
            maybeEnableSasl();
            maybeEnableAcls();
            return new DockerKafkaEnvironment(this.startUpAttempts, this.startUpTimeout, this.kafkaDockerImage, this.kafkaEnv, this.srImage, this.srEnv, this.aclBindings, adminUser(), this.explicitNetwork);
        }

        private Optional<Credentials> adminUser() {
            if (this.userPasswords.isEmpty()) {
                return Optional.empty();
            }
            Map.Entry<String, String> next = this.userPasswords.entrySet().iterator().next();
            return Optional.of(new Credentials(next.getKey(), next.getValue()));
        }

        private void maybeEnableAcls() {
            if (this.enableAcls) {
                withKafkaEnv("KAFKA_SUPER_USERS", ((String) adminUser().map(credentials -> {
                    return "User:" + credentials.userName + ";";
                }).orElse("")) + "User:ANONYMOUS");
                withKafkaEnv("KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND", "false");
                withKafkaEnv("KAFKA_AUTHORIZER_CLASS_NAME", "kafka.security.authorizer.AclAuthorizer");
            }
        }

        private void maybeEnableSasl() {
            Optional<Credentials> adminUser = adminUser();
            if (adminUser.isEmpty()) {
                return;
            }
            withKafkaEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:SASL_PLAINTEXT");
            withKafkaEnv("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", buildJaasConfig(adminUser.get()));
            withKafkaEnv("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS", "PLAIN");
        }

        private String buildJaasConfig(Credentials credentials) {
            String obj = Clients.clientSaslAuthProperties(credentials.userName, credentials.password).get("sasl.jaas.config").toString();
            return obj.substring(0, obj.length() - 1) + ((String) this.userPasswords.entrySet().stream().map(entry -> {
                return " user_" + ((String) entry.getKey()) + "=\"" + ((String) entry.getValue()) + "\"";
            }).collect(Collectors.joining())) + ";";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/specmesh/kafka/DockerKafkaEnvironment$Credentials.class */
    public static class Credentials {
        final String userName;
        final String password;

        Credentials(String str, String str2) {
            this.userName = (String) Objects.requireNonNull(str, "userName");
            this.password = (String) Objects.requireNonNull(str2, "password");
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    private DockerKafkaEnvironment(int i, Duration duration, DockerImageName dockerImageName, Map<String, String> map, Optional<DockerImageName> optional, Map<String, String> map2, Set<AclBinding> set, Optional<Credentials> optional2, Optional<Network> optional3) {
        this.startUpTimeout = (Duration) Objects.requireNonNull(duration, "startUpTimeout");
        this.startUpAttempts = i;
        this.kafkaDockerImage = (DockerImageName) Objects.requireNonNull(dockerImageName, "kafkaDockerImage");
        this.kafkaEnv = Map.copyOf((Map) Objects.requireNonNull(map, "kafkaEnv"));
        this.srDockerImage = (Optional) Objects.requireNonNull(optional, "srDockerImage");
        this.srEnv = Map.copyOf((Map) Objects.requireNonNull(map2, "srEnv"));
        this.aclBindings = Set.copyOf((Collection) Objects.requireNonNull(set, "aclBindings"));
        this.adminUser = (Optional) Objects.requireNonNull(optional2, "credentials");
        this.explicitNetwork = (Optional) Objects.requireNonNull(optional3, "explicitNetwork");
        tearDown();
    }

    public void beforeAll(ExtensionContext extensionContext) {
        this.invokedStatically = true;
        setUp();
    }

    public void beforeEach(ExtensionContext extensionContext) {
        if (this.invokedStatically) {
            return;
        }
        setUp();
    }

    public void afterEach(ExtensionContext extensionContext) {
        if (this.invokedStatically) {
            return;
        }
        tearDown();
    }

    public void afterAll(ExtensionContext extensionContext) {
        tearDown();
    }

    @Override // io.specmesh.kafka.KafkaEnvironment
    public String kafkaBootstrapServers() {
        return this.kafkaBroker.getBootstrapServers();
    }

    @Override // io.specmesh.kafka.KafkaEnvironment
    public String schemeRegistryServer() {
        return this.schemaRegistry.hostNetworkUrl().toString();
    }

    @Override // io.specmesh.kafka.KafkaEnvironment
    public Admin adminClient() {
        HashMap hashMap = new HashMap();
        hashMap.put("client.id", UUID.randomUUID().toString());
        hashMap.put("bootstrap.servers", kafkaBootstrapServers());
        this.adminUser.ifPresent(credentials -> {
            hashMap.putAll(Clients.clientSaslAuthProperties(credentials.userName, credentials.password));
        });
        return AdminClient.create(hashMap);
    }

    public Network network() {
        if (this.network == null) {
            throw new IllegalStateException("Environment not running");
        }
        return this.network;
    }

    private void setUp() {
        if (this.setUpCount.incrementAndGet() != 1) {
            return;
        }
        this.network = this.explicitNetwork.orElseGet(Network::newNetwork);
        this.kafkaBroker = new KafkaContainer(this.kafkaDockerImage).withNetwork(this.network).withNetworkAliases(new String[]{"kafka"}).withStartupAttempts(this.startUpAttempts).withStartupTimeout(this.startUpTimeout).withEnv(this.kafkaEnv);
        ((Startable) this.srDockerImage.map(dockerImageName -> {
            SchemaRegistryContainer schemaRegistryContainer = (SchemaRegistryContainer) ((SchemaRegistryContainer) ((SchemaRegistryContainer) new SchemaRegistryContainer(this.srDockerImage.get()).withKafka(this.kafkaBroker).m4withNetworkAliases("schema-registry").withStartupAttempts(this.startUpAttempts)).withStartupTimeout(this.startUpTimeout)).withEnv(this.srEnv);
            this.schemaRegistry = schemaRegistryContainer;
            return schemaRegistryContainer;
        }).map(schemaRegistryContainer -> {
            return schemaRegistryContainer;
        }).orElse(this.kafkaBroker)).start();
        installAcls();
    }

    private void tearDown() {
        if (this.setUpCount.decrementAndGet() != 0) {
            return;
        }
        if (this.schemaRegistry != null) {
            this.schemaRegistry.close();
            this.schemaRegistry = null;
        }
        if (this.kafkaBroker != null) {
            this.kafkaBroker.close();
            this.kafkaBroker = null;
        }
        if (this.network != null) {
            if (this.explicitNetwork.isEmpty()) {
                this.network.close();
            }
            this.network = null;
        }
        this.invokedStatically = false;
    }

    private void installAcls() {
        try {
            Admin adminClient = adminClient();
            try {
                adminClient.createAcls(this.aclBindings).all().get(30L, TimeUnit.SECONDS);
                if (adminClient != null) {
                    adminClient.close();
                }
            } finally {
            }
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new AssertionError("Failed to create ACLs", e);
        }
    }

    public String testNetworkKafkaBootstrapServers() {
        return "PLAINTEXT://" + ((String) this.kafkaBroker.getNetworkAliases().get(0)) + ":9092";
    }

    public String testNetworkSchemeRegistryServer() {
        return "http://" + ((String) this.schemaRegistry.getNetworkAliases().get(0)) + ":8081";
    }

    @Override // io.specmesh.kafka.KafkaEnvironment
    /* renamed from: srClient, reason: merged with bridge method [inline-methods] */
    public CachedSchemaRegistryClient mo0srClient() {
        return new CachedSchemaRegistryClient(schemeRegistryServer(), 5, List.of(new ProtobufSchemaProvider(), new AvroSchemaProvider(), new JsonSchemaProvider()), Map.of());
    }
}
