package io.streamthoughts.azkarra.api.streams;

import io.streamthoughts.azkarra.api.StreamsLifecycleInterceptor;
import io.streamthoughts.azkarra.api.config.Conf;
import io.streamthoughts.azkarra.api.streams.State;
import io.streamthoughts.azkarra.api.streams.consumer.MonitorOffsetsConsumerInterceptor;
import io.streamthoughts.azkarra.api.streams.errors.DelegatingUncaughtExceptionHandler;
import io.streamthoughts.azkarra.api.streams.errors.StreamThreadExceptionHandler;
import io.streamthoughts.azkarra.api.streams.listener.CompositeStateListener;
import io.streamthoughts.azkarra.api.streams.listener.CompositeStateRestoreListener;
import io.streamthoughts.azkarra.api.streams.listener.CompositeUncaughtExceptionHandler;
import io.streamthoughts.azkarra.api.streams.rocksdb.DefaultRocksDBConfigSetter;
import io.streamthoughts.azkarra.api.streams.topology.TopologyDefinition;
import io.streamthoughts.azkarra.api.time.Time;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.StateRestoreListener;

/* loaded from: input_file:io/streamthoughts/azkarra/api/streams/KafkaStreamsContainerBuilder.class */
public class KafkaStreamsContainerBuilder {
    private static final List<StreamsConfigDecorator> CONFIG_DECORATORS = List.of(new RocksDBConfigDecorator(), new MonitorConsumerInterceptorConfigDecorator());
    private TopologyDefinition topologyDefinition;
    private KafkaStreamsFactory kafkaStreamsFactory;
    private Conf streamsConfig;
    private List<StateRestoreListener> restoreListeners = Collections.emptyList();
    private List<KafkaStreams.StateListener> stateListeners = Collections.emptyList();
    private List<StreamThreadExceptionHandler> exceptionHandlers = Collections.emptyList();
    private List<StreamsLifecycleInterceptor> interceptors = Collections.emptyList();

    /* loaded from: input_file:io/streamthoughts/azkarra/api/streams/KafkaStreamsContainerBuilder$DelegatingKafkaStreamsFactory.class */
    private class DelegatingKafkaStreamsFactory implements KafkaStreamsFactory {
        private final KafkaStreamsFactory factory;
        private DefaultKafkaStreamsContainer container;

        DelegatingKafkaStreamsFactory(KafkaStreamsFactory kafkaStreamsFactory) {
            this.factory = (KafkaStreamsFactory) Objects.requireNonNull(kafkaStreamsFactory, "factory cannot be null");
        }

        void setKafkaStreamsContainer(DefaultKafkaStreamsContainer defaultKafkaStreamsContainer) {
            this.container = defaultKafkaStreamsContainer;
        }

        @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsFactory
        public KafkaStreams make(Topology topology, Conf conf) {
            KafkaStreams make = this.factory.make(topology, conf);
            make.setStateListener(getStateListener());
            make.setUncaughtExceptionHandler(getUncaughtExceptionHandler());
            make.setGlobalStateRestoreListener(new CompositeStateRestoreListener(KafkaStreamsContainerBuilder.this.restoreListeners));
            return make;
        }

        private CompositeStateListener getStateListener() {
            CompositeStateListener compositeStateListener = new CompositeStateListener(KafkaStreamsContainerBuilder.this.stateListeners);
            compositeStateListener.addListener((state, state2) -> {
                this.container.stateChanges(new StateChangeEvent(Time.SYSTEM.milliseconds(), State.Standards.valueOf(state.name()), State.Standards.valueOf(state2.name())));
            });
            return compositeStateListener;
        }

        private CompositeUncaughtExceptionHandler getUncaughtExceptionHandler() {
            CompositeUncaughtExceptionHandler compositeUncaughtExceptionHandler = new CompositeUncaughtExceptionHandler();
            compositeUncaughtExceptionHandler.addHandler((thread, th) -> {
                this.container.logger().error("Handling uncaught streams thread exception: {}", th.getMessage());
                this.container.setException(th);
            });
            if (KafkaStreamsContainerBuilder.this.exceptionHandlers != null) {
                Stream<R> map = KafkaStreamsContainerBuilder.this.exceptionHandlers.stream().map(streamThreadExceptionHandler -> {
                    return new DelegatingUncaughtExceptionHandler(this.container, streamThreadExceptionHandler);
                });
                Objects.requireNonNull(compositeUncaughtExceptionHandler);
                map.forEach((v1) -> {
                    r1.addHandler(v1);
                });
            }
            return compositeUncaughtExceptionHandler;
        }
    }

    /* loaded from: input_file:io/streamthoughts/azkarra/api/streams/KafkaStreamsContainerBuilder$MonitorConsumerInterceptorConfigDecorator.class */
    private static class MonitorConsumerInterceptorConfigDecorator implements StreamsConfigDecorator {
        private static final String INTERCEPTORS_CONFIG_KEY = "main.consumer.interceptor.classes";

        private MonitorConsumerInterceptorConfigDecorator() {
        }

        @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainerBuilder.StreamsConfigDecorator
        public Conf apply(Conf conf) {
            String name = MonitorOffsetsConsumerInterceptor.class.getName();
            if (conf.hasPath(INTERCEPTORS_CONFIG_KEY)) {
                name = "," + conf.getString(INTERCEPTORS_CONFIG_KEY);
            }
            return Conf.of(Conf.of(INTERCEPTORS_CONFIG_KEY, name), conf);
        }
    }

    /* loaded from: input_file:io/streamthoughts/azkarra/api/streams/KafkaStreamsContainerBuilder$RocksDBConfigDecorator.class */
    public static class RocksDBConfigDecorator implements StreamsConfigDecorator {
        @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainerBuilder.StreamsConfigDecorator
        public Conf apply(Conf conf) {
            return conf.hasPath("rocksdb.config.setter") ? conf : Conf.of(Conf.of("rocksdb.config.setter", DefaultRocksDBConfigSetter.class.getName()), conf);
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:io/streamthoughts/azkarra/api/streams/KafkaStreamsContainerBuilder$StreamsConfigDecorator.class */
    private interface StreamsConfigDecorator {
        Conf apply(Conf conf);
    }

    public KafkaStreamsContainerBuilder withStreamsConfig(Conf conf) {
        this.streamsConfig = conf;
        return this;
    }

    public KafkaStreamsContainerBuilder withInterceptors(List<StreamsLifecycleInterceptor> list) {
        this.interceptors = list;
        return this;
    }

    public KafkaStreamsContainerBuilder withKafkaStreamsFactory(KafkaStreamsFactory kafkaStreamsFactory) {
        this.kafkaStreamsFactory = kafkaStreamsFactory;
        return this;
    }

    public KafkaStreamsContainerBuilder withTopologyDefinition(TopologyDefinition topologyDefinition) {
        this.topologyDefinition = topologyDefinition;
        return this;
    }

    public KafkaStreamsContainerBuilder withRestoreListeners(List<StateRestoreListener> list) {
        this.restoreListeners = list;
        return this;
    }

    public KafkaStreamsContainerBuilder withStreamThreadExceptionHandlers(List<StreamThreadExceptionHandler> list) {
        this.exceptionHandlers = list;
        return this;
    }

    public KafkaStreamsContainerBuilder withStateListeners(List<KafkaStreams.StateListener> list) {
        this.stateListeners = list;
        return this;
    }

    public KafkaStreamsContainer build() {
        Conf conf = this.streamsConfig;
        Iterator<StreamsConfigDecorator> it = CONFIG_DECORATORS.iterator();
        while (it.hasNext()) {
            conf = it.next().apply(conf);
        }
        DelegatingKafkaStreamsFactory delegatingKafkaStreamsFactory = new DelegatingKafkaStreamsFactory(this.kafkaStreamsFactory);
        DefaultKafkaStreamsContainer defaultKafkaStreamsContainer = new DefaultKafkaStreamsContainer(conf, this.topologyDefinition, delegatingKafkaStreamsFactory, this.interceptors);
        delegatingKafkaStreamsFactory.setKafkaStreamsContainer(defaultKafkaStreamsContainer);
        return defaultKafkaStreamsContainer;
    }
}
