package ai.grakn.graph.internal.computer;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Stream;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.ComputerSubmissionHelper;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
import org.apache.tinkerpop.gremlin.spark.structure.Spark;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputFormatRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputOutputHelper;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.OutputFormatRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
import org.apache.tinkerpop.gremlin.structure.io.Storage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/grakn/graph/internal/computer/GraknSparkComputer.class */
public final class GraknSparkComputer extends AbstractHadoopGraphComputer {
    private final Configuration sparkConfiguration;
    private boolean workersSet;
    private Configuration apacheConfiguration;
    private org.apache.hadoop.conf.Configuration hadoopConfiguration;
    private String jobGroupId;
    private static final Logger LOGGER = LoggerFactory.getLogger(GraknSparkComputer.class);
    private static GraknGraphRDD graknGraphRDD = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ai/grakn/graph/internal/computer/GraknSparkComputer$GraknGraphRDD.class */
    public static class GraknGraphRDD {
        private static boolean commit;
        private Storage fileSystemStorage;
        private Storage sparkContextStorage;
        private boolean outputToHDFS;
        private boolean outputToSpark;
        private String outputLocation;
        private SparkConf sparkConf;
        private JavaSparkContext sparkContext;
        private JavaPairRDD<Object, VertexWritable> loadedGraphRDD;
        private boolean inputFromSpark;
        static final /* synthetic */ boolean $assertionsDisabled;

        private GraknGraphRDD(GraknSparkComputer graknSparkComputer) {
            this.fileSystemStorage = FileSystemStorage.open(graknSparkComputer.hadoopConfiguration);
            this.sparkContextStorage = SparkContextStorage.open(graknSparkComputer.apacheConfiguration);
            this.inputFromSpark = PersistedInputRDD.class.isAssignableFrom(graknSparkComputer.hadoopConfiguration.getClass("gremlin.spark.graphInputRDD", Object.class));
            this.outputToHDFS = FileOutputFormat.class.isAssignableFrom(graknSparkComputer.hadoopConfiguration.getClass("gremlin.hadoop.graphOutputFormat", Object.class));
            this.outputToSpark = PersistedOutputRDD.class.isAssignableFrom(graknSparkComputer.hadoopConfiguration.getClass("gremlin.spark.graphOutputRDD", Object.class));
            this.outputLocation = graknSparkComputer.hadoopConfiguration.get("gremlin.hadoop.outputLocation", (String) null);
            if (null != this.outputLocation) {
                if (this.outputToHDFS && this.fileSystemStorage.exists(this.outputLocation)) {
                    this.fileSystemStorage.rm(this.outputLocation);
                }
                if (this.outputToSpark && this.sparkContextStorage.exists(this.outputLocation)) {
                    this.sparkContextStorage.rm(this.outputLocation);
                }
            }
            this.sparkConf = new SparkConf();
            this.sparkConf.setAppName("HadoopGremlin(Spark): ");
            graknSparkComputer.hadoopConfiguration.forEach(entry -> {
                this.sparkConf.set((String) entry.getKey(), (String) entry.getValue());
            });
            this.sparkContext = new JavaSparkContext(SparkContext.getOrCreate(this.sparkConf));
            GraknSparkComputer.loadJars(this.sparkContext, graknSparkComputer.hadoopConfiguration);
            Spark.create(this.sparkContext.sc());
            GraknSparkComputer.updateLocalConfiguration(this.sparkContext, this.sparkConf);
            boolean z = false;
            try {
                this.loadedGraphRDD = ((InputRDD) graknSparkComputer.hadoopConfiguration.getClass("gremlin.spark.graphInputRDD", InputFormatRDD.class, InputRDD.class).newInstance()).readGraphRDD(graknSparkComputer.apacheConfiguration, this.sparkContext);
                if (this.loadedGraphRDD.partitioner().isPresent()) {
                    GraknSparkComputer.LOGGER.info("Using the existing partitioner associated with the loaded graphRDD: " + this.loadedGraphRDD.partitioner().get());
                } else {
                    this.loadedGraphRDD = this.loadedGraphRDD.partitionBy(new HashPartitioner(graknSparkComputer.workersSet ? graknSparkComputer.workers : this.loadedGraphRDD.partitions().size()));
                    z = true;
                }
                if (!$assertionsDisabled && !this.loadedGraphRDD.partitioner().isPresent()) {
                    throw new AssertionError();
                }
                if (graknSparkComputer.workersSet) {
                    if (this.loadedGraphRDD.partitions().size() > graknSparkComputer.workers) {
                        this.loadedGraphRDD = this.loadedGraphRDD.coalesce(graknSparkComputer.workers);
                    } else if (this.loadedGraphRDD.partitions().size() < graknSparkComputer.workers) {
                        this.loadedGraphRDD = this.loadedGraphRDD.repartition(graknSparkComputer.workers);
                    }
                }
                if (!this.inputFromSpark || z) {
                    this.loadedGraphRDD = this.loadedGraphRDD.persist(StorageLevel.fromString(graknSparkComputer.hadoopConfiguration.get("gremlin.spark.graphStorageLevel", "MEMORY_ONLY")));
                }
                commit = false;
            } catch (IllegalAccessException | InstantiationException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }

        static {
            $assertionsDisabled = !GraknSparkComputer.class.desiredAssertionStatus();
            commit = false;
        }
    }

    public GraknSparkComputer(HadoopGraph hadoopGraph) {
        super(hadoopGraph);
        this.workersSet = false;
        this.apacheConfiguration = null;
        this.hadoopConfiguration = null;
        this.jobGroupId = null;
        this.sparkConfiguration = new HadoopConfiguration();
        ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration);
        this.apacheConfiguration = new HadoopConfiguration(this.sparkConfiguration);
        this.apacheConfiguration.setProperty("gremlin.hadoop.graphInputFormat.hasEdges", false);
        this.hadoopConfiguration = ConfUtil.makeHadoopConfiguration(this.apacheConfiguration);
        if (this.hadoopConfiguration.get("gremlin.spark.graphInputRDD", (String) null) == null && this.hadoopConfiguration.get("gremlin.hadoop.graphInputFormat", (String) null) != null && FileInputFormat.class.isAssignableFrom(this.hadoopConfiguration.getClass("gremlin.hadoop.graphInputFormat", InputFormat.class))) {
            try {
                String path = FileSystem.get(this.hadoopConfiguration).getFileStatus(new Path(this.hadoopConfiguration.get("gremlin.hadoop.inputLocation"))).getPath().toString();
                this.apacheConfiguration.setProperty("mapreduce.input.fileinputformat.inputdir", path);
                this.hadoopConfiguration.set("mapreduce.input.fileinputformat.inputdir", path);
            } catch (IOException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
    }

    public GraphComputer workers(int i) {
        super.workers(i);
        if (this.sparkConfiguration.containsKey("spark.master") && this.sparkConfiguration.getString("spark.master").startsWith("local")) {
            this.sparkConfiguration.setProperty("spark.master", "local[" + this.workers + "]");
        }
        this.workersSet = true;
        return this;
    }

    public Future<ComputerResult> submit() {
        validateStatePriorToExecution();
        return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor, "SparkSubmitter");
    }

    public void cancelJobs() {
        if (this.jobGroupId == null || graknGraphRDD == null || graknGraphRDD.sparkContext == null) {
            return;
        }
        graknGraphRDD.sparkContext.cancelJobGroup(this.jobGroupId);
    }

    private Future<ComputerResult> submitWithExecutor(Executor executor) {
        getGraphRDD(this);
        this.jobGroupId = Integer.toString(ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE));
        String obj = this.vertexProgram == null ? this.mapReducers.toString() : this.vertexProgram + "+" + this.mapReducers;
        this.sparkConfiguration.setProperty("gremlin.hadoop.outputLocation", this.sparkConfiguration.getString("gremlin.hadoop.outputLocation") + "/" + this.jobGroupId);
        this.apacheConfiguration.setProperty("gremlin.hadoop.outputLocation", this.sparkConfiguration.getString("gremlin.hadoop.outputLocation"));
        this.hadoopConfiguration.set("gremlin.hadoop.outputLocation", this.sparkConfiguration.getString("gremlin.hadoop.outputLocation"));
        return CompletableFuture.supplyAsync(() -> {
            graknGraphRDD.sparkContext.setJobGroup(this.jobGroupId, obj);
            long currentTimeMillis = System.currentTimeMillis();
            Memory memory = null;
            JavaPairRDD<Object, VertexWritable> javaPairRDD = null;
            JavaPairRDD javaPairRDD2 = null;
            if (null != this.vertexProgram) {
                this.mapReducers.addAll(this.vertexProgram.getMapReducers());
                memory = new GraknSparkMemory(this.vertexProgram, this.mapReducers, graknGraphRDD.sparkContext);
                this.vertexProgram.setup(memory);
                memory.broadcastMemory(graknGraphRDD.sparkContext);
                HadoopConfiguration hadoopConfiguration = new HadoopConfiguration();
                this.vertexProgram.storeState(hadoopConfiguration);
                ConfigurationUtils.copy(hadoopConfiguration, this.apacheConfiguration);
                ConfUtil.mergeApacheIntoHadoopConfiguration(hadoopConfiguration, this.hadoopConfiguration);
                while (true) {
                    memory.setInTask(true);
                    javaPairRDD2 = GraknSparkExecutor.executeVertexProgramIteration(graknGraphRDD.loadedGraphRDD, javaPairRDD2, memory, hadoopConfiguration);
                    memory.setInTask(false);
                    if (this.vertexProgram.terminate(memory)) {
                        break;
                    }
                    memory.incrIteration();
                    memory.broadcastMemory(graknGraphRDD.sparkContext);
                }
                javaPairRDD = GraknSparkExecutor.prepareFinalGraphRDD(graknGraphRDD.loadedGraphRDD, javaPairRDD2, (String[]) this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]));
                if ((this.hadoopConfiguration.get("gremlin.hadoop.graphOutputFormat", (String) null) != null || this.hadoopConfiguration.get("gremlin.spark.graphOutputRDD", (String) null) != null) && !this.persist.equals(GraphComputer.Persist.NOTHING)) {
                    try {
                        ((OutputRDD) this.hadoopConfiguration.getClass("gremlin.spark.graphOutputRDD", OutputFormatRDD.class, OutputRDD.class).newInstance()).writeGraphRDD(this.apacheConfiguration, javaPairRDD);
                    } catch (IllegalAccessException | InstantiationException e) {
                        throw new IllegalStateException(e.getMessage(), e);
                    }
                }
            }
            if (!(javaPairRDD != null)) {
                javaPairRDD = graknGraphRDD.loadedGraphRDD;
            }
            MapMemory mapMemory = null == memory ? new MapMemory() : new MapMemory(memory);
            if (!this.mapReducers.isEmpty()) {
                for (MapReduce mapReduce : this.mapReducers) {
                    HadoopConfiguration hadoopConfiguration2 = new HadoopConfiguration(this.apacheConfiguration);
                    mapReduce.storeState(hadoopConfiguration2);
                    JavaPairRDD executeMap = GraknSparkExecutor.executeMap(javaPairRDD, mapReduce, hadoopConfiguration2);
                    JavaPairRDD executeCombine = mapReduce.doStage(MapReduce.Stage.COMBINE) ? GraknSparkExecutor.executeCombine(executeMap, hadoopConfiguration2) : executeMap;
                    try {
                        mapReduce.addResultToMemory(mapMemory, ((OutputRDD) this.hadoopConfiguration.getClass("gremlin.spark.graphOutputRDD", OutputFormatRDD.class, OutputRDD.class).newInstance()).writeMemoryRDD(this.apacheConfiguration, mapReduce.getMemoryKey(), mapReduce.doStage(MapReduce.Stage.REDUCE) ? GraknSparkExecutor.executeReduce(executeCombine, mapReduce, hadoopConfiguration2) : executeCombine));
                    } catch (IllegalAccessException | InstantiationException e2) {
                        throw new IllegalStateException(e2.getMessage(), e2);
                    }
                }
            }
            if (!graknGraphRDD.outputToSpark || this.persist.equals(GraphComputer.Persist.NOTHING)) {
                javaPairRDD.unpersist();
            }
            String string = this.sparkConfiguration.getString("gremlin.hadoop.outputLocation");
            if (null != string && this.persist.equals(GraphComputer.Persist.NOTHING)) {
                if (graknGraphRDD.outputToHDFS) {
                    graknGraphRDD.fileSystemStorage.rm(string);
                }
                if (graknGraphRDD.outputToSpark) {
                    graknGraphRDD.sparkContextStorage.rm(string);
                }
            }
            mapMemory.setRuntime(System.currentTimeMillis() - currentTimeMillis);
            return new DefaultComputerResult(InputOutputHelper.getOutputGraph(this.apacheConfiguration, this.resultGraph, this.persist), mapMemory.asImmutable());
        }, executor);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void loadJars(JavaSparkContext javaSparkContext, org.apache.hadoop.conf.Configuration configuration) {
        if (configuration.getBoolean("gremlin.hadoop.jarsInDistributedCache", true)) {
            String property = null == System.getProperty("HADOOP_GREMLIN_LIBS") ? System.getenv("HADOOP_GREMLIN_LIBS") : System.getProperty("HADOOP_GREMLIN_LIBS");
            if (null == property) {
                LOGGER.warn("HADOOP_GREMLIN_LIBS is not set -- proceeding regardless");
                return;
            }
            for (String str : property.split(":")) {
                File file = new File(str);
                if (file.exists()) {
                    Stream.of((Object[]) file.listFiles()).filter(file2 -> {
                        return file2.getName().endsWith(".jar");
                    }).forEach(file3 -> {
                        javaSparkContext.addJar(file3.getAbsolutePath());
                    });
                } else {
                    LOGGER.warn(str + " does not reference a valid directory -- proceeding regardless");
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void updateLocalConfiguration(JavaSparkContext javaSparkContext, SparkConf sparkConf) {
        for (String str : new String[]{"spark.job.description", "spark.jobGroup.id", "spark.job.interruptOnCancel", "spark.scheduler.pool"}) {
            if (sparkConf.contains(str)) {
                LOGGER.info("Setting Thread Local SparkContext Property - " + str + " : " + sparkConf.get(str));
                javaSparkContext.setLocalProperty(str, sparkConf.get(str));
            }
        }
    }

    public static void main(String[] strArr) throws Exception {
        PropertiesConfiguration propertiesConfiguration = new PropertiesConfiguration(strArr[0]);
        new GraknSparkComputer(HadoopGraph.open(propertiesConfiguration)).program(VertexProgram.createVertexProgram(HadoopGraph.open(propertiesConfiguration), propertiesConfiguration)).submit().get();
    }

    private static synchronized void getGraphRDD(GraknSparkComputer graknSparkComputer) {
        if (graknGraphRDD == null || GraknGraphRDD.commit || graknGraphRDD.sparkContext == null) {
            LOGGER.info("Creating a new Grakn Graph RDD");
            graknGraphRDD = new GraknGraphRDD();
        }
    }

    public static void refresh() {
        if (GraknGraphRDD.commit) {
            return;
        }
        setCommitFlag();
    }

    private static synchronized void setCommitFlag() {
        if (GraknGraphRDD.commit) {
            return;
        }
        boolean unused = GraknGraphRDD.commit = true;
    }

    public static synchronized void clear() {
        if (graknGraphRDD != null) {
            graknGraphRDD.loadedGraphRDD = null;
            graknGraphRDD = null;
        }
        Spark.close();
    }
}
