package weka.distributed.hadoop;

import distributed.core.DistributedJobConfig;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.zip.GZIPInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import weka.classifiers.Classifier;
import weka.core.Instances;
import weka.core.Utils;
import weka.distributed.CSVToARFFHeaderReduceTask;
import weka.distributed.DistributedWekaException;
import weka.distributed.WekaClassifierReduceTask;

/* loaded from: input_file:weka/distributed/hadoop/WekaClassifierHadoopReducer.class */
public class WekaClassifierHadoopReducer extends Reducer<Text, BytesWritable, Text, Text> {
    public static String CLASSIFIER_WRITE_PATH = "*weka.distributed.weka_classifier_write_path";
    public static String MIN_TRAINING_FRACTION = "*weka.distributed.weka_classifier_min_training_fraction";
    protected WekaClassifierReduceTask m_task = null;

    public void setup(Reducer<Text, BytesWritable, Text, Text>.Context context) throws IOException {
        this.m_task = new WekaClassifierReduceTask();
    }

    public void reduce(Text text, Iterable<BytesWritable> iterable, Reducer<Text, BytesWritable, Text, Text>.Context context) throws IOException {
        Configuration configuration = context.getConfiguration();
        String str = configuration.get(CLASSIFIER_WRITE_PATH);
        if (str == null || str.length() == 0) {
            throw new IOException("No destination given for aggregated classifier");
        }
        String str2 = configuration.get(MIN_TRAINING_FRACTION);
        if (!DistributedJobConfig.isEmpty(str2)) {
            double parseDouble = Double.parseDouble(str2);
            if (parseDouble > 1.0d) {
                parseDouble /= 100.0d;
            }
            this.m_task.setMinTrainingFraction(parseDouble);
        }
        try {
            boolean flag = Utils.getFlag("force-vote", Utils.splitOptions(configuration.get(WekaClassifierHadoopMapper.CLASSIFIER_MAP_TASK_OPTIONS)));
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            try {
                Iterator<BytesWritable> it = iterable.iterator();
                while (it.hasNext()) {
                    List<Object> deserialize = deserialize(it.next().getBytes());
                    arrayList.add((Classifier) deserialize.get(0));
                    arrayList2.add((Integer) deserialize.get(1));
                }
                try {
                    Classifier aggregate = this.m_task.aggregate(arrayList, arrayList2, flag);
                    writeClassifierToDestination(aggregate, str, configuration);
                    Text text2 = new Text();
                    text2.set("Summary:\n");
                    Text text3 = new Text();
                    StringBuffer stringBuffer = new StringBuffer();
                    stringBuffer.append("Number of training instances processed by each classifier: ");
                    Iterator it2 = arrayList2.iterator();
                    while (it2.hasNext()) {
                        stringBuffer.append((Integer) it2.next()).append(" ");
                    }
                    if (this.m_task.getDiscarded().size() > 0) {
                        stringBuffer.append("\nThere was one classifier not aggregated because it had seen less than " + (this.m_task.getMinTrainingFraction() * 100.0d) + "% of amount of data (" + this.m_task.getDiscarded().get(0) + " instances) that the others had\n");
                    }
                    text3.set("Number of classifiers aggregated: " + arrayList.size() + ". Final classifier is a " + aggregate.getClass().getName() + "\n" + stringBuffer.toString());
                    context.write(text2, text3);
                    text2.set("Aggregated model:\n");
                    text3.set(aggregate.toString());
                    context.write(text2, text3);
                } catch (Exception e) {
                    throw new IOException(e);
                }
            } catch (ClassNotFoundException e2) {
                throw new IOException(e2);
            }
        } catch (Exception e3) {
            throw new IOException(e3);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Finally extract failed */
    public static void writeClassifierToDestination(Classifier classifier, String str, Configuration configuration) throws DistributedWekaException, IOException {
        Instances instances = null;
        try {
            String[] splitOptions = Utils.splitOptions(configuration.get(WekaClassifierHadoopMapper.CLASSIFIER_MAP_TASK_OPTIONS));
            String option = Utils.getOption("arff-header", splitOptions);
            if (DistributedJobConfig.isEmpty(option)) {
                System.err.println("WekaClassifierHadoopReducer - unable to load training header from the distributed cache. Will only save the classifier.");
            } else {
                instances = CSVToARFFHeaderReduceTask.stripSummaryAtts(WekaClassifierHadoopMapper.loadTrainingHeader(option));
                WekaClassifierHadoopMapper.setClassIndex(splitOptions, instances, true);
            }
            ObjectOutputStream objectOutputStream = null;
            try {
                Path path = new Path(str);
                FileSystem fileSystem = FileSystem.get(configuration);
                if (fileSystem.exists(path)) {
                    fileSystem.delete(path, true);
                }
                objectOutputStream = new ObjectOutputStream(new BufferedOutputStream(fileSystem.create(path)));
                objectOutputStream.writeObject(classifier);
                if (instances != null) {
                    objectOutputStream.writeObject(instances);
                }
                if (objectOutputStream != null) {
                    objectOutputStream.flush();
                    objectOutputStream.close();
                }
                CSVToArffHeaderHadoopReducer.writeHeaderToDestination(instances, str.substring(0, str.lastIndexOf("/")) + "/" + str.substring(str.lastIndexOf("/") + 1, str.length()).replace(".model", "").replace(".MODEL", "") + "_arffHeader.arff", configuration);
            } catch (Throwable th) {
                if (objectOutputStream != null) {
                    objectOutputStream.flush();
                    objectOutputStream.close();
                }
                throw th;
            }
        } catch (Exception e) {
            throw new DistributedWekaException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<Object> deserialize(byte[] bArr) throws IOException, ClassNotFoundException {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
        ObjectInputStream objectInputStream = null;
        ArrayList arrayList = new ArrayList(2);
        try {
            ObjectInputStream objectInputStream2 = new ObjectInputStream(new BufferedInputStream(new GZIPInputStream(byteArrayInputStream)));
            Object readObject = objectInputStream2.readObject();
            if (!(readObject instanceof Classifier)) {
                throw new IOException("Object deserialized was not a Classifier object!");
            }
            Integer num = (Integer) objectInputStream2.readObject();
            arrayList.add(readObject);
            arrayList.add(num);
            if (objectInputStream2 != null) {
                objectInputStream2.close();
            }
            return arrayList;
        } catch (Throwable th) {
            if (0 != 0) {
                objectInputStream.close();
            }
            throw th;
        }
    }

    public /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
        reduce((Text) obj, (Iterable<BytesWritable>) iterable, (Reducer<Text, BytesWritable, Text, Text>.Context) context);
    }
}
