/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.GroupedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.KTableAggregate;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableReduce;
import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.kstream.internals.graph.GroupedTableOperationRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;

public class KGroupedTableImpl<K, V>
extends AbstractStream<K, V>
implements KGroupedTable<K, V> {
    private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-";
    private static final String REDUCE_NAME = "KTABLE-REDUCE-";
    private final String userProvidedRepartitionTopicName;
    private final Initializer<Long> countInitializer = () -> 0L;
    private final Aggregator<K, V, Long> countAdder = (aggKey, value, aggregate) -> aggregate + 1L;
    private final Aggregator<K, V, Long> countSubtractor = (aggKey, value, aggregate) -> aggregate - 1L;
    private StreamsGraphNode repartitionGraphNode;

    KGroupedTableImpl(InternalStreamsBuilder builder, String name, Set<String> sourceNodes, GroupedInternal<K, V> groupedInternal, StreamsGraphNode streamsGraphNode) {
        super(name, groupedInternal.keySerde(), groupedInternal.valueSerde(), sourceNodes, streamsGraphNode, builder);
        this.userProvidedRepartitionTopicName = groupedInternal.name();
    }

    private <T> KTable<K, T> doAggregate(ProcessorSupplier<K, Change<V>> aggregateSupplier, String functionName, MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) {
        String sinkName = this.builder.newProcessorName("KSTREAM-SINK-");
        String sourceName = this.builder.newProcessorName("KSTREAM-SOURCE-");
        String funcName = this.builder.newProcessorName(functionName);
        String repartitionTopic = (this.userProvidedRepartitionTopicName != null ? this.userProvidedRepartitionTopicName : materialized.storeName()) + "-repartition";
        if (this.repartitionGraphNode == null || this.userProvidedRepartitionTopicName == null) {
            this.repartitionGraphNode = this.createRepartitionNode(sinkName, sourceName, repartitionTopic);
        }
        this.builder.addGraphNode(this.streamsGraphNode, this.repartitionGraphNode);
        StatefulProcessorNode<K, Change<V>> statefulProcessorNode = new StatefulProcessorNode<K, Change<V>>(funcName, new ProcessorParameters<K, Change<V>>(aggregateSupplier, funcName), new KeyValueStoreMaterializer<K, T>(materialized).materialize(), false);
        this.builder.addGraphNode(this.repartitionGraphNode, statefulProcessorNode);
        return new KTableImpl(funcName, materialized.keySerde(), materialized.valueSerde(), Collections.singleton(sourceName), materialized.queryableStoreName(), aggregateSupplier, statefulProcessorNode, this.builder);
    }

    private GroupedTableOperationRepartitionNode<K, V> createRepartitionNode(String sinkName, String sourceName, String topic) {
        return GroupedTableOperationRepartitionNode.groupedTableOperationNodeBuilder().withRepartitionTopic(topic).withSinkName(sinkName).withSourceName(sourceName).withKeySerde(this.keySerde).withValueSerde(this.valSerde).withNodeName(sourceName).build();
    }

    @Override
    public KTable<K, V> reduce(Reducer<V> adder, Reducer<V> subtractor, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(adder, "adder can't be null");
        Objects.requireNonNull(subtractor, "subtractor can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>(materialized, this.builder, AGGREGATE_NAME);
        if (materializedInternal.keySerde() == null) {
            materializedInternal.withKeySerde(this.keySerde);
        }
        if (materializedInternal.valueSerde() == null) {
            materializedInternal.withValueSerde(this.valSerde);
        }
        KTableReduce aggregateSupplier = new KTableReduce(materializedInternal.storeName(), adder, subtractor);
        return this.doAggregate(aggregateSupplier, REDUCE_NAME, materializedInternal);
    }

    @Override
    public KTable<K, V> reduce(Reducer<V> adder, Reducer<V> subtractor) {
        return this.reduce(adder, subtractor, Materialized.with(this.keySerde, this.valSerde));
    }

    @Override
    public KTable<K, Long> count(Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
        MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>>(materialized, this.builder, AGGREGATE_NAME);
        if (materializedInternal.keySerde() == null) {
            materializedInternal.withKeySerde(this.keySerde);
        }
        if (materializedInternal.valueSerde() == null) {
            materializedInternal.withValueSerde(Serdes.Long());
        }
        KTableAggregate<K, V, Long> aggregateSupplier = new KTableAggregate<K, V, Long>(materializedInternal.storeName(), this.countInitializer, this.countAdder, this.countSubtractor);
        return this.doAggregate(aggregateSupplier, AGGREGATE_NAME, materializedInternal);
    }

    @Override
    public KTable<K, Long> count() {
        return this.count(Materialized.with(this.keySerde, Serdes.Long()));
    }

    @Override
    public <VR> KTable<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> adder, Aggregator<? super K, ? super V, VR> subtractor, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(initializer, "initializer can't be null");
        Objects.requireNonNull(adder, "adder can't be null");
        Objects.requireNonNull(subtractor, "subtractor can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>>(materialized, this.builder, AGGREGATE_NAME);
        if (materializedInternal.keySerde() == null) {
            materializedInternal.withKeySerde(this.keySerde);
        }
        KTableAggregate<? super K, ? super V, VR> aggregateSupplier = new KTableAggregate<K, V, VR>(materializedInternal.storeName(), initializer, adder, subtractor);
        return this.doAggregate(aggregateSupplier, AGGREGATE_NAME, materializedInternal);
    }

    @Override
    public <T> KTable<K, T> aggregate(Initializer<T> initializer, Aggregator<? super K, ? super V, T> adder, Aggregator<? super K, ? super V, T> subtractor) {
        return this.aggregate(initializer, adder, subtractor, Materialized.with(this.keySerde, null));
    }
}

