package io.ray.streaming.runtime.master.scheduler.controller;

import io.ray.api.ActorHandle;
import io.ray.api.BaseActorHandle;
import io.ray.api.Ray;
import io.ray.api.function.PyActorClass;
import io.ray.streaming.api.Language;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.python.GraphPbBuilder;
import io.ray.streaming.runtime.rpc.RemoteCallWorker;
import io.ray.streaming.runtime.worker.JobWorker;
import io.ray.streaming.runtime.worker.context.JobWorkerContext;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ray/streaming/runtime/master/scheduler/controller/WorkerLifecycleController.class */
public class WorkerLifecycleController {
    private static final Logger LOG = LoggerFactory.getLogger(WorkerLifecycleController.class);

    public boolean createWorkers(List<ExecutionVertex> list) {
        return asyncBatchExecute(this::createWorker, list);
    }

    private boolean createWorker(ExecutionVertex executionVertex) {
        LOG.info("Start to create worker actor for vertex: {} with resource: {}, workeConfig: {}.", new Object[]{executionVertex.getExecutionVertexName(), executionVertex.getResource(), executionVertex.getWorkerConfig()});
        ActorHandle remote = Language.JAVA == executionVertex.getLanguage() ? Ray.actor(JobWorker::new, executionVertex).setResources(executionVertex.getResource()).setMaxRestarts(-1).remote() : Ray.actor(PyActorClass.of("ray.streaming.runtime.worker", "JobWorker"), new GraphPbBuilder().buildVertex(executionVertex).toByteArray()).setResources(executionVertex.getResource()).setMaxRestarts(-1).remote();
        if (remote == null) {
            LOG.error("Create worker actor failed.");
            return false;
        }
        executionVertex.setWorkerActor(remote);
        LOG.info("Worker actor created, actor: {}, vertex: {}.", executionVertex.getWorkerActorId(), executionVertex.getExecutionVertexName());
        return true;
    }

    public boolean initWorkers(Map<ExecutionVertex, JobWorkerContext> map, int i) {
        LOG.info("Begin initiating workers: {}.", map);
        long currentTimeMillis = System.currentTimeMillis();
        HashMap hashMap = new HashMap();
        map.entrySet().forEach(entry -> {
            ExecutionVertex executionVertex = (ExecutionVertex) entry.getKey();
            hashMap.put(RemoteCallWorker.initWorker(executionVertex.getWorkerActor(), (JobWorkerContext) entry.getValue()), executionVertex.getWorkerActorId());
        });
        ArrayList arrayList = new ArrayList(hashMap.keySet());
        LOG.info("Waiting for workers' initialization.");
        if (Ray.wait(arrayList, arrayList.size(), i).getReady().size() != arrayList.size()) {
            LOG.error("Initializing workers timeout[{} ms].", Integer.valueOf(i));
            return false;
        }
        LOG.info("Finished waiting workers' initialization.");
        LOG.info("Workers initialized. Cost {} ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        return true;
    }

    public boolean startWorkers(ExecutionGraph executionGraph, long j, int i) {
        LOG.info("Begin starting workers.");
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        executionGraph.getSourceActors().forEach(baseActorHandle -> {
            arrayList.add(RemoteCallWorker.rollback(baseActorHandle, Long.valueOf(j)));
        });
        executionGraph.getNonSourceActors().forEach(baseActorHandle2 -> {
            arrayList.add(RemoteCallWorker.rollback(baseActorHandle2, Long.valueOf(j)));
        });
        if (Ray.wait(arrayList, arrayList.size(), i).getReady().size() != arrayList.size()) {
            LOG.error("Starting workers timeout[{} ms].", Integer.valueOf(i));
            return false;
        }
        LOG.info("Workers started. Cost {} ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        return true;
    }

    public boolean destroyWorkers(List<ExecutionVertex> list) {
        return asyncBatchExecute(this::destroyWorker, list);
    }

    private boolean destroyWorker(ExecutionVertex executionVertex) {
        BaseActorHandle workerActor = executionVertex.getWorkerActor();
        LOG.info("Begin destroying worker[vertex={}, actor={}].", executionVertex.getExecutionVertexName(), workerActor.getId());
        if (RemoteCallWorker.shutdownWithoutReconstruction(workerActor).booleanValue()) {
            LOG.info("Worker destroyed, actor: {}.", workerActor);
            return true;
        }
        LOG.error("Failed to destroy JobWorker[{}]'s actor: {}.", executionVertex.getExecutionVertexName(), workerActor);
        return false;
    }

    private boolean asyncBatchExecute(Function<ExecutionVertex, Boolean> function, List<ExecutionVertex> list) {
        Object asyncContext = Ray.getAsyncContext();
        if (!((List) ((List) list.stream().map(executionVertex -> {
            return CompletableFuture.supplyAsync(() -> {
                Ray.setAsyncContext(asyncContext);
                return (Boolean) function.apply(executionVertex);
            });
        }).collect(Collectors.toList())).stream().map((v0) -> {
            return v0.join();
        }).collect(Collectors.toList())).stream().anyMatch(bool -> {
            return !bool.booleanValue();
        })) {
            return true;
        }
        LOG.error("Not all futures return true, check ResourceManager'log the detail.");
        return false;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("io/ray/api/function/RayFunc1") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("io/ray/streaming/runtime/worker/JobWorker") && serializedLambda.getImplMethodSignature().equals("(Lio/ray/streaming/runtime/core/graph/executiongraph/ExecutionVertex;)V")) {
                    return JobWorker::new;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
