/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.datagraph.dataset.impl;

import io.datakernel.datagraph.dataset.LocallySortedDataset;
import io.datakernel.datagraph.graph.DataGraph;
import io.datakernel.datagraph.graph.Partition;
import io.datakernel.datagraph.graph.StreamId;
import io.datakernel.datagraph.node.NodeDownload;
import io.datakernel.datagraph.node.NodeReduce;
import io.datakernel.datagraph.node.NodeShard;
import io.datakernel.datagraph.node.NodeUpload;
import io.datakernel.stream.processor.StreamReducers;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

public class DatasetUtils {
    private DatasetUtils() {
    }

    public static <K, I, O> List<StreamId> repartitionAndReduce(DataGraph graph, LocallySortedDataset<K, I> input, StreamReducers.Reducer<K, I, O, ?> reducer, List<Partition> partitions) {
        Function<I, K> keyFunction = input.keyFunction();
        ArrayList<StreamId> outputStreamIds = new ArrayList<StreamId>();
        ArrayList<NodeShard<K, I>> sharders = new ArrayList<NodeShard<K, I>>();
        for (StreamId inputStreamId : input.channels(graph)) {
            Partition partition = graph.getPartition(inputStreamId);
            NodeShard<K, I> sharder = new NodeShard<K, I>(keyFunction, inputStreamId);
            graph.addNode(partition, sharder);
            sharders.add(sharder);
        }
        for (Partition partition : partitions) {
            NodeReduce streamReducer = new NodeReduce(input.keyComparator());
            graph.addNode(partition, streamReducer);
            for (NodeShard nodeShard : sharders) {
                StreamId sharderOutput = nodeShard.newPartition();
                graph.addNodeStream(nodeShard, sharderOutput);
                StreamId reducerInput = DatasetUtils.forwardChannel(graph, input.valueType(), sharderOutput, partition);
                streamReducer.addInput(reducerInput, keyFunction, reducer);
            }
            outputStreamIds.add(streamReducer.getOutput());
        }
        return outputStreamIds;
    }

    public static <K, T> List<StreamId> repartitionAndSort(DataGraph graph, LocallySortedDataset<K, T> input, List<Partition> partitions) {
        return DatasetUtils.repartitionAndReduce(graph, input, StreamReducers.mergeSortReducer(), partitions);
    }

    public static <T> StreamId forwardChannel(DataGraph graph, Class<T> type, StreamId sourceStreamId, Partition targetPartition) {
        Partition sourcePartition = graph.getPartition(sourceStreamId);
        return DatasetUtils.forwardChannel(graph, type, sourcePartition, targetPartition, sourceStreamId);
    }

    private static <T> StreamId forwardChannel(DataGraph graph, Class<T> type, Partition sourcePartition, Partition targetPartition, StreamId sourceStreamId) {
        NodeUpload<T> nodeUpload = new NodeUpload<T>(type, sourceStreamId);
        NodeDownload<T> nodeDownload = new NodeDownload<T>(type, sourcePartition.getAddress(), sourceStreamId);
        graph.addNode(sourcePartition, nodeUpload);
        graph.addNode(targetPartition, nodeDownload);
        return nodeDownload.getOutput();
    }
}

