package weka.distributed.hadoop;

import distributed.core.DistributedJobConfig;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.zip.GZIPInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import weka.core.Instances;
import weka.core.Utils;
import weka.distributed.DistributedWekaException;
import weka.distributed.KMeansMapTask;
import weka.distributed.KMeansReduceTask;

/* loaded from: input_file:weka/distributed/hadoop/KMeansHadoopReducer.class */
public class KMeansHadoopReducer extends Reducer<Text, BytesWritable, Text, Text> {
    public static final String KMEANS_WRITE_PATH = "*weka.distributed.kmeans_write_path";
    public static final String KMEANS_REDUCE_FILE_PREFIX = "reduce_run";
    protected String m_outputDestination;
    protected int m_iterationNumber;

    public void setup(Reducer<Text, BytesWritable, Text, Text>.Context context) throws IOException {
        Configuration configuration = context.getConfiguration();
        this.m_outputDestination = configuration.get(KMEANS_WRITE_PATH);
        if (DistributedJobConfig.isEmpty(this.m_outputDestination)) {
            throw new IOException("No output path for centroids supplied!");
        }
        String str = configuration.get(KMeansHadoopMapper.KMEANS_MAP_TASK_OPTIONS);
        try {
            if (!DistributedJobConfig.isEmpty(str)) {
                String option = Utils.getOption("iteration", Utils.splitOptions(str));
                if (DistributedJobConfig.isEmpty(option)) {
                    throw new IOException("Unable to continue without knowing the current iteration number!");
                }
                try {
                    this.m_iterationNumber = Integer.parseInt(option);
                } catch (NumberFormatException e) {
                    throw new IOException(e);
                }
            }
        } catch (Exception e2) {
            throw new IOException(e2);
        }
    }

    public void reduce(Text text, Iterable<BytesWritable> iterable, Reducer<Text, BytesWritable, Text, Text>.Context context) throws IOException {
        try {
            int parseInt = Integer.parseInt(text.toString().replace("run", ""));
            Instances instances = null;
            ArrayList arrayList = new ArrayList();
            try {
                Iterator<BytesWritable> it = iterable.iterator();
                while (it.hasNext()) {
                    KMeansMapTask deserialize = deserialize(it.next().getBytes());
                    if (instances == null) {
                        instances = deserialize.getTransformedHeader();
                    }
                    arrayList.add(deserialize.getCentroidStats());
                }
                if (instances == null || arrayList.size() <= 0) {
                    if (instances == null) {
                        throw new IOException("Was unable to get the transformed header from the KMeansMapTasks!");
                    }
                    if (arrayList.size() == 0) {
                        throw new IOException("There were no custer summaries to aggregate!");
                    }
                    return;
                }
                try {
                    KMeansReduceTask reduceClusters = new KMeansReduceTask().reduceClusters(parseInt, this.m_iterationNumber, instances, arrayList);
                    writeReduceTaskToDestination(reduceClusters, this.m_outputDestination, parseInt, context.getConfiguration());
                    System.err.println("Wrote reducer for run: " + parseInt + ". Total within clust err: " + reduceClusters.getTotalWithinClustersError());
                } catch (DistributedWekaException e) {
                    throw new IOException((Throwable) e);
                }
            } catch (Exception e2) {
                throw new IOException(e2);
            }
        } catch (NumberFormatException e3) {
            throw new IOException(e3);
        }
    }

    protected static void writeReduceTaskToDestination(KMeansReduceTask kMeansReduceTask, String str, int i, Configuration configuration) throws IOException {
        ObjectOutputStream objectOutputStream = null;
        try {
            Path path = new Path(str + "/" + KMEANS_REDUCE_FILE_PREFIX + i);
            FileSystem fileSystem = FileSystem.get(configuration);
            if (fileSystem.exists(path)) {
                fileSystem.delete(path, true);
            }
            objectOutputStream = new ObjectOutputStream(new BufferedOutputStream(fileSystem.create(path)));
            objectOutputStream.writeObject(kMeansReduceTask);
            if (objectOutputStream != null) {
                objectOutputStream.flush();
                objectOutputStream.close();
            }
        } catch (Throwable th) {
            if (objectOutputStream != null) {
                objectOutputStream.flush();
                objectOutputStream.close();
            }
            throw th;
        }
    }

    protected static KMeansMapTask deserialize(byte[] bArr) throws IOException, ClassNotFoundException {
        ObjectInputStream objectInputStream = null;
        try {
            objectInputStream = new ObjectInputStream(new BufferedInputStream(new GZIPInputStream(new ByteArrayInputStream(bArr))));
            Object readObject = objectInputStream.readObject();
            if (!(readObject instanceof KMeansMapTask)) {
                throw new IOException("Object deserialized was not a KMeansMapTask object!");
            }
            if (objectInputStream != null) {
                objectInputStream.close();
            }
            return (KMeansMapTask) readObject;
        } catch (Throwable th) {
            if (objectInputStream != null) {
                objectInputStream.close();
            }
            throw th;
        }
    }

    public /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
        reduce((Text) obj, (Iterable<BytesWritable>) iterable, (Reducer<Text, BytesWritable, Text, Text>.Context) context);
    }
}
