package weka.distributed.hadoop;

import distributed.core.DistributedJob;
import distributed.core.DistributedJobConfig;
import distributed.hadoop.HDFSUtils;
import distributed.hadoop.MapReduceJobConfig;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.Job;
import weka.core.ClassloaderUtil;
import weka.core.Environment;
import weka.core.Option;
import weka.core.OptionHandler;
import weka.core.Utils;
import weka.core.WekaPackageManager;
import weka.distributed.DistributedWekaException;

/* loaded from: input_file:weka/distributed/hadoop/HadoopJob.class */
public abstract class HadoopJob extends DistributedJob implements OptionHandler {
    private static final long serialVersionUID = -9026086203818342364L;
    public static final String DISTRIBUTED_WEKA_HADOOP_JAR = WekaPackageManager.PACKAGES_DIR.toString() + File.separator + "distributedWekaHadoop" + File.separator + "distributedWekaHadoop.jar";
    public static final String DISTRIBUTED_WEKA_BASE_JAR = WekaPackageManager.PACKAGES_DIR.toString() + File.separator + "distributedWekaBase" + File.separator + "distributedWekaBase.jar";
    public static final String OPEN_CSV_JAR = WekaPackageManager.PACKAGES_DIR.toString() + File.separator + "distributedWekaBase" + File.separator + "lib" + File.separator + "opencsv-2.3.jar";
    public static final String JFREECHART_JAR = WekaPackageManager.PACKAGES_DIR.toString() + File.separator + "distributedWekaBase" + File.separator + "lib" + File.separator + "jfreechart-1.0.13.jar";
    public static final String JCOMMON_JAR = WekaPackageManager.PACKAGES_DIR.toString() + File.separator + "distributedWekaBase" + File.separator + "lib" + File.separator + "jcommon-1.0.16.jar";
    public static final String COLT_JAR = WekaPackageManager.PACKAGES_DIR.toString() + File.separator + "distributedWekaBase" + File.separator + "lib" + File.separator + "colt-1.2.0.jar";
    public static final String LA4J_JAR = WekaPackageManager.PACKAGES_DIR.toString() + File.separator + "distributedWekaBase" + File.separator + "lib" + File.separator + "la4j-0.4.5.jar";
    protected static String DEFAULT_WEKA_JAR_PATH;
    protected String m_pathToWekaJar;
    protected MapReduceJobConfig m_mrConfig;
    protected String m_loggingInterval;

    public Enumeration<Option> listOptions() {
        Vector vector = new Vector();
        Enumeration<Option> listOptions = this.m_mrConfig.listOptions();
        vector.addElement(new Option("\tPath to the weka.jar file", "weka-jar", 1, "-weka-jar <path to weka.jar>"));
        vector.addElement(new Option("\tAdditional Weka packages to use.", "weka-packages", 1, "-weka-packages <comma-separated list of package names>"));
        vector.addElement(new Option("\tLogging interval in seconds (default = 15).", "logging-interval", 1, "-logging-interval <seconds>"));
        while (listOptions.hasMoreElements()) {
            vector.addElement(listOptions.nextElement());
        }
        return vector.elements();
    }

    public void setOptions(String[] strArr) throws Exception {
        this.m_mrConfig.setOptions(strArr);
        String option = Utils.getOption("weka-jar", strArr);
        if (!DistributedJobConfig.isEmpty(option)) {
            setPathToWekaJar(option);
        }
        setAdditionalWekaPackages(Utils.getOption("weka-packages", strArr));
        setLoggingInterval(Utils.getOption("logging-interval", strArr));
    }

    public String[] getBaseOptionsOnly() {
        ArrayList arrayList = new ArrayList();
        if (!DistributedJobConfig.isEmpty(getPathToWekaJar())) {
            arrayList.add("-weka-jar");
            arrayList.add(getPathToWekaJar());
        }
        if (!DistributedJobConfig.isEmpty(getAdditionalWekaPackages())) {
            arrayList.add("-weka-packages");
            arrayList.add(getAdditionalWekaPackages());
        }
        if (!DistributedJobConfig.isEmpty(getLoggingInterval())) {
            arrayList.add("-logging-interval");
            arrayList.add(getLoggingInterval());
        }
        return (String[]) arrayList.toArray(new String[arrayList.size()]);
    }

    public String[] getOptions() {
        ArrayList arrayList = new ArrayList();
        for (String str : getBaseOptionsOnly()) {
            arrayList.add(str);
        }
        for (String str2 : this.m_mrConfig.getOptions()) {
            arrayList.add(str2);
        }
        return (String[]) arrayList.toArray(new String[arrayList.size()]);
    }

    public HadoopJob(String str, String str2) {
        super(str, str2);
        this.m_pathToWekaJar = DEFAULT_WEKA_JAR_PATH;
        this.m_mrConfig = new MapReduceJobConfig();
        this.m_loggingInterval = "10";
    }

    public void setMapReduceJobConfig(MapReduceJobConfig mapReduceJobConfig) {
        this.m_mrConfig = mapReduceJobConfig;
    }

    public MapReduceJobConfig getMapReduceJobConfig() {
        return this.m_mrConfig;
    }

    public String pathToWekaJarTipText() {
        return "The path to the weka jar file. This will get installed inHDFS and placed into the classpath for map and reduce tasks";
    }

    public void setPathToWekaJar(String str) {
        this.m_pathToWekaJar = str;
    }

    public String getPathToWekaJar() {
        return this.m_pathToWekaJar;
    }

    public String additionalWekaPackagesTipText() {
        return "A list of comma separated weka package names to use with the job. Any jar files in the main package directory and the lib directory of each package will get installed in HDFS and placed in the classpath of map and reduce tasks.";
    }

    public void setAdditionalWekaPackages(String str) {
        this.m_mrConfig.setUserSuppliedProperty("*weka.distributed.job.additional.packages", str);
    }

    public String getAdditionalWekaPackages() {
        return this.m_mrConfig.getUserSuppliedProperty("*weka.distributed.job.additional.packages");
    }

    public String loggingIntervalTipText() {
        return "The interval (in seconds) between output of logging information from running jobs";
    }

    public void setLoggingInterval(String str) {
        this.m_loggingInterval = str;
    }

    public String getLoggingInterval() {
        return this.m_loggingInterval;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void installWekaLibrariesInHDFS(Configuration configuration) throws IOException {
        if (this.m_env == null) {
            this.m_env = Environment.getSystemWide();
        }
        if (this.m_pathToWekaJar == null || DistributedJobConfig.isEmpty(this.m_pathToWekaJar.toString())) {
            throw new IOException("No path to weka.jar file provided. We need to install the weka.jar in HDFS so that it is available to running Jobs");
        }
        statusMessage("Installing libraries in HDFS...");
        ArrayList arrayList = new ArrayList();
        logMessage("Copying " + environmentSubstitute(this.m_pathToWekaJar) + " to HDFS");
        arrayList.add(environmentSubstitute(this.m_pathToWekaJar));
        logMessage("Copying " + DISTRIBUTED_WEKA_BASE_JAR + " to HSFS");
        arrayList.add(DISTRIBUTED_WEKA_BASE_JAR);
        logMessage("Copying " + DISTRIBUTED_WEKA_HADOOP_JAR + " to HSFS");
        arrayList.add(DISTRIBUTED_WEKA_HADOOP_JAR);
        logMessage("Copying " + OPEN_CSV_JAR + " to HDFS");
        arrayList.add(OPEN_CSV_JAR);
        logMessage("Copying " + JFREECHART_JAR + " to HDFS");
        arrayList.add(JFREECHART_JAR);
        logMessage("Copying " + JCOMMON_JAR + " to HDFS");
        arrayList.add(JCOMMON_JAR);
        logMessage("Copying " + COLT_JAR + " to HDFS");
        arrayList.add(COLT_JAR);
        logMessage("Copying " + LA4J_JAR + " to HDFS");
        arrayList.add(LA4J_JAR);
        HDFSUtils.copyFilesToWekaHDFSInstallationDirectory(arrayList, this.m_mrConfig.getHDFSConfig(), this.m_env, true);
        addWekaLibrariesToClasspath(configuration);
        installWekaPackageLibrariesInHDFS(getAdditionalWekaPackageNames(this.m_mrConfig), configuration);
    }

    private void installWekaPackageLibrariesInHDFS(List<String> list, Configuration configuration) throws IOException {
        if (list == null || list.size() == 0) {
            return;
        }
        File file = WekaPackageManager.PACKAGES_DIR;
        ArrayList arrayList = new ArrayList();
        for (String str : list) {
            File file2 = new File(file.toString() + File.separator + str);
            if (file2.exists() && file2.isDirectory()) {
                for (File file3 : file2.listFiles()) {
                    if (file3.isFile() && file3.toString().toLowerCase().endsWith(".jar")) {
                        logMessage("Copying package '" + str + "': " + file3.getName() + " to HDFS");
                        arrayList.add(file3.toString());
                    }
                }
                File file4 = new File(file2.toString() + File.separator + "lib");
                if (file4.exists() && file4.isDirectory()) {
                    for (File file5 : file4.listFiles()) {
                        if (file5.isFile() && file5.toString().toLowerCase().endsWith(".jar")) {
                            logMessage("Copying package '" + str + "': " + file5.getName() + " to HDFS");
                            arrayList.add(file5.toString());
                        }
                    }
                }
            }
        }
        HDFSUtils.copyFilesToWekaHDFSInstallationDirectory(arrayList, this.m_mrConfig.getHDFSConfig(), this.m_env, true);
        addWekaPackageLibrariesToClasspath(arrayList, configuration);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addWekaLibrariesToClasspath(Configuration configuration) throws IOException {
        if (this.m_env == null) {
            this.m_env = Environment.getSystemWide();
        }
        statusMessage("Adding Weka libraries to the distributed cache and classpath for the job");
        ArrayList arrayList = new ArrayList();
        arrayList.add(new File(this.m_pathToWekaJar).getName());
        arrayList.add(new File(DISTRIBUTED_WEKA_BASE_JAR).getName());
        arrayList.add(new File(DISTRIBUTED_WEKA_HADOOP_JAR).getName());
        arrayList.add(new File(OPEN_CSV_JAR).getName());
        arrayList.add(new File(JFREECHART_JAR).getName());
        arrayList.add(new File(JCOMMON_JAR).getName());
        arrayList.add(new File(COLT_JAR).getName());
        arrayList.add(new File(LA4J_JAR).getName());
        HDFSUtils.addWekaInstalledFilesToClasspath(this.m_mrConfig.getHDFSConfig(), configuration, arrayList, this.m_env);
    }

    private void addWekaPackageLibrariesToClasspath(List<String> list, Configuration configuration) throws IOException {
        if (list == null || list.size() == 0) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        statusMessage("Adding Weka package libraries to the distributed cache and classpath");
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(new File(it.next()).getName());
        }
        HDFSUtils.addWekaInstalledFilesToClasspath(this.m_mrConfig.getHDFSConfig(), configuration, arrayList, this.m_env);
    }

    public void cleanOutputDirectory(Job job) throws IOException {
        if (DistributedJobConfig.isEmpty(this.m_mrConfig.getOutputPath())) {
            throw new IOException("No output directory set!");
        }
        this.m_mrConfig.deleteOutputDirectory(job, this.m_env);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean runJob(Job job) throws DistributedWekaException {
        try {
            this.m_stopRunningJob = false;
            if (DistributedJobConfig.isEmpty(getLoggingInterval())) {
                this.m_loggingInterval = "10";
            }
            int parseInt = Integer.parseInt(this.m_loggingInterval);
            System.out.println("Setting logging interval to " + parseInt);
            job.submit();
            int i = 0;
            while (!this.m_stopRunningJob && !job.isComplete()) {
                try {
                    if (parseInt >= 1) {
                        printJobStatus(job);
                        i += logTaskMessages(job, i);
                        Thread.sleep(parseInt * 1000);
                    } else {
                        Thread.sleep(60000L);
                    }
                } catch (InterruptedException e) {
                    logMessage(e.getMessage());
                    this.m_stopRunningJob = true;
                }
            }
            if (this.m_stopRunningJob && !job.isComplete()) {
                job.killJob();
            }
            this.m_stopRunningJob = false;
            return job.isSuccessful();
        } catch (Exception e2) {
            throw new DistributedWekaException(e2);
        }
    }

    protected void printJobStatus(Job job) throws IOException {
        String str = getJobName() + " Setup: " + (job.setupProgress() * 100.0f) + " Map: " + (job.mapProgress() * 100.0f) + " Reduce: " + (job.reduceProgress() * 100.0f);
        statusMessage(str);
        logMessage(str);
    }

    protected int logTaskMessages(Job job, int i) throws IOException {
        TaskCompletionEvent[] taskCompletionEvents = job.getTaskCompletionEvents(i);
        StringBuilder sb = new StringBuilder();
        for (TaskCompletionEvent taskCompletionEvent : taskCompletionEvents) {
            sb.append(taskCompletionEvent.toString()).append("\n");
        }
        logMessage(sb.toString());
        return taskCompletionEvents.length;
    }

    public static int getMapReduceNumber(String str, String str2) {
        if (str.indexOf(str2) < 0) {
            return -1;
        }
        String substring = str.substring(str.indexOf(str2) + str2.length());
        return Integer.parseInt(substring.substring(0, substring.indexOf("_")));
    }

    public static int getMapNumber(String str) {
        return getMapReduceNumber(str, "_m_");
    }

    public static int getReduceNumber(String str) {
        return getMapReduceNumber(str, "_r");
    }

    static {
        DEFAULT_WEKA_JAR_PATH = System.getProperty("user.home") + File.separator + "weka.jar";
        try {
            ClassLoader classLoader = ClassloaderUtil.class.getClassLoader();
            if (classLoader instanceof URLClassLoader) {
                for (URL url : ((URLClassLoader) classLoader).getURLs()) {
                    if (url.toString().endsWith("weka.jar")) {
                        DEFAULT_WEKA_JAR_PATH = new File(url.toURI()).toString();
                    }
                }
            }
        } catch (Exception e) {
        }
    }
}
