package edu.iu.dsc.tws.examples.batch.sortop;

import edu.iu.dsc.tws.api.JobConfig;
import edu.iu.dsc.tws.api.Twister2Job;
import edu.iu.dsc.tws.api.comms.messaging.types.MessageTypes;
import edu.iu.dsc.tws.api.comms.packing.MessageSchema;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.resource.IPersistentVolume;
import edu.iu.dsc.tws.api.resource.IVolatileVolume;
import edu.iu.dsc.tws.api.resource.IWorker;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.api.resource.WorkerEnvironment;
import edu.iu.dsc.tws.comms.batch.BKeyedGather;
import edu.iu.dsc.tws.comms.utils.LogicalPlanBuilder;
import edu.iu.dsc.tws.examples.utils.bench.BenchmarkMetadata;
import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.logging.Logger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;

/* loaded from: input_file:edu/iu/dsc/tws/examples/batch/sortop/SortJob.class */
public class SortJob implements IWorker {
    private static final Logger LOG = Logger.getLogger(SortJob.class.getName());
    public static final String ARG_INPUT_FILE = "inputFile";
    public static final String ARG_OUTPUT_FOLDER = "outputFolder";
    public static final String ARG_SIZE = "size";
    public static final String ARG_KEY_SIZE = "keySize";
    public static final String ARG_VALUE_SIZE = "valueSize";
    public static final String ARG_KEY_SEED = "keySeed";
    public static final String ARG_RESOURCE_CPU = "instanceCPUs";
    public static final String ARG_RESOURCE_MEMORY = "instanceMemory";
    public static final String ARG_RESOURCE_INSTANCES = "instances";
    public static final String ARG_TASKS_SOURCES = "sources";
    public static final String ARG_TASKS_SINKS = "sinks";
    private static final String ARG_FIXED_SCHEMA = "fixedSchema";
    public static final String ARG_TUNE_MAX_BYTES_IN_MEMORY = "memoryBytesLimit";
    public static final String ARG_TUNE_MAX_SHUFFLE_FILE_SIZE = "fileSizeBytes";
    private BKeyedGather gather;
    private List<Integer> taskStages = new ArrayList();

    /* loaded from: input_file:edu/iu/dsc/tws/examples/batch/sortop/SortJob$IntegerComparator.class */
    private class IntegerComparator implements Comparator<Object> {
        private IntegerComparator() {
        }

        @Override // java.util.Comparator
        public int compare(Object obj, Object obj2) {
            byte[] bArr = (byte[]) obj;
            byte[] bArr2 = (byte[]) obj2;
            int i = 0;
            for (int i2 = 0; i < bArr.length && i2 < bArr2.length; i2++) {
                byte b = bArr[i];
                byte b2 = bArr2[i2];
                if (b != b2) {
                    return b - b2;
                }
                i++;
            }
            return bArr.length - bArr2.length;
        }
    }

    public void execute(Config config, int i, IWorkerController iWorkerController, IPersistentVolume iPersistentVolume, IVolatileVolume iVolatileVolume) {
        WorkerEnvironment init = WorkerEnvironment.init(config, i, iWorkerController, iPersistentVolume, iVolatileVolume);
        int intValue = config.getIntegerValue(ARG_TASKS_SOURCES, 4).intValue();
        int intValue2 = config.getIntegerValue(ARG_TASKS_SINKS, 4).intValue();
        this.taskStages.add(Integer.valueOf(intValue));
        this.taskStages.add(Integer.valueOf(intValue2));
        LogicalPlanBuilder withFairDistribution = LogicalPlanBuilder.plan(this.taskStages.get(0).intValue(), this.taskStages.get(1).intValue(), init).withFairDistribution();
        int intValue3 = config.getIntegerValue(ARG_VALUE_SIZE, 90).intValue();
        int intValue4 = config.getIntegerValue(ARG_KEY_SIZE, 10).intValue();
        MessageSchema noSchema = MessageSchema.noSchema();
        if (config.getBooleanValue(ARG_FIXED_SCHEMA, false).booleanValue()) {
            LOG.info("Using fixed schema feature with message size : " + (intValue4 + intValue3) + " and key size : " + intValue4);
            noSchema = MessageSchema.ofSize(intValue4 + intValue3, intValue4);
        }
        this.gather = new BKeyedGather(init.getCommunicator(), withFairDistribution, MessageTypes.BYTE_ARRAY, MessageTypes.BYTE_ARRAY, new RecordSave(), new ByteSelector(), true, new IntegerComparator(), true, noSchema);
        RecordSource recordSource = new RecordSource(config, i, this.gather, ((Integer) withFairDistribution.getSourcesOnThisWorker().iterator().next()).intValue());
        long currentTimeMillis = System.currentTimeMillis();
        recordSource.run();
        progress();
        LOG.info("Time: " + (System.currentTimeMillis() - currentTimeMillis));
    }

    private void progress() {
        while (!this.gather.isComplete()) {
            this.gather.progressChannel();
        }
    }

    private static Option createOption(String str, boolean z, String str2, boolean z2) {
        Option option = new Option(str, z, str2);
        option.setRequired(z2);
        return option;
    }

    public static void main(String[] strArr) throws ParseException {
        Config loadConfig = ResourceAllocator.loadConfig(new HashMap());
        JobConfig jobConfig = new JobConfig();
        Options options = new Options();
        options.addOption(createOption(ARG_INPUT_FILE, true, "Path to the file containing input tuples. Path can be specified with %d, where it will be replaced by task index. For example,input-%d, will be considered as input-0 in source task having index 0.", false));
        options.addOption(createOption("size", true, "Data Size in GigaBytes. A source will generate this much of data. Including size of both key and value.", false));
        options.addOption(createOption(ARG_KEY_SIZE, true, "Size of the key in bytes of a single Tuple", true));
        options.addOption(createOption(ARG_KEY_SEED, true, "Size of the key in bytes of a single Tuple", false));
        options.addOption(createOption(ARG_VALUE_SIZE, true, "Size of the value in bytes of a single Tuple", true));
        options.addOption(createOption(ARG_RESOURCE_CPU, true, "Amount of CPUs to allocate per instance", true));
        options.addOption(createOption("instanceMemory", true, "Amount of Memory in mega bytes to allocate per instance", true));
        options.addOption(createOption(ARG_RESOURCE_INSTANCES, true, "No. of instances", true));
        options.addOption(createOption(ARG_TASKS_SOURCES, true, "No of source tasks", true));
        options.addOption(createOption(ARG_TASKS_SINKS, true, "No of sink tasks", true));
        options.addOption(createOption(ARG_TUNE_MAX_BYTES_IN_MEMORY, true, "Maximum bytes to keep in memory", false));
        options.addOption(createOption(ARG_TUNE_MAX_SHUFFLE_FILE_SIZE, true, "Maximum records to keep in memory", false));
        options.addOption(createOption(BenchmarkMetadata.ARG_BENCHMARK_METADATA, true, "Auto generated argument by benchmark suite", false));
        options.addOption(createOption(ARG_OUTPUT_FOLDER, true, "Folder to save output files", false));
        options.addOption(createOption(ARG_FIXED_SCHEMA, false, "Use fixed schema feature", false));
        CommandLine parse = new DefaultParser().parse(options, strArr);
        if (parse.hasOption(ARG_INPUT_FILE)) {
            jobConfig.put(ARG_INPUT_FILE, parse.getOptionValue(ARG_INPUT_FILE));
        } else {
            jobConfig.put("size", Double.valueOf(parse.getOptionValue("size")));
            jobConfig.put(ARG_VALUE_SIZE, Integer.valueOf(parse.getOptionValue(ARG_VALUE_SIZE)));
            jobConfig.put(ARG_KEY_SIZE, Integer.valueOf(parse.getOptionValue(ARG_KEY_SIZE)));
        }
        jobConfig.put(ARG_TASKS_SOURCES, Integer.valueOf(parse.getOptionValue(ARG_TASKS_SOURCES)));
        jobConfig.put(ARG_TASKS_SINKS, Integer.valueOf(parse.getOptionValue(ARG_TASKS_SINKS)));
        jobConfig.put(ARG_RESOURCE_INSTANCES, Integer.valueOf(parse.getOptionValue(ARG_RESOURCE_INSTANCES)));
        if (parse.hasOption(ARG_TUNE_MAX_BYTES_IN_MEMORY)) {
            long longValue = Long.valueOf(parse.getOptionValue(ARG_TUNE_MAX_BYTES_IN_MEMORY)).longValue();
            jobConfig.put("twister2.network.shuffle.memory.bytes.max", Long.valueOf(longValue));
            jobConfig.put(ARG_TUNE_MAX_BYTES_IN_MEMORY, Long.valueOf(longValue));
        }
        if (parse.hasOption(ARG_TUNE_MAX_SHUFFLE_FILE_SIZE)) {
            long longValue2 = Long.valueOf(parse.getOptionValue(ARG_TUNE_MAX_SHUFFLE_FILE_SIZE)).longValue();
            jobConfig.put("twister2.network.shuffle.file.bytes.max", Long.valueOf(longValue2));
            jobConfig.put(ARG_TUNE_MAX_SHUFFLE_FILE_SIZE, Long.valueOf(longValue2));
        }
        if (parse.hasOption(BenchmarkMetadata.ARG_BENCHMARK_METADATA)) {
            jobConfig.put(BenchmarkMetadata.ARG_BENCHMARK_METADATA, parse.getOptionValue(BenchmarkMetadata.ARG_BENCHMARK_METADATA));
            jobConfig.put(BenchmarkMetadata.ARG_RUN_BENCHMARK, true);
        }
        if (parse.hasOption(ARG_OUTPUT_FOLDER)) {
            jobConfig.put(ARG_OUTPUT_FOLDER, parse.getOptionValue(ARG_OUTPUT_FOLDER));
        }
        if (parse.hasOption(ARG_FIXED_SCHEMA)) {
            jobConfig.put(ARG_FIXED_SCHEMA, true);
        }
        Twister2Submitter.submitJob(Twister2Job.newBuilder().setJobName(SortJob.class.getName()).setWorkerClass(SortJob.class.getName()).addComputeResource(Integer.valueOf(parse.getOptionValue(ARG_RESOURCE_CPU)).intValue(), Integer.valueOf(parse.getOptionValue("instanceMemory")).intValue(), Integer.valueOf(parse.getOptionValue(ARG_RESOURCE_INSTANCES)).intValue()).setConfig(jobConfig).build(), loadConfig);
    }
}
