package io.goodforgod.testcontainers.extensions.kafka;

import io.goodforgod.testcontainers.extensions.ContainerMode;
import io.goodforgod.testcontainers.extensions.kafka.ContainerKafkaConnection;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionConfigurationException;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException;
import org.junit.jupiter.api.extension.ParameterResolver;
import org.junit.platform.commons.support.AnnotationSupport;
import org.junit.platform.commons.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;

@ApiStatus.Internal
/* loaded from: input_file:io/goodforgod/testcontainers/extensions/kafka/TestcontainersKafkaExtension.class */
final class TestcontainersKafkaExtension implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback, AfterEachCallback, ParameterResolver {
    private static final String EXTERNAL_TEST_KAFKA_BOOTSTRAP = "EXTERNAL_TEST_KAFKA_BOOTSTRAP_SERVERS";
    private static final String EXTERNAL_TEST_KAFKA_PREFIX = "EXTERNAL_TEST_KAFKA_";
    private static final ExtensionContext.Namespace NAMESPACE = ExtensionContext.Namespace.create(new Object[]{TestcontainersKafkaExtension.class});
    private static final Map<String, ExtensionContainerImpl> IMAGE_TO_SHARED_CONTAINER = new ConcurrentHashMap();
    private Properties externalConnection = null;
    private final Logger logger = LoggerFactory.getLogger(getClass());

    /* loaded from: input_file:io/goodforgod/testcontainers/extensions/kafka/TestcontainersKafkaExtension$ExtensionContainerImpl.class */
    private static final class ExtensionContainerImpl implements ExtensionContainer {
        private final KafkaContainer container;
        private final Properties properties;

        ExtensionContainerImpl(KafkaContainer kafkaContainer, Properties properties) {
            this.container = kafkaContainer;
            this.properties = properties;
        }

        @Override // io.goodforgod.testcontainers.extensions.kafka.ExtensionContainer
        public void stop() {
            this.container.stop();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/goodforgod/testcontainers/extensions/kafka/TestcontainersKafkaExtension$KafkaConnectionPool.class */
    public static final class KafkaConnectionPool {
        private final List<KafkaConnectionImpl> connections = new ArrayList();

        private KafkaConnectionPool() {
        }

        private void add(KafkaConnectionImpl kafkaConnectionImpl) {
            this.connections.add(kafkaConnectionImpl);
        }

        private void clear() {
            Iterator<KafkaConnectionImpl> it = this.connections.iterator();
            while (it.hasNext()) {
                try {
                    it.next().clear();
                } catch (Exception e) {
                }
            }
        }

        private void close() {
            Iterator<KafkaConnectionImpl> it = this.connections.iterator();
            while (it.hasNext()) {
                try {
                    it.next().close();
                } catch (Exception e) {
                }
            }
            this.connections.clear();
        }
    }

    TestcontainersKafkaExtension() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static List<ExtensionContainer> getSharedContainers() {
        return new ArrayList(IMAGE_TO_SHARED_CONTAINER.values());
    }

    private <T extends Annotation> Optional<T> findAnnotation(Class<T> cls, ExtensionContext extensionContext) {
        Optional of = Optional.of(extensionContext);
        while (true) {
            Optional optional = of;
            if (!optional.isPresent()) {
                return Optional.empty();
            }
            Optional<T> findAnnotation = AnnotationSupport.findAnnotation(((ExtensionContext) optional.get()).getRequiredTestClass(), cls);
            if (findAnnotation.isPresent()) {
                return findAnnotation;
            }
            of = ((ExtensionContext) optional.get()).getParent();
        }
    }

    private Optional<KafkaContainer> getContainerFromField(ExtensionContext extensionContext) {
        this.logger.debug("Looking for Kafka Container...");
        Class<? extends Annotation> annotationContainer = getAnnotationContainer();
        return ReflectionUtils.findFields(extensionContext.getRequiredTestClass(), field -> {
            return (field.isSynthetic() || field.getAnnotation(annotationContainer) == null) ? false : true;
        }, ReflectionUtils.HierarchyTraversalMode.TOP_DOWN).stream().findFirst().flatMap(field2 -> {
            return extensionContext.getTestInstance().map(obj -> {
                try {
                    field2.setAccessible(true);
                    Object obj = field2.get(obj);
                    Class<KafkaContainer> containerType = getContainerType();
                    if (!containerType.isAssignableFrom(obj.getClass())) {
                        throw new IllegalArgumentException(String.format("Field '%s' annotated with @%s value must be instance of %s", field2.getName(), annotationContainer.getSimpleName(), containerType));
                    }
                    this.logger.debug("Found Kafka Container in field: {}", field2.getName());
                    return (KafkaContainer) obj;
                } catch (IllegalAccessException e) {
                    throw new IllegalStateException(String.format("Failed retrieving value from field '%s' annotated with @%s", field2.getName(), annotationContainer.getSimpleName()), e);
                }
            });
        });
    }

    private Class<KafkaContainer> getContainerType() {
        return KafkaContainer.class;
    }

    private Class<? extends Annotation> getAnnotationContainer() {
        return ContainerKafka.class;
    }

    private Class<? extends Annotation> getAnnotationConnection() {
        return ContainerKafkaConnection.class;
    }

    @NotNull
    private KafkaContainer getDefaultContainer(@NotNull ContainerMetadata containerMetadata) {
        return new KafkaContainer(DockerImageName.parse(containerMetadata.image()).asCompatibleSubstituteFor(DockerImageName.parse("confluentinc/cp-kafka"))).withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false").withEnv("AUTO_CREATE_TOPICS", "true").withNetworkAliases(new String[]{"kafka-" + System.currentTimeMillis()}).withNetwork(Network.SHARED).withKraft().withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger(KafkaContainer.class))).withStartupTimeout(Duration.ofMinutes(3L));
    }

    @NotNull
    private Optional<ContainerMetadata> findMetadata(@NotNull ExtensionContext extensionContext) {
        return findAnnotation(TestcontainersKafka.class, extensionContext).map(testcontainersKafka -> {
            return new ContainerMetadata(testcontainersKafka.image(), testcontainersKafka.mode());
        });
    }

    @NotNull
    private Properties getPropertiesForContainer(@NotNull KafkaContainer kafkaContainer) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
        return properties;
    }

    @NotNull
    private Optional<Properties> getConnectionExternal() {
        if (System.getenv(EXTERNAL_TEST_KAFKA_BOOTSTRAP) == null) {
            return Optional.empty();
        }
        Properties properties = new Properties();
        System.getenv().forEach((str, str2) -> {
            if (str.startsWith(EXTERNAL_TEST_KAFKA_PREFIX)) {
                properties.put(str.replace(EXTERNAL_TEST_KAFKA_PREFIX, "").replace("_", ".").toLowerCase(), str2);
            }
        });
        return Optional.of(properties);
    }

    @Nullable
    private Properties getPropertiesExternalCached() {
        if (this.externalConnection == null) {
            this.externalConnection = getConnectionExternal().orElse(null);
        }
        if (this.externalConnection != null) {
            this.logger.debug("Found external connection to database, no containers will be created during tests: {}", this.externalConnection);
        }
        return this.externalConnection;
    }

    private ContainerMetadata getMetadata(@NotNull ExtensionContext extensionContext) {
        return findMetadata(extensionContext).orElseThrow(() -> {
            return new ExtensionConfigurationException("Extension annotation not found");
        });
    }

    private void injectKafkaConnection(Properties properties, ExtensionContext extensionContext) {
        Class<? extends Annotation> annotationConnection = getAnnotationConnection();
        List findFields = ReflectionUtils.findFields(extensionContext.getRequiredTestClass(), field -> {
            return (field.isSynthetic() || Modifier.isFinal(field.getModifiers()) || Modifier.isStatic(field.getModifiers()) || field.getAnnotation(annotationConnection) == null) ? false : true;
        }, ReflectionUtils.HierarchyTraversalMode.TOP_DOWN);
        this.logger.debug("Starting @ContainerKafkaConnection field injection for container properties: {}", properties);
        KafkaConnectionPool kafkaConnectionPool = (KafkaConnectionPool) extensionContext.getStore(NAMESPACE).get(KafkaConnectionPool.class, KafkaConnectionPool.class);
        extensionContext.getTestInstance().ifPresent(obj -> {
            Iterator it = findFields.iterator();
            while (it.hasNext()) {
                Field field2 = (Field) it.next();
                try {
                    ContainerKafkaConnection containerKafkaConnection = (ContainerKafkaConnection) field2.getAnnotation(ContainerKafkaConnection.class);
                    Properties properties2 = new Properties();
                    properties2.putAll(properties);
                    Arrays.stream(containerKafkaConnection.properties()).forEach(property -> {
                        properties2.put(property.name(), property.value());
                    });
                    KafkaConnectionImpl kafkaConnectionImpl = new KafkaConnectionImpl(properties2);
                    kafkaConnectionPool.add(kafkaConnectionImpl);
                    field2.setAccessible(true);
                    field2.set(obj, kafkaConnectionImpl);
                } catch (IllegalAccessException e) {
                    throw new IllegalStateException(String.format("Field '%s' annotated with @%s can't set kafka connection", field2.getName(), annotationConnection.getSimpleName()), e);
                }
            }
        });
    }

    public void beforeAll(ExtensionContext extensionContext) throws Exception {
        ContainerMetadata metadata = getMetadata(extensionContext);
        ExtensionContext.Store store = extensionContext.getStore(NAMESPACE);
        store.put(KafkaConnectionPool.class, new KafkaConnectionPool());
        Properties propertiesExternalCached = getPropertiesExternalCached();
        if (propertiesExternalCached != null) {
            injectKafkaConnection(propertiesExternalCached, extensionContext);
            return;
        }
        if (metadata.runMode() == ContainerMode.PER_RUN) {
            Optional<KafkaContainer> containerFromField = getContainerFromField(extensionContext);
            Optional<U> map = containerFromField.map((v0) -> {
                return v0.getDockerImageName();
            });
            Objects.requireNonNull(metadata);
            ExtensionContainerImpl computeIfAbsent = IMAGE_TO_SHARED_CONTAINER.computeIfAbsent((String) map.orElseGet(metadata::image), str -> {
                KafkaContainer kafkaContainer = (KafkaContainer) containerFromField.orElseGet(() -> {
                    this.logger.debug("Getting default Kafka Container for image: {}", metadata.image());
                    return getDefaultContainer(metadata);
                });
                this.logger.debug("Starting in mode '{}' Kafka Container: {}", metadata.runMode(), kafkaContainer);
                kafkaContainer.withReuse(true).start();
                this.logger.debug("Started successfully in mode '{}' Kafka Container: {}", metadata.runMode(), kafkaContainer);
                return new ExtensionContainerImpl(kafkaContainer, getPropertiesForContainer(kafkaContainer));
            });
            store.put(ContainerMode.PER_RUN, computeIfAbsent);
            injectKafkaConnection(computeIfAbsent.properties, extensionContext);
            return;
        }
        if (metadata.runMode() == ContainerMode.PER_CLASS) {
            KafkaContainer orElseGet = getContainerFromField(extensionContext).orElseGet(() -> {
                this.logger.debug("Getting default Kafka Container for image: {}", metadata.image());
                return getDefaultContainer(metadata);
            });
            this.logger.debug("Starting in mode '{}' Kafka Container: {}", metadata.runMode(), orElseGet);
            orElseGet.start();
            this.logger.debug("Started successfully in mode '{}' Kafka Container: {}", metadata.runMode(), orElseGet);
            ExtensionContainerImpl extensionContainerImpl = new ExtensionContainerImpl(orElseGet, getPropertiesForContainer(orElseGet));
            store.put(ContainerMode.PER_CLASS, extensionContainerImpl);
            injectKafkaConnection(extensionContainerImpl.properties, extensionContext);
        }
    }

    public void beforeEach(ExtensionContext extensionContext) throws Exception {
        ContainerMetadata metadata = getMetadata(extensionContext);
        ExtensionContext.Store store = extensionContext.getStore(NAMESPACE);
        if (metadata.runMode() == ContainerMode.PER_METHOD) {
            KafkaContainer orElseGet = getContainerFromField(extensionContext).orElseGet(() -> {
                this.logger.debug("Getting default Kafka Container for image: {}", metadata.image());
                return getDefaultContainer(metadata);
            });
            this.logger.debug("Starting in mode '{}' Kafka Container: {}", metadata.runMode(), orElseGet);
            orElseGet.start();
            this.logger.debug("Started successfully in mode '{}' Kafka Container: {}", metadata.runMode(), orElseGet);
            ExtensionContainerImpl extensionContainerImpl = new ExtensionContainerImpl(orElseGet, getPropertiesForContainer(orElseGet));
            store.put(ContainerMode.PER_METHOD, extensionContainerImpl);
            injectKafkaConnection(extensionContainerImpl.properties, extensionContext);
        }
    }

    public void afterEach(ExtensionContext extensionContext) throws Exception {
        ExtensionContainerImpl extensionContainerImpl;
        ContainerMetadata metadata = getMetadata(extensionContext);
        ExtensionContext.Store store = extensionContext.getStore(NAMESPACE);
        if (metadata.runMode() == ContainerMode.PER_METHOD && (extensionContainerImpl = (ExtensionContainerImpl) store.get(ContainerMode.PER_METHOD, ExtensionContainerImpl.class)) != null) {
            this.logger.debug("Stopping in mode '{}' Kafka Container: {}", metadata.runMode(), extensionContainerImpl.container);
            extensionContainerImpl.stop();
            this.logger.debug("Stopped successfully in mode '{}' Kafka Container: {}", metadata.runMode(), extensionContainerImpl.container);
        }
        ((KafkaConnectionPool) store.get(KafkaConnectionPool.class, KafkaConnectionPool.class)).clear();
    }

    public void afterAll(ExtensionContext extensionContext) throws Exception {
        ExtensionContainerImpl extensionContainerImpl;
        ContainerMetadata metadata = getMetadata(extensionContext);
        ExtensionContext.Store store = extensionContext.getStore(NAMESPACE);
        if (metadata.runMode() == ContainerMode.PER_CLASS && (extensionContainerImpl = (ExtensionContainerImpl) store.get(ContainerMode.PER_CLASS, ExtensionContainerImpl.class)) != null) {
            this.logger.debug("Stopping in mode '{}' Kafka Container: {}", metadata.runMode(), extensionContainerImpl.container);
            extensionContainerImpl.stop();
            this.logger.debug("Stopped successfully in mode '{}' Kafka Container: {}", metadata.runMode(), extensionContainerImpl.container);
        }
        ((KafkaConnectionPool) store.get(KafkaConnectionPool.class, KafkaConnectionPool.class)).close();
    }

    public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
        Class<? extends Annotation> annotationConnection = getAnnotationConnection();
        if (!((parameterContext.getDeclaringExecutable() instanceof Method) && parameterContext.getParameter().getAnnotation(annotationConnection) != null)) {
            return false;
        }
        if (parameterContext.getParameter().getType().equals(KafkaConnection.class)) {
            return true;
        }
        throw new ExtensionConfigurationException(String.format("Parameter '%s' annotated @%s is not of type %s", parameterContext.getParameter().getName(), annotationConnection.getSimpleName(), KafkaConnection.class));
    }

    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
        ExtensionContext.Store store = extensionContext.getStore(NAMESPACE);
        KafkaConnectionPool kafkaConnectionPool = (KafkaConnectionPool) store.get(KafkaConnectionPool.class, KafkaConnectionPool.class);
        ContainerMetadata metadata = getMetadata(extensionContext);
        ContainerKafkaConnection containerKafkaConnection = (ContainerKafkaConnection) parameterContext.getParameter().getAnnotation(ContainerKafkaConnection.class);
        Properties properties = new Properties();
        Properties propertiesExternalCached = getPropertiesExternalCached();
        if (propertiesExternalCached != null) {
            properties.putAll(propertiesExternalCached);
        } else if (metadata.runMode() == ContainerMode.PER_METHOD) {
            properties.putAll(((ExtensionContainerImpl) store.get(ContainerMode.PER_METHOD, ExtensionContainerImpl.class)).properties);
        } else if (metadata.runMode() == ContainerMode.PER_CLASS) {
            properties.putAll(((ExtensionContainerImpl) store.get(ContainerMode.PER_CLASS, ExtensionContainerImpl.class)).properties);
        } else {
            properties.putAll(((ExtensionContainerImpl) store.get(ContainerMode.PER_RUN, ExtensionContainerImpl.class)).properties);
        }
        for (ContainerKafkaConnection.Property property : containerKafkaConnection.properties()) {
            properties.put(property.name(), property.value());
        }
        KafkaConnectionImpl kafkaConnectionImpl = new KafkaConnectionImpl(properties);
        kafkaConnectionPool.add(kafkaConnectionImpl);
        return kafkaConnectionImpl;
    }
}
