package edu.iu.dsc.tws.tsched.streaming.datalocalityaware;

import edu.iu.dsc.tws.api.compute.exceptions.TaskSchedulerException;
import edu.iu.dsc.tws.api.compute.graph.ComputeGraph;
import edu.iu.dsc.tws.api.compute.graph.Vertex;
import edu.iu.dsc.tws.api.compute.schedule.ITaskScheduler;
import edu.iu.dsc.tws.api.compute.schedule.elements.Resource;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskInstanceId;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskInstancePlan;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskSchedulePlan;
import edu.iu.dsc.tws.api.compute.schedule.elements.Worker;
import edu.iu.dsc.tws.api.compute.schedule.elements.WorkerPlan;
import edu.iu.dsc.tws.api.compute.schedule.elements.WorkerSchedulePlan;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.data.FileStatus;
import edu.iu.dsc.tws.api.data.FileSystem;
import edu.iu.dsc.tws.api.data.Path;
import edu.iu.dsc.tws.data.utils.DataNodeLocatorUtils;
import edu.iu.dsc.tws.data.utils.FileSystemUtils;
import edu.iu.dsc.tws.tsched.spi.common.TaskSchedulerContext;
import edu.iu.dsc.tws.tsched.spi.taskschedule.TaskInstanceMapCalculation;
import edu.iu.dsc.tws.tsched.utils.DataTransferTimeCalculator;
import edu.iu.dsc.tws.tsched.utils.TaskAttributes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/tsched/streaming/datalocalityaware/DataLocalityStreamingTaskScheduler.class */
public class DataLocalityStreamingTaskScheduler implements ITaskScheduler {
    private Double instanceRAM;
    private Double instanceDisk;
    private Double instanceCPU;
    private Config config;
    private int workerId = 0;
    private static final Logger LOG = Logger.getLogger(DataLocalityStreamingTaskScheduler.class.getName());
    private static int globalTaskIndex = 0;
    private static List<Integer> allocatedWorkers = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:edu/iu/dsc/tws/tsched/streaming/datalocalityaware/DataLocalityStreamingTaskScheduler$GetDistanceCalculation.class */
    public class GetDistanceCalculation {
        private WorkerPlan workers;
        private int index;
        private double calculateDistance;
        private String nodesList;
        private ArrayList<DataTransferTimeCalculator> calculatedVal;
        private List<Integer> assignedWorkers;

        GetDistanceCalculation(WorkerPlan workerPlan, int i, double d, String str, List<Integer> list) {
            this.workers = workerPlan;
            this.index = i;
            this.calculateDistance = d;
            this.nodesList = str;
            this.assignedWorkers = list;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public double getCalculateDistance() {
            return this.calculateDistance;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ArrayList<DataTransferTimeCalculator> getCalculatedVal() {
            return this.calculatedVal;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public GetDistanceCalculation invoke() {
            double containerInstanceBandwidth;
            double containerInstanceLatency;
            double containerInstanceBandwidth2;
            double containerInstanceLatency2;
            this.calculatedVal = new ArrayList<>();
            for (int i = 0; i < this.workers.getNumberOfWorkers(); i++) {
                Worker worker = this.workers.getWorker(i);
                double datanodeInstanceBandwidth = TaskSchedulerContext.datanodeInstanceBandwidth(DataLocalityStreamingTaskScheduler.this.config);
                double datanodeInstanceLatency = TaskSchedulerContext.datanodeInstanceLatency(DataLocalityStreamingTaskScheduler.this.config);
                if (this.index == 0) {
                    if (worker.getProperty("bandwidth") == null || worker.getProperty("latency") == null) {
                        containerInstanceBandwidth2 = TaskSchedulerContext.containerInstanceBandwidth(DataLocalityStreamingTaskScheduler.this.config);
                        containerInstanceLatency2 = TaskSchedulerContext.containerInstanceLatency(DataLocalityStreamingTaskScheduler.this.config);
                    } else {
                        containerInstanceBandwidth2 = ((Double) worker.getProperty("bandwidth")).doubleValue();
                        containerInstanceLatency2 = ((Double) worker.getProperty("latency")).doubleValue();
                    }
                    this.calculatedVal.add(getDistance(worker, containerInstanceBandwidth2, containerInstanceLatency2, datanodeInstanceBandwidth, datanodeInstanceLatency));
                } else if (!this.assignedWorkers.contains(Integer.valueOf(worker.getId()))) {
                    if (worker.getProperty("bandwidth") == null || worker.getProperty("latency") == null) {
                        containerInstanceBandwidth = TaskSchedulerContext.containerInstanceBandwidth(DataLocalityStreamingTaskScheduler.this.config);
                        containerInstanceLatency = TaskSchedulerContext.containerInstanceLatency(DataLocalityStreamingTaskScheduler.this.config);
                    } else {
                        containerInstanceBandwidth = ((Double) worker.getProperty("bandwidth")).doubleValue();
                        containerInstanceLatency = ((Double) worker.getProperty("latency")).doubleValue();
                    }
                    this.calculatedVal.add(getDistance(worker, containerInstanceBandwidth, containerInstanceLatency, datanodeInstanceBandwidth, datanodeInstanceLatency));
                }
            }
            return this;
        }

        private DataTransferTimeCalculator getDistance(Worker worker, double d, double d2, double d3, double d4) {
            DataTransferTimeCalculator dataTransferTimeCalculator = new DataTransferTimeCalculator(this.nodesList, this.calculateDistance);
            this.calculateDistance = Math.abs(((2.0d * d) * d2) - ((2.0d * d3) * d4));
            dataTransferTimeCalculator.setRequiredDataTransferTime(Double.valueOf(this.calculateDistance));
            dataTransferTimeCalculator.setNodeName(worker.getId() + "");
            dataTransferTimeCalculator.setTaskIndex(this.index);
            return dataTransferTimeCalculator;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:edu/iu/dsc/tws/tsched/streaming/datalocalityaware/DataLocalityStreamingTaskScheduler$VertexComparator.class */
    public static class VertexComparator implements Comparator<Vertex> {
        private VertexComparator() {
        }

        @Override // java.util.Comparator
        public int compare(Vertex vertex, Vertex vertex2) {
            return vertex.getName().compareTo(vertex2.getName());
        }
    }

    public void initialize(Config config) {
        this.config = config;
        this.instanceRAM = Double.valueOf(TaskSchedulerContext.taskInstanceRam(this.config));
        this.instanceDisk = Double.valueOf(TaskSchedulerContext.taskInstanceDisk(this.config));
        this.instanceCPU = Double.valueOf(TaskSchedulerContext.taskInstanceCpu(this.config));
    }

    public void initialize(Config config, int i) {
        initialize(config);
        this.workerId = i;
    }

    public TaskSchedulePlan schedule(ComputeGraph computeGraph, WorkerPlan workerPlan) {
        HashSet hashSet = new HashSet();
        Set<Vertex> taskVertexSet = computeGraph.getTaskVertexSet();
        Map<Integer, List<TaskInstanceId>> dataLocalityStreamingSchedulingAlgorithm = dataLocalityStreamingSchedulingAlgorithm(computeGraph, workerPlan.getNumberOfWorkers(), workerPlan);
        TaskInstanceMapCalculation taskInstanceMapCalculation = new TaskInstanceMapCalculation(this.instanceRAM, this.instanceCPU, this.instanceDisk);
        Map<Integer, Map<TaskInstanceId, Double>> instancesRamMapInContainer = taskInstanceMapCalculation.getInstancesRamMapInContainer(dataLocalityStreamingSchedulingAlgorithm, taskVertexSet);
        Map<Integer, Map<TaskInstanceId, Double>> instancesDiskMapInContainer = taskInstanceMapCalculation.getInstancesDiskMapInContainer(dataLocalityStreamingSchedulingAlgorithm, taskVertexSet);
        Map<Integer, Map<TaskInstanceId, Double>> instancesCPUMapInContainer = taskInstanceMapCalculation.getInstancesCPUMapInContainer(dataLocalityStreamingSchedulingAlgorithm, taskVertexSet);
        Iterator<Integer> it = dataLocalityStreamingSchedulingAlgorithm.keySet().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            double containerRamPadding = TaskSchedulerContext.containerRamPadding(this.config);
            double containerDiskPadding = TaskSchedulerContext.containerDiskPadding(this.config);
            double containerCpuPadding = TaskSchedulerContext.containerCpuPadding(this.config);
            List<TaskInstanceId> list = dataLocalityStreamingSchedulingAlgorithm.get(Integer.valueOf(intValue));
            HashMap hashMap = new HashMap();
            for (TaskInstanceId taskInstanceId : list) {
                double doubleValue = instancesRamMapInContainer.get(Integer.valueOf(intValue)).get(taskInstanceId).doubleValue();
                double doubleValue2 = instancesDiskMapInContainer.get(Integer.valueOf(intValue)).get(taskInstanceId).doubleValue();
                hashMap.put(taskInstanceId, new TaskInstancePlan(taskInstanceId.getTaskName(), taskInstanceId.getTaskId(), taskInstanceId.getTaskIndex(), new Resource(Double.valueOf(doubleValue), Double.valueOf(doubleValue2), Double.valueOf(instancesCPUMapInContainer.get(Integer.valueOf(intValue)).get(taskInstanceId).doubleValue()))));
                containerRamPadding += doubleValue;
                containerDiskPadding += doubleValue2;
                containerCpuPadding += doubleValue2;
            }
            Worker worker = workerPlan.getWorker(intValue);
            hashSet.add(new WorkerSchedulePlan(intValue, new HashSet(hashMap.values()), (worker == null || worker.getCpu() <= 0 || worker.getDisk() <= 0 || worker.getRam() <= 0) ? new Resource(Double.valueOf(containerRamPadding), Double.valueOf(containerDiskPadding), Double.valueOf(containerCpuPadding)) : new Resource(Double.valueOf(worker.getRam()), Double.valueOf(worker.getDisk()), Double.valueOf(worker.getCpu()))));
        }
        return new TaskSchedulePlan(0, hashSet);
    }

    private Map<Integer, List<TaskInstanceId>> dataLocalityStreamingSchedulingAlgorithm(ComputeGraph computeGraph, int i, WorkerPlan workerPlan) {
        TaskAttributes taskAttributes = new TaskAttributes();
        Set<Vertex> taskVertexSet = computeGraph.getTaskVertexSet();
        int instancesPerWorker = !computeGraph.getGraphConstraints().isEmpty() ? taskAttributes.getInstancesPerWorker(computeGraph.getGraphConstraints()) : TaskSchedulerContext.defaultTaskInstancesPerContainer(this.config);
        int i2 = instancesPerWorker * i;
        int i3 = 0;
        int i4 = 0;
        int totalNumberOfInstances = !computeGraph.getNodeConstraints().isEmpty() ? taskAttributes.getTotalNumberOfInstances(taskVertexSet, computeGraph.getNodeConstraints()) : taskAttributes.getTotalNumberOfInstances(taskVertexSet);
        HashMap hashMap = new HashMap();
        if (i2 < totalNumberOfInstances) {
            throw new TaskSchedulerException("Task scheduling couldn't be performed for the container capacity of " + i2 + " and " + totalNumberOfInstances + " task instances");
        }
        LOG.info("Task scheduling could be performed for the container capacity of " + i2 + " and " + totalNumberOfInstances + " task instances");
        for (int i5 = 0; i5 < i; i5++) {
            hashMap.put(Integer.valueOf(i5), new ArrayList());
        }
        new TreeSet(new VertexComparator()).addAll(taskVertexSet);
        for (Map.Entry<String, Integer> entry : (!computeGraph.getNodeConstraints().isEmpty() ? taskAttributes.getParallelTaskMap(taskVertexSet, computeGraph.getNodeConstraints()) : taskAttributes.getParallelTaskMap(taskVertexSet)).entrySet()) {
            for (Vertex vertex : taskVertexSet) {
                if (entry.getKey().equals(vertex.getName())) {
                    int parallelism = vertex.getParallelism();
                    int i6 = 0;
                    List<DataTransferTimeCalculator> dTTimecalculatorList = dTTimecalculatorList(i3, workerPlan, hashMap, i4, instancesPerWorker);
                    for (int i7 = 0; i7 < parallelism; i7++) {
                        i4 = Integer.parseInt(((DataTransferTimeCalculator) Collections.min(dTTimecalculatorList)).getNodeName().trim());
                        if (i6 >= instancesPerWorker) {
                            throw new TaskSchedulerException("Task Scheduling couldn't be possible for the present configuration, please check the number of workers, maximum instances per worker");
                        }
                        hashMap.get(Integer.valueOf(i4)).add(new TaskInstanceId(vertex.getName(), globalTaskIndex, i7));
                        i6++;
                    }
                    globalTaskIndex++;
                    i3++;
                }
            }
        }
        return hashMap;
    }

    private List<DataTransferTimeCalculator> dTTimecalculatorList(int i, WorkerPlan workerPlan, Map<Integer, List<TaskInstanceId>> map, int i2, int i3) {
        List<String> inputFilesList = getInputFilesList();
        List<DataTransferTimeCalculator> list = null;
        DataNodeLocatorUtils dataNodeLocatorUtils = new DataNodeLocatorUtils(this.config);
        if (inputFilesList.size() > 0) {
            if (i == 0) {
                list = findBestWorkerNode(distanceCalculator(dataNodeLocatorUtils.findDataNodesLocation(inputFilesList), workerPlan, i, allocatedWorkers));
            } else {
                List<String> findDataNodesLocation = dataNodeLocatorUtils.findDataNodesLocation(inputFilesList);
                Worker worker = workerPlan.getWorker(i2);
                if (map.get(Integer.valueOf(i2)).size() >= i3) {
                    allocatedWorkers.add(Integer.valueOf(worker.getId()));
                }
                list = findBestWorkerNode(distanceCalculator(findDataNodesLocation, workerPlan, i, allocatedWorkers));
            }
        }
        return list;
    }

    private List<String> getInputFilesList() {
        ArrayList arrayList = new ArrayList();
        Path path = new Path(this.config.get("dinput") != null ? String.valueOf(this.config.get("dinput")) : null);
        try {
            FileSystem fileSystem = FileSystemUtils.get(path);
            if (this.config.get("filesys").equals("hdfs")) {
                arrayList.add(String.valueOf(fileSystem.getFileStatus(path).getPath()));
            } else if (this.config.get("filesys").equals("local")) {
                for (FileStatus fileStatus : fileSystem.listFiles(path)) {
                    String valueOf = String.valueOf(fileStatus.getPath());
                    if (valueOf != null) {
                        arrayList.add(valueOf);
                    }
                }
            }
            return arrayList;
        } catch (IOException e) {
            throw new TaskSchedulerException("Not able to get the input files", e);
        }
    }

    private Map<String, List<DataTransferTimeCalculator>> distanceCalculator(List<String> list, WorkerPlan workerPlan, int i, List<Integer> list2) {
        HashMap hashMap = new HashMap();
        double d = 0.0d;
        for (String str : list) {
            GetDistanceCalculation invoke = new GetDistanceCalculation(workerPlan, i, d, str, list2).invoke();
            d = invoke.getCalculateDistance();
            hashMap.put(str, invoke.getCalculatedVal());
        }
        return hashMap;
    }

    private static List<DataTransferTimeCalculator> findBestWorkerNode(Map<String, List<DataTransferTimeCalculator>> map) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, List<DataTransferTimeCalculator>> entry : map.entrySet()) {
            String key = entry.getKey();
            List<DataTransferTimeCalculator> value = entry.getValue();
            arrayList.add(new DataTransferTimeCalculator(((DataTransferTimeCalculator) Collections.min(value)).getNodeName(), ((DataTransferTimeCalculator) Collections.min(value)).getRequiredDataTransferTime().doubleValue(), key));
        }
        return arrayList;
    }
}
