package org.apache.kafka.streams.kstream.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.state.StoreBuilder;

/* loaded from: input_file:lib/kafka-streams-2.6.0.jar:org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.class */
class CogroupedStreamAggregateBuilder<K, VOut> {
    private final InternalStreamsBuilder builder;
    private final Map<KGroupedStreamImpl<K, ?>, StreamsGraphNode> parentNodes = new LinkedHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public CogroupedStreamAggregateBuilder(InternalStreamsBuilder internalStreamsBuilder) {
        this.builder = internalStreamsBuilder;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <KR, VIn, W extends Window> KTable<KR, VOut> build(Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> map, Initializer<VOut> initializer, NamedInternal namedInternal, StoreBuilder<?> storeBuilder, Serde<KR> serde, Serde<VOut> serde2, String str, Windows<W> windows, SessionWindows sessionWindows, Merger<? super K, VOut> merger) {
        for (KGroupedStreamImpl<K, ?> kGroupedStreamImpl : map.keySet()) {
            if (kGroupedStreamImpl.repartitionRequired) {
                OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, ?> optimizableRepartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
                createRepartitionSource(kGroupedStreamImpl.userProvidedRepartitionTopicName != null ? kGroupedStreamImpl.userProvidedRepartitionTopicName : storeBuilder.name(), optimizableRepartitionNodeBuilder, kGroupedStreamImpl.keySerde, kGroupedStreamImpl.valSerde);
                if (!this.parentNodes.containsKey(kGroupedStreamImpl)) {
                    OptimizableRepartitionNode<K, ?> build = optimizableRepartitionNodeBuilder.build();
                    this.builder.addGraphNode(kGroupedStreamImpl.streamsGraphNode, build);
                    this.parentNodes.put(kGroupedStreamImpl, build);
                }
            } else {
                this.parentNodes.put(kGroupedStreamImpl, kGroupedStreamImpl.streamsGraphNode);
            }
        }
        Collection<? extends AbstractStream<K, ?>> arrayList = new ArrayList<>(this.parentNodes.keySet());
        AbstractStream abstractStream = (AbstractStream) arrayList.iterator().next();
        arrayList.remove(abstractStream);
        abstractStream.ensureCopartitionWith(arrayList);
        ArrayList arrayList2 = new ArrayList();
        boolean z = false;
        int i = 0;
        for (Map.Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> entry : map.entrySet()) {
            int i2 = i;
            i++;
            StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(entry.getValue(), initializer, namedInternal.suffixWithOrElseGet("-cogroup-agg-" + i2, this.builder, "COGROUPKSTREAM-AGGREGATE-"), z, storeBuilder, windows, sessionWindows, merger);
            z = true;
            arrayList2.add(statefulProcessorNode);
            this.builder.addGraphNode(this.parentNodes.get(entry.getKey()), statefulProcessorNode);
        }
        String suffixWithOrElseGet = namedInternal.suffixWithOrElseGet("-cogroup-merge", this.builder, "COGROUPKSTREAM-MERGE-");
        PassThrough passThrough = new PassThrough();
        ProcessorGraphNode processorGraphNode = new ProcessorGraphNode(suffixWithOrElseGet, new ProcessorParameters(passThrough, suffixWithOrElseGet));
        this.builder.addGraphNode(arrayList2, processorGraphNode);
        return new KTableImpl(suffixWithOrElseGet, serde, serde2, Collections.singleton(processorGraphNode.nodeName()), str, passThrough, processorGraphNode, this.builder);
    }

    private <W extends Window> StatefulProcessorNode<K, ?> getStatefulProcessorNode(Aggregator<? super K, Object, VOut> aggregator, Initializer<VOut> initializer, String str, boolean z, StoreBuilder<?> storeBuilder, Windows<W> windows, SessionWindows sessionWindows, Merger<? super K, VOut> merger) {
        KStreamAggProcessorSupplier kStreamSessionWindowAggregate;
        if (windows == null && sessionWindows == null) {
            kStreamSessionWindowAggregate = new KStreamAggregate(storeBuilder.name(), initializer, aggregator);
        } else if (windows != null && sessionWindows == null) {
            kStreamSessionWindowAggregate = new KStreamWindowAggregate(windows, storeBuilder.name(), initializer, aggregator);
        } else {
            if (windows != null || merger == null) {
                throw new IllegalArgumentException("must include windows OR sessionWindows + sessionMerger OR all must be null");
            }
            kStreamSessionWindowAggregate = new KStreamSessionWindowAggregate(sessionWindows, storeBuilder.name(), initializer, aggregator, merger);
        }
        return !z ? new StatefulProcessorNode<>(str, new ProcessorParameters(kStreamSessionWindowAggregate, str), storeBuilder) : new StatefulProcessorNode<>(str, new ProcessorParameters(kStreamSessionWindowAggregate, str), new String[]{storeBuilder.name()});
    }

    private <VIn> void createRepartitionSource(String str, OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, ?> optimizableRepartitionNodeBuilder, Serde<K> serde, Serde<?> serde2) {
        KStreamImpl.createRepartitionedSource(this.builder, serde, serde2, str, null, optimizableRepartitionNodeBuilder);
    }
}
