/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred.nativetask.handlers;

import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.nativetask.Command;
import org.apache.hadoop.mapred.nativetask.CommandDispatcher;
import org.apache.hadoop.mapred.nativetask.DataChannel;
import org.apache.hadoop.mapred.nativetask.ICombineHandler;
import org.apache.hadoop.mapred.nativetask.INativeHandler;
import org.apache.hadoop.mapred.nativetask.NativeBatchProcessor;
import org.apache.hadoop.mapred.nativetask.TaskContext;
import org.apache.hadoop.mapred.nativetask.handlers.BufferPuller;
import org.apache.hadoop.mapred.nativetask.handlers.BufferPusher;
import org.apache.hadoop.mapred.nativetask.handlers.NativeCollectorOnlyHandler;
import org.apache.hadoop.mapred.nativetask.serde.SerializationFramework;
import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
import org.apache.hadoop.mapreduce.TaskCounter;

class CombinerHandler<K, V>
implements ICombineHandler,
CommandDispatcher {
    public static final String NAME = "NativeTask.CombineHandler";
    private static Log LOG = LogFactory.getLog(NativeCollectorOnlyHandler.class);
    public static final Command LOAD = new Command(1, "Load");
    public static final Command COMBINE = new Command(4, "Combine");
    public final Task.CombinerRunner<K, V> combinerRunner;
    private final INativeHandler nativeHandler;
    private final BufferPuller puller;
    private final BufferPusher<K, V> kvPusher;
    private boolean closed = false;

    public static <K, V> ICombineHandler create(TaskContext context) throws IOException, ClassNotFoundException {
        JobConf conf = new JobConf((Configuration)context.getConf());
        conf.set("SerializationFramework", String.valueOf(SerializationFramework.WRITABLE_SERIALIZATION.getType()));
        String combinerClazz = conf.get("mapred.combiner.class");
        if (null == combinerClazz) {
            combinerClazz = conf.get("mapreduce.job.combine.class");
        }
        if (null == combinerClazz) {
            return null;
        }
        LOG.info((Object)("NativeTask Combiner is enabled, class = " + combinerClazz));
        Counters.Counter combineInputCounter = context.getTaskReporter().getCounter((Enum)TaskCounter.COMBINE_INPUT_RECORDS);
        Task.CombinerRunner combinerRunner = Task.CombinerRunner.create((JobConf)conf, (TaskAttemptID)context.getTaskAttemptId(), (Counters.Counter)combineInputCounter, (Task.TaskReporter)context.getTaskReporter(), null);
        INativeHandler nativeHandler = NativeBatchProcessor.create(NAME, (Configuration)conf, DataChannel.INOUT);
        BufferPusher pusher = new BufferPusher(context.getInputKeyClass(), context.getInputValueClass(), nativeHandler);
        BufferPuller puller = new BufferPuller(nativeHandler);
        return new CombinerHandler(nativeHandler, combinerRunner, puller, pusher);
    }

    public CombinerHandler(INativeHandler nativeHandler, Task.CombinerRunner<K, V> combiner, BufferPuller puller, BufferPusher<K, V> kvPusher) throws IOException {
        this.nativeHandler = nativeHandler;
        this.combinerRunner = combiner;
        this.puller = puller;
        this.kvPusher = kvPusher;
        nativeHandler.setCommandDispatcher(this);
        nativeHandler.setDataReceiver(puller);
    }

    @Override
    public ReadWriteBuffer onCall(Command command, ReadWriteBuffer parameter) throws IOException {
        if (null == command) {
            return null;
        }
        if (command.equals(COMBINE)) {
            this.combine();
        }
        return null;
    }

    @Override
    public void combine() throws IOException {
        try {
            this.puller.reset();
            this.combinerRunner.combine((RawKeyValueIterator)this.puller, this.kvPusher);
            this.kvPusher.flush();
            return;
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    @Override
    public long getId() {
        return this.nativeHandler.getNativeHandler();
    }

    @Override
    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        if (null != this.puller) {
            this.puller.close();
        }
        if (null != this.kvPusher) {
            this.kvPusher.close();
        }
        if (null != this.nativeHandler) {
            this.nativeHandler.close();
        }
        this.closed = true;
    }
}

