/*
 * Decompiled with CFR 0.152.
 */
package io.activej.dataflow.collector;

import io.activej.dataflow.DataflowClient;
import io.activej.dataflow.dataset.Dataset;
import io.activej.dataflow.dataset.SortedDataset;
import io.activej.dataflow.graph.DataflowContext;
import io.activej.dataflow.graph.DataflowGraph;
import io.activej.dataflow.graph.Partition;
import io.activej.dataflow.graph.StreamId;
import io.activej.dataflow.node.NodeUpload;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.processor.StreamMerger;
import java.util.Comparator;
import java.util.List;
import java.util.function.Function;

public class MergeCollector<K, T> {
    private final Dataset<T> input;
    private final DataflowClient client;
    private final Function<T, K> keyFunction;
    private final Comparator<K> keyComparator;
    private final boolean distinct;

    public MergeCollector(SortedDataset<K, T> input, DataflowClient client, boolean distinct) {
        this(input, client, input.keyFunction(), input.keyComparator(), distinct);
    }

    public MergeCollector(Dataset<T> input, DataflowClient client, Function<T, K> keyFunction, Comparator<K> keyComparator, boolean distinct) {
        this.input = input;
        this.client = client;
        this.keyFunction = keyFunction;
        this.keyComparator = keyComparator;
        this.distinct = distinct;
    }

    public StreamSupplier<T> compile(DataflowGraph graph) {
        DataflowContext context = DataflowContext.of(graph);
        List<StreamId> inputStreamIds = this.input.channels(context);
        StreamMerger merger = StreamMerger.create(this.keyFunction, this.keyComparator, (boolean)this.distinct);
        int index = context.generateNodeIndex();
        for (StreamId streamId : inputStreamIds) {
            NodeUpload<T> nodeUpload = new NodeUpload<T>(index, this.input.valueType(), streamId);
            Partition partition = graph.getPartition(streamId);
            graph.addNode(partition, nodeUpload);
            StreamSupplier<T> supplier = this.client.download(partition.getAddress(), streamId, this.input.valueType());
            supplier.streamTo(merger.newInput());
        }
        return merger.getOutput();
    }
}

