package com.expedia.www.haystack.pipes.commons.kafka;

import com.expedia.open.tracing.Span;
import com.expedia.www.haystack.pipes.commons.serialization.SerdeFactory;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.ProcessorSupplier;

/* loaded from: input_file:BOOT-INF/lib/haystack-pipes-commons-2.0.0.jar:com/expedia/www/haystack/pipes/commons/kafka/KafkaStreamBuilderBase.class */
public abstract class KafkaStreamBuilderBase implements KafkaStreamBuilder, Main {
    private final KafkaStreamStarter kafkaStreamStarter;
    private final SerdeFactory serdeFactory;
    private final String application;
    private final KafkaConfig kafkaConfig;
    private final ForeachAction<String, Span> foreachAction;
    private final ProcessorSupplier<String, Span> processorSupplier;

    public KafkaStreamBuilderBase(KafkaStreamStarter kafkaStreamStarter, SerdeFactory serdeFactory, String str, KafkaConfig kafkaConfig, ForeachAction<String, Span> foreachAction) {
        this(kafkaStreamStarter, serdeFactory, str, kafkaConfig, foreachAction, null);
    }

    public KafkaStreamBuilderBase(KafkaStreamStarter kafkaStreamStarter, SerdeFactory serdeFactory, String str, KafkaConfigurationProvider kafkaConfigurationProvider, ProcessorSupplier<String, Span> processorSupplier) {
        this(kafkaStreamStarter, serdeFactory, str, kafkaConfigurationProvider, null, processorSupplier);
    }

    private KafkaStreamBuilderBase(KafkaStreamStarter kafkaStreamStarter, SerdeFactory serdeFactory, String str, KafkaConfig kafkaConfig, ForeachAction<String, Span> foreachAction, ProcessorSupplier<String, Span> processorSupplier) {
        this.kafkaStreamStarter = kafkaStreamStarter;
        this.serdeFactory = serdeFactory;
        this.application = str;
        this.kafkaConfig = kafkaConfig;
        this.foreachAction = foreachAction;
        this.processorSupplier = processorSupplier;
    }

    @Override // com.expedia.www.haystack.pipes.commons.kafka.KafkaStreamBuilder
    public void buildStreamTopology(KStreamBuilder kStreamBuilder) {
        KStream stream = kStreamBuilder.stream(Serdes.String(), this.serdeFactory.createJsonProtoSpanSerde(this.application), this.kafkaConfig.fromtopic());
        if (this.foreachAction != null) {
            stream.foreach(this.foreachAction);
        }
        if (this.processorSupplier != null) {
            stream.process(this.processorSupplier, new String[0]);
        }
    }

    @Override // com.expedia.www.haystack.pipes.commons.kafka.Main
    public void main() {
        this.kafkaStreamStarter.createAndStartStream(this);
    }
}
