package io.ray.streaming.runtime.master.resourcemanager;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.ray.api.Ray;
import io.ray.api.id.UniqueId;
import io.ray.api.runtimecontext.NodeInfo;
import io.ray.streaming.runtime.config.master.ResourceConfig;
import io.ray.streaming.runtime.config.types.ResourceAssignStrategyType;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import io.ray.streaming.runtime.core.resource.Container;
import io.ray.streaming.runtime.core.resource.Resources;
import io.ray.streaming.runtime.master.JobRuntimeContext;
import io.ray.streaming.runtime.master.resourcemanager.strategy.ResourceAssignStrategy;
import io.ray.streaming.runtime.master.resourcemanager.strategy.ResourceAssignStrategyFactory;
import io.ray.streaming.runtime.util.RayUtils;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ray/streaming/runtime/master/resourcemanager/ResourceManagerImpl.class */
public class ResourceManagerImpl implements ResourceManager {
    private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerImpl.class);
    private static final String CONTAINER_ENGAGED_KEY = "CONTAINER_ENGAGED_KEY";
    private JobRuntimeContext runtimeContext;
    private ResourceConfig resourceConfig;
    private ResourceAssignStrategy resourceAssignStrategy;
    private int actorNumPerContainer;
    private final ScheduledExecutorService resourceUpdater = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("resource-update-thread").build());
    private final Resources resources = new Resources();

    public ResourceManagerImpl(JobRuntimeContext jobRuntimeContext) {
        this.runtimeContext = jobRuntimeContext;
        this.resourceConfig = jobRuntimeContext.getConf().masterConfig.resourceConfig;
        LOG.info("ResourceManagerImpl begin init, conf is {}, resources are {}.", this.resourceConfig, this.resources);
        this.actorNumPerContainer = this.resourceConfig.actorNumPerContainer();
        this.resourceAssignStrategy = ResourceAssignStrategyFactory.getStrategy(ResourceAssignStrategyType.PIPELINE_FIRST_STRATEGY);
        LOG.info("Slot assign strategy: {}.", this.resourceAssignStrategy.getName());
        initResource();
        checkAndUpdateResourcePeriodically();
        LOG.info("ResourceManagerImpl init success.");
    }

    @Override // io.ray.streaming.runtime.master.resourcemanager.strategy.ResourceAssignStrategy
    public ResourceAssignmentView assignResource(List<Container> list, ExecutionGraph executionGraph) {
        return this.resourceAssignStrategy.assignResource(list, executionGraph);
    }

    @Override // io.ray.streaming.runtime.master.resourcemanager.strategy.ResourceAssignStrategy
    public String getName() {
        return this.resourceAssignStrategy.getName();
    }

    @Override // io.ray.streaming.runtime.master.resourcemanager.ResourceManager
    public ImmutableList<Container> getRegisteredContainers() {
        LOG.info("Current resource detail: {}.", this.resources.toString());
        return this.resources.getRegisteredContainers();
    }

    private void checkAndUpdateResource() {
        Map<UniqueId, NodeInfo> aliveNodeInfoMap = RayUtils.getAliveNodeInfoMap();
        List list = (List) aliveNodeInfoMap.keySet().stream().filter(this::isAddedNode).collect(Collectors.toList());
        List<UniqueId> list2 = (List) this.resources.getRegisteredContainerMap().keySet().stream().filter(uniqueId -> {
            return !aliveNodeInfoMap.containsKey(uniqueId);
        }).collect(Collectors.toList());
        LOG.info("Latest node infos: {}, current containers: {}, add nodes: {}, delete nodes: {}.", new Object[]{aliveNodeInfoMap, this.resources.getRegisteredContainers(), list, list2});
        if (list.isEmpty() && list2.isEmpty()) {
            return;
        }
        LOG.info("Latest node infos from GCS: {}", aliveNodeInfoMap);
        LOG.info("Resource details: {}.", this.resources.toString());
        LOG.info("Get add nodes info: {}, del nodes info: {}.", list, list2);
        unregisterDeletedContainer(list2);
        Stream stream = list.stream();
        aliveNodeInfoMap.getClass();
        registerNewContainers((List) stream.map((v1) -> {
            return r2.get(v1);
        }).collect(Collectors.toList()));
    }

    private void registerNewContainers(List<NodeInfo> list) {
        LOG.info("Start to register containers. new add node infos are: {}.", list);
        if (list == null || list.isEmpty()) {
            LOG.info("NodeInfos is null or empty, skip registry.");
            return;
        }
        Iterator<NodeInfo> it = list.iterator();
        while (it.hasNext()) {
            registerContainer(it.next());
        }
    }

    private void registerContainer(NodeInfo nodeInfo) {
        LOG.info("Register container {}.", nodeInfo);
        Container from = Container.from(nodeInfo);
        double allocatedActorNum = this.actorNumPerContainer - from.getAllocatedActorNum();
        Ray.setResource(from.getNodeId(), from.getName(), allocatedActorNum);
        Ray.setResource(from.getNodeId(), CONTAINER_ENGAGED_KEY, 1.0d);
        from.getAvailableResources().put(from.getName(), Double.valueOf(allocatedActorNum));
        this.resources.registerContainer(from);
    }

    private void unregisterDeletedContainer(List<UniqueId> list) {
        LOG.info("Unregister container, deleted node ids are: {}.", list);
        if (null == list || list.isEmpty()) {
            return;
        }
        this.resources.unRegisterContainer(list);
    }

    private void initResource() {
        LOG.info("Init resource.");
        checkAndUpdateResource();
    }

    private void checkAndUpdateResourcePeriodically() {
        this.resourceUpdater.scheduleAtFixedRate(Ray.wrapRunnable(this::checkAndUpdateResource), 0L, this.resourceConfig.resourceCheckIntervalSecond(), TimeUnit.SECONDS);
    }

    private boolean isAddedNode(UniqueId uniqueId) {
        return !this.resources.getRegisteredContainerMap().containsKey(uniqueId);
    }
}
