package io.activej.dataflow.dataset.impl;

import io.activej.dataflow.dataset.Dataset;
import io.activej.dataflow.dataset.DatasetUtils;
import io.activej.dataflow.graph.DataflowContext;
import io.activej.dataflow.graph.DataflowGraph;
import io.activej.dataflow.graph.Partition;
import io.activej.dataflow.graph.StreamId;
import io.activej.dataflow.node.NodeReduce;
import io.activej.dataflow.node.NodeReduceSimple;
import io.activej.dataflow.node.NodeShard;
import io.activej.dataflow.node.NodeSort;
import io.activej.datastream.processor.StreamReducers;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.function.Function;

/* loaded from: input_file:io/activej/dataflow/dataset/impl/DatasetSplitSortReduceRepartitionReduce.class */
public final class DatasetSplitSortReduceRepartitionReduce<K, I, O, A> extends Dataset<O> {
    private final Dataset<I> input;
    private final Function<I, K> inputKeyFunction;
    private final Function<A, K> accumulatorKeyFunction;
    private final Comparator<K> keyComparator;
    private final StreamReducers.ReducerToResult<K, I, O, A> reducer;
    private final Class<A> accumulatorType;

    public DatasetSplitSortReduceRepartitionReduce(Dataset<I> dataset, Function<I, K> function, Function<A, K> function2, Comparator<K> comparator, StreamReducers.ReducerToResult<K, I, O, A> reducerToResult, Class<O> cls, Class<A> cls2) {
        super(cls);
        this.input = dataset;
        this.inputKeyFunction = function;
        this.accumulatorKeyFunction = function2;
        this.keyComparator = comparator;
        this.reducer = reducerToResult;
        this.accumulatorType = cls2;
    }

    @Override // io.activej.dataflow.dataset.Dataset
    public List<StreamId> channels(DataflowContext dataflowContext) {
        DataflowGraph graph = dataflowContext.getGraph();
        int nonce = dataflowContext.getNonce();
        ArrayList arrayList = new ArrayList();
        ArrayList<NodeShard> arrayList2 = new ArrayList();
        for (StreamId streamId : this.input.channels(dataflowContext.withoutFixedNonce())) {
            Partition partition = graph.getPartition(streamId);
            NodeShard nodeShard = new NodeShard(this.inputKeyFunction, streamId, nonce);
            graph.addNode(partition, nodeShard);
            arrayList2.add(nodeShard);
        }
        for (Partition partition2 : graph.getAvailablePartitions()) {
            NodeReduce nodeReduce = new NodeReduce(this.keyComparator);
            graph.addNode(partition2, nodeReduce);
            for (NodeShard nodeShard2 : arrayList2) {
                StreamId newPartition = nodeShard2.newPartition();
                graph.addNodeStream(nodeShard2, newPartition);
                nodeReduce.addInput(sortReduceForward(graph, newPartition, partition2), this.accumulatorKeyFunction, this.reducer.accumulatorToOutput());
            }
            arrayList.add(nodeReduce.getOutput());
        }
        return arrayList;
    }

    private StreamId sortReduceForward(DataflowGraph dataflowGraph, StreamId streamId, Partition partition) {
        Partition partition2 = dataflowGraph.getPartition(streamId);
        NodeSort nodeSort = new NodeSort(this.input.valueType(), this.inputKeyFunction, this.keyComparator, false, 1000000, streamId);
        dataflowGraph.addNode(partition2, nodeSort);
        NodeReduceSimple nodeReduceSimple = new NodeReduceSimple(this.inputKeyFunction, this.keyComparator, this.reducer.inputToAccumulator());
        nodeReduceSimple.addInput(nodeSort.getOutput());
        dataflowGraph.addNode(partition2, nodeReduceSimple);
        return DatasetUtils.forwardChannel(dataflowGraph, this.accumulatorType, nodeReduceSimple.getOutput(), partition);
    }
}
