package weka.distributed.hadoop;

import distributed.core.DistributedJobConfig;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import weka.classifiers.Classifier;
import weka.classifiers.UpdateableClassifier;
import weka.core.Environment;
import weka.core.Instance;
import weka.core.Utils;
import weka.distributed.CSVToARFFHeaderMapTask;
import weka.distributed.CSVToARFFHeaderReduceTask;
import weka.distributed.WekaClassifierMapTask;
import weka.filters.Filter;
import weka.filters.PreconstructedFilter;

/* loaded from: input_file:weka/distributed/hadoop/WekaFoldBasedClassifierHadoopMapper.class */
public class WekaFoldBasedClassifierHadoopMapper extends WekaClassifierHadoopMapper {
    protected WekaClassifierMapTask[] m_tasks;
    protected int m_totalFolds = 1;
    protected boolean m_isUpdateableClassifier;
    protected boolean m_forceBatchForUpdateable;

    @Override // weka.distributed.hadoop.WekaClassifierHadoopMapper
    public void setup(Mapper<LongWritable, Text, Text, BytesWritable>.Context context) throws IOException {
        this.m_rowHelper = new CSVToARFFHeaderMapTask();
        Configuration configuration = context.getConfiguration();
        String str = configuration.get(WekaClassifierHadoopMapper.CLASSIFIER_MAP_TASK_OPTIONS);
        String str2 = configuration.get(CSVToArffHeaderHadoopMapper.CSV_TO_ARFF_HEADER_MAP_TASK_OPTIONS);
        try {
            if (!DistributedJobConfig.isEmpty(str2)) {
                this.m_rowHelper.setOptions(Utils.splitOptions(str2));
            }
            if (DistributedJobConfig.isEmpty(str)) {
                throw new IOException("Can't continue without the name of the ARFF header file!");
            }
            String[] splitOptions = Utils.splitOptions(str);
            String option = Utils.getOption("arff-header", splitOptions);
            if (DistributedJobConfig.isEmpty(option)) {
                throw new IOException("Can't continue without the name of the ARFF header file!");
            }
            this.m_trainingHeader = CSVToARFFHeaderReduceTask.stripSummaryAtts(loadTrainingHeader(option));
            setClassIndex(splitOptions, this.m_trainingHeader, true);
            this.m_rowHelper.initParserOnly(CSVToARFFHeaderMapTask.instanceHeaderToAttributeNameList(this.m_trainingHeader));
            String option2 = Utils.getOption("total-folds", splitOptions);
            if (!DistributedJobConfig.isEmpty(option2)) {
                this.m_totalFolds = Integer.parseInt(option2);
            }
            Utils.getOption("fold-number", splitOptions);
            boolean flag = Utils.getFlag("continue-training-updateable", splitOptions);
            String option3 = Utils.getOption("preconstructed-filter", splitOptions);
            Filter loadPreconstructedFilter = DistributedJobConfig.isEmpty(option3) ? null : loadPreconstructedFilter(option3);
            Environment environment = new Environment();
            environment.addVariable("total.num.maps", "" + configuration.getInt("mapred.map.tasks", 1));
            String[] strArr = (String[]) splitOptions.clone();
            WekaClassifierMapTask wekaClassifierMapTask = new WekaClassifierMapTask();
            wekaClassifierMapTask.setOptions((String[]) strArr.clone());
            this.m_isUpdateableClassifier = wekaClassifierMapTask.getClassifier() instanceof UpdateableClassifier;
            this.m_forceBatchForUpdateable = wekaClassifierMapTask.getForceBatchLearningForUpdateableClassifiers();
            this.m_tasks = new WekaClassifierMapTask[this.m_totalFolds];
            for (int i = 0; i < this.m_totalFolds; i++) {
                this.m_tasks[i] = new WekaClassifierMapTask();
                this.m_tasks[i].setEnvironment(environment);
                this.m_tasks[i].setOptions((String[]) strArr.clone());
                this.m_tasks[i].setFoldNumber(i + 1);
                this.m_tasks[i].setTotalNumFolds(this.m_totalFolds);
                this.m_tasks[i].setContinueTrainingUpdateableClassifier(flag);
                if (loadPreconstructedFilter != null) {
                    this.m_tasks[i].addPreconstructedFilterToUse((PreconstructedFilter) loadPreconstructedFilter);
                }
            }
            if (flag) {
                String option4 = Utils.getOption("model-file-name", splitOptions);
                if (DistributedJobConfig.isEmpty(option4)) {
                    throw new IOException("Continued training of incremental classifier has been specified but no file name for the classifier to load has been specified. Can't continue.");
                }
                for (int i2 = 0; i2 < this.m_totalFolds; i2++) {
                    Classifier loadClassifier = loadClassifier("" + (i2 + 1) + "_" + option4);
                    if (!(loadClassifier instanceof UpdateableClassifier)) {
                        throw new Exception("The classifier loaded is not an UpdateableClassifier (incremental)");
                    }
                    this.m_tasks[i2].setClassifier(loadClassifier);
                }
            }
            for (int i3 = 0; i3 < this.m_totalFolds; i3++) {
                this.m_tasks[i3].setup(this.m_trainingHeader);
            }
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    @Override // weka.distributed.hadoop.WekaClassifierHadoopMapper
    protected void processRow(String str) throws IOException {
        if (str != null) {
            String[] parseRowOnly = this.m_rowHelper.parseRowOnly(str);
            if (parseRowOnly.length != this.m_trainingHeader.numAttributes()) {
                throw new IOException("Parsed a row that contains a different number of values than there are attributes in the training ARFF header: " + str);
            }
            try {
                Instance makeInstance = makeInstance(this.m_rowHelper, this.m_trainingHeader, this.m_isUpdateableClassifier, this.m_forceBatchForUpdateable, parseRowOnly);
                for (int i = 0; i < this.m_totalFolds; i++) {
                    this.m_tasks[i].processInstance(makeInstance);
                }
            } catch (Exception e) {
                throw new IOException(e);
            }
        }
    }

    @Override // weka.distributed.hadoop.WekaClassifierHadoopMapper
    public void cleanup(Mapper<LongWritable, Text, Text, BytesWritable>.Context context) throws IOException, InterruptedException {
        for (int i = 0; i < this.m_totalFolds; i++) {
            try {
                this.m_tasks[i].finalizeTask();
                System.err.println("Model after continued training on fold " + (i + 1) + ":\n" + this.m_tasks[i].getClassifier().toString());
                byte[] classifierToBytes = classifierToBytes(this.m_tasks[i].getClassifier(), this.m_tasks[i].getNumTrainingInstances());
                Text text = new Text();
                text.set("classifier_fold_" + (i + 1));
                BytesWritable bytesWritable = new BytesWritable();
                bytesWritable.set(classifierToBytes, 0, classifierToBytes.length);
                context.write(text, bytesWritable);
                this.m_tasks[i] = null;
            } catch (Exception e) {
                throw new IOException(e);
            }
        }
    }
}
