/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.tsched.batch.datalocalityaware;

import edu.iu.dsc.tws.api.compute.graph.ComputeGraph;
import edu.iu.dsc.tws.api.compute.graph.Vertex;
import edu.iu.dsc.tws.api.compute.schedule.ITaskScheduler;
import edu.iu.dsc.tws.api.compute.schedule.elements.Resource;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskInstanceId;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskInstancePlan;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskSchedulePlan;
import edu.iu.dsc.tws.api.compute.schedule.elements.Worker;
import edu.iu.dsc.tws.api.compute.schedule.elements.WorkerPlan;
import edu.iu.dsc.tws.api.compute.schedule.elements.WorkerSchedulePlan;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.data.FileStatus;
import edu.iu.dsc.tws.api.data.FileSystem;
import edu.iu.dsc.tws.api.data.Path;
import edu.iu.dsc.tws.data.utils.DataNodeLocatorUtils;
import edu.iu.dsc.tws.data.utils.FileSystemUtils;
import edu.iu.dsc.tws.tsched.spi.common.TaskSchedulerContext;
import edu.iu.dsc.tws.tsched.spi.taskschedule.TaskInstanceMapCalculation;
import edu.iu.dsc.tws.tsched.utils.DataTransferTimeCalculator;
import edu.iu.dsc.tws.tsched.utils.TaskAttributes;
import edu.iu.dsc.tws.tsched.utils.TaskVertexParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;

public class DataLocalityBatchTaskScheduler
implements ITaskScheduler {
    private static final Logger LOG = Logger.getLogger(DataLocalityBatchTaskScheduler.class.getName());
    private int gTaskId = 0;
    private Double instanceRAM;
    private Double instanceDisk;
    private Double instanceCPU;
    private Config config;
    private int workerId;
    private Map<Integer, List<TaskInstanceId>> dataLocalityAwareAllocation;
    private TaskAttributes taskAttributes;
    private DataNodeLocatorUtils dataNodeLocatorUtils;

    public void initialize(Config cfg) {
        this.config = cfg;
        this.instanceRAM = TaskSchedulerContext.taskInstanceRam(this.config);
        this.instanceDisk = TaskSchedulerContext.taskInstanceDisk(this.config);
        this.instanceCPU = TaskSchedulerContext.taskInstanceCpu(this.config);
        this.dataNodeLocatorUtils = new DataNodeLocatorUtils(this.config);
        this.dataLocalityAwareAllocation = new HashMap<Integer, List<TaskInstanceId>>();
        this.taskAttributes = new TaskAttributes();
    }

    public void initialize(Config cfg, int workerid) {
        this.initialize(cfg);
        this.workerId = workerid;
    }

    public TaskSchedulePlan schedule(ComputeGraph graph, WorkerPlan workerPlan) {
        LinkedHashMap<Integer, WorkerSchedulePlan> containerPlans = new LinkedHashMap<Integer, WorkerSchedulePlan>();
        for (int i = 0; i < workerPlan.getNumberOfWorkers(); ++i) {
            this.dataLocalityAwareAllocation.put(i, new ArrayList());
        }
        LinkedHashSet<Vertex> taskVertexSet = new LinkedHashSet<Vertex>(graph.getTaskVertexSet());
        TaskVertexParser taskVertexParser = new TaskVertexParser();
        List<Set<Vertex>> taskVertexList = taskVertexParser.parseVertexSet(graph);
        for (Set<Vertex> vertexSet : taskVertexList) {
            Map<Integer, List<TaskInstanceId>> containerInstanceMap;
            if (vertexSet.size() > 1) {
                containerInstanceMap = this.dataLocalityBatchSchedulingAlgorithm(graph, vertexSet, workerPlan);
            } else {
                Vertex vertex = vertexSet.iterator().next();
                containerInstanceMap = this.dataLocalityBatchSchedulingAlgorithm(graph, vertex, workerPlan);
            }
            TaskInstanceMapCalculation instanceMapCalculation = new TaskInstanceMapCalculation(this.instanceRAM, this.instanceCPU, this.instanceDisk);
            Map<Integer, Map<TaskInstanceId, Double>> instancesRamMap = instanceMapCalculation.getInstancesRamMapInContainer(containerInstanceMap, taskVertexSet);
            Map<Integer, Map<TaskInstanceId, Double>> instancesDiskMap = instanceMapCalculation.getInstancesDiskMapInContainer(containerInstanceMap, taskVertexSet);
            Map<Integer, Map<TaskInstanceId, Double>> instancesCPUMap = instanceMapCalculation.getInstancesCPUMapInContainer(containerInstanceMap, taskVertexSet);
            for (int containerId : containerInstanceMap.keySet()) {
                WorkerSchedulePlan taskWorkerSchedulePlan;
                double containerRAMValue = TaskSchedulerContext.containerRamPadding(this.config);
                double containerDiskValue = TaskSchedulerContext.containerDiskPadding(this.config);
                double containerCpuValue = TaskSchedulerContext.containerCpuPadding(this.config);
                List<TaskInstanceId> taskTaskInstanceIds = containerInstanceMap.get(containerId);
                HashMap<TaskInstanceId, TaskInstancePlan> taskInstancePlanMap = new HashMap<TaskInstanceId, TaskInstancePlan>();
                for (TaskInstanceId id : taskTaskInstanceIds) {
                    double instanceRAMValue = instancesRamMap.get(containerId).get(id);
                    double instanceDiskValue = instancesDiskMap.get(containerId).get(id);
                    double instanceCPUValue = instancesCPUMap.get(containerId).get(id);
                    Resource instanceResource = new Resource(Double.valueOf(instanceRAMValue), Double.valueOf(instanceDiskValue), Double.valueOf(instanceCPUValue));
                    taskInstancePlanMap.put(id, new TaskInstancePlan(id.getTaskName(), id.getTaskId(), id.getTaskIndex(), instanceResource));
                    containerRAMValue += instanceRAMValue;
                    containerDiskValue += instanceDiskValue;
                    containerCpuValue += instanceDiskValue;
                }
                Worker worker = workerPlan.getWorker(containerId);
                Resource containerResource = worker != null && worker.getCpu() > 0 && worker.getDisk() > 0 && worker.getRam() > 0 ? new Resource(Double.valueOf(worker.getRam()), Double.valueOf(worker.getDisk()), Double.valueOf(worker.getCpu())) : new Resource(Double.valueOf(containerRAMValue), Double.valueOf(containerDiskValue), Double.valueOf(containerCpuValue));
                if (containerPlans.containsKey(containerId)) {
                    taskWorkerSchedulePlan = (WorkerSchedulePlan)containerPlans.get(containerId);
                    taskWorkerSchedulePlan.getTaskInstances().addAll(taskInstancePlanMap.values());
                    continue;
                }
                taskWorkerSchedulePlan = new WorkerSchedulePlan(containerId, new HashSet(taskInstancePlanMap.values()), containerResource);
                containerPlans.put(containerId, taskWorkerSchedulePlan);
            }
        }
        TaskSchedulePlan taskSchedulePlan = new TaskSchedulePlan(0, new HashSet(containerPlans.values()));
        Map containersMap = taskSchedulePlan.getContainersMap();
        for (Map.Entry entry : containersMap.entrySet()) {
            Integer integer = (Integer)entry.getKey();
            WorkerSchedulePlan workerSchedulePlan = (WorkerSchedulePlan)entry.getValue();
            Set containerPlanTaskInstances = workerSchedulePlan.getTaskInstances();
            LOG.fine("Task Details for Container Id:" + integer + "\tsize:" + containerPlanTaskInstances.size());
            for (TaskInstancePlan ip : containerPlanTaskInstances) {
                LOG.fine("TaskId:" + ip.getTaskId() + "\tTask Index" + ip.getTaskIndex() + "\tTask Name:" + ip.getTaskName());
            }
        }
        return taskSchedulePlan;
    }

    private Map<Integer, List<TaskInstanceId>> dataLocalityBatchSchedulingAlgorithm(ComputeGraph graph, Vertex vertex, WorkerPlan workerPlan) {
        if (!graph.getGraphConstraints().isEmpty()) {
            Map<String, Integer> parallelTaskMap = !graph.getNodeConstraints().isEmpty() ? this.taskAttributes.getParallelTaskMap(vertex, (Map<String, Map<String, String>>)graph.getNodeConstraints()) : this.taskAttributes.getParallelTaskMap(vertex);
            this.dataLocalityAwareAllocation = this.attributeBasedAllocation(parallelTaskMap, graph, workerPlan);
        } else {
            Map<String, Integer> parallelTaskMap = this.taskAttributes.getParallelTaskMap(vertex);
            this.dataLocalityAwareAllocation = this.nonAttributeBasedAllocation(parallelTaskMap, workerPlan);
        }
        return this.dataLocalityAwareAllocation;
    }

    private Map<Integer, List<TaskInstanceId>> dataLocalityBatchSchedulingAlgorithm(ComputeGraph graph, Set<Vertex> vertexSet, WorkerPlan workerPlan) {
        if (!graph.getGraphConstraints().isEmpty()) {
            Map<String, Integer> parallelTaskMap = !graph.getNodeConstraints().isEmpty() ? this.taskAttributes.getParallelTaskMap(vertexSet, (Map<String, Map<String, String>>)graph.getNodeConstraints()) : this.taskAttributes.getParallelTaskMap(vertexSet);
            this.dataLocalityAwareAllocation = this.attributeBasedAllocation(parallelTaskMap, graph, workerPlan);
        } else {
            Map<String, Integer> parallelTaskMap = this.taskAttributes.getParallelTaskMap(vertexSet);
            this.dataLocalityAwareAllocation = this.nonAttributeBasedAllocation(parallelTaskMap, workerPlan);
        }
        return this.dataLocalityAwareAllocation;
    }

    private Map<Integer, List<TaskInstanceId>> attributeBasedAllocation(Map<String, Integer> parallelTaskMap, ComputeGraph graph, WorkerPlan workerPlan) {
        List<DataTransferTimeCalculator> workerNodeList = this.getWorkerNodeList(workerPlan);
        int containerIndex = Integer.parseInt(workerNodeList.get(0).getNodeName());
        int instancesPerContainer = this.taskAttributes.getInstancesPerWorker(graph.getGraphConstraints());
        for (Map.Entry<String, Integer> e : parallelTaskMap.entrySet()) {
            String task = e.getKey();
            int taskParallelism = e.getValue();
            int maxTaskObject = 0;
            for (int taskIndex = 0; taskIndex < taskParallelism; ++taskIndex) {
                this.dataLocalityAwareAllocation.get(containerIndex).add(new TaskInstanceId(task, this.gTaskId, taskIndex));
                if (++maxTaskObject != instancesPerContainer) continue;
                ++containerIndex;
            }
            containerIndex = 0;
            ++this.gTaskId;
        }
        return this.dataLocalityAwareAllocation;
    }

    private Map<Integer, List<TaskInstanceId>> nonAttributeBasedAllocation(Map<String, Integer> parallelTaskMap, WorkerPlan workerPlan) {
        List<DataTransferTimeCalculator> workerNodeList = this.getWorkerNodeList(workerPlan);
        int instancesPerContainer = TaskSchedulerContext.defaultTaskInstancesPerContainer(this.config);
        int containerIndex = Integer.parseInt(workerNodeList.get(0).getNodeName());
        for (Map.Entry<String, Integer> e : parallelTaskMap.entrySet()) {
            String task = e.getKey();
            int taskParallelism = e.getValue();
            int maxTaskObject = 0;
            for (int taskIndex = 0; taskIndex < taskParallelism; ++taskIndex) {
                this.dataLocalityAwareAllocation.get(containerIndex).add(new TaskInstanceId(task, this.gTaskId, taskIndex));
                if (++maxTaskObject != instancesPerContainer) continue;
                ++containerIndex;
            }
            containerIndex = 0;
            ++this.gTaskId;
        }
        return this.dataLocalityAwareAllocation;
    }

    private List<DataTransferTimeCalculator> getWorkerNodeList(WorkerPlan workerPlan) {
        int index = 0;
        List<String> inputDataList = this.getInputFilesList();
        List datanodesList = this.dataNodeLocatorUtils.findDataNodesLocation(inputDataList);
        Map<String, List<DataTransferTimeCalculator>> workerPlanMap = this.calculateDistance(datanodesList, workerPlan, index);
        List<DataTransferTimeCalculator> workerNodeList = DataLocalityBatchTaskScheduler.findBestWorkerNode(workerPlanMap);
        return workerNodeList;
    }

    private List<String> getInputFilesList() {
        ArrayList<String> inputDataList = new ArrayList<String>();
        String directory = null;
        if (this.config.get("dinput") != null) {
            directory = String.valueOf(this.config.get("dinput"));
        }
        Path path = new Path(directory);
        try {
            FileSystem fileSystem = FileSystemUtils.get((Path)path);
            if (this.config.get("filesys").equals("hdfs")) {
                FileStatus pathFile = fileSystem.getFileStatus(path);
                inputDataList.add(String.valueOf(pathFile.getPath()));
            } else if (this.config.get("filesys").equals("local")) {
                for (FileStatus file : fileSystem.listFiles(path)) {
                    String filename = String.valueOf(file.getPath());
                    if (filename == null) continue;
                    inputDataList.add(filename);
                }
            }
        }
        catch (IOException e) {
            throw new RuntimeException("IOException Occured");
        }
        return inputDataList;
    }

    private Map<String, List<DataTransferTimeCalculator>> calculateDistance(List<String> datanodesList, WorkerPlan workerPlan, int taskIndex) {
        HashMap<String, List<DataTransferTimeCalculator>> workerPlanMap = new HashMap<String, List<DataTransferTimeCalculator>>();
        double calculateDistance = 0.0;
        for (String nodesList : datanodesList) {
            ArrayList<DataTransferTimeCalculator> calculatedVal = new ArrayList<DataTransferTimeCalculator>();
            for (int i = 0; i < workerPlan.getNumberOfWorkers(); ++i) {
                double workerLatency;
                double workerBandwidth;
                Worker worker = workerPlan.getWorker(i);
                DataTransferTimeCalculator calculateDataTransferTime = new DataTransferTimeCalculator(nodesList, calculateDistance);
                if (worker.getProperty("bandwidth") != null && worker.getProperty("latency") != null) {
                    workerBandwidth = (Double)worker.getProperty("bandwidth");
                    workerLatency = (Double)worker.getProperty("latency");
                } else {
                    workerBandwidth = TaskSchedulerContext.containerInstanceBandwidth(this.config);
                    workerLatency = TaskSchedulerContext.containerInstanceLatency(this.config);
                }
                double datanodeBandwidth = TaskSchedulerContext.datanodeInstanceBandwidth(this.config);
                double datanodeLatency = TaskSchedulerContext.datanodeInstanceLatency(this.config);
                calculateDistance = Math.abs(2.0 * workerBandwidth * workerLatency - 2.0 * datanodeBandwidth * datanodeLatency);
                calculateDataTransferTime.setRequiredDataTransferTime(calculateDistance);
                calculateDataTransferTime.setNodeName(worker.getId() + "");
                calculateDataTransferTime.setTaskIndex(taskIndex);
                calculatedVal.add(calculateDataTransferTime);
            }
            workerPlanMap.put(nodesList, calculatedVal);
        }
        return workerPlanMap;
    }

    private static List<DataTransferTimeCalculator> findBestWorkerNode(Map<String, List<DataTransferTimeCalculator>> workerPlanMap) {
        ArrayList<DataTransferTimeCalculator> cal = new ArrayList<DataTransferTimeCalculator>();
        for (Map.Entry<String, List<DataTransferTimeCalculator>> entry : workerPlanMap.entrySet()) {
            String key = entry.getKey();
            List<DataTransferTimeCalculator> value = entry.getValue();
            for (DataTransferTimeCalculator aValue : value) {
                cal.add(new DataTransferTimeCalculator(aValue.getNodeName(), aValue.getRequiredDataTransferTime(), key));
            }
        }
        return cal;
    }
}

