package io.micronaut.configuration.kafka.embedded;

import io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration;
import io.micronaut.context.annotation.Requirements;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.BeanCreatedEvent;
import io.micronaut.context.event.BeanCreatedEventListener;
import io.micronaut.context.exceptions.ConfigurationException;
import io.micronaut.core.io.socket.SocketUtils;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import javax.inject.Singleton;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import kafka.zk.EmbeddedZookeeper;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.utils.MockTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@Requirements({@Requires(env = {"test", "dev"}), @Requires(classes = {KafkaServer.class, ZkClient.class, TestUtils.class, org.apache.kafka.test.TestUtils.class}), @Requires(property = AbstractKafkaConfiguration.EMBEDDED)})
/* loaded from: input_file:io/micronaut/configuration/kafka/embedded/KafkaEmbedded.class */
public class KafkaEmbedded implements BeanCreatedEventListener<AbstractKafkaConfiguration>, AutoCloseable {
    private static final String ZKHOST = "127.0.0.1";
    private static final Logger LOG = LoggerFactory.getLogger(KafkaEmbedded.class);
    private EmbeddedZookeeper zkServer;
    private ZkClient zkClient;
    private ZkUtils zkUtils;
    private KafkaServer kafkaServer;
    private final KafkaEmbeddedConfiguration embeddedConfiguration;
    private final AtomicBoolean init = new AtomicBoolean(false);

    public KafkaEmbedded(KafkaEmbeddedConfiguration kafkaEmbeddedConfiguration) {
        this.embeddedConfiguration = kafkaEmbeddedConfiguration;
    }

    public synchronized AbstractKafkaConfiguration onCreated(BeanCreatedEvent<AbstractKafkaConfiguration> beanCreatedEvent) {
        boolean contains;
        ConfigurationException configurationException;
        AbstractKafkaConfiguration abstractKafkaConfiguration = (AbstractKafkaConfiguration) beanCreatedEvent.getBean();
        if (this.kafkaServer != null) {
            return abstractKafkaConfiguration;
        }
        String[] split = abstractKafkaConfiguration.getConfig().getProperty("bootstrap.servers").split(",")[0].split(":");
        int i = -1;
        if (split.length == 2) {
            try {
                i = Integer.parseInt(split[1]);
            } catch (NumberFormatException e) {
                return abstractKafkaConfiguration;
            }
        } else if (SocketUtils.isTcpPortAvailable(AbstractKafkaConfiguration.DEFAULT_KAFKA_PORT)) {
            i = 9092;
        }
        if (this.embeddedConfiguration.isEnabled() && this.kafkaServer == null && i > -1 && SocketUtils.isTcpPortAvailable(i) && this.init.compareAndSet(false, true)) {
            try {
                if (this.zkServer == null) {
                    initZooKeeper();
                }
                Properties properties = this.embeddedConfiguration.getProperties();
                properties.setProperty("zookeeper.connect", "127.0.0.1:" + this.zkServer.port());
                properties.putIfAbsent("broker.id", "0");
                properties.put("port", Integer.valueOf(i));
                properties.putIfAbsent("offsets.topic.replication.factor", "1");
                properties.computeIfAbsent("log.dirs", obj -> {
                    try {
                        return Files.createTempDirectory("kafka-", new FileAttribute[0]).toAbsolutePath().toString();
                    } catch (IOException e2) {
                        throw new ConfigurationException("Error creating log directory for embedded Kafka server: " + e2.getMessage(), e2);
                    }
                });
                properties.setProperty("listeners", "PLAINTEXT://127.0.0.1:" + i);
                this.kafkaServer = TestUtils.createServer(new KafkaConfig(properties), new MockTime());
                List<String> topics = this.embeddedConfiguration.getTopics();
                if (!topics.isEmpty()) {
                    Properties properties2 = new Properties();
                    properties2.put("bootstrap.servers", "127.0.0.1:" + i);
                    AdminClient.create(properties2).createTopics((Collection) topics.stream().map(str -> {
                        return new NewTopic(str, 1, (short) 1);
                    }).collect(Collectors.toList())).all().get();
                }
            } finally {
                if (!contains) {
                }
            }
        }
        return abstractKafkaConfiguration;
    }

    @Override // java.lang.AutoCloseable
    @PreDestroy
    public void close() {
        new Thread(() -> {
            if (this.kafkaServer != null) {
                try {
                    this.kafkaServer.shutdown();
                } catch (Exception e) {
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("Error shutting down embedded Kafka Server: " + e.getMessage(), e);
                    }
                }
            }
            if (this.zkClient != null) {
                try {
                    this.zkClient.close();
                } catch (Exception e2) {
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("Error shutting down embedded ZooKeeper Client: " + e2.getMessage(), e2);
                    }
                }
            }
            if (this.zkServer != null) {
                try {
                    this.zkServer.shutdown();
                } catch (Exception e3) {
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("Error shutting down embedded ZooKeeper: " + e3.getMessage(), e3);
                    }
                }
            }
        }, "embedded-kafka-shutdown-thread").start();
    }

    public Optional<KafkaServer> getKafkaServer() {
        return Optional.ofNullable(this.kafkaServer);
    }

    public Optional<ZkUtils> getZkUtils() {
        return Optional.ofNullable(this.zkUtils);
    }

    private void initZooKeeper() {
        this.zkServer = new EmbeddedZookeeper();
        this.zkClient = new ZkClient("127.0.0.1:" + this.zkServer.port(), 30000, 30000, ZKStringSerializer$.MODULE$);
        this.zkUtils = ZkUtils.apply(this.zkClient, false);
    }

    /* renamed from: onCreated, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ Object m46onCreated(BeanCreatedEvent beanCreatedEvent) {
        return onCreated((BeanCreatedEvent<AbstractKafkaConfiguration>) beanCreatedEvent);
    }
}
