package org.janusgraph.graphdb.olap.computer;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.search.path.ShortestPathVertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.HaltedTraverserStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Property;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
import org.janusgraph.core.JanusGraphComputer;
import org.janusgraph.core.JanusGraphException;
import org.janusgraph.core.JanusGraphTransaction;
import org.janusgraph.core.JanusGraphVertex;
import org.janusgraph.core.schema.JanusGraphManagement;
import org.janusgraph.core.schema.SchemaInspector;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.configuration.Configuration;
import org.janusgraph.diskstorage.keycolumnvalue.scan.ScanMetrics;
import org.janusgraph.diskstorage.keycolumnvalue.scan.StandardScanner;
import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
import org.janusgraph.graphdb.database.StandardJanusGraph;
import org.janusgraph.graphdb.database.cache.StandardSchemaCache;
import org.janusgraph.graphdb.olap.computer.VertexMapJob;
import org.janusgraph.graphdb.olap.computer.VertexProgramScanJob;
import org.janusgraph.graphdb.util.WorkerPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/janusgraph/graphdb/olap/computer/FulgoraGraphComputer.class */
public class FulgoraGraphComputer implements JanusGraphComputer {
    private VertexProgram<?> vertexProgram;
    private final StandardJanusGraph graph;
    private FulgoraMemory memory;
    private FulgoraVertexMemory vertexMemory;
    private final int readBatchSize;
    private final int writeBatchSize;
    private String jobId;
    private static final Logger log = LoggerFactory.getLogger(FulgoraGraphComputer.class);
    private static final AtomicInteger computerCounter = new AtomicInteger(0);
    private final Set<MapReduce> mapReduces = new HashSet();
    private final int expectedNumVertices = StandardSchemaCache.MAX_CACHED_TYPES_DEFAULT;
    private boolean executed = false;
    private int numThreads = 1;
    private GraphComputer.ResultGraph resultGraphMode = null;
    private GraphComputer.Persist persistMode = null;
    private final GraphFilter graphFilter = new GraphFilter();
    private final String name = "compute" + computerCounter.incrementAndGet();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/janusgraph/graphdb/olap/computer/FulgoraGraphComputer$VertexPropertyWriter.class */
    public class VertexPropertyWriter implements Runnable {
        private final List<Map.Entry<Long, Map<String, Object>>> properties;
        private final AtomicInteger failures;
        static final /* synthetic */ boolean $assertionsDisabled;

        private VertexPropertyWriter(List<Map.Entry<Long, Map<String, Object>>> list, AtomicInteger atomicInteger) {
            if (!$assertionsDisabled && (list == null || list.isEmpty() || atomicInteger == null)) {
                throw new AssertionError();
            }
            this.properties = list;
            this.failures = atomicInteger;
        }

        @Override // java.lang.Runnable
        public void run() {
            JanusGraphTransaction start = FulgoraGraphComputer.this.graph.buildTransaction().enableBatchLoading().start();
            try {
                try {
                    for (Map.Entry<Long, Map<String, Object>> entry : this.properties) {
                        JanusGraphVertex vertex = start.getVertex(entry.getKey().longValue());
                        if (vertex != null) {
                            for (Map.Entry<String, Object> entry2 : entry.getValue().entrySet()) {
                                vertex.property(VertexProperty.Cardinality.single, entry2.getKey(), entry2.getValue(), new Object[0]);
                            }
                        }
                    }
                    start.commit();
                    if (start == null || !start.isOpen()) {
                        return;
                    }
                    start.rollback();
                } catch (Throwable th) {
                    this.failures.incrementAndGet();
                    FulgoraGraphComputer.log.error("Encountered exception while trying to write properties: ", th);
                    if (start == null || !start.isOpen()) {
                        return;
                    }
                    start.rollback();
                }
            } catch (Throwable th2) {
                if (start != null && start.isOpen()) {
                    start.rollback();
                }
                throw th2;
            }
        }

        static {
            $assertionsDisabled = !FulgoraGraphComputer.class.desiredAssertionStatus();
        }
    }

    public FulgoraGraphComputer(StandardJanusGraph standardJanusGraph, Configuration configuration) {
        this.graph = standardJanusGraph;
        this.writeBatchSize = ((Integer) configuration.get(GraphDatabaseConfiguration.BUFFER_SIZE, new String[0])).intValue();
        this.readBatchSize = this.writeBatchSize * 10;
    }

    public GraphComputer vertices(Traversal<Vertex, Vertex> traversal) {
        this.graphFilter.setVertexFilter(traversal);
        return this;
    }

    public GraphComputer edges(Traversal<Vertex, Edge> traversal) {
        this.graphFilter.setEdgeFilter(traversal);
        return this;
    }

    public GraphComputer vertexProperties(Traversal<Vertex, ? extends Property<?>> traversal) {
        this.graphFilter.setVertexPropertyFilter(traversal);
        return this;
    }

    public GraphComputer result(GraphComputer.ResultGraph resultGraph) {
        Preconditions.checkNotNull(resultGraph, "Need to specify mode");
        this.resultGraphMode = resultGraph;
        return this;
    }

    public GraphComputer persist(GraphComputer.Persist persist) {
        Preconditions.checkNotNull(persist, "Need to specify mode");
        this.persistMode = persist;
        return this;
    }

    @Override // org.janusgraph.core.JanusGraphComputer
    /* renamed from: workers */
    public JanusGraphComputer mo7workers(int i) {
        Preconditions.checkArgument(i > 0, "Invalid number of threads: %s", i);
        this.numThreads = i;
        return this;
    }

    public GraphComputer program(VertexProgram vertexProgram) {
        Preconditions.checkState(this.vertexProgram == null, "A vertex program has already been set");
        this.vertexProgram = vertexProgram;
        return this;
    }

    public GraphComputer mapReduce(MapReduce mapReduce) {
        this.mapReduces.add(mapReduce);
        return this;
    }

    public Future<ComputerResult> submit() {
        guardAgainstDuplicateSubmission();
        ensureSettingsAreValid();
        initializeMemory();
        return CompletableFuture.supplyAsync(this::submitAsync);
    }

    private void guardAgainstDuplicateSubmission() {
        if (this.executed) {
            throw GraphComputer.Exceptions.computerHasAlreadyBeenSubmittedAVertexProgram();
        }
        this.executed = true;
    }

    private void ensureSettingsAreValid() {
        if (null == this.vertexProgram && this.mapReduces.isEmpty()) {
            throw GraphComputer.Exceptions.computerHasNoVertexProgramNorMapReducers();
        }
        if (null != this.vertexProgram) {
            GraphComputerHelper.validateProgramOnComputer(this, this.vertexProgram);
            this.mapReduces.addAll(this.vertexProgram.getMapReducers());
        }
        this.persistMode = GraphComputerHelper.getPersistState(Optional.ofNullable(this.vertexProgram), Optional.ofNullable(this.persistMode));
        this.resultGraphMode = GraphComputerHelper.getResultGraphState(Optional.ofNullable(this.vertexProgram), Optional.ofNullable(this.resultGraphMode));
        if (!features().supportsResultGraphPersistCombination(this.resultGraphMode, this.persistMode)) {
            throw GraphComputer.Exceptions.resultGraphPersistCombinationNotSupported(this.resultGraphMode, this.persistMode);
        }
        if (this.numThreads > features().getMaxWorkers()) {
            throw GraphComputer.Exceptions.computerRequiresMoreWorkersThanSupported(this.numThreads, features().getMaxWorkers());
        }
    }

    private void initializeMemory() {
        this.memory = new FulgoraMemory(this.vertexProgram, this.mapReduces);
    }

    private ComputerResult submitAsync() {
        long currentTimeMillis = System.currentTimeMillis();
        executeVertexProgram();
        executeMapJobs(collectMapJobs());
        Graph writeMutatedPropertiesBackIntoGraph = writeMutatedPropertiesBackIntoGraph();
        this.memory.setRuntime(System.currentTimeMillis() - currentTimeMillis);
        this.memory.complete();
        return new DefaultComputerResult(writeMutatedPropertiesBackIntoGraph, this.memory);
    }

    /* JADX WARN: Finally extract failed */
    private void executeVertexProgram() {
        if (null == this.vertexProgram) {
            return;
        }
        this.vertexMemory = new FulgoraVertexMemory(StandardSchemaCache.MAX_CACHED_TYPES_DEFAULT, this.graph.getIDManager(), this.vertexProgram);
        this.vertexProgram.setup(this.memory);
        VertexProgramScanJob.Executor vertexProgramScanJob = VertexProgramScanJob.getVertexProgramScanJob(this.graph, this.memory, this.vertexMemory, this.vertexProgram);
        Throwable th = null;
        try {
            int i = 1;
            while (true) {
                this.memory.completeSubRound();
                executeIterationOfJob(vertexProgramScanJob, i);
                this.memory.completeSubRound();
                try {
                    if (this.vertexProgram.terminate(this.memory)) {
                        break;
                    }
                    this.memory.incrIteration();
                    i++;
                } catch (Throwable th2) {
                    this.memory.incrIteration();
                    throw th2;
                }
            }
            this.memory.incrIteration();
            if (vertexProgramScanJob != null) {
                if (0 == 0) {
                    vertexProgramScanJob.close();
                    return;
                }
                try {
                    vertexProgramScanJob.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            }
        } catch (Throwable th4) {
            if (vertexProgramScanJob != null) {
                if (0 != 0) {
                    try {
                        vertexProgramScanJob.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    vertexProgramScanJob.close();
                }
            }
            throw th4;
        }
    }

    private void executeIterationOfJob(VertexProgramScanJob.Executor executor, int i) {
        initializeVertexMemoryForIteration();
        StandardScanner.Builder createScanBuilderForJob = createScanBuilderForJob(executor, i);
        try {
            executeOnPartitionedVertices(i, new PartitionedVertexProgramExecutor(this.graph, this.memory, this.vertexMemory, this.vertexProgram), executeOnNonPartitionedVertices(i, createScanBuilderForJob));
            this.vertexMemory.completeIteration();
        } catch (Exception e) {
            throw new JanusGraphException(e);
        }
    }

    private void initializeVertexMemoryForIteration() {
        if (!(this.vertexProgram instanceof ShortestPathVertexProgram)) {
            this.vertexMemory.nextIteration(this.vertexProgram.getMessageScopes(this.memory));
            return;
        }
        HashSet hashSet = new HashSet();
        hashSet.add(MessageScope.Local.of(() -> {
            return __.bothE(new String[0]);
        }));
        hashSet.add(MessageScope.Global.instance());
        this.vertexMemory.nextIteration(hashSet);
    }

    private StandardScanner.Builder createScanBuilderForJob(VertexProgramScanJob.Executor executor, int i) {
        this.jobId = this.name + "#" + i;
        StandardScanner.Builder buildEdgeScanJob = this.graph.getBackend().buildEdgeScanJob();
        buildEdgeScanJob.setJobId(this.jobId);
        buildEdgeScanJob.setNumProcessingThreads(this.numThreads);
        buildEdgeScanJob.setWorkBlockSize(this.readBatchSize);
        buildEdgeScanJob.setJob(executor);
        return buildEdgeScanJob;
    }

    private ScanMetrics executeOnNonPartitionedVertices(int i, StandardScanner.Builder builder) throws InterruptedException, ExecutionException, BackendException {
        ScanMetrics scanMetrics = builder.execute().get();
        long j = scanMetrics.get(ScanMetrics.Metric.FAILURE);
        if (j > 0) {
            throw new JanusGraphException("Failed to process [" + j + "] vertices in vertex program iteration [" + i + "]. Computer is aborting.");
        }
        return scanMetrics;
    }

    private void executeOnPartitionedVertices(int i, PartitionedVertexProgramExecutor partitionedVertexProgramExecutor, ScanMetrics scanMetrics) {
        partitionedVertexProgramExecutor.run(this.numThreads, scanMetrics);
        long custom = scanMetrics.getCustom(PartitionedVertexProgramExecutor.PARTITION_VERTEX_POSTFAIL);
        if (custom > 0) {
            throw new JanusGraphException("Failed to process [" + custom + "] partitioned vertices in vertex program iteration [" + i + "]. Computer is aborting.");
        }
    }

    private Map<MapReduce, FulgoraMapEmitter> collectMapJobs() {
        HashMap hashMap = new HashMap(this.mapReduces.size());
        for (MapReduce mapReduce : this.mapReduces) {
            if (mapReduce.doStage(MapReduce.Stage.MAP)) {
                hashMap.put(mapReduce, new FulgoraMapEmitter(mapReduce.doStage(MapReduce.Stage.REDUCE)));
            }
        }
        return hashMap;
    }

    private void executeMapJobs(Map<MapReduce, FulgoraMapEmitter> map) {
        this.jobId = this.name + "#map";
        VertexMapJob.Executor vertexMapJob = VertexMapJob.getVertexMapJob(this.graph, this.vertexMemory, map);
        Throwable th = null;
        try {
            try {
                executeMapJob(vertexMapJob);
                executeReducePhase(map);
                if (vertexMapJob != null) {
                    if (0 != 0) {
                        try {
                            vertexMapJob.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        vertexMapJob.close();
                    }
                }
                this.memory.attachReferenceElements(this.graph);
            } finally {
            }
        } catch (Throwable th3) {
            if (vertexMapJob != null) {
                if (th != null) {
                    try {
                        vertexMapJob.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    vertexMapJob.close();
                }
            }
            throw th3;
        }
    }

    private void executeMapJob(VertexMapJob.Executor executor) {
        StandardScanner.Builder buildEdgeScanJob = this.graph.getBackend().buildEdgeScanJob();
        buildEdgeScanJob.setJobId(this.jobId);
        buildEdgeScanJob.setNumProcessingThreads(this.numThreads);
        buildEdgeScanJob.setWorkBlockSize(this.readBatchSize);
        buildEdgeScanJob.setJob(executor);
        try {
            ScanMetrics scanMetrics = buildEdgeScanJob.execute().get();
            long j = scanMetrics.get(ScanMetrics.Metric.FAILURE);
            if (j > 0) {
                throw new JanusGraphException("Failed to process [" + j + "] vertices in map phase. Computer is aborting.");
            }
            long custom = scanMetrics.getCustom(VertexMapJob.MAP_JOB_FAILURE);
            if (custom > 0) {
                throw new JanusGraphException("Failed to process [" + custom + "] individual map jobs. Computer is aborting.");
            }
        } catch (JanusGraphException e) {
            throw e;
        } catch (Exception e2) {
            throw new JanusGraphException(e2);
        }
    }

    private void executeReducePhase(Map<MapReduce, FulgoraMapEmitter> map) {
        Map.Entry entry;
        for (Map.Entry<MapReduce, FulgoraMapEmitter> entry2 : map.entrySet()) {
            FulgoraMapEmitter value = entry2.getValue();
            MapReduce key = entry2.getKey();
            value.complete(key);
            if (key.doStage(MapReduce.Stage.REDUCE)) {
                FulgoraReduceEmitter fulgoraReduceEmitter = new FulgoraReduceEmitter();
                try {
                    WorkerPool workerPool = new WorkerPool(this.numThreads);
                    Throwable th = null;
                    try {
                        try {
                            workerPool.submit(() -> {
                                key.workerStart(MapReduce.Stage.REDUCE);
                            });
                            Iterator it = value.reduceMap.entrySet().iterator();
                            while (it.hasNext() && null != (entry = (Map.Entry) it.next())) {
                                workerPool.submit(() -> {
                                    key.reduce(entry.getKey(), ((Iterable) entry.getValue()).iterator(), fulgoraReduceEmitter);
                                });
                            }
                            workerPool.submit(() -> {
                                key.workerEnd(MapReduce.Stage.REDUCE);
                            });
                            if (workerPool != null) {
                                if (0 != 0) {
                                    try {
                                        workerPool.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    workerPool.close();
                                }
                            }
                            fulgoraReduceEmitter.complete(key);
                            key.addResultToMemory(this.memory, fulgoraReduceEmitter.reduceQueue.iterator());
                        } finally {
                        }
                    } finally {
                    }
                } catch (Exception e) {
                    throw new JanusGraphException("Exception while executing reduce phase", e);
                }
            } else {
                key.addResultToMemory(this.memory, value.mapQueue.iterator());
            }
        }
    }

    private Graph writeMutatedPropertiesBackIntoGraph() {
        SchemaInspector schemaInspector = this.graph;
        if (this.persistMode == GraphComputer.Persist.NOTHING && this.resultGraphMode == GraphComputer.ResultGraph.NEW) {
            schemaInspector = EmptyGraph.instance();
        } else if (this.persistMode != GraphComputer.Persist.NOTHING && this.vertexProgram != null && !this.vertexProgram.getVertexComputeKeys().isEmpty()) {
            JanusGraphManagement openManagement = this.graph.openManagement();
            try {
                for (VertexComputeKey vertexComputeKey : this.vertexProgram.getVertexComputeKeys()) {
                    if (!openManagement.containsPropertyKey(vertexComputeKey.getKey())) {
                        log.warn("Property key [{}] is not part of the schema and will be created. It is advised to initialize all keys.", vertexComputeKey.getKey());
                    }
                    openManagement.getOrCreatePropertyKey(vertexComputeKey.getKey());
                }
                openManagement.commit();
                if (openManagement != null && openManagement.isOpen()) {
                    openManagement.rollback();
                }
                HaltedTraverserStrategy reference = HaltedTraverserStrategy.reference();
                Map transformValues = Maps.transformValues(this.vertexMemory.getMutableVertexProperties(), map -> {
                    return Maps.transformEntries(Maps.filterKeys(map, str -> {
                        return !VertexProgramHelper.isTransientVertexComputeKey(str, this.vertexProgram.getVertexComputeKeys());
                    }), (str2, obj) -> {
                        if (str2 == null || !str2.equals("gremlin.traversalVertexProgram.haltedTraversers") || !(obj instanceof TraverserSet)) {
                            return obj;
                        }
                        TraverserSet traverserSet = new TraverserSet();
                        Iterator it = ((TraverserSet) obj).iterator();
                        while (it.hasNext()) {
                            traverserSet.add(reference.halt((Traverser.Admin) it.next()));
                        }
                        return traverserSet;
                    });
                });
                if (this.resultGraphMode == GraphComputer.ResultGraph.ORIGINAL) {
                    AtomicInteger atomicInteger = new AtomicInteger(0);
                    try {
                        WorkerPool workerPool = new WorkerPool(this.numThreads);
                        Throwable th = null;
                        try {
                            try {
                                ArrayList arrayList = new ArrayList(this.writeBatchSize / this.vertexProgram.getVertexComputeKeys().size());
                                int i = 0;
                                for (Map.Entry entry : transformValues.entrySet()) {
                                    arrayList.add(entry);
                                    i += ((Map) entry.getValue()).size();
                                    if (i >= this.writeBatchSize) {
                                        workerPool.submit(new VertexPropertyWriter(arrayList, atomicInteger));
                                        arrayList = new ArrayList(arrayList.size());
                                        i = 0;
                                    }
                                }
                                if (!arrayList.isEmpty()) {
                                    workerPool.submit(new VertexPropertyWriter(arrayList, atomicInteger));
                                }
                                if (workerPool != null) {
                                    if (0 != 0) {
                                        try {
                                            workerPool.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        workerPool.close();
                                    }
                                }
                                if (atomicInteger.get() > 0) {
                                    throw new JanusGraphException("Could not persist program results to graph. Check log for details.");
                                }
                            } finally {
                            }
                        } finally {
                        }
                    } catch (Exception e) {
                        throw new JanusGraphException("Exception while attempting to persist result into graph", e);
                    }
                } else if (this.resultGraphMode == GraphComputer.ResultGraph.NEW) {
                    schemaInspector = this.graph.newTransaction();
                    for (Map.Entry entry2 : transformValues.entrySet()) {
                        Vertex vertex = (Vertex) schemaInspector.vertices(new Object[]{entry2.getKey()}).next();
                        for (Map.Entry entry3 : ((Map) entry2.getValue()).entrySet()) {
                            if (entry3.getValue() instanceof List) {
                                ((List) entry3.getValue()).forEach(obj -> {
                                    vertex.property(VertexProperty.Cardinality.list, (String) entry3.getKey(), obj, new Object[0]);
                                });
                            } else {
                                vertex.property(VertexProperty.Cardinality.single, (String) entry3.getKey(), entry3.getValue(), new Object[0]);
                            }
                        }
                    }
                }
            } catch (Throwable th3) {
                if (openManagement != null && openManagement.isOpen()) {
                    openManagement.rollback();
                }
                throw th3;
            }
        }
        return schemaInspector;
    }

    public String toString() {
        return StringFactory.graphComputerString(this);
    }

    public GraphComputer.Features features() {
        return new GraphComputer.Features() { // from class: org.janusgraph.graphdb.olap.computer.FulgoraGraphComputer.1
            public boolean supportsVertexAddition() {
                return false;
            }

            public boolean supportsVertexRemoval() {
                return false;
            }

            public boolean supportsVertexPropertyAddition() {
                return true;
            }

            public boolean supportsVertexPropertyRemoval() {
                return false;
            }

            public boolean supportsEdgeAddition() {
                return false;
            }

            public boolean supportsEdgeRemoval() {
                return false;
            }

            public boolean supportsEdgePropertyAddition() {
                return false;
            }

            public boolean supportsEdgePropertyRemoval() {
                return false;
            }

            public boolean supportsGraphFilter() {
                return false;
            }
        };
    }
}
