/*
 * Decompiled with CFR 0.152.
 */
package io.activej.dataflow.dataset;

import io.activej.dataflow.dataset.LocallySortedDataset;
import io.activej.dataflow.graph.DataflowContext;
import io.activej.dataflow.graph.DataflowGraph;
import io.activej.dataflow.graph.Partition;
import io.activej.dataflow.graph.StreamId;
import io.activej.dataflow.node.NodeDownload;
import io.activej.dataflow.node.NodeReduce;
import io.activej.dataflow.node.NodeShard;
import io.activej.dataflow.node.NodeUpload;
import io.activej.datastream.processor.StreamReducers;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.IntStream;

public class DatasetUtils {
    public static <K, I, O, A> List<StreamId> repartitionAndReduce(DataflowContext context, LocallySortedDataset<K, I> input, StreamReducers.Reducer<K, I, O, A> reducer, List<Partition> partitions) {
        DataflowGraph graph = context.getGraph();
        int nonce = context.getNonce();
        Function<I, K> keyFunction = input.keyFunction();
        ArrayList<StreamId> outputStreamIds = new ArrayList<StreamId>();
        ArrayList<NodeShard<K, I>> sharders = new ArrayList<NodeShard<K, I>>();
        int sharderIndex = context.generateNodeIndex();
        for (StreamId inputStreamId : input.channels(context.withoutFixedNonce())) {
            Partition partition = graph.getPartition(inputStreamId);
            NodeShard<K, I> sharder = new NodeShard<K, I>(sharderIndex, keyFunction, inputStreamId, nonce);
            graph.addNode(partition, sharder);
            sharders.add(sharder);
        }
        int reducerIndex = context.generateNodeIndex();
        int[] downloadIndexes = DatasetUtils.generateIndexes(context, sharders.size());
        int[] uploadIndexes = DatasetUtils.generateIndexes(context, partitions.size());
        for (int i = 0; i < partitions.size(); ++i) {
            Partition partition = partitions.get(i);
            NodeReduce<K, O, A> nodeReduce = new NodeReduce<K, O, A>(reducerIndex, input.keyComparator());
            graph.addNode(partition, nodeReduce);
            for (int j = 0; j < sharders.size(); ++j) {
                NodeShard sharder = (NodeShard)sharders.get(j);
                StreamId sharderOutput = sharder.newPartition();
                graph.addNodeStream(sharder, sharderOutput);
                StreamId reducerInput = DatasetUtils.forwardChannel(context, input.valueType(), sharderOutput, partition, uploadIndexes[i], downloadIndexes[j]);
                nodeReduce.addInput(reducerInput, keyFunction, reducer);
            }
            outputStreamIds.add(nodeReduce.getOutput());
        }
        return outputStreamIds;
    }

    public static <K, T> List<StreamId> repartitionAndSort(DataflowContext context, LocallySortedDataset<K, T> input, List<Partition> partitions) {
        return DatasetUtils.repartitionAndReduce(context, input, StreamReducers.mergeSortReducer(), partitions);
    }

    public static <T> StreamId forwardChannel(DataflowContext context, Class<T> type, StreamId sourceStreamId, Partition targetPartition, int uploadIndex, int downloadIndex) {
        Partition sourcePartition = context.getGraph().getPartition(sourceStreamId);
        return DatasetUtils.forwardChannel(context, type, sourcePartition, targetPartition, sourceStreamId, uploadIndex, downloadIndex);
    }

    private static <T> StreamId forwardChannel(DataflowContext context, Class<T> type, Partition sourcePartition, Partition targetPartition, StreamId sourceStreamId, int uploadIndex, int downloadIndex) {
        DataflowGraph graph = context.getGraph();
        NodeUpload<T> nodeUpload = new NodeUpload<T>(uploadIndex, type, sourceStreamId);
        NodeDownload<T> nodeDownload = new NodeDownload<T>(downloadIndex, type, sourcePartition.getAddress(), sourceStreamId);
        graph.addNode(sourcePartition, nodeUpload);
        graph.addNode(targetPartition, nodeDownload);
        return nodeDownload.getOutput();
    }

    public static int[] generateIndexes(DataflowContext context, int size) {
        return IntStream.generate(context::generateNodeIndex).limit(size).toArray();
    }
}

