package io.simplesource.saga.shared.streams;

import io.simplesource.kafka.spec.TopicSpec;
import io.simplesource.saga.shared.topics.TopicCreation;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;

/* loaded from: input_file:io/simplesource/saga/shared/streams/StreamAppUtils.class */
public final class StreamAppUtils {

    @FunctionalInterface
    /* loaded from: input_file:io/simplesource/saga/shared/streams/StreamAppUtils$ShutdownHandler.class */
    public interface ShutdownHandler {
        void shutDown();
    }

    public static void createMissingTopics(Properties properties, List<TopicCreation> list) {
        try {
            createMissingTopics(AdminClient.create(properties), list).all().get(30L, TimeUnit.SECONDS);
        } catch (Exception e) {
            throw new RuntimeException("Unable to add missing topics", e);
        }
    }

    public static CreateTopicsResult createMissingTopics(AdminClient adminClient, List<TopicCreation> list) {
        try {
            Set set = (Set) adminClient.listTopics().names().get();
            return adminClient.createTopics((Collection) list.stream().filter(topicCreation -> {
                return !set.contains(topicCreation.topicName);
            }).map(topicCreation2 -> {
                TopicSpec topicSpec = topicCreation2.topicSpec;
                return new NewTopic(topicCreation2.topicName, topicSpec.partitionCount(), topicSpec.replicaCount()).configs(topicSpec.config());
            }).collect(Collectors.toList()));
        } catch (Exception e) {
            throw new RuntimeException("Unable to create missing topics");
        }
    }

    public static void runStreamApp(Properties properties, Topology topology) {
        KafkaStreams kafkaStreams = new KafkaStreams(topology, properties);
        kafkaStreams.cleanUp();
        kafkaStreams.start();
        Objects.requireNonNull(kafkaStreams);
        addShutdownHook(kafkaStreams::close);
    }

    public static void shutdownExecutorService(ExecutorService executorService) {
        try {
            executorService.shutdown();
            if (!executorService.awaitTermination(2000L, TimeUnit.MILLISECONDS)) {
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
        }
    }

    public static void addShutdownHook(ShutdownHandler shutdownHandler) {
        Runtime runtime = Runtime.getRuntime();
        Objects.requireNonNull(shutdownHandler);
        runtime.addShutdownHook(new Thread(shutdownHandler::shutDown));
    }
}
