package io.ray.streaming.runtime.master.resourcemanager.strategy.impl;

import io.ray.streaming.runtime.config.types.ResourceAssignStrategyType;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionJobVertex;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.core.resource.Container;
import io.ray.streaming.runtime.core.resource.ResourceType;
import io.ray.streaming.runtime.master.resourcemanager.ResourceAssignmentView;
import io.ray.streaming.runtime.master.resourcemanager.ViewBuilder;
import io.ray.streaming.runtime.master.resourcemanager.strategy.ResourceAssignStrategy;
import io.ray.streaming.runtime.master.scheduler.ScheduleException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ray/streaming/runtime/master/resourcemanager/strategy/impl/PipelineFirstStrategy.class */
public class PipelineFirstStrategy implements ResourceAssignStrategy {
    public static final Logger LOG = LoggerFactory.getLogger(PipelineFirstStrategy.class);
    private int currentContainerIndex = 0;

    @Override // io.ray.streaming.runtime.master.resourcemanager.strategy.ResourceAssignStrategy
    public ResourceAssignmentView assignResource(List<Container> list, ExecutionGraph executionGraph) {
        Map<Integer, ExecutionJobVertex> executionJobVertexMap = executionGraph.getExecutionJobVertexMap();
        HashMap hashMap = new HashMap();
        executionJobVertexMap.forEach((num, executionJobVertex) -> {
            hashMap.put(num, Integer.valueOf(executionJobVertex.getExecutionVertices().size()));
        });
        int sum = hashMap.values().stream().mapToInt((v0) -> {
            return v0.intValue();
        }).sum();
        int size = list.size();
        int max = Math.max(sum / size, 1);
        updateContainerCapacity(list, max);
        int i = 0;
        boolean z = false;
        if (max * size < sum) {
            i = max * size;
            LOG.info("Need to enlarge capacity per container, threshold: {}.", Integer.valueOf(i));
        }
        LOG.info("Total execution vertices num: {}, container num: {}, capacity per container: {}.", new Object[]{Integer.valueOf(sum), Integer.valueOf(size), Integer.valueOf(max)});
        int maxParallelism = executionGraph.getMaxParallelism();
        int i2 = 0;
        for (int i3 = 0; i3 < maxParallelism; i3++) {
            Iterator<ExecutionJobVertex> it = executionJobVertexMap.values().iterator();
            while (it.hasNext()) {
                List<ExecutionVertex> executionVertices = it.next().getExecutionVertices();
                if (executionVertices.size() > i3) {
                    ExecutionVertex executionVertex = executionVertices.get(i3);
                    Map<String, Double> resource = executionVertex.getResource();
                    if (resource.containsKey(ResourceType.CPU.getValue())) {
                        LOG.info("Required resource contain {} value : {}, no limitation by default.", ResourceType.CPU, resource.get(ResourceType.CPU.getValue()));
                        resource.remove(ResourceType.CPU.getValue());
                    }
                    findMatchedContainer(resource, list).allocateActor(executionVertex);
                    i2++;
                    if (!z && i > 0 && i2 >= i) {
                        updateContainerCapacity(list, max + 1);
                        z = true;
                        LOG.info("Enlarge capacity per container to: {}.", Integer.valueOf(list.get(0).getCapacity()));
                    }
                }
            }
        }
        ResourceAssignmentView buildResourceAssignmentView = ViewBuilder.buildResourceAssignmentView(list);
        LOG.info("Assigning resource finished, allocating map: {}.", buildResourceAssignmentView);
        return buildResourceAssignmentView;
    }

    @Override // io.ray.streaming.runtime.master.resourcemanager.strategy.ResourceAssignStrategy
    public String getName() {
        return ResourceAssignStrategyType.PIPELINE_FIRST_STRATEGY.getName();
    }

    private void updateContainerCapacity(List<Container> list, int i) {
        list.forEach(container -> {
            container.updateCapacity(i);
        });
    }

    private Container findMatchedContainer(Map<String, Double> map, List<Container> list) {
        LOG.info("Check resource, required: {}.", map);
        int i = 0;
        while (!hasEnoughResource(map, getCurrentContainer(list))) {
            i++;
            forwardToNextContainer(list);
            if (i >= list.size()) {
                throw new ScheduleException(String.format("No enough resource left, required resource: %s, available resource: %s.", map, list));
            }
        }
        return getCurrentContainer(list);
    }

    private boolean hasEnoughResource(Map<String, Double> map, Container container) {
        LOG.info("Check resource for index: {}, container: {}", Integer.valueOf(this.currentContainerIndex), container);
        if (map == null) {
            return true;
        }
        if (container.isFull()) {
            LOG.info("Container {} is full.", container);
            return false;
        }
        Map<String, Double> availableResources = container.getAvailableResources();
        for (Map.Entry<String, Double> entry : map.entrySet()) {
            if (!availableResources.containsKey(entry.getKey())) {
                LOG.warn("No enough resource for container {}. required: {}, available: {}.", new Object[]{container.getAddress(), map, availableResources});
                return false;
            }
            if (availableResources.get(entry.getKey()).doubleValue() < entry.getValue().doubleValue()) {
                LOG.warn("No enough resource for container {}. required: {}, available: {}.", new Object[]{container.getAddress(), map, availableResources});
                return false;
            }
        }
        return true;
    }

    private Container forwardToNextContainer(List<Container> list) {
        this.currentContainerIndex = (this.currentContainerIndex + 1) % list.size();
        return getCurrentContainer(list);
    }

    private Container getCurrentContainer(List<Container> list) {
        return list.get(this.currentContainerIndex);
    }
}
