package weka.distributed.hadoop;

import distributed.core.DistributedJob;
import distributed.core.DistributedJobConfig;
import distributed.hadoop.HDFSUtils;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Vector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import weka.core.CommandlineRunnable;
import weka.core.Environment;
import weka.core.Option;
import weka.core.OptionHandler;
import weka.core.Utils;
import weka.distributed.DistributedWekaException;

/* loaded from: input_file:weka/distributed/hadoop/WekaScoringHadoopJob.class */
public class WekaScoringHadoopJob extends HadoopJob implements CommandlineRunnable {
    protected static final String OUTPUT_SUBDIR = "/scoring";
    private static final long serialVersionUID = 2899919003194014468L;
    protected String m_colsRange;
    protected String m_modelPath;
    protected ArffHeaderHadoopJob m_arffHeaderJob;
    protected String m_wekaCsvToArffMapTaskOpts;

    public WekaScoringHadoopJob() {
        super("Scoring job", "Score data with a Weka model");
        this.m_colsRange = "first-last";
        this.m_modelPath = "";
        this.m_arffHeaderJob = new ArffHeaderHadoopJob();
        this.m_wekaCsvToArffMapTaskOpts = "";
        this.m_mrConfig.setMapperClass(WekaScoringHadoopMapper.class.getName());
        this.m_mrConfig.setMapOutputKeyClass(LongWritable.class.getName());
        this.m_mrConfig.setMapOutputValueClass(Text.class.getName());
    }

    public String globalInfo() {
        return "Score new data using a previously trained model.";
    }

    public String modelPathTipText() {
        return "The path (HDFS or local) to the model to use for scoring";
    }

    public void setModelPath(String str) {
        this.m_modelPath = str;
    }

    public String getModelPath() {
        return this.m_modelPath;
    }

    public String columnsToOutputInScoredDataTipText() {
        return "The columns to output (as a comma-separated list of indexes) in the scored data. 'first' and 'last' may be used as well (e.g. 1,2,10-last)";
    }

    public void setColumnsToOutputInScoredData(String str) {
        this.m_colsRange = str;
    }

    public String getColumnsToOutputInScoredData() {
        return this.m_colsRange;
    }

    public void setCSVMapTaskOptions(String str) {
        this.m_wekaCsvToArffMapTaskOpts = str;
    }

    public String getCSVMapTaskOptions() {
        return this.m_wekaCsvToArffMapTaskOpts;
    }

    @Override // weka.distributed.hadoop.HadoopJob
    public Enumeration<Option> listOptions() {
        Vector vector = new Vector();
        vector.add(new Option("\tPath to model file to use for scoring (can be \n\tlocal or in HDFS", "model-file", 1, "-model-file <path to model file>"));
        vector.add(new Option("\tColumns to output in the scored data. Specify as\n\ta range, e.g. 1,4,5,10-last (default = first-last).", "columns-to-output", 1, "-columns-to-output"));
        vector.add(new Option("", "", 0, "\nOptions specific to ARFF training header creation:"));
        Enumeration<Option> listOptions = new ArffHeaderHadoopJob().listOptions();
        while (listOptions.hasMoreElements()) {
            vector.add(listOptions.nextElement());
        }
        return vector.elements();
    }

    @Override // weka.distributed.hadoop.HadoopJob
    public void setOptions(String[] strArr) throws Exception {
        String option = Utils.getOption("model-file", strArr);
        if (!DistributedJobConfig.isEmpty(option)) {
            setModelPath(option);
        }
        String option2 = Utils.getOption("columns-to-output", strArr);
        if (!DistributedJobConfig.isEmpty(option2)) {
            setColumnsToOutputInScoredData(option2);
        }
        String[] strArr2 = (String[]) strArr.clone();
        super.setOptions(strArr);
        this.m_arffHeaderJob.setOptions(strArr2);
        String joinOptions = Utils.joinOptions(this.m_arffHeaderJob.getOptions());
        if (DistributedJobConfig.isEmpty(joinOptions)) {
            return;
        }
        setCSVMapTaskOptions(joinOptions);
    }

    @Override // weka.distributed.hadoop.HadoopJob
    public String[] getOptions() {
        ArrayList arrayList = new ArrayList();
        if (!DistributedJobConfig.isEmpty(getModelPath())) {
            arrayList.add("-model-file");
            arrayList.add(getModelPath());
        }
        arrayList.add("-columns-to-output");
        arrayList.add(getColumnsToOutputInScoredData());
        if (!DistributedJobConfig.isEmpty(getCSVMapTaskOptions())) {
            try {
                for (String str : Utils.splitOptions(getCSVMapTaskOptions())) {
                    arrayList.add(str);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return (String[]) arrayList.toArray(new String[arrayList.size()]);
    }

    protected void loadClassifierAndSetJobName(InputStream inputStream) throws IOException {
        ObjectInputStream objectInputStream = null;
        try {
            try {
                objectInputStream = new ObjectInputStream(new BufferedInputStream(inputStream));
                Object readObject = objectInputStream.readObject();
                setJobName("Scoring job: " + readObject.getClass().toString() + (readObject instanceof OptionHandler ? " " + Utils.joinOptions(((OptionHandler) readObject).getOptions()) : ""));
                if (objectInputStream != null) {
                    objectInputStream.close();
                }
            } catch (Exception e) {
                throw new IOException(e);
            }
        } catch (Throwable th) {
            if (objectInputStream != null) {
                objectInputStream.close();
            }
            throw th;
        }
    }

    protected String handleModelFile(Configuration configuration) throws IOException {
        String str;
        String substring;
        boolean z;
        String modelPath = getModelPath();
        try {
            modelPath = environmentSubstitute(modelPath);
        } catch (Exception e) {
        }
        if (modelPath.startsWith("hdfs://")) {
            String replace = modelPath.replace("hdfs://", "");
            modelPath = replace.substring(replace.indexOf("/"));
        }
        File file = new File(modelPath);
        if (file.exists()) {
            substring = modelPath.lastIndexOf(File.separator) >= 0 ? modelPath.substring(modelPath.lastIndexOf(File.separator) + File.separator.length(), modelPath.length()) : modelPath;
            str = HDFSUtils.WEKA_TEMP_DISTRIBUTED_CACHE_FILES + substring;
            logMessage("Copying local model file (" + modelPath + ") to HDFS.");
            HDFSUtils.copyToHDFS(modelPath, str, this.m_mrConfig.getHDFSConfig(), this.m_env, true);
            loadClassifierAndSetJobName(new FileInputStream(file));
            z = true;
        } else {
            str = modelPath;
            substring = modelPath.substring(modelPath.lastIndexOf("/") + 1, modelPath.length());
            Configuration configuration2 = new Configuration();
            this.m_mrConfig.getHDFSConfig().configureForHadoop(configuration2, this.m_env);
            loadClassifierAndSetJobName(FileSystem.get(configuration2).open(new Path(str)));
            z = true;
        }
        if (!z) {
            throw new IOException("Unable to locate model file: " + modelPath + " on the local file system or in HDFS.");
        }
        System.err.println("Adding " + str + " to the distributed cache.");
        HDFSUtils.addFileToDistributedCache(this.m_mrConfig.getHDFSConfig(), configuration, str, this.m_env);
        return substring;
    }

    protected boolean initializeAndRunArffJob() throws DistributedWekaException {
        if (this.m_env == null) {
            this.m_env = Environment.getSystemWide();
        }
        this.m_arffHeaderJob.setEnvironment(this.m_env);
        if (!this.m_arffHeaderJob.runJob()) {
            statusMessage("Unable to continue - creating the ARFF header failed!");
            logMessage("Unable to continue - creating the ARFF header failed!");
            return false;
        }
        this.m_mrConfig.setOutputPath(environmentSubstitute(this.m_mrConfig.getOutputPath() + OUTPUT_SUBDIR));
        return true;
    }

    public boolean runJob() throws DistributedWekaException {
        if (DistributedJobConfig.isEmpty(getModelPath())) {
            throw new DistributedWekaException("No model file specified - can't continue");
        }
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
            setJobStatus(DistributedJob.JobStatus.RUNNING);
            if (!initializeAndRunArffJob()) {
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                return false;
            }
            this.m_mrConfig.setNumberOfReducers("0");
            String environmentSubstitute = environmentSubstitute(this.m_arffHeaderJob.getAggregatedHeaderPath());
            Configuration configuration = new Configuration();
            try {
                HDFSUtils.addFileToDistributedCache(this.m_mrConfig.getHDFSConfig(), configuration, environmentSubstitute, this.m_env);
                String handleModelFile = handleModelFile(configuration);
                String substring = environmentSubstitute.substring(environmentSubstitute.lastIndexOf("/") + 1, environmentSubstitute.length());
                String environmentSubstitute2 = environmentSubstitute(getColumnsToOutputInScoredData());
                this.m_mrConfig.setUserSuppliedProperty(WekaScoringHadoopMapper.SCORING_MAP_TASK_OPTIONS, "-arff-header " + substring + " -model-file-name " + handleModelFile + (!DistributedJobConfig.isEmpty(environmentSubstitute2) ? " -columns-to-output " + environmentSubstitute2 : ""));
                this.m_mrConfig.setUserSuppliedProperty(CSVToArffHeaderHadoopMapper.CSV_TO_ARFF_HEADER_MAP_TASK_OPTIONS, environmentSubstitute(getCSVMapTaskOptions()));
                installWekaLibrariesInHDFS(configuration);
                Job configureForHadoop = this.m_mrConfig.configureForHadoop(getJobName(), configuration, this.m_env);
                cleanOutputDirectory(configureForHadoop);
                statusMessage("Submitting scoring job: " + getJobName());
                logMessage("Submitting scoring job: " + getJobName());
                boolean runJob = runJob(configureForHadoop);
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                return runJob;
            } catch (IOException e) {
                throw new DistributedWekaException(e);
            } catch (ClassNotFoundException e2) {
                throw new DistributedWekaException(e2);
            }
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    public static void main(String[] strArr) {
        WekaScoringHadoopJob wekaScoringHadoopJob = new WekaScoringHadoopJob();
        wekaScoringHadoopJob.run(wekaScoringHadoopJob, strArr);
    }

    public void run(Object obj, String[] strArr) throws IllegalArgumentException {
        if (!(obj instanceof WekaScoringHadoopJob)) {
            throw new IllegalArgumentException("Object to run is not a WekaScoringHadoopJob");
        }
        try {
            WekaScoringHadoopJob wekaScoringHadoopJob = (WekaScoringHadoopJob) obj;
            if (Utils.getFlag('h', strArr)) {
                System.err.println(DistributedJob.makeOptionsStr(wekaScoringHadoopJob));
                System.exit(1);
            }
            wekaScoringHadoopJob.setOptions(strArr);
            wekaScoringHadoopJob.runJob();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
