package edu.iu.dsc.tws.examples.task.batch;

import edu.iu.dsc.tws.api.JobConfig;
import edu.iu.dsc.tws.api.Twister2Job;
import edu.iu.dsc.tws.api.comms.messaging.types.MessageTypes;
import edu.iu.dsc.tws.api.compute.IMessage;
import edu.iu.dsc.tws.api.compute.executor.ExecutionPlan;
import edu.iu.dsc.tws.api.compute.executor.IExecutor;
import edu.iu.dsc.tws.api.compute.graph.ComputeGraph;
import edu.iu.dsc.tws.api.compute.graph.OperationMode;
import edu.iu.dsc.tws.api.compute.modifiers.Collector;
import edu.iu.dsc.tws.api.compute.modifiers.Receptor;
import edu.iu.dsc.tws.api.compute.nodes.BaseCompute;
import edu.iu.dsc.tws.api.compute.nodes.BaseSource;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.dataset.DataObject;
import edu.iu.dsc.tws.api.dataset.DataPartition;
import edu.iu.dsc.tws.api.resource.IPersistentVolume;
import edu.iu.dsc.tws.api.resource.IVolatileVolume;
import edu.iu.dsc.tws.api.resource.IWorker;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.dataset.DataObjectImpl;
import edu.iu.dsc.tws.dataset.partition.EntityPartition;
import edu.iu.dsc.tws.examples.comms.Constants;
import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
import edu.iu.dsc.tws.task.ComputeEnvironment;
import edu.iu.dsc.tws.task.impl.ComputeGraphBuilder;
import edu.iu.dsc.tws.task.impl.TaskExecutor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/examples/task/batch/IterativeJob.class */
public class IterativeJob implements IWorker {
    private static final Logger LOG = Logger.getLogger(IterativeJob.class.getName());

    /* loaded from: input_file:edu/iu/dsc/tws/examples/task/batch/IterativeJob$IterativeSourceTask.class */
    private static class IterativeSourceTask extends BaseSource implements Receptor {
        private static final long serialVersionUID = -254264120110286748L;
        private DataObjectImpl<Object> input;
        private int count;

        private IterativeSourceTask() {
            this.count = 0;
        }

        public void execute() {
            if (this.count == 999) {
                if (this.context.writeEnd("partition", "Hello")) {
                    this.count++;
                }
            } else {
                if (this.count >= 999 || !this.context.write("partition", "Hello")) {
                    return;
                }
                this.count++;
            }
        }

        public void add(String str, DataObject<?> dataObject) {
            this.input = (DataObjectImpl) dataObject;
        }

        public void reset() {
            this.count = 0;
        }
    }

    /* loaded from: input_file:edu/iu/dsc/tws/examples/task/batch/IterativeJob$PartitionTask.class */
    private static class PartitionTask extends BaseCompute implements Collector {
        private static final long serialVersionUID = -5190777711234234L;
        private List<String> list;
        private int count;

        private PartitionTask() {
            this.list = new ArrayList();
        }

        public boolean execute(IMessage iMessage) {
            if (iMessage.getContent() instanceof Iterator) {
                while (((Iterator) iMessage.getContent()).hasNext()) {
                    Object next = ((Iterator) iMessage.getContent()).next();
                    this.count++;
                    this.list.add(next.toString());
                }
            }
            IterativeJob.LOG.info("RECEIVE Count: " + this.count);
            return true;
        }

        public void reset() {
            this.count = 0;
        }

        public DataPartition<Object> get() {
            return new EntityPartition(this.context.taskIndex(), this.list);
        }
    }

    public void execute(Config config, int i, IWorkerController iWorkerController, IPersistentVolume iPersistentVolume, IVolatileVolume iVolatileVolume) {
        LOG.log(Level.INFO, "Task worker starting: " + i);
        TaskExecutor taskExecutor = ComputeEnvironment.init(config, i, iWorkerController, iPersistentVolume, iVolatileVolume).getTaskExecutor();
        IterativeSourceTask iterativeSourceTask = new IterativeSourceTask();
        PartitionTask partitionTask = new PartitionTask();
        ComputeGraphBuilder newBuilder = ComputeGraphBuilder.newBuilder(config);
        newBuilder.addSource("source", iterativeSourceTask, 4);
        newBuilder.addCompute("sink", partitionTask, 4).partition("source").viaEdge("partition").withDataType(MessageTypes.OBJECT);
        newBuilder.setMode(OperationMode.BATCH);
        ComputeGraph build = newBuilder.build();
        ExecutionPlan plan = taskExecutor.plan(build);
        IExecutor createExecution = taskExecutor.createExecution(build, plan);
        for (int i2 = 0; i2 < 10; i2++) {
            LOG.info("Starting iteration: " + i2);
            taskExecutor.addInput(build, plan, "source", Constants.ARGS_INPUT_DIRECTORY, new DataObjectImpl(config));
            createExecution.execute();
            taskExecutor.getOutput(build, plan, "sink").getPartitions();
        }
        createExecution.closeExecution();
    }

    public static void main(String[] strArr) {
        LOG.log(Level.INFO, "Iterative job");
        Config loadConfig = ResourceAllocator.loadConfig(new HashMap());
        HashMap hashMap = new HashMap();
        hashMap.put("twister2.exector.worker.threads", 8);
        JobConfig jobConfig = new JobConfig();
        jobConfig.putAll(hashMap);
        Twister2Job.Twister2JobBuilder newBuilder = Twister2Job.newBuilder();
        newBuilder.setJobName("iterative-job");
        newBuilder.setWorkerClass(IterativeJob.class.getName());
        newBuilder.addComputeResource(4.0d, 1024, 4);
        newBuilder.setConfig(jobConfig);
        Twister2Submitter.submitJob(newBuilder.build(), loadConfig);
    }
}
