package io.quarkus.kafka.streams.runtime;

import io.quarkus.arc.Unremovable;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.Startup;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.jboss.logging.Logger;

@Singleton
/* loaded from: input_file:io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.class */
public class KafkaStreamsProducer {
    private static final Logger LOGGER = Logger.getLogger(KafkaStreamsProducer.class.getName());
    private static volatile boolean shutdown = false;
    private final ExecutorService executorService;
    private final KafkaStreams kafkaStreams;
    private final KafkaStreamsTopologyManager kafkaStreamsTopologyManager;
    private final Admin kafkaAdminClient;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/quarkus/kafka/streams/runtime/KafkaStreamsProducer$DurationToSecondsFunction.class */
    public static final class DurationToSecondsFunction implements Function<Duration, String> {
        private static final DurationToSecondsFunction INSTANCE = new DurationToSecondsFunction();

        private DurationToSecondsFunction() {
        }

        @Override // java.util.function.Function
        public String apply(Duration duration) {
            return String.valueOf(duration.getSeconds());
        }
    }

    @Inject
    public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStreamsRuntimeConfig kafkaStreamsRuntimeConfig, Instance<Topology> instance, Instance<KafkaClientSupplier> instance2, Instance<KafkaStreams.StateListener> instance3, Instance<StateRestoreListener> instance4) {
        shutdown = false;
        if (instance.isUnsatisfied()) {
            LOGGER.debug("No Topology producer; Kafka Streams will not be started");
            this.executorService = null;
            this.kafkaStreams = null;
            this.kafkaStreamsTopologyManager = null;
            this.kafkaAdminClient = null;
            return;
        }
        Properties streamsProperties = getStreamsProperties(kafkaStreamsSupport.getProperties(), asString(kafkaStreamsRuntimeConfig.bootstrapServers), kafkaStreamsRuntimeConfig);
        this.kafkaAdminClient = Admin.create(getAdminClientConfig(streamsProperties));
        this.executorService = Executors.newSingleThreadExecutor();
        this.kafkaStreams = initializeKafkaStreams(streamsProperties, kafkaStreamsRuntimeConfig, this.kafkaAdminClient, (Topology) instance.get(), instance2, instance3, instance4, this.executorService);
        this.kafkaStreamsTopologyManager = new KafkaStreamsTopologyManager(this.kafkaAdminClient);
    }

    @Unremovable
    @Startup
    @Singleton
    @Produces
    public KafkaStreams getKafkaStreams() {
        return this.kafkaStreams;
    }

    @Unremovable
    @Startup
    @Singleton
    @Produces
    public KafkaStreamsTopologyManager kafkaStreamsTopologyManager() {
        return this.kafkaStreamsTopologyManager;
    }

    void onStop(@Observes ShutdownEvent shutdownEvent) {
        shutdown = true;
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
        if (this.kafkaStreams != null) {
            LOGGER.debug("Stopping Kafka Streams pipeline");
            this.kafkaStreams.close();
        }
        if (this.kafkaAdminClient != null) {
            this.kafkaAdminClient.close();
        }
    }

    private static KafkaStreams initializeKafkaStreams(Properties properties, final KafkaStreamsRuntimeConfig kafkaStreamsRuntimeConfig, final Admin admin, Topology topology, Instance<KafkaClientSupplier> instance, Instance<KafkaStreams.StateListener> instance2, Instance<StateRestoreListener> instance3, ExecutorService executorService) {
        KafkaStreams kafkaStreams = instance.isUnsatisfied() ? new KafkaStreams(topology, properties) : new KafkaStreams(topology, properties, (KafkaClientSupplier) instance.get());
        if (!instance2.isUnsatisfied()) {
            kafkaStreams.setStateListener((KafkaStreams.StateListener) instance2.get());
        }
        if (!instance3.isUnsatisfied()) {
            kafkaStreams.setGlobalStateRestoreListener((StateRestoreListener) instance3.get());
        }
        final KafkaStreams kafkaStreams2 = kafkaStreams;
        executorService.execute(new Runnable() { // from class: io.quarkus.kafka.streams.runtime.KafkaStreamsProducer.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    KafkaStreamsProducer.waitForTopicsToBeCreated(admin, kafkaStreamsRuntimeConfig.getTrimmedTopics());
                    if (KafkaStreamsProducer.shutdown) {
                        return;
                    }
                    KafkaStreamsProducer.LOGGER.debug("Starting Kafka Streams pipeline");
                    kafkaStreams2.start();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        return kafkaStreams;
    }

    private static Properties getStreamsProperties(Properties properties, String str, KafkaStreamsRuntimeConfig kafkaStreamsRuntimeConfig) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.putAll(KafkaStreamsPropertiesUtil.quarkusKafkaStreamsProperties());
        properties2.putAll(KafkaStreamsPropertiesUtil.appKafkaStreamsProperties());
        properties2.put("bootstrap.servers", str);
        properties2.put("application.id", kafkaStreamsRuntimeConfig.applicationId);
        if (kafkaStreamsRuntimeConfig.applicationServer.isPresent()) {
            properties2.put("application.server", kafkaStreamsRuntimeConfig.applicationServer.get());
        }
        if (kafkaStreamsRuntimeConfig.schemaRegistryUrl.isPresent()) {
            properties2.put(kafkaStreamsRuntimeConfig.schemaRegistryKey, kafkaStreamsRuntimeConfig.schemaRegistryUrl.get());
        }
        setProperty(kafkaStreamsRuntimeConfig.securityProtocol, properties2, "security.protocol");
        SaslConfig saslConfig = kafkaStreamsRuntimeConfig.sasl;
        if (saslConfig != null) {
            setProperty(saslConfig.mechanism, properties2, "sasl.mechanism");
            setProperty(saslConfig.jaasConfig, properties2, "sasl.jaas.config");
            setProperty(saslConfig.clientCallbackHandlerClass, properties2, "sasl.client.callback.handler.class");
            setProperty(saslConfig.loginCallbackHandlerClass, properties2, "sasl.login.callback.handler.class");
            setProperty(saslConfig.loginClass, properties2, "sasl.login.class");
            setProperty(saslConfig.kerberosServiceName, properties2, "sasl.kerberos.service.name");
            setProperty(saslConfig.kerberosKinitCmd, properties2, "sasl.kerberos.kinit.cmd");
            setProperty(saslConfig.kerberosTicketRenewWindowFactor, properties2, "sasl.kerberos.ticket.renew.window.factor");
            setProperty(saslConfig.kerberosTicketRenewJitter, properties2, "sasl.kerberos.ticket.renew.jitter");
            setProperty(saslConfig.kerberosMinTimeBeforeRelogin, properties2, "sasl.kerberos.min.time.before.relogin");
            setProperty(saslConfig.loginRefreshWindowFactor, properties2, "sasl.login.refresh.window.factor");
            setProperty(saslConfig.loginRefreshWindowJitter, properties2, "sasl.login.refresh.window.jitter");
            setProperty(saslConfig.loginRefreshMinPeriod, properties2, "sasl.login.refresh.min.period.seconds", DurationToSecondsFunction.INSTANCE);
            setProperty(saslConfig.loginRefreshBuffer, properties2, "sasl.login.refresh.buffer.seconds", DurationToSecondsFunction.INSTANCE);
        }
        SslConfig sslConfig = kafkaStreamsRuntimeConfig.ssl;
        if (sslConfig != null) {
            setProperty(sslConfig.protocol, properties2, "ssl.protocol");
            setProperty(sslConfig.provider, properties2, "ssl.provider");
            setProperty(sslConfig.cipherSuites, properties2, "ssl.cipher.suites");
            setProperty(sslConfig.enabledProtocols, properties2, "ssl.enabled.protocols");
            setStoreConfig(sslConfig.truststore, properties2, "ssl.truststore");
            setStoreConfig(sslConfig.keystore, properties2, "ssl.keystore");
            setStoreConfig(sslConfig.key, properties2, "ssl.key");
            setProperty(sslConfig.keymanagerAlgorithm, properties2, "ssl.keymanager.algorithm");
            setProperty(sslConfig.trustmanagerAlgorithm, properties2, "ssl.trustmanager.algorithm");
            setProperty(Optional.of(sslConfig.endpointIdentificationAlgorithm.orElse("")), properties2, "ssl.endpoint.identification.algorithm");
            setProperty(sslConfig.secureRandomImplementation, properties2, "ssl.secure.random.implementation");
        }
        return properties2;
    }

    private static void setStoreConfig(StoreConfig storeConfig, Properties properties, String str) {
        if (storeConfig != null) {
            setProperty(storeConfig.type, properties, str + ".type");
            setProperty(storeConfig.location, properties, str + ".location");
            setProperty(storeConfig.password, properties, str + ".password");
        }
    }

    private static <T> void setProperty(Optional<T> optional, Properties properties, String str) {
        setProperty(optional, properties, str, Objects::toString);
    }

    private static <T> void setProperty(Optional<T> optional, Properties properties, String str, Function<T, String> function) {
        if (optional.isPresent()) {
            properties.put(str, function.apply(optional.get()));
        }
    }

    private static String asString(List<InetSocketAddress> list) {
        return (String) list.stream().map(KafkaStreamsProducer::toHostPort).collect(Collectors.joining(","));
    }

    private static String toHostPort(InetSocketAddress inetSocketAddress) {
        return inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void waitForTopicsToBeCreated(Admin admin, Collection<String> collection) throws InterruptedException {
        Set set;
        Object obj = null;
        while (!shutdown) {
            try {
                try {
                    set = (Set) admin.listTopics().names().get(10L, TimeUnit.SECONDS);
                } catch (ExecutionException | TimeoutException e) {
                    LOGGER.error("Failed to get topic names from broker", e);
                    Thread.sleep(1000L);
                }
                if (set.containsAll(collection)) {
                    LOGGER.debug("All expected topics created: " + collection);
                    Thread.sleep(1000L);
                    return;
                }
                HashSet hashSet = new HashSet(collection);
                hashSet.removeAll(set);
                if (hashSet.equals(obj)) {
                    LOGGER.debug("Waiting for topic(s) to be created: " + hashSet);
                } else {
                    LOGGER.warn("Waiting for topic(s) to be created: " + hashSet);
                    obj = hashSet;
                }
                Thread.sleep(1000L);
            } catch (Throwable th) {
                Thread.sleep(1000L);
                throw th;
            }
        }
    }

    private static Properties getAdminClientConfig(Properties properties) {
        Properties properties2 = new Properties(properties);
        for (String str : AdminClientConfig.configNames()) {
            if (properties.containsKey("admin." + str)) {
                properties2.put(str, properties.get("admin." + str));
            } else if (properties.containsKey(str)) {
                properties2.put(str, properties.get(str));
            }
        }
        return properties2;
    }
}
