package io.ray.streaming.runtime.master;

import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
import io.ray.api.ActorHandle;
import io.ray.api.Ray;
import io.ray.api.id.ActorId;
import io.ray.streaming.jobgraph.JobGraph;
import io.ray.streaming.runtime.config.StreamingConfig;
import io.ray.streaming.runtime.config.StreamingMasterConfig;
import io.ray.streaming.runtime.context.ContextBackend;
import io.ray.streaming.runtime.context.ContextBackendFactory;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.core.resource.Container;
import io.ray.streaming.runtime.generated.RemoteCall;
import io.ray.streaming.runtime.master.context.JobMasterRuntimeContext;
import io.ray.streaming.runtime.master.coordinator.CheckpointCoordinator;
import io.ray.streaming.runtime.master.coordinator.FailoverCoordinator;
import io.ray.streaming.runtime.master.coordinator.command.WorkerCommitReport;
import io.ray.streaming.runtime.master.coordinator.command.WorkerRollbackRequest;
import io.ray.streaming.runtime.master.graphmanager.GraphManager;
import io.ray.streaming.runtime.master.graphmanager.GraphManagerImpl;
import io.ray.streaming.runtime.master.resourcemanager.ResourceManager;
import io.ray.streaming.runtime.master.resourcemanager.ResourceManagerImpl;
import io.ray.streaming.runtime.master.scheduler.JobSchedulerImpl;
import io.ray.streaming.runtime.util.CheckpointStateUtil;
import io.ray.streaming.runtime.util.ResourceUtil;
import io.ray.streaming.runtime.util.Serializer;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ray/streaming/runtime/master/JobMaster.class */
public class JobMaster {
    private static final Logger LOG = LoggerFactory.getLogger(JobMaster.class);
    private JobMasterRuntimeContext runtimeContext;
    private ResourceManager resourceManager;
    private JobSchedulerImpl scheduler;
    private GraphManager graphManager;
    private StreamingMasterConfig conf;
    private ContextBackend contextBackend;
    private ActorHandle<JobMaster> jobMasterActor;
    private CheckpointCoordinator checkpointCoordinator;
    private FailoverCoordinator failoverCoordinator;

    public JobMaster(Map<String, String> map) {
        LOG.info("Creating job master with conf: {}.", map);
        StreamingConfig streamingConfig = new StreamingConfig(map);
        this.conf = streamingConfig.masterConfig;
        this.contextBackend = ContextBackendFactory.getContextBackend(this.conf);
        this.runtimeContext = new JobMasterRuntimeContext(streamingConfig);
        if (!Ray.getRuntimeContext().isSingleProcess() && Ray.getRuntimeContext().wasCurrentActorRestarted()) {
            loadMasterCheckpoint();
        }
        LOG.info("Finished creating job master.");
    }

    public static String getJobMasterRuntimeContextKey(StreamingMasterConfig streamingMasterConfig) {
        return String.valueOf(streamingMasterConfig.checkpointConfig.jobMasterContextCpPrefixKey()) + streamingMasterConfig.commonConfig.jobName();
    }

    private void loadMasterCheckpoint() {
        LOG.info("Start to load JobMaster's checkpoint.");
        byte[] bArr = CheckpointStateUtil.get(this.contextBackend, getJobMasterRuntimeContextKey(getConf()));
        if (bArr == null) {
            LOG.warn("JobMaster got empty checkpoint from state backend. Skip loading checkpoint.");
            this.runtimeContext.checkpointIds.add(0L);
        } else {
            this.runtimeContext = (JobMasterRuntimeContext) Serializer.decode(bArr);
            LOG.info("JobMaster recover runtime context[{}] from state backend.", this.runtimeContext);
            init(true);
        }
    }

    public Boolean init(boolean z) {
        LOG.info("Initializing job master, isRecover={}.", Boolean.valueOf(z));
        if (this.runtimeContext.getExecutionGraph() == null) {
            LOG.error("Init job master failed. Job graphs is null.");
            return false;
        }
        Preconditions.checkArgument(this.graphManager.getExecutionGraph() != null, "no execution graph");
        this.checkpointCoordinator = new CheckpointCoordinator(this);
        this.checkpointCoordinator.start();
        this.failoverCoordinator = new FailoverCoordinator(this, z);
        this.failoverCoordinator.start();
        saveContext();
        LOG.info("Finished initializing job master.");
        return true;
    }

    public boolean submitJob(ActorHandle<JobMaster> actorHandle, JobGraph jobGraph) {
        LOG.info("Begin submitting job using logical plan: {}.", jobGraph);
        this.jobMasterActor = actorHandle;
        this.graphManager = new GraphManagerImpl(this.runtimeContext);
        this.resourceManager = new ResourceManagerImpl(this.runtimeContext);
        ExecutionGraph buildExecutionGraph = this.graphManager.buildExecutionGraph(jobGraph);
        this.runtimeContext.setJobGraph(jobGraph);
        this.runtimeContext.setExecutionGraph(buildExecutionGraph);
        try {
            this.scheduler = new JobSchedulerImpl(this);
            this.scheduler.scheduleJob(this.graphManager.getExecutionGraph());
            return true;
        } catch (Exception e) {
            LOG.error("Failed to submit job.", e);
            return false;
        }
    }

    public synchronized void saveContext() {
        if (this.runtimeContext == null || getConf() == null) {
            return;
        }
        LOG.debug("Save JobMaster context.");
        CheckpointStateUtil.put(this.contextBackend, getJobMasterRuntimeContextKey(getConf()), Serializer.encode(this.runtimeContext));
    }

    public byte[] reportJobWorkerCommit(byte[] bArr) {
        Boolean bool = false;
        try {
            RemoteCall.BaseWorkerCmd parseFrom = RemoteCall.BaseWorkerCmd.parseFrom(bArr);
            ActorId fromBytes = ActorId.fromBytes(parseFrom.getActorId().toByteArray());
            LOG.info("Vertex {}, request job worker commit cost {}ms, actorId={}.", new Object[]{getExecutionVertex(fromBytes), Long.valueOf(System.currentTimeMillis() - parseFrom.getTimestamp()), fromBytes});
            bool = this.checkpointCoordinator.reportJobWorkerCommit(new WorkerCommitReport(fromBytes, parseFrom.getDetail().unpack(RemoteCall.WorkerCommitReport.class).getCommitCheckpointId()));
        } catch (InvalidProtocolBufferException e) {
            LOG.error("Parse job worker commit has exception.", e);
        }
        return RemoteCall.BoolResult.newBuilder().setBoolRes(bool.booleanValue()).m129build().toByteArray();
    }

    public byte[] requestJobWorkerRollback(byte[] bArr) {
        RemoteCall.BaseWorkerCmd parseFrom;
        ActorId fromBytes;
        long currentTimeMillis;
        Boolean bool = false;
        try {
            parseFrom = RemoteCall.BaseWorkerCmd.parseFrom(bArr);
            fromBytes = ActorId.fromBytes(parseFrom.getActorId().toByteArray());
            currentTimeMillis = System.currentTimeMillis() - parseFrom.getTimestamp();
        } catch (Throwable th) {
            LOG.error("Parse job worker rollback has exception.", th);
        }
        if (!this.graphManager.getExecutionGraph().getActorById(fromBytes).isPresent()) {
            LOG.warn("Skip this invalid rollback, actor id {} is not found.", fromBytes);
            return RemoteCall.BoolResult.newBuilder().setBoolRes(false).m129build().toByteArray();
        }
        ExecutionVertex executionVertex = getExecutionVertex(fromBytes);
        LOG.info("Vertex {}, request job worker rollback cost {}ms, actorId={}.", new Object[]{executionVertex, Long.valueOf(currentTimeMillis), fromBytes});
        RemoteCall.WorkerRollbackRequest parseFrom2 = RemoteCall.WorkerRollbackRequest.parseFrom(parseFrom.getDetail().getValue());
        executionVertex.setPid(parseFrom2.getWorkerPid());
        Optional<Container> containerById = ResourceUtil.getContainerById(this.resourceManager.getRegisteredContainers(), executionVertex.getContainerId());
        bool = this.failoverCoordinator.requestJobWorkerRollback(new WorkerRollbackRequest(fromBytes, parseFrom2.getExceptionMsg(), containerById.isPresent() ? containerById.get().getHostname() : "", executionVertex.getPid()));
        LOG.info("Vertex {} request rollback, exception msg : {}.", executionVertex, parseFrom2.getExceptionMsg());
        return RemoteCall.BoolResult.newBuilder().setBoolRes(bool.booleanValue()).m129build().toByteArray();
    }

    private ExecutionVertex getExecutionVertex(ActorId actorId) {
        return this.graphManager.getExecutionGraph().getExecutionVertexByActorId(actorId);
    }

    public ActorHandle<JobMaster> getJobMasterActor() {
        return this.jobMasterActor;
    }

    public JobMasterRuntimeContext getRuntimeContext() {
        return this.runtimeContext;
    }

    public ResourceManager getResourceManager() {
        return this.resourceManager;
    }

    public GraphManager getGraphManager() {
        return this.graphManager;
    }

    public StreamingMasterConfig getConf() {
        return this.conf;
    }
}
