/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.datagraph.node;

import io.datakernel.datagraph.graph.StreamId;
import io.datakernel.datagraph.graph.TaskContext;
import io.datakernel.datagraph.node.Node;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.processor.Sharders;
import io.datakernel.stream.processor.StreamSharder;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

public final class NodeShard<K, T>
implements Node {
    private Function<T, K> keyFunction;
    private StreamId input;
    private List<StreamId> outputs;

    public StreamId newPartition() {
        StreamId newOutput = new StreamId();
        this.outputs.add(newOutput);
        return newOutput;
    }

    public StreamId getOutput(int partition) {
        return this.outputs.get(partition);
    }

    public NodeShard() {
    }

    public NodeShard(Function<T, K> keyFunction, StreamId input) {
        this.keyFunction = keyFunction;
        this.input = input;
        this.outputs = new ArrayList<StreamId>();
    }

    public NodeShard(Function<T, K> keyFunction, StreamId input, List<StreamId> outputs) {
        this.keyFunction = keyFunction;
        this.input = input;
        this.outputs = outputs;
    }

    public Function<T, K> getKeyFunction() {
        return this.keyFunction;
    }

    public void setKeyFunction(Function<T, K> keyFunction) {
        this.keyFunction = keyFunction;
    }

    public StreamId getInput() {
        return this.input;
    }

    public void setInput(StreamId input) {
        this.input = input;
    }

    public List<StreamId> getOutputs() {
        return this.outputs;
    }

    public void setOutputs(List<StreamId> outputs) {
        this.outputs = outputs;
    }

    @Override
    public void createAndBind(TaskContext taskContext) {
        Sharders.HashSharder hashSharder = new Sharders.HashSharder(this.outputs.size());
        StreamSharder streamSharder = StreamSharder.create(object -> hashSharder.shard(this.keyFunction.apply(object)));
        taskContext.bindChannel(this.input, streamSharder.getInput());
        for (StreamId streamId : this.outputs) {
            StreamProducer producer = streamSharder.newOutput();
            taskContext.export(streamId, producer);
        }
    }
}

