package io.simplesource.saga.shared.streams;

import io.simplesource.saga.shared.streams.StreamAppUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/simplesource/saga/shared/streams/StreamApp.class */
public class StreamApp<I> {
    private final I streamAppInput;
    private final Logger logger = LoggerFactory.getLogger(StreamApp.class);
    private final List<StreamBuildStep> buildSteps = new ArrayList();

    public StreamApp(I i) {
        this.streamAppInput = i;
    }

    public final StreamApp<I> withBuildStep(StreamBuildStep<I> streamBuildStep) {
        this.buildSteps.add(streamBuildStep);
        return this;
    }

    public StreamBuildResult build(Properties properties) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        StreamBuildContext of = StreamBuildContext.of(this.streamAppInput, properties);
        List list = (List) this.buildSteps.stream().map(streamBuildStep -> {
            return streamBuildStep.applyStep(of);
        }).collect(Collectors.toList());
        this.logger.info("Expected topics:");
        return new StreamBuildResult((List) list.stream().flatMap(streamBuildSpec -> {
            return streamBuildSpec.topics.stream();
        }).collect(Collectors.toList()), () -> {
            Topology build = streamsBuilder.build();
            this.logger.info("Topology description {}", build.describe());
            return build;
        }, (List) list.stream().map(streamBuildSpec2 -> {
            return streamBuildSpec2.topologyBuildStep.apply(streamsBuilder);
        }).filter(optional -> {
            return optional.isPresent();
        }).map(optional2 -> {
            return (StreamAppUtils.ShutdownHandler) optional2.orElse(null);
        }).collect(Collectors.toList()));
    }

    public void run(StreamAppConfig streamAppConfig) {
        Properties config = StreamAppConfig.getConfig(streamAppConfig);
        StreamBuildResult build = build(config);
        Stream<R> map = build.topicCreations.stream().map(topicCreation -> {
            return topicCreation.topicName;
        });
        Logger logger = this.logger;
        Objects.requireNonNull(logger);
        map.forEach(logger::info);
        StreamAppUtils.createMissingTopics(config, build.topicCreations);
        StreamAppUtils.runStreamApp(config, build.topologySupplier.get());
        build.shutdownHandlers.forEach(StreamAppUtils::addShutdownHook);
    }
}
