package weka.distributed.hadoop;

import distributed.core.DistributedJobConfig;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import weka.classifiers.Classifier;
import weka.classifiers.UpdateableClassifier;
import weka.core.Attribute;
import weka.core.Environment;
import weka.core.Instance;
import weka.core.Instances;
import weka.core.Utils;
import weka.distributed.CSVToARFFHeaderMapTask;
import weka.distributed.CSVToARFFHeaderReduceTask;
import weka.distributed.WekaClassifierMapTask;
import weka.filters.Filter;
import weka.filters.PreconstructedFilter;

/* loaded from: input_file:weka/distributed/hadoop/WekaClassifierHadoopMapper.class */
public class WekaClassifierHadoopMapper extends Mapper<LongWritable, Text, Text, BytesWritable> {
    public static final String CLASSIFIER_MAP_TASK_OPTIONS = "*weka.distributed.weka_classifier_map_task_opts";
    protected WekaClassifierMapTask m_task = null;
    protected CSVToARFFHeaderMapTask m_rowHelper = null;
    protected Instances m_trainingHeader;

    /* JADX INFO: Access modifiers changed from: protected */
    public static Instances loadTrainingHeader(String str) throws IOException {
        if (!new File(str).exists()) {
            throw new IOException("The ARFF header file '" + str + "' does not seem to exist in the distributed cache!");
        }
        BufferedReader bufferedReader = null;
        try {
            BufferedReader bufferedReader2 = new BufferedReader(new FileReader(str));
            Instances instances = new Instances(bufferedReader2);
            bufferedReader2.close();
            bufferedReader = null;
            if (0 != 0) {
                bufferedReader.close();
            }
            return instances;
        } catch (Throwable th) {
            if (bufferedReader != null) {
                bufferedReader.close();
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static Classifier loadClassifier(String str) throws Exception {
        File file = new File(str);
        if (!file.exists()) {
            throw new IOException("The classifier model file '" + str + "' does not seem to exist in the distributed cache!");
        }
        ObjectInputStream objectInputStream = null;
        try {
            objectInputStream = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)));
            Classifier classifier = (Classifier) objectInputStream.readObject();
            if (objectInputStream != null) {
                objectInputStream.close();
            }
            return classifier;
        } catch (Throwable th) {
            if (objectInputStream != null) {
                objectInputStream.close();
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static Filter loadPreconstructedFilter(String str) throws Exception {
        File file = new File(str);
        if (!file.exists()) {
            throw new IOException("The pre-constructed filter '" + str + "' does not seem to exist in the distributed cache!");
        }
        ObjectInputStream objectInputStream = null;
        try {
            objectInputStream = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)));
            Filter filter = (Filter) objectInputStream.readObject();
            if (!(filter instanceof PreconstructedFilter)) {
                throw new Exception("The filter in file '" + str + "' is not a PreconstructedFilter!");
            }
            if (objectInputStream != null) {
                objectInputStream.close();
            }
            return filter;
        } catch (Throwable th) {
            if (objectInputStream != null) {
                objectInputStream.close();
            }
            throw th;
        }
    }

    public static void setClassIndex(String str, Instances instances, boolean z) throws Exception {
        if (DistributedJobConfig.isEmpty(str)) {
            if (z) {
                instances.setClassIndex(instances.numAttributes() - 1);
                return;
            }
            return;
        }
        try {
            instances.setClassIndex(Integer.parseInt(str) - 1);
        } catch (NumberFormatException e) {
            Attribute attribute = instances.attribute(str.trim());
            if (attribute != null) {
                instances.setClass(attribute);
            } else if (str.toLowerCase().equals("first")) {
                instances.setClassIndex(0);
            } else {
                if (!str.toLowerCase().equals("last")) {
                    throw new Exception("Can't find class attribute: " + str + " in ARFF header!");
                }
                instances.setClassIndex(instances.numAttributes() - 1);
            }
        }
    }

    public static void setClassIndex(String[] strArr, Instances instances, boolean z) throws Exception {
        setClassIndex(Utils.getOption("class", strArr), instances, z);
    }

    public void setup(Mapper<LongWritable, Text, Text, BytesWritable>.Context context) throws IOException {
        this.m_task = new WekaClassifierMapTask();
        this.m_rowHelper = new CSVToARFFHeaderMapTask();
        Configuration configuration = context.getConfiguration();
        String str = configuration.get(CLASSIFIER_MAP_TASK_OPTIONS);
        String str2 = configuration.get(CSVToArffHeaderHadoopMapper.CSV_TO_ARFF_HEADER_MAP_TASK_OPTIONS);
        try {
            if (!DistributedJobConfig.isEmpty(str2)) {
                this.m_rowHelper.setOptions(Utils.splitOptions(str2));
            }
            if (DistributedJobConfig.isEmpty(str)) {
                throw new IOException("Can't continue without the name of the ARFF header file!");
            }
            String[] splitOptions = Utils.splitOptions(str);
            String option = Utils.getOption("arff-header", splitOptions);
            if (DistributedJobConfig.isEmpty(option)) {
                throw new IOException("Can't continue without the name of the ARFF header file!");
            }
            this.m_trainingHeader = CSVToARFFHeaderReduceTask.stripSummaryAtts(loadTrainingHeader(option));
            setClassIndex(splitOptions, this.m_trainingHeader, true);
            this.m_rowHelper.initParserOnly(CSVToARFFHeaderMapTask.instanceHeaderToAttributeNameList(this.m_trainingHeader));
            boolean flag = Utils.getFlag("continue-training-updateable", splitOptions);
            this.m_task.setContinueTrainingUpdateableClassifier(flag);
            String option2 = Utils.getOption("preconstructed-filter", splitOptions);
            if (!DistributedJobConfig.isEmpty(option2)) {
                this.m_task.addPreconstructedFilterToUse(loadPreconstructedFilter(option2));
            }
            Environment environment = new Environment();
            environment.addVariable("total.num.maps", "" + configuration.getInt("mapred.map.tasks", 1));
            this.m_task.setEnvironment(environment);
            this.m_task.setOptions(splitOptions);
            if (flag) {
                String option3 = Utils.getOption("model-file-name", splitOptions);
                if (DistributedJobConfig.isEmpty(option3)) {
                    throw new IOException("Continued training of incremental classifier has been specified but no file name for the classifier to load has been specified. Can't continue.");
                }
                System.err.println("WekaClassifierHadoopMapper - loading staged model...");
                Classifier loadClassifier = loadClassifier(option3);
                if (!(loadClassifier instanceof UpdateableClassifier)) {
                    throw new Exception("The classifier loaded is not an UpdateableClassifier (incremental)");
                }
                this.m_task.setClassifier(loadClassifier);
            }
            this.m_task.setup(this.m_trainingHeader);
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static Instance makeInstance(CSVToARFFHeaderMapTask cSVToARFFHeaderMapTask, Instances instances, boolean z, boolean z2, String[] strArr) throws Exception {
        return cSVToARFFHeaderMapTask.makeInstance(instances, z && !z2, strArr);
    }

    public void map(LongWritable longWritable, Text text, Mapper<LongWritable, Text, Text, BytesWritable>.Context context) throws IOException {
        if (text != null) {
            processRow(text.toString());
        }
    }

    protected void processRow(String str) throws IOException {
        if (str != null) {
            String[] parseRowOnly = this.m_rowHelper.parseRowOnly(str);
            if (parseRowOnly.length != this.m_trainingHeader.numAttributes()) {
                throw new IOException("Parsed a row that contains a different number of values than there are attributes in the training ARFF header: " + str);
            }
            try {
                this.m_task.processInstance(makeInstance(this.m_rowHelper, this.m_trainingHeader, this.m_task.getClassifier() instanceof UpdateableClassifier, this.m_task.getForceBatchLearningForUpdateableClassifiers(), parseRowOnly));
            } catch (Exception e) {
                throw new IOException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static byte[] classifierToBytes(Classifier classifier, int i) throws IOException {
        ObjectOutputStream objectOutputStream = null;
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            ObjectOutputStream objectOutputStream2 = new ObjectOutputStream(new BufferedOutputStream(new GZIPOutputStream(byteArrayOutputStream)));
            objectOutputStream2.writeObject(classifier);
            objectOutputStream2.writeObject(new Integer(i));
            objectOutputStream2.flush();
            objectOutputStream2.close();
            byte[] byteArray = byteArrayOutputStream.toByteArray();
            objectOutputStream = null;
            if (0 != 0) {
                objectOutputStream.close();
            }
            return byteArray;
        } catch (Throwable th) {
            if (objectOutputStream != null) {
                objectOutputStream.close();
            }
            throw th;
        }
    }

    public void cleanup(Mapper<LongWritable, Text, Text, BytesWritable>.Context context) throws IOException, InterruptedException {
        try {
            this.m_task.finalizeTask();
            byte[] classifierToBytes = classifierToBytes(this.m_task.getClassifier(), this.m_task.getNumTrainingInstances());
            Text text = new Text();
            text.set("classifier");
            BytesWritable bytesWritable = new BytesWritable();
            bytesWritable.set(classifierToBytes, 0, classifierToBytes.length);
            context.write(text, bytesWritable);
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
        map((LongWritable) obj, (Text) obj2, (Mapper<LongWritable, Text, Text, BytesWritable>.Context) context);
    }
}
