/*
 * Decompiled with CFR 0.152.
 */
package io.activej.dataflow.node;

import io.activej.common.HashUtils;
import io.activej.dataflow.graph.StreamId;
import io.activej.dataflow.graph.Task;
import io.activej.dataflow.node.AbstractNode;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.processor.StreamSplitter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Function;

public final class NodeShard<K, T>
extends AbstractNode {
    private final Function<T, K> keyFunction;
    private final int nonce;
    private final StreamId input;
    private final List<StreamId> outputs;

    public NodeShard(int index, Function<T, K> keyFunction, StreamId input, int nonce) {
        this(index, keyFunction, input, new ArrayList<StreamId>(), nonce);
    }

    public NodeShard(int index, Function<T, K> keyFunction, StreamId input, List<StreamId> outputs, int nonce) {
        super(index);
        this.keyFunction = keyFunction;
        this.input = input;
        this.outputs = outputs;
        this.nonce = nonce;
    }

    public StreamId newPartition() {
        StreamId newOutput = new StreamId();
        this.outputs.add(newOutput);
        return newOutput;
    }

    public StreamId getOutput(int partition) {
        return this.outputs.get(partition);
    }

    public Function<T, K> getKeyFunction() {
        return this.keyFunction;
    }

    public StreamId getInput() {
        return this.input;
    }

    @Override
    public Collection<StreamId> getInputs() {
        return Collections.singletonList(this.input);
    }

    public List<StreamId> getOutputs() {
        return this.outputs;
    }

    public int getNonce() {
        return this.nonce;
    }

    @Override
    public void createAndBind(Task task) {
        int bits;
        int partitions = this.outputs.size();
        BiConsumer<Object, StreamDataAcceptor[]> splitter = (partitions & (bits = partitions - 1)) == 0 ? (item, acceptors) -> acceptors[HashUtils.murmur3hash((int)(this.keyFunction.apply(item).hashCode() + this.nonce)) & bits].accept(item) : (item, acceptors) -> {
            int hash = HashUtils.murmur3hash((int)(this.keyFunction.apply(item).hashCode() + this.nonce));
            int hashAbs = hash < 0 ? (hash == Integer.MIN_VALUE ? Integer.MAX_VALUE : -hash) : hash;
            acceptors[hashAbs % partitions].accept(item);
        };
        StreamSplitter streamSharder = StreamSplitter.create(splitter);
        task.bindChannel(this.input, streamSharder.getInput());
        for (StreamId streamId : this.outputs) {
            StreamSupplier supplier = streamSharder.newOutput();
            task.export(streamId, supplier);
        }
    }

    public String toString() {
        return "NodeShard{keyFunction=" + this.keyFunction.getClass().getSimpleName() + ", input=" + this.input + ", outputs=" + this.outputs + '}';
    }
}

