package io.ray.streaming.runtime.master.graphmanager;

import io.ray.api.BaseActorHandle;
import io.ray.streaming.jobgraph.JobGraph;
import io.ray.streaming.jobgraph.JobVertex;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionEdge;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionJobEdge;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionJobVertex;
import io.ray.streaming.runtime.master.context.JobMasterRuntimeContext;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ray/streaming/runtime/master/graphmanager/GraphManagerImpl.class */
public class GraphManagerImpl implements GraphManager {
    private static final Logger LOG = LoggerFactory.getLogger(GraphManagerImpl.class);
    protected final JobMasterRuntimeContext runtimeContext;

    public GraphManagerImpl(JobMasterRuntimeContext jobMasterRuntimeContext) {
        this.runtimeContext = jobMasterRuntimeContext;
    }

    @Override // io.ray.streaming.runtime.master.graphmanager.GraphManager
    public ExecutionGraph buildExecutionGraph(JobGraph jobGraph) {
        LOG.info("Begin build execution graph with job graph {}.", jobGraph);
        ExecutionGraph executionGraph = setupStructure(jobGraph);
        executionGraph.setMaxParallelism(((Integer) jobGraph.getJobVertices().stream().map((v0) -> {
            return v0.getParallelism();
        }).max((v0, v1) -> {
            return v0.compareTo(v1);
        }).get()).intValue());
        executionGraph.setJobConfig(jobGraph.getJobConfig());
        LOG.info("Build execution graph success.");
        return executionGraph;
    }

    private ExecutionGraph setupStructure(JobGraph jobGraph) {
        ExecutionGraph executionGraph = new ExecutionGraph(jobGraph.getJobName());
        Map jobConfig = jobGraph.getJobConfig();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        HashMap hashMap = new HashMap();
        long buildTime = executionGraph.getBuildTime();
        for (JobVertex jobVertex : jobGraph.getJobVertices()) {
            linkedHashMap.put(Integer.valueOf(jobVertex.getVertexId()), new ExecutionJobVertex(jobVertex, jobConfig, executionGraph.getExecutionVertexIdGenerator(), buildTime));
        }
        jobGraph.getJobEdges().forEach(jobEdge -> {
            ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) linkedHashMap.get(Integer.valueOf(jobEdge.getSrcVertexId()));
            ExecutionJobVertex executionJobVertex2 = (ExecutionJobVertex) linkedHashMap.get(Integer.valueOf(jobEdge.getTargetVertexId()));
            ExecutionJobEdge executionJobEdge = new ExecutionJobEdge(executionJobVertex, executionJobVertex2, jobEdge);
            executionJobVertex.getOutputEdges().add(executionJobEdge);
            executionJobVertex2.getInputEdges().add(executionJobEdge);
            executionJobVertex.getExecutionVertices().forEach(executionVertex -> {
                executionJobVertex2.getExecutionVertices().forEach(executionVertex -> {
                    hashMap.put(Integer.valueOf(executionVertex.getExecutionVertexId()), executionVertex);
                    hashMap.put(Integer.valueOf(executionVertex.getExecutionVertexId()), executionVertex);
                    ExecutionEdge executionEdge = new ExecutionEdge(executionVertex, executionVertex, executionJobEdge);
                    executionVertex.getOutputEdges().add(executionEdge);
                    executionVertex.getInputEdges().add(executionEdge);
                });
            });
        });
        executionGraph.setExecutionJobVertexMap(linkedHashMap);
        executionGraph.setExecutionVertexMap(hashMap);
        return executionGraph;
    }

    private void addActorToChannelGroupedActors(Map<String, Set<BaseActorHandle>> map, String str, BaseActorHandle baseActorHandle) {
        map.computeIfAbsent(str, str2 -> {
            return new HashSet();
        }).add(baseActorHandle);
    }

    @Override // io.ray.streaming.runtime.master.graphmanager.GraphManager
    public JobGraph getJobGraph() {
        return this.runtimeContext.getJobGraph();
    }

    @Override // io.ray.streaming.runtime.master.graphmanager.GraphManager
    public ExecutionGraph getExecutionGraph() {
        return this.runtimeContext.getExecutionGraph();
    }
}
