package edu.iu.dsc.tws.examples.batch.wordcount.tset;

import edu.iu.dsc.tws.api.JobConfig;
import edu.iu.dsc.tws.api.Twister2Job;
import edu.iu.dsc.tws.api.comms.structs.Tuple;
import edu.iu.dsc.tws.api.data.Path;
import edu.iu.dsc.tws.api.resource.Twister2Worker;
import edu.iu.dsc.tws.api.resource.WorkerEnvironment;
import edu.iu.dsc.tws.api.tset.TSetContext;
import edu.iu.dsc.tws.api.tset.fn.BaseApplyFunc;
import edu.iu.dsc.tws.api.tset.fn.BaseSourceFunc;
import edu.iu.dsc.tws.data.api.formatters.LocalTextInputPartitioner;
import edu.iu.dsc.tws.data.api.splits.FileInputSplit;
import edu.iu.dsc.tws.data.fs.io.InputSplit;
import edu.iu.dsc.tws.dataset.DataSource;
import edu.iu.dsc.tws.examples.batch.cdfw.CDFConstants;
import edu.iu.dsc.tws.examples.comms.Constants;
import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
import edu.iu.dsc.tws.tset.env.BatchEnvironment;
import edu.iu.dsc.tws.tset.env.TSetEnvironment;
import edu.iu.dsc.tws.tset.fn.HashingPartitioner;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.TreeMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;

/* loaded from: input_file:edu/iu/dsc/tws/examples/batch/wordcount/tset/FileBasedWordCount.class */
public class FileBasedWordCount implements Twister2Worker, Serializable {
    private static final Logger LOG = Logger.getLogger(FileBasedWordCount.class.getName());

    /* loaded from: input_file:edu/iu/dsc/tws/examples/batch/wordcount/tset/FileBasedWordCount$WordCountFileSource.class */
    static class WordCountFileSource extends BaseSourceFunc<String> {
        private DataSource<String, FileInputSplit<String>> dataSource;
        private InputSplit<String> dataSplit;

        WordCountFileSource() {
        }

        public void prepare(TSetContext tSetContext) {
            super.prepare(tSetContext);
            this.dataSource = new DataSource<>(tSetContext.getConfig(), new LocalTextInputPartitioner(new Path((String) tSetContext.getConfig().get("INPUT_FILE")), tSetContext.getParallelism()), tSetContext.getParallelism());
            this.dataSplit = this.dataSource.getNextSplit(tSetContext.getIndex());
        }

        public boolean hasNext() {
            try {
                if (this.dataSplit == null || this.dataSplit.reachedEnd()) {
                    this.dataSplit = this.dataSource.getNextSplit(getTSetContext().getIndex());
                }
                if (this.dataSplit != null) {
                    if (!this.dataSplit.reachedEnd()) {
                        return true;
                    }
                }
                return false;
            } catch (IOException e) {
                throw new RuntimeException("Unable read data split!");
            }
        }

        /* renamed from: next, reason: merged with bridge method [inline-methods] */
        public String m32next() {
            try {
                return (String) this.dataSplit.nextRecord((Object) null);
            } catch (IOException e) {
                throw new RuntimeException("Unable read data split!");
            }
        }
    }

    /* loaded from: input_file:edu/iu/dsc/tws/examples/batch/wordcount/tset/FileBasedWordCount$WordcountFileWriter.class */
    static class WordcountFileWriter extends BaseApplyFunc<Tuple<String, Integer>> {
        private BufferedWriter writer;

        WordcountFileWriter() {
        }

        public void prepare(TSetContext tSetContext) {
            super.prepare(tSetContext);
            try {
                File file = new File(tSetContext.getConfig().get("OUTPUT_FILE") + "." + getTSetContext().getIndex());
                file.getParentFile().mkdirs();
                this.writer = new BufferedWriter(new FileWriter(file, false));
            } catch (IOException e) {
                throw new RuntimeException("Unable to create file writer!");
            }
        }

        public void apply(Tuple<String, Integer> tuple) {
            try {
                this.writer.write(((String) tuple.getKey()) + " " + tuple.getValue());
                this.writer.newLine();
            } catch (IOException e) {
                throw new RuntimeException("Unable to write!");
            }
        }

        public void close() {
            try {
                this.writer.close();
            } catch (IOException e) {
                throw new RuntimeException("Unable to close the writer!");
            }
        }
    }

    public void execute(WorkerEnvironment workerEnvironment) {
        BatchEnvironment initBatch = TSetEnvironment.initBatch(workerEnvironment);
        initBatch.createSource(new WordCountFileSource(), 1).partition(new HashingPartitioner(), ((Integer) initBatch.getConfig().get("PAR")).intValue()).flatmap((str, recordCollector) -> {
            StringTokenizer stringTokenizer = new StringTokenizer(str);
            while (stringTokenizer.hasMoreTokens()) {
                recordCollector.collect(stringTokenizer.nextToken());
            }
        }).mapToTuple(str2 -> {
            return new Tuple(str2, 1);
        }).keyedReduce((v0, v1) -> {
            return Integer.sum(v0, v1);
        }).map(tuple -> {
            return tuple;
        }).gather().forEach(new WordcountFileWriter());
    }

    private static void downloadFile(String str) {
        try {
            Files.copy(new URL("https://www.gutenberg.org/files/1342/1342-0.txt").openStream(), Paths.get(str, new String[0]), StandardCopyOption.REPLACE_EXISTING);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] strArr) throws Exception {
        LOG.log(Level.INFO, "Starting wordcount Job");
        Options options = new Options();
        options.addOption(Constants.ARGS_INPUT_DIRECTORY, true, "Input file");
        options.addOption("output", true, "Output file");
        options.addOption(CDFConstants.ARGS_PARALLELISM_VALUE, true, "Parallelism");
        options.addOption("validate", true, "validate?");
        CommandLine parse = new DefaultParser().parse(options, strArr);
        String optionValue = parse.getOptionValue(Constants.ARGS_INPUT_DIRECTORY);
        if (optionValue == null) {
            LOG.warning("No input is provided! Downloading Pride and Prejudice text from Gutenberg project");
            optionValue = "/tmp/wc/wordcount.in";
            downloadFile(optionValue);
        }
        String optionValue2 = parse.getOptionValue("output", "/tmp/wc/wordcount.out");
        int parseInt = Integer.parseInt(parse.getOptionValue(CDFConstants.ARGS_PARALLELISM_VALUE, "4"));
        boolean parseBoolean = Boolean.parseBoolean(parse.getOptionValue("validate", "true"));
        LOG.info("Wordcount input: " + optionValue + " output: " + optionValue2 + " parallelism: " + parseInt);
        long nanoTime = System.nanoTime();
        JobConfig jobConfig = new JobConfig();
        jobConfig.put("INPUT_FILE", optionValue);
        jobConfig.put("OUTPUT_FILE", optionValue2);
        jobConfig.put("PAR", Integer.valueOf(parseInt));
        Twister2Job.Twister2JobBuilder newBuilder = Twister2Job.newBuilder();
        newBuilder.setJobName("tset-wordcount");
        newBuilder.setWorkerClass(FileBasedWordCount.class);
        newBuilder.addComputeResource(1.0d, 512, parseInt);
        newBuilder.setConfig(jobConfig);
        Twister2Submitter.submitJob(newBuilder.build(), ResourceAllocator.getDefaultConfig());
        LOG.info("time elapsed ms " + ((System.nanoTime() - nanoTime) * 1.0E-6d));
        if (parseBoolean) {
            LOG.info("validating results!");
            TreeMap treeMap = new TreeMap();
            BufferedReader bufferedReader = new BufferedReader(new FileReader(optionValue));
            while (true) {
                try {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        break;
                    }
                    StringTokenizer stringTokenizer = new StringTokenizer(readLine);
                    while (stringTokenizer.hasMoreTokens()) {
                        String nextToken = stringTokenizer.nextToken();
                        treeMap.putIfAbsent(nextToken, 0);
                        treeMap.put(nextToken, Integer.valueOf(((Integer) treeMap.get(nextToken)).intValue() + 1));
                    }
                } finally {
                }
            }
            bufferedReader.close();
            TreeMap treeMap2 = new TreeMap();
            bufferedReader = new BufferedReader(new FileReader(optionValue2 + ".0"));
            while (true) {
                try {
                    String readLine2 = bufferedReader.readLine();
                    if (readLine2 == null || readLine2.isEmpty()) {
                        break;
                    }
                    String[] split = readLine2.split(" ");
                    treeMap2.put(split[0].trim(), Integer.valueOf(Integer.parseInt(split[1])));
                } finally {
                }
            }
            bufferedReader.close();
            for (Map.Entry entry : treeMap.entrySet()) {
                int intValue = ((Integer) treeMap2.get(entry.getKey())).intValue();
                if (intValue != ((Integer) entry.getValue()).intValue()) {
                    LOG.severe(String.format("Expected: %s %d Got: %s %d", entry.getKey(), entry.getValue(), entry.getKey(), Integer.valueOf(intValue)));
                }
            }
            if (treeMap2.equals(treeMap)) {
                LOG.info("RESULTS VALID!");
                return;
            }
            LOG.severe("UNSUCCESSFUL!");
            BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(optionValue2 + ".trusted", false));
            try {
                for (Map.Entry entry2 : treeMap.entrySet()) {
                    bufferedWriter.write(String.format("%s %d\n", entry2.getKey(), entry2.getValue()));
                }
                bufferedWriter.close();
            } catch (Throwable th) {
                try {
                    bufferedWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1880731031:
                if (implMethodName.equals("lambda$execute$2960c211$1")) {
                    z = 3;
                    break;
                }
                break;
            case -1073687404:
                if (implMethodName.equals("lambda$execute$c230688e$1")) {
                    z = false;
                    break;
                }
                break;
            case 114251:
                if (implMethodName.equals("sum")) {
                    z = 2;
                    break;
                }
                break;
            case 438951263:
                if (implMethodName.equals("lambda$execute$5fafd741$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("edu/iu/dsc/tws/api/tset/fn/FlatMapFunc") && serializedLambda.getFunctionalInterfaceMethodName().equals("flatMap") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ledu/iu/dsc/tws/api/tset/fn/RecordCollector;)V") && serializedLambda.getImplClass().equals("edu/iu/dsc/tws/examples/batch/wordcount/tset/FileBasedWordCount") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ledu/iu/dsc/tws/api/tset/fn/RecordCollector;)V")) {
                    return (str, recordCollector) -> {
                        StringTokenizer stringTokenizer = new StringTokenizer(str);
                        while (stringTokenizer.hasMoreTokens()) {
                            recordCollector.collect(stringTokenizer.nextToken());
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("edu/iu/dsc/tws/api/tset/fn/MapFunc") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("edu/iu/dsc/tws/examples/batch/wordcount/tset/FileBasedWordCount") && serializedLambda.getImplMethodSignature().equals("(Ledu/iu/dsc/tws/api/comms/structs/Tuple;)Ledu/iu/dsc/tws/api/comms/structs/Tuple;")) {
                    return tuple -> {
                        return tuple;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("edu/iu/dsc/tws/api/tset/fn/ReduceFunc") && serializedLambda.getFunctionalInterfaceMethodName().equals("reduce") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("(II)I")) {
                    return (v0, v1) -> {
                        return Integer.sum(v0, v1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("edu/iu/dsc/tws/api/tset/fn/MapFunc") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("edu/iu/dsc/tws/examples/batch/wordcount/tset/FileBasedWordCount") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ledu/iu/dsc/tws/api/comms/structs/Tuple;")) {
                    return str2 -> {
                        return new Tuple(str2, 1);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
