/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.configuration.kafka.streams;

import io.micronaut.configuration.kafka.streams.AbstractKafkaStreamsConfiguration;
import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder;
import io.micronaut.configuration.kafka.streams.InteractiveQueryService;
import io.micronaut.context.annotation.Context;
import io.micronaut.context.annotation.EachBean;
import io.micronaut.context.annotation.Factory;
import java.io.Closeable;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedDeque;
import javax.annotation.PreDestroy;
import javax.inject.Singleton;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.KStream;

@Factory
public class KafkaStreamsFactory
implements Closeable {
    private final Collection<KafkaStreams> streams = new ConcurrentLinkedDeque<KafkaStreams>();

    @EachBean(value=AbstractKafkaStreamsConfiguration.class)
    ConfiguredStreamBuilder streamsBuilder(AbstractKafkaStreamsConfiguration configuration) {
        return new ConfiguredStreamBuilder(configuration.getConfig());
    }

    @EachBean(value=ConfiguredStreamBuilder.class)
    @Context
    KafkaStreams kafkaStreams(ConfiguredStreamBuilder builder, KStream ... kStreams) {
        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(builder.getConfiguration()), builder.getConfiguration());
        this.streams.add(kafkaStreams);
        kafkaStreams.start();
        return kafkaStreams;
    }

    @Singleton
    InteractiveQueryService interactiveQueryService() {
        return new InteractiveQueryService(this.streams);
    }

    @Override
    @PreDestroy
    public void close() {
        for (KafkaStreams stream : this.streams) {
            try {
                stream.close(Duration.ofSeconds(3L));
            }
            catch (Exception exception) {}
        }
    }
}

