package io.ray.streaming.runtime.master.scheduler;

import io.ray.api.ActorHandle;
import io.ray.streaming.runtime.config.StreamingConfig;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.core.resource.Container;
import io.ray.streaming.runtime.master.JobMaster;
import io.ray.streaming.runtime.master.graphmanager.GraphManager;
import io.ray.streaming.runtime.master.resourcemanager.ResourceManager;
import io.ray.streaming.runtime.master.resourcemanager.ViewBuilder;
import io.ray.streaming.runtime.master.scheduler.controller.WorkerLifecycleController;
import io.ray.streaming.runtime.worker.context.JobWorkerContext;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ray/streaming/runtime/master/scheduler/JobSchedulerImpl.class */
public class JobSchedulerImpl implements JobScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerImpl.class);
    private final JobMaster jobMaster;
    private final ResourceManager resourceManager;
    private final GraphManager graphManager;
    private final WorkerLifecycleController workerLifecycleController = new WorkerLifecycleController();
    private StreamingConfig jobConfig;

    public JobSchedulerImpl(JobMaster jobMaster) {
        this.jobMaster = jobMaster;
        this.graphManager = jobMaster.getGraphManager();
        this.resourceManager = jobMaster.getResourceManager();
        this.jobConfig = jobMaster.getRuntimeContext().getConf();
        LOG.info("Scheduler initiated.");
    }

    @Override // io.ray.streaming.runtime.master.scheduler.JobScheduler
    public boolean scheduleJob(ExecutionGraph executionGraph) {
        LOG.info("Begin scheduling. Job: {}.", executionGraph.getJobName());
        prepareResourceAndCreateWorker(executionGraph);
        executionGraph.generateActorMappings();
        initAndStart(executionGraph);
        return true;
    }

    protected void prepareResourceAndCreateWorker(ExecutionGraph executionGraph) {
        List<Container> registeredContainers = this.resourceManager.getRegisteredContainers();
        this.resourceManager.assignResource(registeredContainers, executionGraph);
        LOG.info("Allocating map is: {}.", ViewBuilder.buildResourceAssignmentView(registeredContainers));
        createWorkers(executionGraph);
    }

    private void initAndStart(ExecutionGraph executionGraph) {
        initWorkers(buildWorkersContext(executionGraph));
        initMaster();
        startWorkers(executionGraph, this.jobMaster.getRuntimeContext().lastCheckpointId);
    }

    public boolean createWorkers(ExecutionGraph executionGraph) {
        LOG.info("Begin creating workers.");
        long currentTimeMillis = System.currentTimeMillis();
        if (this.workerLifecycleController.createWorkers(executionGraph.getAllAddedExecutionVertices())) {
            LOG.info("Finished creating workers. Cost {} ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            return true;
        }
        LOG.error("Failed to create workers. Cost {} ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        return false;
    }

    protected boolean initWorkers(Map<ExecutionVertex, JobWorkerContext> map) {
        try {
            return this.workerLifecycleController.initWorkers(map, this.jobConfig.masterConfig.schedulerConfig.workerInitiationWaitTimeoutMs());
        } catch (Exception e) {
            LOG.error("Failed to initiate workers.", e);
            return false;
        }
    }

    public boolean startWorkers(ExecutionGraph executionGraph, long j) {
        try {
            return this.workerLifecycleController.startWorkers(executionGraph, j, this.jobConfig.masterConfig.schedulerConfig.workerStartingWaitTimeoutMs());
        } catch (Exception e) {
            LOG.error("Failed to start workers.", e);
            return false;
        }
    }

    protected Map<ExecutionVertex, JobWorkerContext> buildWorkersContext(ExecutionGraph executionGraph) {
        ActorHandle<JobMaster> jobMasterActor = this.jobMaster.getJobMasterActor();
        HashMap hashMap = new HashMap();
        executionGraph.getAllExecutionVertices().forEach(executionVertex -> {
            hashMap.put(executionVertex, buildJobWorkerContext(executionVertex, jobMasterActor));
        });
        return hashMap;
    }

    private JobWorkerContext buildJobWorkerContext(ExecutionVertex executionVertex, ActorHandle<JobMaster> actorHandle) {
        return new JobWorkerContext(actorHandle, executionVertex);
    }

    public boolean destroyWorkers(List<ExecutionVertex> list) {
        try {
            return this.workerLifecycleController.destroyWorkers(list);
        } catch (Exception e) {
            LOG.error("Failed to destroy workers.", e);
            return false;
        }
    }

    private void initMaster() {
        this.jobMaster.init(false);
    }
}
