package edu.iu.dsc.tws.tsched.batch.datalocalityaware;

import edu.iu.dsc.tws.api.compute.graph.ComputeGraph;
import edu.iu.dsc.tws.api.compute.graph.Vertex;
import edu.iu.dsc.tws.api.compute.schedule.ITaskScheduler;
import edu.iu.dsc.tws.api.compute.schedule.elements.Resource;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskInstanceId;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskInstancePlan;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskSchedulePlan;
import edu.iu.dsc.tws.api.compute.schedule.elements.Worker;
import edu.iu.dsc.tws.api.compute.schedule.elements.WorkerPlan;
import edu.iu.dsc.tws.api.compute.schedule.elements.WorkerSchedulePlan;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.data.FileStatus;
import edu.iu.dsc.tws.api.data.FileSystem;
import edu.iu.dsc.tws.api.data.Path;
import edu.iu.dsc.tws.data.utils.DataNodeLocatorUtils;
import edu.iu.dsc.tws.data.utils.FileSystemUtils;
import edu.iu.dsc.tws.tsched.spi.common.TaskSchedulerContext;
import edu.iu.dsc.tws.tsched.spi.taskschedule.TaskInstanceMapCalculation;
import edu.iu.dsc.tws.tsched.utils.DataTransferTimeCalculator;
import edu.iu.dsc.tws.tsched.utils.TaskAttributes;
import edu.iu.dsc.tws.tsched.utils.TaskVertexParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/tsched/batch/datalocalityaware/DataLocalityBatchTaskScheduler.class */
public class DataLocalityBatchTaskScheduler implements ITaskScheduler {
    private static final Logger LOG = Logger.getLogger(DataLocalityBatchTaskScheduler.class.getName());
    private int gTaskId = 0;
    private Double instanceRAM;
    private Double instanceDisk;
    private Double instanceCPU;
    private Config config;
    private int workerId;
    private Map<Integer, List<TaskInstanceId>> dataLocalityAwareAllocation;
    private TaskAttributes taskAttributes;
    private DataNodeLocatorUtils dataNodeLocatorUtils;

    public void initialize(Config config) {
        this.config = config;
        this.instanceRAM = Double.valueOf(TaskSchedulerContext.taskInstanceRam(this.config));
        this.instanceDisk = Double.valueOf(TaskSchedulerContext.taskInstanceDisk(this.config));
        this.instanceCPU = Double.valueOf(TaskSchedulerContext.taskInstanceCpu(this.config));
        this.dataNodeLocatorUtils = new DataNodeLocatorUtils(this.config);
        this.dataLocalityAwareAllocation = new HashMap();
        this.taskAttributes = new TaskAttributes();
    }

    public void initialize(Config config, int i) {
        initialize(config);
        this.workerId = i;
    }

    public TaskSchedulePlan schedule(ComputeGraph computeGraph, WorkerPlan workerPlan) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (int i = 0; i < workerPlan.getNumberOfWorkers(); i++) {
            this.dataLocalityAwareAllocation.put(Integer.valueOf(i), new ArrayList());
        }
        LinkedHashSet linkedHashSet = new LinkedHashSet(computeGraph.getTaskVertexSet());
        for (Set<Vertex> set : new TaskVertexParser().parseVertexSet(computeGraph)) {
            Map<Integer, List<TaskInstanceId>> dataLocalityBatchSchedulingAlgorithm = set.size() > 1 ? dataLocalityBatchSchedulingAlgorithm(computeGraph, set, workerPlan) : dataLocalityBatchSchedulingAlgorithm(computeGraph, set.iterator().next(), workerPlan);
            TaskInstanceMapCalculation taskInstanceMapCalculation = new TaskInstanceMapCalculation(this.instanceRAM, this.instanceCPU, this.instanceDisk);
            Map<Integer, Map<TaskInstanceId, Double>> instancesRamMapInContainer = taskInstanceMapCalculation.getInstancesRamMapInContainer(dataLocalityBatchSchedulingAlgorithm, linkedHashSet);
            Map<Integer, Map<TaskInstanceId, Double>> instancesDiskMapInContainer = taskInstanceMapCalculation.getInstancesDiskMapInContainer(dataLocalityBatchSchedulingAlgorithm, linkedHashSet);
            Map<Integer, Map<TaskInstanceId, Double>> instancesCPUMapInContainer = taskInstanceMapCalculation.getInstancesCPUMapInContainer(dataLocalityBatchSchedulingAlgorithm, linkedHashSet);
            Iterator<Integer> it = dataLocalityBatchSchedulingAlgorithm.keySet().iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                double containerRamPadding = TaskSchedulerContext.containerRamPadding(this.config);
                double containerDiskPadding = TaskSchedulerContext.containerDiskPadding(this.config);
                double containerCpuPadding = TaskSchedulerContext.containerCpuPadding(this.config);
                List<TaskInstanceId> list = dataLocalityBatchSchedulingAlgorithm.get(Integer.valueOf(intValue));
                HashMap hashMap = new HashMap();
                for (TaskInstanceId taskInstanceId : list) {
                    double doubleValue = instancesRamMapInContainer.get(Integer.valueOf(intValue)).get(taskInstanceId).doubleValue();
                    double doubleValue2 = instancesDiskMapInContainer.get(Integer.valueOf(intValue)).get(taskInstanceId).doubleValue();
                    hashMap.put(taskInstanceId, new TaskInstancePlan(taskInstanceId.getTaskName(), taskInstanceId.getTaskId(), taskInstanceId.getTaskIndex(), new Resource(Double.valueOf(doubleValue), Double.valueOf(doubleValue2), Double.valueOf(instancesCPUMapInContainer.get(Integer.valueOf(intValue)).get(taskInstanceId).doubleValue()))));
                    containerRamPadding += doubleValue;
                    containerDiskPadding += doubleValue2;
                    containerCpuPadding += doubleValue2;
                }
                Worker worker = workerPlan.getWorker(intValue);
                Resource resource = (worker == null || worker.getCpu() <= 0 || worker.getDisk() <= 0 || worker.getRam() <= 0) ? new Resource(Double.valueOf(containerRamPadding), Double.valueOf(containerDiskPadding), Double.valueOf(containerCpuPadding)) : new Resource(Double.valueOf(worker.getRam()), Double.valueOf(worker.getDisk()), Double.valueOf(worker.getCpu()));
                if (linkedHashMap.containsKey(Integer.valueOf(intValue))) {
                    ((WorkerSchedulePlan) linkedHashMap.get(Integer.valueOf(intValue))).getTaskInstances().addAll(hashMap.values());
                } else {
                    linkedHashMap.put(Integer.valueOf(intValue), new WorkerSchedulePlan(intValue, new HashSet(hashMap.values()), resource));
                }
            }
        }
        TaskSchedulePlan taskSchedulePlan = new TaskSchedulePlan(0, new HashSet(linkedHashMap.values()));
        for (Map.Entry entry : taskSchedulePlan.getContainersMap().entrySet()) {
            Integer num = (Integer) entry.getKey();
            Set<TaskInstancePlan> taskInstances = ((WorkerSchedulePlan) entry.getValue()).getTaskInstances();
            LOG.fine("Task Details for Container Id:" + num + "\tsize:" + taskInstances.size());
            for (TaskInstancePlan taskInstancePlan : taskInstances) {
                LOG.fine("TaskId:" + taskInstancePlan.getTaskId() + "\tTask Index" + taskInstancePlan.getTaskIndex() + "\tTask Name:" + taskInstancePlan.getTaskName());
            }
        }
        return taskSchedulePlan;
    }

    private Map<Integer, List<TaskInstanceId>> dataLocalityBatchSchedulingAlgorithm(ComputeGraph computeGraph, Vertex vertex, WorkerPlan workerPlan) {
        if (computeGraph.getGraphConstraints().isEmpty()) {
            this.dataLocalityAwareAllocation = nonAttributeBasedAllocation(this.taskAttributes.getParallelTaskMap(vertex), workerPlan);
        } else {
            this.dataLocalityAwareAllocation = attributeBasedAllocation(!computeGraph.getNodeConstraints().isEmpty() ? this.taskAttributes.getParallelTaskMap(vertex, computeGraph.getNodeConstraints()) : this.taskAttributes.getParallelTaskMap(vertex), computeGraph, workerPlan);
        }
        return this.dataLocalityAwareAllocation;
    }

    private Map<Integer, List<TaskInstanceId>> dataLocalityBatchSchedulingAlgorithm(ComputeGraph computeGraph, Set<Vertex> set, WorkerPlan workerPlan) {
        if (computeGraph.getGraphConstraints().isEmpty()) {
            this.dataLocalityAwareAllocation = nonAttributeBasedAllocation(this.taskAttributes.getParallelTaskMap(set), workerPlan);
        } else {
            this.dataLocalityAwareAllocation = attributeBasedAllocation(!computeGraph.getNodeConstraints().isEmpty() ? this.taskAttributes.getParallelTaskMap(set, computeGraph.getNodeConstraints()) : this.taskAttributes.getParallelTaskMap(set), computeGraph, workerPlan);
        }
        return this.dataLocalityAwareAllocation;
    }

    private Map<Integer, List<TaskInstanceId>> attributeBasedAllocation(Map<String, Integer> map, ComputeGraph computeGraph, WorkerPlan workerPlan) {
        int parseInt = Integer.parseInt(getWorkerNodeList(workerPlan).get(0).getNodeName());
        int instancesPerWorker = this.taskAttributes.getInstancesPerWorker(computeGraph.getGraphConstraints());
        for (Map.Entry<String, Integer> entry : map.entrySet()) {
            String key = entry.getKey();
            int intValue = entry.getValue().intValue();
            int i = 0;
            for (int i2 = 0; i2 < intValue; i2++) {
                this.dataLocalityAwareAllocation.get(Integer.valueOf(parseInt)).add(new TaskInstanceId(key, this.gTaskId, i2));
                i++;
                if (i == instancesPerWorker) {
                    parseInt++;
                }
            }
            parseInt = 0;
            this.gTaskId++;
        }
        return this.dataLocalityAwareAllocation;
    }

    private Map<Integer, List<TaskInstanceId>> nonAttributeBasedAllocation(Map<String, Integer> map, WorkerPlan workerPlan) {
        List<DataTransferTimeCalculator> workerNodeList = getWorkerNodeList(workerPlan);
        int defaultTaskInstancesPerContainer = TaskSchedulerContext.defaultTaskInstancesPerContainer(this.config);
        int parseInt = Integer.parseInt(workerNodeList.get(0).getNodeName());
        for (Map.Entry<String, Integer> entry : map.entrySet()) {
            String key = entry.getKey();
            int intValue = entry.getValue().intValue();
            int i = 0;
            for (int i2 = 0; i2 < intValue; i2++) {
                this.dataLocalityAwareAllocation.get(Integer.valueOf(parseInt)).add(new TaskInstanceId(key, this.gTaskId, i2));
                i++;
                if (i == defaultTaskInstancesPerContainer) {
                    parseInt++;
                }
            }
            parseInt = 0;
            this.gTaskId++;
        }
        return this.dataLocalityAwareAllocation;
    }

    private List<DataTransferTimeCalculator> getWorkerNodeList(WorkerPlan workerPlan) {
        return findBestWorkerNode(calculateDistance(this.dataNodeLocatorUtils.findDataNodesLocation(getInputFilesList()), workerPlan, 0));
    }

    private List<String> getInputFilesList() {
        ArrayList arrayList = new ArrayList();
        Path path = new Path(this.config.get("dinput") != null ? String.valueOf(this.config.get("dinput")) : null);
        try {
            FileSystem fileSystem = FileSystemUtils.get(path);
            if (this.config.get("filesys").equals("hdfs")) {
                arrayList.add(String.valueOf(fileSystem.getFileStatus(path).getPath()));
            } else if (this.config.get("filesys").equals("local")) {
                for (FileStatus fileStatus : fileSystem.listFiles(path)) {
                    String valueOf = String.valueOf(fileStatus.getPath());
                    if (valueOf != null) {
                        arrayList.add(valueOf);
                    }
                }
            }
            return arrayList;
        } catch (IOException e) {
            throw new RuntimeException("IOException Occured");
        }
    }

    private Map<String, List<DataTransferTimeCalculator>> calculateDistance(List<String> list, WorkerPlan workerPlan, int i) {
        double containerInstanceBandwidth;
        double containerInstanceLatency;
        HashMap hashMap = new HashMap();
        double d = 0.0d;
        for (String str : list) {
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < workerPlan.getNumberOfWorkers(); i2++) {
                Worker worker = workerPlan.getWorker(i2);
                DataTransferTimeCalculator dataTransferTimeCalculator = new DataTransferTimeCalculator(str, d);
                if (worker.getProperty("bandwidth") == null || worker.getProperty("latency") == null) {
                    containerInstanceBandwidth = TaskSchedulerContext.containerInstanceBandwidth(this.config);
                    containerInstanceLatency = TaskSchedulerContext.containerInstanceLatency(this.config);
                } else {
                    containerInstanceBandwidth = ((Double) worker.getProperty("bandwidth")).doubleValue();
                    containerInstanceLatency = ((Double) worker.getProperty("latency")).doubleValue();
                }
                d = Math.abs(((2.0d * containerInstanceBandwidth) * containerInstanceLatency) - ((2.0d * TaskSchedulerContext.datanodeInstanceBandwidth(this.config)) * TaskSchedulerContext.datanodeInstanceLatency(this.config)));
                dataTransferTimeCalculator.setRequiredDataTransferTime(Double.valueOf(d));
                dataTransferTimeCalculator.setNodeName(worker.getId() + "");
                dataTransferTimeCalculator.setTaskIndex(i);
                arrayList.add(dataTransferTimeCalculator);
            }
            hashMap.put(str, arrayList);
        }
        return hashMap;
    }

    private static List<DataTransferTimeCalculator> findBestWorkerNode(Map<String, List<DataTransferTimeCalculator>> map) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, List<DataTransferTimeCalculator>> entry : map.entrySet()) {
            String key = entry.getKey();
            for (DataTransferTimeCalculator dataTransferTimeCalculator : entry.getValue()) {
                arrayList.add(new DataTransferTimeCalculator(dataTransferTimeCalculator.getNodeName(), dataTransferTimeCalculator.getRequiredDataTransferTime().doubleValue(), key));
            }
        }
        return arrayList;
    }
}
