/*
 * Decompiled with CFR 0.152.
 */
package cz.seznam.euphoria.core.executor;

import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.UnaryPredicate;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.executor.FlowValidator;
import cz.seznam.euphoria.core.executor.graph.DAG;
import cz.seznam.euphoria.core.executor.graph.Node;
import cz.seznam.euphoria.shadow.com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public class FlowUnfolder {
    public static DAG<Operator<?, ?>> unfold(Flow flow, Set<Class<? extends Operator<?, ?>>> operatorClasses) throws IllegalArgumentException {
        return FlowUnfolder.unfold(flow, op -> operatorClasses.contains(op.getClass()));
    }

    public static DAG<Operator<?, ?>> unfold(Flow flow, UnaryPredicate<Operator<?, ?>> wantTranslate) {
        DAG<Operator<?, ?>> dag = FlowUnfolder.toDAG(flow);
        return FlowUnfolder.translate(dag, wantTranslate);
    }

    public static DAG<Operator<?, ?>> translate(DAG<Operator<?, ?>> dag, UnaryPredicate<Operator<?, ?>> wantTranslate) throws IllegalArgumentException {
        dag = FlowValidator.preTranslate(dag);
        DAG<Operator<?, ?>> ret = DAG.empty();
        HashMap datasetProducers = new HashMap();
        dag.nodes().flatMap(n -> n.listInputs().stream()).forEach(d -> datasetProducers.put(d, Optional.empty()));
        dag.nodes().forEach(n -> datasetProducers.put(n.output(), Optional.of(n)));
        dag.traverse().forEach(n -> {
            if (n.get() instanceof InputOperator) {
                ret.add((Operator<?, ?>)n.get(), (T[])new Operator[0]);
            } else if (((Boolean)wantTranslate.apply(n.get())).booleanValue()) {
                List<Operator<?, ?>> parents = FlowUnfolder.getParents(n, datasetProducers);
                ret.add((Operator<?, ?>)n.get(), parents);
            } else {
                DAG<Operator<?, ?>> basicOps = ((Operator)n.get()).getBasicOps();
                if (basicOps.size() == 1 && basicOps.nodes().findFirst().get().getClass() == ((Operator)n.get()).getClass()) {
                    throw new IllegalArgumentException("Operator " + n.get() + " cannot be executed with given executor!");
                }
                DAG<Operator<?, ?>> modified = FlowUnfolder.translate(basicOps, wantTranslate);
                modified.traverse().forEach(m -> {
                    List<Operator<?, ?>> parents = FlowUnfolder.getParents(m, datasetProducers);
                    ret.add((Operator<?, ?>)m.get(), parents);
                    datasetProducers.put(((Operator)m.get()).output(), Optional.of(m.get()));
                });
                Operator<?, ?> leaf = Iterables.getOnlyElement(modified.getLeafs()).get();
                datasetProducers.put(((Operator)n.get()).output(), Optional.of(leaf));
                if (((Operator)n.get()).output().getOutputSink() != null) {
                    leaf.output().persist(((Operator)n.get()).output().getOutputSink());
                }
            }
        });
        return FlowValidator.postTranslate(ret);
    }

    private static List<Operator<?, ?>> getParents(Node<Operator<?, ?>> node, Map<Dataset<?>, Optional<Operator<?, ?>>> datasetProducents) {
        if (node.getParents().isEmpty()) {
            Operator<?, ?> op = node.get();
            return op.listInputs().stream().map(datasetProducents::get).filter(o -> {
                if (o == null) {
                    throw new IllegalStateException("Inputs of operator " + op + " are inconsistent: " + op.listInputs());
                }
                return o.isPresent();
            }).map(Optional::get).collect(Collectors.toList());
        }
        return node.getParents().stream().map(n -> (Optional)datasetProducents.get(((Operator)n.get()).output())).map(o -> {
            if (o == null) {
                throw new IllegalStateException("Output of " + node.get() + " should have been stored into 'datasetProducents");
            }
            return (Operator)o.get();
        }).collect(Collectors.toList());
    }

    private static DAG<Operator<?, ?>> toDAG(Flow flow) {
        FlowUnfolder.applySinkTransforms(flow);
        Collection<Operator<?, ?>> operators = flow.operators();
        HashSet resolvedOperators = new HashSet();
        HashMap datasets = new HashMap();
        flow.sources().forEach(d -> {
            Operator cfr_ignored_0 = datasets.put((Dataset<?>)d, (Operator<?, ?>)new InputOperator(d));
        });
        ArrayList roots = new ArrayList(datasets.values());
        DAG<Operator<?, ?>> ret = DAG.of(roots);
        while (resolvedOperators.size() != operators.size()) {
            boolean anyAdded = false;
            for (Operator<?, ?> op : operators) {
                if (resolvedOperators.contains(op)) continue;
                if (!op.listInputs().stream().allMatch(datasets::containsKey)) continue;
                resolvedOperators.add(op);
                List parents = op.listInputs().stream().map(datasets::get).collect(Collectors.toList());
                ret.add(op, parents);
                datasets.put(op.output(), op);
                anyAdded = true;
            }
            if (anyAdded) continue;
            throw new IllegalStateException("Given flow is not a valid DAG!");
        }
        return ret;
    }

    private static void applySinkTransforms(Flow flow) {
        List<Dataset> outputs = flow.operators().stream().filter(o -> o.output().getOutputSink() != null).map(Operator::output).collect(Collectors.toList());
        outputs.forEach(d -> {
            if (d.getOutputSink().prepareDataset(d)) {
                d.persist(null);
            }
        });
    }

    public static final class InputOperator<T>
    extends Operator<T, T> {
        private final Dataset<T> ds;

        InputOperator(Dataset<T> ds) {
            super("InputOperator", ds.getFlow());
            this.ds = ds;
        }

        @Override
        public Collection<Dataset<T>> listInputs() {
            return Collections.emptyList();
        }

        @Override
        public Dataset<T> output() {
            return this.ds;
        }
    }
}

