/*
 * Decompiled with CFR 0.152.
 */
package io.activej.dataflow.node;

import io.activej.dataflow.graph.StreamId;
import io.activej.dataflow.graph.Task;
import io.activej.dataflow.node.AbstractNode;
import io.activej.dataflow.stats.NodeStat;
import io.activej.dataflow.stats.TestNodeStat;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.processor.StreamReducer;
import io.activej.datastream.processor.StreamReducers;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Function;

public final class NodeReduce<K, O, A>
extends AbstractNode {
    private final Comparator<K> keyComparator;
    private final Map<StreamId, Input<K, O, A>> inputs;
    private final StreamId output;

    public NodeReduce(int index, Comparator<K> keyComparator) {
        this(index, keyComparator, new LinkedHashMap<StreamId, Input<K, O, A>>(), new StreamId());
    }

    public NodeReduce(int index, Comparator<K> keyComparator, Map<StreamId, Input<K, O, A>> inputs, StreamId output) {
        super(index);
        this.keyComparator = keyComparator;
        this.inputs = inputs;
        this.output = output;
    }

    public <I> void addInput(StreamId streamId, Function<I, K> keyFunction, StreamReducers.Reducer<K, I, O, A> reducer) {
        this.inputs.put(streamId, new Input<K, O, A>(reducer, keyFunction));
    }

    @Override
    public Collection<StreamId> getInputs() {
        return this.inputs.keySet();
    }

    @Override
    public Collection<StreamId> getOutputs() {
        return Collections.singletonList(this.output);
    }

    @Override
    public void createAndBind(Task task) {
        StreamReducer streamReducer = StreamReducer.create(this.keyComparator);
        for (Map.Entry<StreamId, Input<K, O, A>> entry : this.inputs.entrySet()) {
            Input<K, O, A> koaInput = entry.getValue();
            StreamConsumer input = streamReducer.newInput(((Input)koaInput).keyFunction, ((Input)koaInput).reducer);
            task.bindChannel(entry.getKey(), input);
        }
        task.export(this.output, streamReducer.getOutput());
    }

    public Comparator<K> getKeyComparator() {
        return this.keyComparator;
    }

    public Map<StreamId, Input<K, O, A>> getInputMap() {
        return this.inputs;
    }

    public StreamId getOutput() {
        return this.output;
    }

    @Override
    public NodeStat getStats() {
        return new TestNodeStat(this.getIndex());
    }

    public String toString() {
        return "NodeReduce{keyComparator=" + this.keyComparator.getClass().getSimpleName() + ", inputs=" + this.inputs + ", output=" + this.output + '}';
    }

    public static class Input<K, O, A> {
        private final StreamReducers.Reducer<K, ?, O, A> reducer;
        private final Function<?, K> keyFunction;

        public Input(StreamReducers.Reducer<K, ?, O, A> reducer, Function<?, K> keyFunction) {
            this.reducer = reducer;
            this.keyFunction = keyFunction;
        }

        public StreamReducers.Reducer<K, ?, O, A> getReducer() {
            return this.reducer;
        }

        public Function<?, K> getKeyFunction() {
            return this.keyFunction;
        }

        public String toString() {
            return "Input{reducer=" + this.reducer.getClass().getSimpleName() + ", keyFunction=" + this.keyFunction.getClass().getSimpleName() + '}';
        }
    }
}

