/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindowedKStream;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.GroupedInternal;
import org.apache.kafka.streams.kstream.internals.GroupedStreamAggregateBuilder;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.KStreamAggProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KStreamAggregate;
import org.apache.kafka.streams.kstream.internals.KStreamReduce;
import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImpl;
import org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.state.KeyValueStore;

class KGroupedStreamImpl<K, V>
extends AbstractStream<K, V>
implements KGroupedStream<K, V> {
    static final String REDUCE_NAME = "KSTREAM-REDUCE-";
    static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
    private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;

    KGroupedStreamImpl(String name, Set<String> sourceNodes, GroupedInternal<K, V> groupedInternal, boolean repartitionRequired, StreamsGraphNode streamsGraphNode, InternalStreamsBuilder builder) {
        super(name, groupedInternal.keySerde(), groupedInternal.valueSerde(), sourceNodes, streamsGraphNode, builder);
        this.aggregateBuilder = new GroupedStreamAggregateBuilder<K, V>(builder, groupedInternal, repartitionRequired, sourceNodes, name, streamsGraphNode);
    }

    @Override
    public KTable<K, V> reduce(Reducer<V> reducer) {
        return this.reduce(reducer, Materialized.with(this.keySerde, this.valSerde));
    }

    @Override
    public KTable<K, V> reduce(Reducer<V> reducer, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(reducer, "reducer can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>(materialized, this.builder, REDUCE_NAME);
        if (materializedInternal.keySerde() == null) {
            materializedInternal.withKeySerde(this.keySerde);
        }
        if (materializedInternal.valueSerde() == null) {
            materializedInternal.withValueSerde(this.valSerde);
        }
        return this.doAggregate(new KStreamReduce(materializedInternal.storeName(), reducer), REDUCE_NAME, materializedInternal);
    }

    @Override
    public <VR> KTable<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> aggregator, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(initializer, "initializer can't be null");
        Objects.requireNonNull(aggregator, "aggregator can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>>(materialized, this.builder, AGGREGATE_NAME);
        if (materializedInternal.keySerde() == null) {
            materializedInternal.withKeySerde(this.keySerde);
        }
        return this.doAggregate(new KStreamAggregate<K, V, VR>(materializedInternal.storeName(), initializer, aggregator), AGGREGATE_NAME, materializedInternal);
    }

    @Override
    public <VR> KTable<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> aggregator) {
        return this.aggregate(initializer, aggregator, Materialized.with(this.keySerde, null));
    }

    @Override
    public KTable<K, Long> count() {
        return this.doCount(Materialized.with(this.keySerde, Serdes.Long()));
    }

    @Override
    public KTable<K, Long> count(Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(materialized, "materialized can't be null");
        if (new MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>>(materialized).storeName() == null) {
            this.builder.newStoreName(AGGREGATE_NAME);
        }
        return this.doCount(materialized);
    }

    private KTable<K, Long> doCount(Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
        MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>>(materialized, this.builder, AGGREGATE_NAME);
        if (materializedInternal.keySerde() == null) {
            materializedInternal.withKeySerde(this.keySerde);
        }
        if (materializedInternal.valueSerde() == null) {
            materializedInternal.withValueSerde(Serdes.Long());
        }
        return this.doAggregate(new KStreamAggregate(materializedInternal.storeName(), this.aggregateBuilder.countInitializer, this.aggregateBuilder.countAggregator), AGGREGATE_NAME, materializedInternal);
    }

    @Override
    public <W extends Window> TimeWindowedKStream<K, V> windowedBy(Windows<W> windows) {
        return new TimeWindowedKStreamImpl<K, V, W>(windows, this.builder, this.sourceNodes, this.name, this.keySerde, this.valSerde, this.aggregateBuilder, this.streamsGraphNode);
    }

    @Override
    public SessionWindowedKStream<K, V> windowedBy(SessionWindows windows) {
        return new SessionWindowedKStreamImpl<K, V>(windows, this.builder, this.sourceNodes, this.name, this.keySerde, this.valSerde, this.aggregateBuilder, this.streamsGraphNode);
    }

    private <T> KTable<K, T> doAggregate(KStreamAggProcessorSupplier<K, K, V, T> aggregateSupplier, String functionName, MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) {
        return this.aggregateBuilder.build(functionName, new KeyValueStoreMaterializer<K, T>(materializedInternal).materialize(), aggregateSupplier, materializedInternal.queryableStoreName(), materializedInternal.keySerde(), materializedInternal.valueSerde());
    }
}

