package edu.iu.dsc.tws.examples.batch.kmeans;

import edu.iu.dsc.tws.api.comms.messaging.types.MessageTypes;
import edu.iu.dsc.tws.api.compute.IFunction;
import edu.iu.dsc.tws.api.compute.IMessage;
import edu.iu.dsc.tws.api.compute.TaskContext;
import edu.iu.dsc.tws.api.compute.executor.IExecutor;
import edu.iu.dsc.tws.api.compute.graph.ComputeGraph;
import edu.iu.dsc.tws.api.compute.graph.OperationMode;
import edu.iu.dsc.tws.api.compute.modifiers.Collector;
import edu.iu.dsc.tws.api.compute.modifiers.IONames;
import edu.iu.dsc.tws.api.compute.modifiers.Receptor;
import edu.iu.dsc.tws.api.compute.nodes.BaseCompute;
import edu.iu.dsc.tws.api.compute.nodes.BaseSource;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.dataset.DataPartition;
import edu.iu.dsc.tws.api.resource.IPersistentVolume;
import edu.iu.dsc.tws.api.resource.IVolatileVolume;
import edu.iu.dsc.tws.api.resource.IWorker;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.dataset.partition.EntityPartition;
import edu.iu.dsc.tws.examples.batch.cdfw.CDFConstants;
import edu.iu.dsc.tws.examples.comms.Constants;
import edu.iu.dsc.tws.task.ComputeEnvironment;
import edu.iu.dsc.tws.task.impl.ComputeGraphBuilder;
import edu.iu.dsc.tws.task.impl.TaskExecutor;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/examples/batch/kmeans/KMeansComputeJob.class */
public class KMeansComputeJob implements IWorker {
    private static final Logger LOG = Logger.getLogger(KMeansComputeJob.class.getName());

    /* loaded from: input_file:edu/iu/dsc/tws/examples/batch/kmeans/KMeansComputeJob$CentroidAggregator.class */
    public static class CentroidAggregator implements IFunction {
        private static final long serialVersionUID = -254264120110286748L;

        public Object onMessage(Object obj, Object obj2) throws ArrayIndexOutOfBoundsException {
            double[][] dArr = (double[][]) obj;
            double[][] dArr2 = (double[][]) obj2;
            double[][] dArr3 = new double[dArr.length][dArr[0].length];
            if (dArr.length != dArr2.length) {
                throw new RuntimeException("Center sizes not equal " + dArr.length + " != " + dArr2.length);
            }
            for (int i = 0; i < dArr.length; i++) {
                for (int i2 = 0; i2 < dArr[0].length; i2++) {
                    dArr3[i][i2] = dArr[i][i2] + dArr2[i][i2];
                }
            }
            return dArr3;
        }
    }

    /* loaded from: input_file:edu/iu/dsc/tws/examples/batch/kmeans/KMeansComputeJob$KMeansAllReduceTask.class */
    public static class KMeansAllReduceTask extends BaseCompute implements Collector {
        private static final long serialVersionUID = -5190777711234234L;
        private double[][] newCentroids;

        public boolean execute(IMessage iMessage) {
            double[][] dArr = (double[][]) iMessage.getContent();
            this.newCentroids = new double[dArr.length][dArr[0].length - 1];
            for (int i = 0; i < dArr.length; i++) {
                for (int i2 = 0; i2 < dArr[0].length - 1; i2++) {
                    this.newCentroids[i][i2] = dArr[i][i2] / dArr[i][dArr[0].length - 1];
                }
            }
            return true;
        }

        public DataPartition<double[][]> get() {
            return new EntityPartition(this.newCentroids);
        }

        public IONames getCollectibleNames() {
            return IONames.declare(new String[]{"centroids"});
        }

        public void prepare(Config config, TaskContext taskContext) {
            super.prepare(config, taskContext);
        }
    }

    /* loaded from: input_file:edu/iu/dsc/tws/examples/batch/kmeans/KMeansComputeJob$KMeansSourceTask.class */
    public static class KMeansSourceTask extends BaseSource implements Receptor {
        private static final long serialVersionUID = -254264120110286748L;
        private DataPartition<?> dataPartition = null;
        private DataPartition<?> centroidPartition = null;

        public void execute() {
            this.context.writeEnd("all-reduce", KMeansUtils.findNearestCenter(this.config.getIntegerValue("dim", 2).intValue(), (double[][]) this.dataPartition.first(), (double[][]) this.centroidPartition.first()));
        }

        public void add(String str, DataPartition<?> dataPartition) {
            if ("points".equals(str)) {
                this.dataPartition = dataPartition;
            }
            if ("centroids".equals(str)) {
                this.centroidPartition = dataPartition;
            }
        }

        public IONames getReceivableNames() {
            return IONames.declare(new String[]{"points", "centroids"});
        }
    }

    public void execute(Config config, int i, IWorkerController iWorkerController, IPersistentVolume iPersistentVolume, IVolatileVolume iVolatileVolume) {
        LOG.log(Level.FINE, "Task worker starting: " + i);
        ComputeEnvironment init = ComputeEnvironment.init(config, i, iWorkerController, iPersistentVolume, iVolatileVolume);
        TaskExecutor taskExecutor = init.getTaskExecutor();
        int intValue = config.getIntegerValue(CDFConstants.ARGS_PARALLELISM_VALUE).intValue();
        int intValue2 = config.getIntegerValue("dim").intValue();
        int intValue3 = config.getIntegerValue(Constants.ARGS_NUMBER_OF_FILES).intValue();
        int intValue4 = config.getIntegerValue(CDFConstants.ARGS_DSIZE).intValue();
        int intValue5 = config.getIntegerValue(CDFConstants.ARGS_CSIZE).intValue();
        int intValue6 = config.getIntegerValue(CDFConstants.ARGS_ITERATIONS).intValue();
        String str = config.getStringValue(CDFConstants.ARGS_DINPUT) + i;
        String str2 = config.getStringValue(CDFConstants.ARGS_CINPUT) + i;
        KMeansUtils.generateDataPoints(config, intValue2, intValue3, intValue4, intValue5, str, str2);
        long currentTimeMillis = System.currentTimeMillis();
        ComputeGraph buildDataPointsTG = buildDataPointsTG(str, intValue4, intValue, intValue2, config);
        ComputeGraph buildCentroidsTG = buildCentroidsTG(str2, intValue5, intValue, intValue2, config);
        ComputeGraph buildKMeansTG = buildKMeansTG(intValue, config);
        taskExecutor.execute(buildDataPointsTG, taskExecutor.plan(buildDataPointsTG));
        taskExecutor.execute(buildCentroidsTG, taskExecutor.plan(buildCentroidsTG));
        long currentTimeMillis2 = System.currentTimeMillis();
        IExecutor createExecution = taskExecutor.createExecution(buildKMeansTG);
        int i2 = 0;
        while (i2 < intValue6) {
            createExecution.execute(i2 == intValue6 - 1);
            i2++;
        }
        init.close();
        long currentTimeMillis3 = System.currentTimeMillis();
        LOG.info("Total K-Means Execution Time: " + (currentTimeMillis3 - currentTimeMillis) + "\tData Load time : " + (currentTimeMillis2 - currentTimeMillis) + "\tCompute Time : " + (currentTimeMillis3 - currentTimeMillis2));
    }

    public static ComputeGraph buildDataPointsTG(String str, int i, int i2, int i3, Config config) {
        PointDataSource pointDataSource = new PointDataSource("direct", str, "points", i3);
        ComputeGraphBuilder newBuilder = ComputeGraphBuilder.newBuilder(config);
        newBuilder.addSource("datapointsource", pointDataSource, i2);
        newBuilder.setMode(OperationMode.BATCH);
        newBuilder.setTaskGraphName("datapointsTG");
        return newBuilder.build();
    }

    public static ComputeGraph buildCentroidsTG(String str, int i, int i2, int i3, Config config) {
        PointDataSource pointDataSource = new PointDataSource("direct", str, "centroids", i3);
        ComputeGraphBuilder newBuilder = ComputeGraphBuilder.newBuilder(config);
        newBuilder.addSource("centroidsource", pointDataSource, i2);
        newBuilder.setMode(OperationMode.BATCH);
        newBuilder.setTaskGraphName("centTG");
        return newBuilder.build();
    }

    public static ComputeGraph buildKMeansTG(int i, Config config) {
        KMeansSourceTask kMeansSourceTask = new KMeansSourceTask();
        KMeansAllReduceTask kMeansAllReduceTask = new KMeansAllReduceTask();
        ComputeGraphBuilder newBuilder = ComputeGraphBuilder.newBuilder(config);
        newBuilder.addSource("kmeanssource", kMeansSourceTask, i);
        newBuilder.addCompute("kmeanssink", kMeansAllReduceTask, i).allreduce("kmeanssource").viaEdge("all-reduce").withReductionFunction(new CentroidAggregator()).withDataType(MessageTypes.OBJECT);
        newBuilder.setMode(OperationMode.BATCH);
        newBuilder.setTaskGraphName("kmeansTG");
        return newBuilder.build();
    }
}
