package org.aksw.jena_sparql_api_sparql_path2;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.stream.StreamSupport;
import org.aksw.jena_sparql_api.utils.Pair;
import org.aksw.jena_sparql_api_sparql_path.spark.NfaExecutionSpark;
import org.apache.jena.graph.Node;
import org.apache.jena.sparql.path.Path;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;

/* loaded from: input_file:org/aksw/jena_sparql_api_sparql_path2/SparqlKShortestPathFinderSpark.class */
public class SparqlKShortestPathFinderSpark implements SparqlKShortestPathFinder {
    protected final JavaSparkContext sparkContext;
    protected final JavaPairRDD<Node, Tuple2<Node, Node>> fwdRdd;
    protected final JavaPairRDD<Node, Tuple2<Node, Node>> bwdRdd;

    public SparqlKShortestPathFinderSpark(JavaSparkContext javaSparkContext, JavaPairRDD<Node, Tuple2<Node, Node>> javaPairRDD, JavaPairRDD<Node, Tuple2<Node, Node>> javaPairRDD2) {
        this.sparkContext = javaSparkContext;
        this.fwdRdd = javaPairRDD;
        this.bwdRdd = javaPairRDD2;
    }

    public static JavaRDD<NestedPath<Node, Node>> exec(JavaSparkContext javaSparkContext, JavaPairRDD<Node, Tuple2<Node, Node>> javaPairRDD, JavaPairRDD<Node, Tuple2<Node, Node>> javaPairRDD2, Node node, Node node2, Path path, final Long l) {
        JavaPairRDD persist;
        HashMap hashMap = new HashMap();
        Nfa compileToNfa = PathCompiler.compileToNfa(path);
        hashMap.put(1, compileToNfa);
        Broadcast broadcast = javaSparkContext.broadcast(hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(1, node2);
        Broadcast broadcast2 = javaSparkContext.broadcast(hashMap2);
        JavaPairRDD persist2 = javaSparkContext.parallelizePairs(Collections.singletonList(new FrontierItem(1, compileToNfa.getStartStates(), node, false, Node.class))).partitionBy((Partitioner) javaPairRDD.partitioner().get()).persist(StorageLevel.MEMORY_AND_DISK_SER());
        JavaPairRDD javaPairRDD3 = null;
        Function<LabeledEdge<Integer, PredicateClass>, PredicateClass> function = new Function<LabeledEdge<Integer, PredicateClass>, PredicateClass>() { // from class: org.aksw.jena_sparql_api_sparql_path2.SparqlKShortestPathFinderSpark.1
            private static final long serialVersionUID = 23901355501L;

            public PredicateClass call(LabeledEdge<Integer, PredicateClass> labeledEdge) throws Exception {
                return (PredicateClass) labeledEdge.getLabel();
            }
        };
        do {
            persist2.count();
            JavaPairRDD collectPaths = NfaExecutionSpark.collectPaths(persist2, broadcast, broadcast2, function);
            JavaPairRDD persist3 = javaPairRDD3 == null ? collectPaths : collectPaths.union(javaPairRDD3).persist(StorageLevel.MEMORY_AND_DISK_SER());
            long count = persist3.count();
            if (javaPairRDD3 != null) {
                javaPairRDD3.unpersist(false);
            }
            javaPairRDD3 = persist3;
            Pair pair = (Pair) NfaExecutionSpark.analyzeFrontierDir(persist2, broadcast, function).values().stream().reduce(new Pair(0, 0), (pair2, pair3) -> {
                return new Pair(Long.valueOf(((Number) pair2.getKey()).longValue() + ((Number) pair3.getKey()).longValue()), Long.valueOf(((Number) pair2.getValue()).longValue() + ((Number) pair3.getValue()).longValue()));
            });
            boolean z = ((Number) pair.getKey()).longValue() > 0;
            boolean z2 = ((Number) pair.getValue()).longValue() > 0;
            JavaPairRDD javaPairRDD4 = null;
            if (z) {
                javaPairRDD4 = NfaExecutionSpark.advanceFrontier(javaSparkContext, 1, persist2, javaPairRDD, false, broadcast, function);
            }
            JavaPairRDD javaPairRDD5 = null;
            if (z2) {
                javaPairRDD5 = NfaExecutionSpark.advanceFrontier(javaSparkContext, 1, persist2, javaPairRDD2, true, broadcast, function);
            }
            boolean z3 = false;
            if (javaPairRDD4 == null) {
                persist = javaPairRDD5;
            } else if (javaPairRDD5 == null) {
                persist = javaPairRDD4;
            } else {
                persist = javaPairRDD4.union(javaPairRDD5).persist(StorageLevel.MEMORY_AND_DISK_SER());
                z3 = true;
            }
            persist2.unpersist(false);
            if (z3) {
                javaPairRDD4.unpersist(false);
                javaPairRDD5.unpersist(false);
            }
            persist2 = persist;
            if (count > l.longValue() || persist2 == null) {
                break;
            }
        } while (!persist2.isEmpty());
        JavaRDD<NestedPath<Node, Node>> persist4 = javaPairRDD3.map(new Function<Tuple2<Integer, NestedPath<Node, Node>>, NestedPath<Node, Node>>() { // from class: org.aksw.jena_sparql_api_sparql_path2.SparqlKShortestPathFinderSpark.4
            private static final long serialVersionUID = 234902531915L;

            public NestedPath<Node, Node> call(Tuple2<Integer, NestedPath<Node, Node>> tuple2) throws Exception {
                return (NestedPath) tuple2._2;
            }
        }).sortBy(new Function<NestedPath<Node, Node>, Integer>() { // from class: org.aksw.jena_sparql_api_sparql_path2.SparqlKShortestPathFinderSpark.3
            private static final long serialVersionUID = 331039331952L;

            public Integer call(NestedPath<Node, Node> nestedPath) throws Exception {
                return Integer.valueOf(nestedPath.getLength());
            }
        }, true, 10).zipWithIndex().filter(new Function<Tuple2<NestedPath<Node, Node>, Long>, Boolean>() { // from class: org.aksw.jena_sparql_api_sparql_path2.SparqlKShortestPathFinderSpark.2
            private static final long serialVersionUID = 47129253090252L;

            public Boolean call(Tuple2<NestedPath<Node, Node>, Long> tuple2) throws Exception {
                return Boolean.valueOf(((Long) tuple2._2()).longValue() < l.longValue());
            }
        }).keys().persist(StorageLevel.MEMORY_AND_DISK_SER());
        broadcast.destroy();
        broadcast2.destroy();
        return persist4;
    }

    public Iterator<TripletPath<Node, Directed<Node>>> findPaths(Node node, Node node2, Path path, Long l) {
        Iterator localIterator = exec(this.sparkContext, this.fwdRdd, this.bwdRdd, node, node2, path, l).toLocalIterator();
        Iterable iterable = () -> {
            return localIterator;
        };
        return StreamSupport.stream(iterable.spliterator(), false).map(nestedPath -> {
            return nestedPath.asSimpleDirectedPath();
        }).iterator();
    }
}
