package edu.iu.dsc.tws.examples.batch.cdfw;

import edu.iu.dsc.tws.api.JobConfig;
import edu.iu.dsc.tws.api.Twister2Job;
import edu.iu.dsc.tws.api.comms.messaging.types.MessageTypes;
import edu.iu.dsc.tws.api.compute.IFunction;
import edu.iu.dsc.tws.api.compute.TaskContext;
import edu.iu.dsc.tws.api.compute.graph.OperationMode;
import edu.iu.dsc.tws.api.compute.nodes.BaseSource;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.dataset.partition.CollectionPartition;
import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
import edu.iu.dsc.tws.task.cdfw.BaseDriver;
import edu.iu.dsc.tws.task.cdfw.CDFWEnv;
import edu.iu.dsc.tws.task.cdfw.DataFlowGraph;
import edu.iu.dsc.tws.task.cdfw.DataFlowJobConfig;
import edu.iu.dsc.tws.task.cdfw.task.ConnectedSink;
import edu.iu.dsc.tws.task.cdfw.task.ConnectedSource;
import edu.iu.dsc.tws.task.impl.ComputeGraphBuilder;
import edu.iu.dsc.tws.task.impl.cdfw.CDFWWorker;
import java.util.HashMap;
import java.util.logging.Logger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;

/* loaded from: input_file:edu/iu/dsc/tws/examples/batch/cdfw/ParallelDataFlowsExample.class */
public final class ParallelDataFlowsExample {
    private static final Logger LOG = Logger.getLogger(ParallelDataFlowsExample.class.getName());

    /* loaded from: input_file:edu/iu/dsc/tws/examples/batch/cdfw/ParallelDataFlowsExample$Aggregator.class */
    public static class Aggregator implements IFunction {
        private static final long serialVersionUID = -254264120110286748L;

        public Object onMessage(Object obj, Object obj2) throws ArrayIndexOutOfBoundsException {
            return obj.toString() + obj2.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:edu/iu/dsc/tws/examples/batch/cdfw/ParallelDataFlowsExample$FirstSourceTask.class */
    public static class FirstSourceTask extends BaseSource {
        private static final long serialVersionUID = -254264120110286748L;
        private CollectionPartition<Object> collectionPartition;

        protected FirstSourceTask() {
        }

        public void execute() {
            ParallelDataFlowsExample.LOG.fine("Context task id and index:" + this.context.taskId() + "\t" + this.context.taskIndex());
            for (int i = 0; i < 4; i++) {
                this.collectionPartition.add("PartitionValue" + i);
            }
            this.context.writeEnd("partition", this.collectionPartition);
        }

        public void prepare(Config config, TaskContext taskContext) {
            super.prepare(config, taskContext);
            this.collectionPartition = new CollectionPartition<>();
        }
    }

    /* loaded from: input_file:edu/iu/dsc/tws/examples/batch/cdfw/ParallelDataFlowsExample$ParallelDataflowsDriver.class */
    public static class ParallelDataflowsDriver extends BaseDriver {
        public void execute(CDFWEnv cDFWEnv) {
            Config config = cDFWEnv.getConfig();
            DataFlowJobConfig dataFlowJobConfig = new DataFlowJobConfig();
            DataFlowGraph generateFirstJob = ParallelDataFlowsExample.generateFirstJob(config, 2, 2, dataFlowJobConfig);
            DataFlowGraph generateSecondJob = ParallelDataFlowsExample.generateSecondJob(config, 2, 2, dataFlowJobConfig);
            cDFWEnv.executeDataFlowGraph(generateFirstJob);
            cDFWEnv.executeDataFlowGraph(generateSecondJob);
            cDFWEnv.close();
        }
    }

    private ParallelDataFlowsExample() {
    }

    public static void main(String[] strArr) throws ParseException {
        Config loadConfig = ResourceAllocator.loadConfig(new HashMap());
        HashMap hashMap = new HashMap();
        hashMap.put("twister2.exector.worker.threads", 1);
        Options options = new Options();
        options.addOption(CDFConstants.ARGS_PARALLELISM_VALUE, true, "2");
        options.addOption("workers", true, "2");
        CommandLine parse = new DefaultParser().parse(options, strArr);
        int parseInt = Integer.parseInt(parse.getOptionValue("workers"));
        int parseInt2 = Integer.parseInt(parse.getOptionValue(CDFConstants.ARGS_PARALLELISM_VALUE));
        hashMap.put("workers", Integer.toString(parseInt));
        hashMap.put(CDFConstants.ARGS_PARALLELISM_VALUE, Integer.toString(parseInt2));
        JobConfig jobConfig = new JobConfig();
        jobConfig.putAll(hashMap);
        Twister2Submitter.submitJob(Twister2Job.newBuilder().setJobName(ParallelDataFlowsExample.class.getName()).setWorkerClass(CDFWWorker.class).setDriverClass(ParallelDataflowsDriver.class.getName()).addComputeResource(1.0d, 512, parseInt, true).setConfig(jobConfig).build(), Config.newBuilder().putAll(loadConfig).put("twister2.resource.job.driver.class", (Object) null).build());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static DataFlowGraph generateFirstJob(Config config, int i, int i2, DataFlowJobConfig dataFlowJobConfig) {
        FirstSourceTask firstSourceTask = new FirstSourceTask();
        ConnectedSink connectedSink = new ConnectedSink("first_out");
        ComputeGraphBuilder newBuilder = ComputeGraphBuilder.newBuilder(config);
        newBuilder.addSource("source1", firstSourceTask, i);
        newBuilder.addCompute("sink1", connectedSink, i).partition("source1").viaEdge("partition").withDataType(MessageTypes.OBJECT);
        newBuilder.setMode(OperationMode.BATCH);
        return DataFlowGraph.newSubGraphJob("first_graph", newBuilder.build()).setWorkers(i2).addDataFlowJobConfig(dataFlowJobConfig).setGraphType("non-iterative");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static DataFlowGraph generateSecondJob(Config config, int i, int i2, DataFlowJobConfig dataFlowJobConfig) {
        ConnectedSource connectedSource = new ConnectedSource("reduce", "first_out");
        ConnectedSink connectedSink = new ConnectedSink();
        ComputeGraphBuilder newBuilder = ComputeGraphBuilder.newBuilder(config);
        newBuilder.addSource("source1", connectedSource, i);
        newBuilder.addCompute("sink1", connectedSink, 1).reduce("source1").viaEdge("reduce").withReductionFunction(new Aggregator()).withDataType(MessageTypes.OBJECT);
        newBuilder.setMode(OperationMode.BATCH);
        return DataFlowGraph.newSubGraphJob("second_graph", newBuilder.build()).setWorkers(i2).addDataFlowJobConfig(dataFlowJobConfig).setGraphType("non-iterative");
    }
}
