package weka.distributed.hadoop;

import distributed.core.DistributedJob;
import distributed.core.DistributedJobConfig;
import distributed.hadoop.HDFSUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Random;
import java.util.Vector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import weka.core.CommandlineRunnable;
import weka.core.Environment;
import weka.core.Instances;
import weka.core.Option;
import weka.core.Utils;
import weka.distributed.CSVToARFFHeaderMapTask;
import weka.distributed.DistributedWekaException;
import weka.gui.beans.InstancesProducer;

/* loaded from: input_file:weka/distributed/hadoop/ArffHeaderHadoopJob.class */
public class ArffHeaderHadoopJob extends HadoopJob implements InstancesProducer, CommandlineRunnable {
    private static final long serialVersionUID = -8812941574014567344L;
    public static final String OUTPUT_SUBDIR = "/arff";
    protected String m_pathToExistingHeader;
    protected String m_attributeNames;
    protected String m_attributeNamesFile;
    protected String m_wekaCsvToArffMapTaskOpts;
    protected String m_hdfsPathToAggregatedHeader;
    protected String m_outputHeaderFileName;
    protected Instances m_finalHeader;

    public ArffHeaderHadoopJob() {
        super("ARFF instances header job", "Create a consolidated ARFF header for the job");
        this.m_pathToExistingHeader = "";
        this.m_attributeNames = "";
        this.m_attributeNamesFile = "";
        this.m_wekaCsvToArffMapTaskOpts = "";
        this.m_hdfsPathToAggregatedHeader = "";
        this.m_outputHeaderFileName = "";
        this.m_mrConfig.setMapperClass(CSVToArffHeaderHadoopMapper.class.getName());
        this.m_mrConfig.setReducerClass(CSVToArffHeaderHadoopReducer.class.getName());
    }

    public String globalInfo() {
        return "Creates a unified ARFF header from input CSV files by determining column types automatically (if not specified by the user); it also computes summary statistics for each attribute that get stored in additional \"meta\" attributes. This job creates a header (with all nominal values determined) that gets used by other distributed jobs to ensure that  models created on data processed by separate map tasks are consistent and can be aggregated.";
    }

    public String getAggregatedHeaderPath() {
        return this.m_hdfsPathToAggregatedHeader;
    }

    @Override // weka.distributed.hadoop.HadoopJob
    public Enumeration<Option> listOptions() {
        Vector vector = new Vector();
        vector.add(new Option("\tPath to header file to use. Set this if you have\n\trun previous jobs that have generated a header already. Setting this\n\tprevents this job from running.", "existing-header", 1, "-existing-header"));
        vector.add(new Option("\tComma separated list of attribute names to use.\n\tUse either this option, -names-file or neither (in which case\n\tattribute names will be generated).", "A", 1, "-A <attribute names>"));
        vector.add(new Option("\tLocation of a names file to source attribute names\n\tfrom. Can exist locally or in HDFS. Use either this option, -A or neither (in which case\n\tattribute names will be generated).", "names-file", 1, "-names-file <path to file>"));
        vector.add(new Option("\tFile name for output ARFF header. Note that this is a name only\n\tand not a path. This file will be created in the output directory\n\tspecified by the -output-path option. (default is a randomly generated name)", "header-file-name", 1, "-header-file-name <name>"));
        Enumeration listOptions = new CSVToARFFHeaderMapTask().listOptions();
        while (listOptions.hasMoreElements()) {
            vector.addElement(listOptions.nextElement());
        }
        vector.add(new Option("", "", 0, "\nGeneral Hadoop job configuration options:"));
        Enumeration<Option> listOptions2 = super.listOptions();
        while (listOptions2.hasMoreElements()) {
            vector.addElement(listOptions2.nextElement());
        }
        return vector.elements();
    }

    @Override // weka.distributed.hadoop.HadoopJob
    public void setOptions(String[] strArr) throws Exception {
        setPathToExistingHeader(Utils.getOption("existing-header", strArr));
        setAttributeNames(Utils.getOption('A', strArr));
        setAttributeNamesFile(Utils.getOption("names-file", strArr));
        setOutputHeaderFileName(Utils.getOption("header-file-name", strArr));
        super.setOptions(strArr);
        CSVToARFFHeaderMapTask cSVToARFFHeaderMapTask = new CSVToARFFHeaderMapTask();
        cSVToARFFHeaderMapTask.setOptions(strArr);
        String joinOptions = Utils.joinOptions(cSVToARFFHeaderMapTask.getOptions());
        if (DistributedJobConfig.isEmpty(joinOptions)) {
            return;
        }
        setCsvToArffTaskOptions(joinOptions);
    }

    public String[] getJobOptionsOnly() {
        ArrayList arrayList = new ArrayList();
        if (!DistributedJobConfig.isEmpty(getPathToExistingHeader())) {
            arrayList.add("-existing-header");
            arrayList.add(getPathToExistingHeader());
        }
        if (!DistributedJobConfig.isEmpty(getAttributeNames())) {
            arrayList.add("-A");
            arrayList.add(getAttributeNames());
        }
        if (!DistributedJobConfig.isEmpty(getAttributeNamesFile())) {
            arrayList.add("-names-file");
            arrayList.add(getAttributeNamesFile());
        }
        if (!DistributedJobConfig.isEmpty(getOutputHeaderFileName())) {
            arrayList.add("-header-file-name");
            arrayList.add(getOutputHeaderFileName());
        }
        return (String[]) arrayList.toArray(new String[arrayList.size()]);
    }

    @Override // weka.distributed.hadoop.HadoopJob
    public String[] getOptions() {
        ArrayList arrayList = new ArrayList();
        if (!DistributedJobConfig.isEmpty(getPathToExistingHeader())) {
            arrayList.add("-existing-header");
            arrayList.add(getPathToExistingHeader());
        }
        if (!DistributedJobConfig.isEmpty(getAttributeNames())) {
            arrayList.add("-A");
            arrayList.add(getAttributeNames());
        }
        if (!DistributedJobConfig.isEmpty(getAttributeNamesFile())) {
            arrayList.add("-names-file");
            arrayList.add(getAttributeNamesFile());
        }
        if (!DistributedJobConfig.isEmpty(getOutputHeaderFileName())) {
            arrayList.add("-header-file-name");
            arrayList.add(getOutputHeaderFileName());
        }
        for (String str : super.getOptions()) {
            arrayList.add(str);
        }
        if (!DistributedJobConfig.isEmpty(getCsvToArffTaskOptions())) {
            try {
                for (String str2 : Utils.splitOptions(getCsvToArffTaskOptions())) {
                    arrayList.add(str2);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return (String[]) arrayList.toArray(new String[arrayList.size()]);
    }

    public Instances getFinalHeader() {
        return this.m_finalHeader;
    }

    @Override // weka.gui.beans.InstancesProducer
    public Instances getInstances() {
        return getFinalHeader();
    }

    public String pathToExistingHeaderTipText() {
        return "The path to an existing ARFF header to use (if set this prevents the ARFF header job from running).";
    }

    public void setPathToExistingHeader(String str) {
        this.m_pathToExistingHeader = str;
    }

    public String getPathToExistingHeader() {
        return this.m_pathToExistingHeader;
    }

    public String outputHeaderFileNameTipText() {
        return "The name of the header file to create in the output directoryfor the job. If not specified then a name is generated automatically.";
    }

    public void setOutputHeaderFileName(String str) {
        this.m_outputHeaderFileName = str;
    }

    public String getOutputHeaderFileName() {
        return this.m_outputHeaderFileName;
    }

    public String attributeNamesTipText() {
        return "A comma-separated list of attribute names for the data";
    }

    public void setAttributeNames(String str) {
        this.m_attributeNames = str;
    }

    public String getAttributeNames() {
        return this.m_attributeNames;
    }

    public String attributeNamesFileTipText() {
        return "Path to the file to load attribute names from";
    }

    public void setAttributeNamesFile(String str) {
        this.m_attributeNamesFile = str;
    }

    public String getAttributeNamesFile() {
        return this.m_attributeNamesFile;
    }

    public String csvToArffTaskOptionsTipText() {
        return "Options to pass on to the underlying csv to arff task";
    }

    public void setCsvToArffTaskOptions(String str) {
        this.m_wekaCsvToArffMapTaskOpts = str;
    }

    public String getCsvToArffTaskOptions() {
        return this.m_wekaCsvToArffMapTaskOpts;
    }

    protected String handleNamesFile(Configuration configuration) throws IOException {
        return HDFSUtils.addFileToDistributedCache(this.m_mrConfig.getHDFSConfig(), configuration, environmentSubstitute(getAttributeNamesFile()), this.m_env);
    }

    protected void handleExistingHeaderFile() throws DistributedWekaException {
        String pathToExistingHeader = getPathToExistingHeader();
        try {
            pathToExistingHeader = environmentSubstitute(pathToExistingHeader);
        } catch (Exception e) {
        }
        File file = new File(pathToExistingHeader);
        boolean z = false;
        if (file.exists()) {
            String str = HDFSUtils.WEKA_TEMP_DISTRIBUTED_CACHE_FILES + file.getName();
            try {
                HDFSUtils.copyToHDFS(pathToExistingHeader, str, this.m_mrConfig.getHDFSConfig(), this.m_env, true);
                this.m_hdfsPathToAggregatedHeader = str;
                Configuration configuration = new Configuration();
                this.m_mrConfig.getHDFSConfig().configureForHadoop(configuration, this.m_env);
                getFinalHeaderFromHDFS(configuration, str);
                z = true;
            } catch (IOException e2) {
                throw new DistributedWekaException(e2);
            }
        } else {
            try {
                Path path = new Path(pathToExistingHeader);
                Configuration configuration2 = new Configuration();
                this.m_mrConfig.getHDFSConfig().configureForHadoop(configuration2, this.m_env);
                if (FileSystem.get(configuration2).exists(path)) {
                    this.m_hdfsPathToAggregatedHeader = pathToExistingHeader;
                    getFinalHeaderFromHDFS(configuration2, pathToExistingHeader);
                    z = true;
                }
            } catch (IOException e3) {
                throw new DistributedWekaException(e3);
            }
        }
        if (!z) {
            throw new DistributedWekaException("Was unable to find '" + pathToExistingHeader + "' on either the local file system or in HDFS");
        }
    }

    public boolean runJob() throws DistributedWekaException {
        if (!DistributedJobConfig.isEmpty(getPathToExistingHeader())) {
            try {
                handleExistingHeaderFile();
                return true;
            } catch (DistributedWekaException e) {
                logMessage("Unable to laod existing header file from '" + getPathToExistingHeader() + "' (reason: " + e.getMessage() + "). Running job to create header...");
            }
        }
        this.m_finalHeader = null;
        try {
            setJobStatus(DistributedJob.JobStatus.RUNNING);
            if (this.m_env == null) {
                this.m_env = Environment.getSystemWide();
            }
            String str = this.m_mrConfig.getOutputPath() + OUTPUT_SUBDIR;
            this.m_mrConfig.setOutputPath(str);
            String str2 = "" + Math.abs(new Random().nextInt());
            if (!DistributedJobConfig.isEmpty(getOutputHeaderFileName())) {
                str2 = environmentSubstitute(getOutputHeaderFileName());
            }
            if (!str2.toLowerCase().endsWith(".arff")) {
                str2 = str2 + ".arff";
            }
            String str3 = str + "/" + str2;
            this.m_hdfsPathToAggregatedHeader = str3;
            this.m_mrConfig.setUserSuppliedProperty(CSVToArffHeaderHadoopReducer.CSV_TO_ARFF_HEADER_WRITE_PATH, str3);
            CSVToARFFHeaderMapTask cSVToARFFHeaderMapTask = new CSVToARFFHeaderMapTask();
            if (!DistributedJobConfig.isEmpty(getCsvToArffTaskOptions())) {
                cSVToARFFHeaderMapTask.setOptions(Utils.splitOptions(getCsvToArffTaskOptions()));
            }
            boolean z = false;
            List<String> list = null;
            if (!DistributedJobConfig.isEmpty(getAttributeNames())) {
                list = new ArrayList();
                for (String str4 : environmentSubstitute(getAttributeNames()).split(",")) {
                    String trim = str4.trim();
                    if (trim.length() > 0) {
                        list.add(trim);
                    }
                }
                z = true;
            } else if (!DistributedJobConfig.isEmpty(getAttributeNamesFile())) {
                String environmentSubstitute = environmentSubstitute(getAttributeNamesFile());
                if (environmentSubstitute.startsWith("hdfs://")) {
                    String replace = environmentSubstitute.replace("hdfs://", "");
                    String resolvePath = HDFSUtils.resolvePath(replace.substring(replace.indexOf("/")), this.m_env);
                    Configuration configuration = new Configuration();
                    this.m_mrConfig.getHDFSConfig().configureForHadoop(configuration, this.m_env);
                    list = CSVToArffHeaderHadoopMapper.readNames(new BufferedReader(new InputStreamReader(FileSystem.get(configuration).open(new Path(resolvePath)))));
                } else {
                    list = CSVToArffHeaderHadoopMapper.readNames(new BufferedReader(new FileReader(new File(new URI(environmentSubstitute).getPath()))));
                }
                z = true;
            }
            if (z && !cSVToARFFHeaderMapTask.getComputeSummaryStats() && cSVToARFFHeaderMapTask.headerAvailableImmediately(list.size(), list, new StringBuffer())) {
                Instances header = cSVToARFFHeaderMapTask.getHeader(list.size(), list);
                this.m_finalHeader = header;
                Configuration configuration2 = new Configuration();
                this.m_mrConfig.getHDFSConfig().configureForHadoop(configuration2, this.m_env);
                CSVToArffHeaderHadoopReducer.writeHeaderToDestination(header, str3, configuration2);
                return true;
            }
            Configuration configuration3 = new Configuration();
            StringBuilder sb = new StringBuilder();
            if (!DistributedJobConfig.isEmpty(getAttributeNames())) {
                sb.append(" -A ").append(environmentSubstitute(getAttributeNames()));
            } else if (!DistributedJobConfig.isEmpty(getAttributeNamesFile())) {
                sb.append(" -names-file ").append(handleNamesFile(configuration3));
            }
            if (!DistributedJobConfig.isEmpty(getCsvToArffTaskOptions())) {
                sb.append(" ").append(getCsvToArffTaskOptions());
            }
            if (sb.length() > 0) {
                this.m_mrConfig.setUserSuppliedProperty(CSVToArffHeaderHadoopMapper.CSV_TO_ARFF_HEADER_MAP_TASK_OPTIONS, environmentSubstitute(sb.toString()));
                setJobName(getJobName() + " " + sb.toString());
            }
            installWekaLibrariesInHDFS(configuration3);
            Job configureForHadoop = this.m_mrConfig.configureForHadoop(environmentSubstitute(getJobName()), configuration3, this.m_env);
            cleanOutputDirectory(configureForHadoop);
            statusMessage("Submitting job: " + getJobName());
            logMessage("Submitting job: " + getJobName());
            boolean runJob = runJob(configureForHadoop);
            if (runJob) {
                getFinalHeaderFromHDFS(configuration3, str3);
            }
            setJobStatus(runJob ? DistributedJob.JobStatus.FINISHED : DistributedJob.JobStatus.FAILED);
            return runJob;
        } catch (Exception e2) {
            setJobStatus(DistributedJob.JobStatus.FAILED);
            throw new DistributedWekaException(e2);
        }
    }

    protected void getFinalHeaderFromHDFS(Configuration configuration, String str) throws DistributedWekaException {
        try {
            BufferedReader bufferedReader = null;
            try {
                BufferedReader bufferedReader2 = new BufferedReader(new InputStreamReader(FileSystem.get(configuration).open(new Path(str))));
                this.m_finalHeader = new Instances(bufferedReader2);
                bufferedReader2.close();
                bufferedReader = null;
                if (0 != 0) {
                    bufferedReader.close();
                }
            } catch (Throwable th) {
                if (bufferedReader != null) {
                    bufferedReader.close();
                }
                throw th;
            }
        } catch (Exception e) {
            throw new DistributedWekaException(e);
        }
    }

    public static void main(String[] strArr) {
        ArffHeaderHadoopJob arffHeaderHadoopJob = new ArffHeaderHadoopJob();
        arffHeaderHadoopJob.run(arffHeaderHadoopJob, strArr);
    }

    public void run(Object obj, String[] strArr) throws IllegalArgumentException {
        if (!(obj instanceof ArffHeaderHadoopJob)) {
            throw new IllegalArgumentException("Object to run is not an ArffHeaderHadoopJob!");
        }
        try {
            ArffHeaderHadoopJob arffHeaderHadoopJob = (ArffHeaderHadoopJob) obj;
            if (Utils.getFlag('h', strArr)) {
                System.err.println(DistributedJob.makeOptionsStr(arffHeaderHadoopJob));
                System.exit(1);
            }
            arffHeaderHadoopJob.setOptions(strArr);
            arffHeaderHadoopJob.runJob();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
