package io.quarkus.kafka.streams.runtime;

import io.quarkus.arc.Unremovable;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.event.Event;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;

@Singleton
/* loaded from: input_file:io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.class */
public class KafkaStreamsProducer {
    private static final Logger LOGGER = Logger.getLogger(KafkaStreamsProducer.class.getName());
    private static volatile boolean shutdown = false;
    private final ExecutorService executorService;
    private final StreamsConfig streamsConfig;
    private final KafkaStreams kafkaStreams;
    private final KafkaStreamsTopologyManager kafkaStreamsTopologyManager;
    private final Admin kafkaAdminClient;
    private final Duration topicsTimeout;
    private final List<String> trimmedTopics;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/quarkus/kafka/streams/runtime/KafkaStreamsProducer$DurationToSecondsFunction.class */
    public static final class DurationToSecondsFunction implements Function<Duration, String> {
        private static final DurationToSecondsFunction INSTANCE = new DurationToSecondsFunction();

        private DurationToSecondsFunction() {
        }

        @Override // java.util.function.Function
        public String apply(Duration duration) {
            return String.valueOf(duration.getSeconds());
        }
    }

    @Inject
    public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStreamsRuntimeConfig kafkaStreamsRuntimeConfig, ExecutorService executorService, Instance<Topology> instance, Instance<KafkaClientSupplier> instance2, @Identifier("default-kafka-broker") Instance<Map<String, Object>> instance3, Instance<KafkaStreams.StateListener> instance4, Instance<StateRestoreListener> instance5, Instance<StreamsUncaughtExceptionHandler> instance6) {
        shutdown = false;
        if (instance.isUnsatisfied()) {
            LOGGER.warn("No Topology producer; Kafka Streams will not be started");
            this.executorService = null;
            this.streamsConfig = null;
            this.kafkaStreams = null;
            this.kafkaStreamsTopologyManager = null;
            this.kafkaAdminClient = null;
            this.topicsTimeout = null;
            this.trimmedTopics = null;
            return;
        }
        Properties properties = kafkaStreamsSupport.getProperties();
        String asString = asString(kafkaStreamsRuntimeConfig.bootstrapServers);
        Properties streamsProperties = getStreamsProperties(properties, instance3.isUnsatisfied() ? Collections.emptyMap() : (Map) instance3.get(), KafkaStreamsRuntimeConfig.DEFAULT_KAFKA_BROKER.equalsIgnoreCase(asString) ? (String) ConfigProvider.getConfig().getOptionalValue("kafka.bootstrap.servers", String.class).orElse(asString) : asString, kafkaStreamsRuntimeConfig);
        this.kafkaAdminClient = Admin.create(getAdminClientConfig(streamsProperties));
        this.executorService = executorService;
        this.topicsTimeout = kafkaStreamsRuntimeConfig.topicsTimeout;
        this.trimmedTopics = isTopicsCheckEnabled() ? kafkaStreamsRuntimeConfig.getTrimmedTopics() : Collections.emptyList();
        this.streamsConfig = new StreamsConfig(streamsProperties);
        this.kafkaStreams = initializeKafkaStreams(this.streamsConfig, (Topology) instance.get(), instance2, instance4, instance5, instance6);
        this.kafkaStreamsTopologyManager = new KafkaStreamsTopologyManager(this.kafkaAdminClient);
    }

    private boolean isTopicsCheckEnabled() {
        return this.topicsTimeout.compareTo(Duration.ZERO) > 0;
    }

    public void onStartup(@Observes StartupEvent startupEvent, Event<KafkaStreams> event) {
        if (this.kafkaStreams != null) {
            event.fire(this.kafkaStreams);
            this.executorService.execute(() -> {
                if (isTopicsCheckEnabled()) {
                    try {
                        waitForTopicsToBeCreated(this.kafkaAdminClient, this.trimmedTopics, this.topicsTimeout);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
                if (shutdown) {
                    return;
                }
                LOGGER.debug("Starting Kafka Streams pipeline");
                this.kafkaStreams.start();
            });
        }
    }

    @Singleton
    @Unremovable
    @Produces
    public KafkaStreams getKafkaStreams() {
        return this.kafkaStreams;
    }

    @Singleton
    @Unremovable
    @Produces
    public StreamsConfig getStreamsConfig() {
        return this.streamsConfig;
    }

    @Singleton
    @Unremovable
    @Produces
    public KafkaStreamsTopologyManager kafkaStreamsTopologyManager() {
        return this.kafkaStreamsTopologyManager;
    }

    void onStop(@Observes ShutdownEvent shutdownEvent) {
        shutdown = true;
        if (this.kafkaStreams != null) {
            LOGGER.debug("Stopping Kafka Streams pipeline");
            this.kafkaStreams.close();
        }
        if (this.kafkaAdminClient != null) {
            this.kafkaAdminClient.close(Duration.ZERO);
        }
    }

    private static KafkaStreams initializeKafkaStreams(StreamsConfig streamsConfig, Topology topology, Instance<KafkaClientSupplier> instance, Instance<KafkaStreams.StateListener> instance2, Instance<StateRestoreListener> instance3, Instance<StreamsUncaughtExceptionHandler> instance4) {
        KafkaStreams kafkaStreams = instance.isUnsatisfied() ? new KafkaStreams(topology, streamsConfig) : new KafkaStreams(topology, streamsConfig, (KafkaClientSupplier) instance.get());
        if (!instance2.isUnsatisfied()) {
            kafkaStreams.setStateListener((KafkaStreams.StateListener) instance2.get());
        }
        if (!instance3.isUnsatisfied()) {
            kafkaStreams.setGlobalStateRestoreListener((StateRestoreListener) instance3.get());
        }
        if (!instance4.isUnsatisfied()) {
            kafkaStreams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) instance4.get());
        }
        return kafkaStreams;
    }

    private static Properties getStreamsProperties(Properties properties, Map<String, Object> map, String str, KafkaStreamsRuntimeConfig kafkaStreamsRuntimeConfig) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.putAll(map);
        properties2.putAll(KafkaStreamsPropertiesUtil.quarkusKafkaStreamsProperties());
        properties2.putAll(KafkaStreamsPropertiesUtil.appKafkaStreamsProperties());
        properties2.put("bootstrap.servers", str);
        properties2.put("application.id", kafkaStreamsRuntimeConfig.applicationId);
        if (kafkaStreamsRuntimeConfig.applicationServer.isPresent()) {
            properties2.put("application.server", kafkaStreamsRuntimeConfig.applicationServer.get());
        }
        if (kafkaStreamsRuntimeConfig.schemaRegistryUrl.isPresent()) {
            properties2.put(kafkaStreamsRuntimeConfig.schemaRegistryKey, kafkaStreamsRuntimeConfig.schemaRegistryUrl.get());
        }
        setProperty(kafkaStreamsRuntimeConfig.securityProtocol, properties2, "security.protocol");
        SaslConfig saslConfig = kafkaStreamsRuntimeConfig.sasl;
        if (saslConfig != null) {
            setProperty(saslConfig.mechanism, properties2, "sasl.mechanism");
            setProperty(saslConfig.jaasConfig, properties2, "sasl.jaas.config");
            setProperty(saslConfig.clientCallbackHandlerClass, properties2, "sasl.client.callback.handler.class");
            setProperty(saslConfig.loginCallbackHandlerClass, properties2, "sasl.login.callback.handler.class");
            setProperty(saslConfig.loginClass, properties2, "sasl.login.class");
            setProperty(saslConfig.kerberosServiceName, properties2, "sasl.kerberos.service.name");
            setProperty(saslConfig.kerberosKinitCmd, properties2, "sasl.kerberos.kinit.cmd");
            setProperty(saslConfig.kerberosTicketRenewWindowFactor, properties2, "sasl.kerberos.ticket.renew.window.factor");
            setProperty(saslConfig.kerberosTicketRenewJitter, properties2, "sasl.kerberos.ticket.renew.jitter");
            setProperty(saslConfig.kerberosMinTimeBeforeRelogin, properties2, "sasl.kerberos.min.time.before.relogin");
            setProperty(saslConfig.loginRefreshWindowFactor, properties2, "sasl.login.refresh.window.factor");
            setProperty(saslConfig.loginRefreshWindowJitter, properties2, "sasl.login.refresh.window.jitter");
            setProperty(saslConfig.loginRefreshMinPeriod, properties2, "sasl.login.refresh.min.period.seconds", DurationToSecondsFunction.INSTANCE);
            setProperty(saslConfig.loginRefreshBuffer, properties2, "sasl.login.refresh.buffer.seconds", DurationToSecondsFunction.INSTANCE);
        }
        SslConfig sslConfig = kafkaStreamsRuntimeConfig.ssl;
        if (sslConfig != null) {
            setProperty(sslConfig.protocol, properties2, "ssl.protocol");
            setProperty(sslConfig.provider, properties2, "ssl.provider");
            setProperty(sslConfig.cipherSuites, properties2, "ssl.cipher.suites");
            setProperty(sslConfig.enabledProtocols, properties2, "ssl.enabled.protocols");
            setTrustStoreConfig(sslConfig.truststore, properties2);
            setKeyStoreConfig(sslConfig.keystore, properties2);
            setKeyConfig(sslConfig.key, properties2);
            setProperty(sslConfig.keymanagerAlgorithm, properties2, "ssl.keymanager.algorithm");
            setProperty(sslConfig.trustmanagerAlgorithm, properties2, "ssl.trustmanager.algorithm");
            setProperty(Optional.of(sslConfig.endpointIdentificationAlgorithm.orElse("")), properties2, "ssl.endpoint.identification.algorithm");
            setProperty(sslConfig.secureRandomImplementation, properties2, "ssl.secure.random.implementation");
        }
        return properties2;
    }

    private static void setTrustStoreConfig(TrustStoreConfig trustStoreConfig, Properties properties) {
        if (trustStoreConfig != null) {
            setProperty(trustStoreConfig.type, properties, "ssl.truststore.type");
            setProperty(trustStoreConfig.location, properties, "ssl.truststore.location");
            setProperty(trustStoreConfig.password, properties, "ssl.truststore.password");
            setProperty(trustStoreConfig.certificates, properties, "ssl.truststore.certificates");
        }
    }

    private static void setKeyStoreConfig(KeyStoreConfig keyStoreConfig, Properties properties) {
        if (keyStoreConfig != null) {
            setProperty(keyStoreConfig.type, properties, "ssl.keystore.type");
            setProperty(keyStoreConfig.location, properties, "ssl.keystore.location");
            setProperty(keyStoreConfig.password, properties, "ssl.keystore.password");
            setProperty(keyStoreConfig.key, properties, "ssl.keystore.key");
            setProperty(keyStoreConfig.certificateChain, properties, "ssl.keystore.certificate.chain");
        }
    }

    private static void setKeyConfig(KeyConfig keyConfig, Properties properties) {
        if (keyConfig != null) {
            setProperty(keyConfig.password, properties, "ssl.key.password");
        }
    }

    private static <T> void setProperty(Optional<T> optional, Properties properties, String str) {
        setProperty(optional, properties, str, Objects::toString);
    }

    private static <T> void setProperty(Optional<T> optional, Properties properties, String str, Function<T, String> function) {
        if (optional.isPresent()) {
            properties.put(str, function.apply(optional.get()));
        }
    }

    private static String asString(List<InetSocketAddress> list) {
        return (String) list.stream().map(KafkaStreamsProducer::toHostPort).collect(Collectors.joining(","));
    }

    private static String toHostPort(InetSocketAddress inetSocketAddress) {
        return inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort();
    }

    private static void waitForTopicsToBeCreated(Admin admin, Collection<String> collection, Duration duration) throws InterruptedException {
        Set set;
        Object obj = null;
        while (!shutdown) {
            try {
                try {
                    set = (Set) admin.listTopics().names().get(duration.toMillis(), TimeUnit.MILLISECONDS);
                } catch (ExecutionException | TimeoutException e) {
                    LOGGER.error("Failed to get topic names from broker", e);
                    Thread.sleep(1000L);
                }
                if (set.containsAll(collection)) {
                    LOGGER.debug("All expected topics created: " + collection);
                    Thread.sleep(1000L);
                    return;
                }
                HashSet hashSet = new HashSet(collection);
                hashSet.removeAll(set);
                if (hashSet.equals(obj)) {
                    LOGGER.debug("Waiting for topic(s) to be created: " + hashSet);
                } else {
                    LOGGER.warn("Waiting for topic(s) to be created: " + hashSet);
                    obj = hashSet;
                }
                Thread.sleep(1000L);
            } catch (Throwable th) {
                Thread.sleep(1000L);
                throw th;
            }
        }
    }

    private static Properties getAdminClientConfig(Properties properties) {
        Properties properties2 = new Properties(properties);
        if (properties.containsKey("tls-configuration-name")) {
            properties2.put("tls-configuration-name", properties.get("tls-configuration-name"));
        }
        for (String str : AdminClientConfig.configNames()) {
            if (properties.containsKey("admin." + str)) {
                properties2.put(str, properties.get("admin." + str));
            } else if (properties.containsKey(str)) {
                properties2.put(str, properties.get(str));
            }
        }
        return properties2;
    }
}
