package weka.distributed.hadoop;

import distributed.core.DistributedJobConfig;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.zip.GZIPInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import weka.clusterers.CentroidSketch;
import weka.core.Instances;
import weka.core.Utils;
import weka.distributed.KMeansMapTask;
import weka.distributed.KMeansReduceTask;

/* loaded from: input_file:weka/distributed/hadoop/KMeansCentroidSketchHadoopReducer.class */
public class KMeansCentroidSketchHadoopReducer extends Reducer<Text, BytesWritable, Text, Text> {
    public static final String SKETCH_WRITE_PATH = "*weka.distributed.centroid_sketch_write_path";
    protected String m_outputDestination;
    protected boolean m_isFirstIteration;
    protected Instances m_transformedHeaderNoSummary;

    public void setup(Reducer<Text, BytesWritable, Text, Text>.Context context) throws IOException {
        Configuration configuration = context.getConfiguration();
        this.m_outputDestination = configuration.get(SKETCH_WRITE_PATH);
        if (DistributedJobConfig.isEmpty(this.m_outputDestination)) {
            throw new IOException("No output path for centroid sketches supplied!");
        }
        String str = configuration.get(KMeansCentroidSketchHadoopMapper.CENTROID_SKETCH_MAP_TASK_OPTIONS);
        try {
            if (!DistributedJobConfig.isEmpty(str)) {
                String[] splitOptions = Utils.splitOptions(str);
                String option = Utils.getOption("arff-header", splitOptions);
                if (DistributedJobConfig.isEmpty(option)) {
                    throw new IOException("Can't continue without the name of the ARFF header file!");
                }
                Instances loadTrainingHeader = WekaClassifierHadoopMapper.loadTrainingHeader(option);
                this.m_isFirstIteration = Utils.getFlag("first-iteration", splitOptions);
                KMeansMapTask kMeansMapTask = new KMeansMapTask();
                kMeansMapTask.setOptions(splitOptions);
                this.m_transformedHeaderNoSummary = kMeansMapTask.init(loadTrainingHeader);
            }
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    public void reduce(Text text, Iterable<BytesWritable> iterable, Reducer<Text, BytesWritable, Text, Text>.Context context) throws IOException {
        try {
            int parseInt = Integer.parseInt(text.toString().replace("run", ""));
            CentroidSketch centroidSketch = null;
            ArrayList arrayList = new ArrayList();
            try {
                Iterator<BytesWritable> it = iterable.iterator();
                while (it.hasNext()) {
                    CentroidSketch deserialize = deserialize(it.next().getBytes());
                    if (centroidSketch == null) {
                        centroidSketch = deserialize;
                    } else {
                        centroidSketch.aggregateReservoir(deserialize.getReservoirSample());
                    }
                    if (this.m_isFirstIteration) {
                        arrayList.add(deserialize.getDistanceFunction());
                    }
                }
                centroidSketch.addReservoirToCurrentSketch();
                if (this.m_isFirstIteration) {
                    centroidSketch.getDistanceFunction().setInstances(KMeansReduceTask.computeDistancePrimingDataFromDistanceFunctions(arrayList, this.m_transformedHeaderNoSummary));
                }
                writeSketchToDestination(centroidSketch, this.m_outputDestination, parseInt, context.getConfiguration());
                System.err.println("Number of instances in sketch for run " + parseInt + ": " + centroidSketch.getCurrentSketch().numInstances());
                Text text2 = new Text();
                text2.set("Summary:\n");
                Text text3 = new Text();
                text3.set("Number of instances in sketch for run " + parseInt + ": " + centroidSketch.getCurrentSketch().numInstances());
                context.write(text2, text3);
            } catch (Exception e) {
                throw new IOException(e);
            }
        } catch (NumberFormatException e2) {
            throw new IOException(e2);
        }
    }

    protected static void writeSketchToDestination(CentroidSketch centroidSketch, String str, int i, Configuration configuration) throws IOException {
        ObjectOutputStream objectOutputStream = null;
        try {
            Path path = new Path(str + "/sketch_run" + i);
            FileSystem fileSystem = FileSystem.get(configuration);
            if (fileSystem.exists(path)) {
                fileSystem.delete(path, true);
            }
            objectOutputStream = new ObjectOutputStream(new BufferedOutputStream(fileSystem.create(path)));
            objectOutputStream.writeObject(centroidSketch);
            if (objectOutputStream != null) {
                objectOutputStream.flush();
                objectOutputStream.close();
            }
        } catch (Throwable th) {
            if (objectOutputStream != null) {
                objectOutputStream.flush();
                objectOutputStream.close();
            }
            throw th;
        }
    }

    protected static CentroidSketch deserialize(byte[] bArr) throws IOException, ClassNotFoundException {
        ObjectInputStream objectInputStream = null;
        try {
            objectInputStream = new ObjectInputStream(new BufferedInputStream(new GZIPInputStream(new ByteArrayInputStream(bArr))));
            Object readObject = objectInputStream.readObject();
            if (!(readObject instanceof CentroidSketch)) {
                throw new IOException("Object deserialized was not a CentroidSketch object!");
            }
            if (objectInputStream != null) {
                objectInputStream.close();
            }
            return (CentroidSketch) readObject;
        } catch (Throwable th) {
            if (objectInputStream != null) {
                objectInputStream.close();
            }
            throw th;
        }
    }

    public /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
        reduce((Text) obj, (Iterable<BytesWritable>) iterable, (Reducer<Text, BytesWritable, Text, Text>.Context) context);
    }
}
