package io.quarkus.kafka.streams.runtime;

import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.jboss.logging.Logger;

@ApplicationScoped
/* loaded from: input_file:io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.class */
public class KafkaStreamsTopologyManager {
    private static final Logger LOGGER = Logger.getLogger(KafkaStreamsTopologyManager.class.getName());
    private static final Pattern COMMA_PATTERN = Pattern.compile(",");
    private final ExecutorService executor;
    private KafkaStreams streams;
    private KafkaStreamsRuntimeConfig runtimeConfig;
    private Instance<Topology> topology;
    private Properties properties;

    KafkaStreamsTopologyManager() {
        this.executor = null;
    }

    @Inject
    public KafkaStreamsTopologyManager(Instance<Topology> instance) {
        if (instance.isUnsatisfied()) {
            LOGGER.debug("No Topology producer; Kafka Streams will not be started");
            this.executor = null;
        } else {
            this.executor = Executors.newSingleThreadExecutor();
            this.topology = instance;
        }
    }

    private static Properties getStreamsProperties(Properties properties, String str, KafkaStreamsRuntimeConfig kafkaStreamsRuntimeConfig) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.put("bootstrap.servers", str);
        properties2.put("application.id", kafkaStreamsRuntimeConfig.applicationId);
        if (kafkaStreamsRuntimeConfig.applicationServer.isPresent()) {
            properties2.put("application.server", kafkaStreamsRuntimeConfig.applicationServer.get());
        }
        return properties2;
    }

    private static String asString(List<InetSocketAddress> list) {
        return (String) list.stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining(","));
    }

    void onStart(@Observes StartupEvent startupEvent) {
        if (this.executor == null) {
            return;
        }
        String asString = asString(this.runtimeConfig.bootstrapServers);
        Properties streamsProperties = getStreamsProperties(this.properties, asString, this.runtimeConfig);
        Set set = (Set) this.runtimeConfig.topics.map(str -> {
            return COMMA_PATTERN.split(str);
        }).map((v0) -> {
            return Arrays.asList(v0);
        }).map((v1) -> {
            return new HashSet(v1);
        }).map((v0) -> {
            return Collections.unmodifiableSet(v0);
        }).orElseGet(Collections::emptySet);
        this.streams = new KafkaStreams((Topology) this.topology.get(), streamsProperties);
        this.executor.execute(() -> {
            try {
                waitForTopicsToBeCreated(set, asString);
                LOGGER.debug("Starting Kafka Streams pipeline");
                this.streams.start();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }

    void onStop(@Observes ShutdownEvent shutdownEvent) {
        if (this.streams != null) {
            LOGGER.debug("Stopping Kafka Streams pipeline");
            this.streams.close();
        }
    }

    @Singleton
    @Produces
    public KafkaStreams getStreams() {
        return this.streams;
    }

    private void waitForTopicsToBeCreated(Set<String> set, String str) throws InterruptedException {
        Set set2;
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", str);
        for (String str2 : AdminClientConfig.configNames()) {
            if (this.properties.containsKey("admin." + str2)) {
                hashMap.put(str2, this.properties.get("admin." + str2));
            } else if (this.properties.containsKey(str2)) {
                hashMap.put(str2, this.properties.get(str2));
            }
        }
        AdminClient create = AdminClient.create(hashMap);
        Throwable th = null;
        while (true) {
            try {
                try {
                    try {
                        set2 = (Set) create.listTopics().names().get(10L, TimeUnit.SECONDS);
                    } catch (ExecutionException | TimeoutException e) {
                        LOGGER.error("Failed to get topic names from broker", e);
                    }
                    if (set2.containsAll(set)) {
                        LOGGER.debug("All expected topics created");
                        break;
                    }
                    HashSet hashSet = new HashSet(set);
                    hashSet.removeAll(set2);
                    LOGGER.debug("Waiting for topic(s) to be created: " + hashSet);
                    Thread.sleep(1000L);
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (create != null) {
                    if (th != null) {
                        try {
                            create.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th3;
            }
        }
        if (create != null) {
            if (0 == 0) {
                create.close();
                return;
            }
            try {
                create.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    public void setRuntimeConfig(KafkaStreamsRuntimeConfig kafkaStreamsRuntimeConfig) {
        this.runtimeConfig = kafkaStreamsRuntimeConfig;
    }

    public void configure(Properties properties) {
        this.properties = properties;
    }
}
