package edu.iu.dsc.tws.examples.task.dataparallel;

import edu.iu.dsc.tws.api.compute.TaskContext;
import edu.iu.dsc.tws.api.compute.nodes.BaseSource;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.data.FileSystem;
import edu.iu.dsc.tws.api.data.Path;
import edu.iu.dsc.tws.data.api.formatters.LocalTextInputPartitioner;
import edu.iu.dsc.tws.data.api.formatters.SharedTextInputPartitioner;
import edu.iu.dsc.tws.data.api.out.TextOutputWriter;
import edu.iu.dsc.tws.data.fs.io.InputSplit;
import edu.iu.dsc.tws.dataset.DataSink;
import edu.iu.dsc.tws.dataset.DataSource;
import edu.iu.dsc.tws.examples.comms.Constants;
import edu.iu.dsc.tws.executor.core.ExecutionRuntime;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/examples/task/dataparallel/DataParallelTask.class */
public class DataParallelTask extends BaseSource {
    private static final Logger LOG = Logger.getLogger(DataParallelTask.class.getName());
    private static final long serialVersionUID = -1;
    private DataSource<String, ?> source;
    private DataSink<String> sink;

    public void execute() {
        InputSplit nextSplit = this.source.getNextSplit(this.context.taskIndex());
        int i = 0;
        int i2 = 0;
        while (nextSplit != null) {
            int i3 = 0;
            while (!nextSplit.reachedEnd()) {
                try {
                    String str = (String) nextSplit.nextRecord((Object) null);
                    if (str != null) {
                        LOG.fine("We read value: " + str);
                        this.sink.add(this.context.taskIndex(), str);
                        i3++;
                        i2++;
                    }
                } catch (IOException e) {
                    LOG.log(Level.SEVERE, "Failed to read the input", (Throwable) e);
                }
            }
            i++;
            nextSplit = this.source.getNextSplit(this.context.taskIndex());
            LOG.info("Finished: " + this.context.taskIndex() + " count: " + i3 + " split: " + i + " total count: " + i2);
        }
        this.sink.persist();
    }

    public void prepare(Config config, TaskContext taskContext) {
        super.prepare(config, taskContext);
        String stringValue = config.getStringValue(Constants.ARGS_INPUT_DIRECTORY);
        ExecutionRuntime executionRuntime = (ExecutionRuntime) this.config.get("_twister2.runtime_");
        String stringValue2 = config.getStringValue("output");
        if (config.getBooleanValue(Constants.ARGS_SHARED_FILE_SYSTEM).booleanValue()) {
            this.source = executionRuntime.createInput(config, taskContext, new SharedTextInputPartitioner(new Path(stringValue)));
        } else {
            this.source = executionRuntime.createInput(config, taskContext, new LocalTextInputPartitioner(new Path(stringValue), taskContext.getParallelism()));
        }
        this.sink = new DataSink<>(config, new TextOutputWriter(FileSystem.WriteMode.OVERWRITE, new Path(stringValue2)));
    }
}
