package io.kipe.streams.kafka.processors;

import io.kipe.streams.kafka.factories.TopicNamesFactory;
import io.kipe.streams.recordtypes.GenericRecord;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;

/* loaded from: input_file:io/kipe/streams/kafka/processors/StatsBuilder.class */
public class StatsBuilder<K> extends AbstractTopologyPartBuilder<K, GenericRecord> {
    private String[] groupFields;
    private final List<StatsExpression> expressions;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StatsBuilder(StreamsBuilder streamsBuilder, KStream<K, GenericRecord> kStream, Serde<K> serde, Serde<GenericRecord> serde2, String str) {
        super(streamsBuilder, kStream, serde, serde2, str);
        this.groupFields = new String[0];
        this.expressions = new LinkedList();
    }

    public StatsBuilder<K> groupBy(String... strArr) {
        this.groupFields = strArr;
        return this;
    }

    public StatsBuilder<K> with(StatsExpression statsExpression) {
        Objects.requireNonNull(statsExpression, "expression");
        this.expressions.add(statsExpression);
        return this;
    }

    public StatsBuilder<K> as(String str) {
        Objects.requireNonNull(str, "fieldName");
        int size = this.expressions.size();
        if (size == 0) {
            throw new IllegalStateException("no aggregation function was added before");
        }
        this.expressions.get(size - 1).setFieldName(str);
        return this;
    }

    public KTable<String, GenericRecord> asKTable(Serde<String> serde) {
        if (serde == null) {
            LOG.warn("The default keySerde is being used. To customize serdes, provide a specific serde to override this behavior.");
        }
        Objects.requireNonNull(getTopicsBaseName(), "topicBaseName");
        return this.stream.groupBy((obj, genericRecord) -> {
            StringBuilder sb = new StringBuilder();
            for (String str : this.groupFields) {
                sb.append("{").append(genericRecord.getString(str)).append("}");
            }
            return sb.toString();
        }, Grouped.as(getTopicsBaseName()).withKeySerde(serde).withValueSerde(this.valueSerde)).aggregate(() -> {
            return null;
        }, (str, genericRecord2, genericRecord3) -> {
            GenericRecord genericRecord2 = genericRecord3;
            if (genericRecord2 == null) {
                genericRecord2 = new GenericRecord();
                for (String str : this.groupFields) {
                    genericRecord2.set(str, genericRecord2.get(str));
                }
            }
            Iterator<StatsExpression> it = this.expressions.iterator();
            while (it.hasNext()) {
                it.next().update(str, genericRecord2, genericRecord2);
            }
            return genericRecord2;
        }, Materialized.as(TopicNamesFactory.getProcessorStoreTopicName(getTopicsBaseName())).withKeySerde(serde).withValueSerde(this.valueSerde).withCachingDisabled());
    }

    public KTable<String, GenericRecord> asKTable() {
        return asKTable(null);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public KipesBuilder<String, GenericRecord> build(Serde<String> serde) {
        return createKipesBuilder(asKTable(serde).toStream(), serde, this.valueSerde);
    }

    public KipesBuilder<String, GenericRecord> build() {
        return build(null);
    }
}
