package org.apache.hadoop.mapred;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskTracker;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.util.StringUtils;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.DefaultFileRegion;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.FileRegion;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.jboss.netty.util.CharsetUtil;

/* loaded from: input_file:org/apache/hadoop/mapred/ShuffleHandler.class */
public class ShuffleHandler extends SimpleChannelUpstreamHandler {
    public static final Log LOG = LogFactory.getLog(ShuffleHandler.class);
    private final NettyMapOutputAttributes attributes;
    private int port;
    private final TaskTracker.ShuffleServerMetrics shuffleMetrics;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/mapred/ShuffleHandler$ChanneFutureListenerMetrics.class */
    public class ChanneFutureListenerMetrics implements ChannelFutureListener {
        private final FileRegion partition;

        private ChanneFutureListenerMetrics(FileRegion fileRegion) {
            this.partition = fileRegion;
        }

        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            this.partition.releaseExternalResources();
            ShuffleHandler.this.shuffleMetrics.successOutput();
        }
    }

    public ShuffleHandler(NettyMapOutputAttributes nettyMapOutputAttributes, int i) {
        this.attributes = nettyMapOutputAttributes;
        this.port = i;
        this.shuffleMetrics = nettyMapOutputAttributes.getShuffleServerMetrics();
    }

    private List<String> splitMaps(List<String> list) {
        if (null == list) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            Collections.addAll(arrayList, it.next().split(StringUtils.COMMA_STR));
        }
        return arrayList;
    }

    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
        HttpRequest httpRequest = (HttpRequest) messageEvent.getMessage();
        if (httpRequest.getMethod() != HttpMethod.GET) {
            sendError(channelHandlerContext, HttpResponseStatus.METHOD_NOT_ALLOWED);
            return;
        }
        Map parameters = new QueryStringDecoder(httpRequest.getUri()).getParameters();
        List<String> splitMaps = splitMaps((List) parameters.get("map"));
        List list = (List) parameters.get("reduce");
        List list2 = (List) parameters.get("job");
        if (LOG.isDebugEnabled()) {
            LOG.debug("RECV: " + httpRequest.getUri() + "\n  mapId: " + splitMaps + "\n  reduceId: " + list + "\n  jobId: " + list2);
        }
        if (splitMaps == null || list == null || list2 == null) {
            sendError(channelHandlerContext, "Required param job, map and reduce", HttpResponseStatus.BAD_REQUEST);
            return;
        }
        if (list.size() != 1 || list2.size() != 1) {
            sendError(channelHandlerContext, "Too many job/reduce parameters", HttpResponseStatus.BAD_REQUEST);
            return;
        }
        try {
            int parseInt = Integer.parseInt((String) list.get(0));
            String str = (String) list2.get(0);
            if (null == httpRequest.getUri()) {
                sendError(channelHandlerContext, HttpResponseStatus.FORBIDDEN);
                return;
            }
            if (splitMaps.size() > 1) {
                throw new IllegalArgumentException("Doesn't support more than one map id.  Current requst is asking for " + splitMaps.size());
            }
            Channel channel = messageEvent.getChannel();
            ChannelFuture channelFuture = null;
            Iterator<String> it = splitMaps.iterator();
            while (it.hasNext()) {
                try {
                    channelFuture = sendMapOutput(channelHandlerContext, channel, str, it.next(), parseInt);
                    if (null == channelFuture) {
                        sendError(channelHandlerContext, HttpResponseStatus.NOT_FOUND);
                        return;
                    }
                } catch (IOException e) {
                    LOG.error("Shuffle error ", e);
                    this.shuffleMetrics.failedOutput();
                    sendError(channelHandlerContext, e.getMessage(), HttpResponseStatus.INTERNAL_SERVER_ERROR);
                    return;
                }
            }
            channelFuture.addListener(ChannelFutureListener.CLOSE);
        } catch (NumberFormatException e2) {
            sendError(channelHandlerContext, "Bad reduce parameter", HttpResponseStatus.BAD_REQUEST);
        } catch (IllegalArgumentException e3) {
            sendError(channelHandlerContext, "Bad job parameter", HttpResponseStatus.BAD_REQUEST);
        }
    }

    protected ChannelFuture sendMapOutput(ChannelHandlerContext channelHandlerContext, Channel channel, String str, String str2, int i) throws IOException {
        LocalDirAllocator localDirAllocator = this.attributes.getLocalDirAllocator();
        ((LocalFileSystem) this.attributes.getLocalFS()).getRaw();
        TaskTracker.ShuffleServerMetrics shuffleServerMetrics = this.attributes.getShuffleServerMetrics();
        TaskTracker taskTracker = this.attributes.getTaskTracker();
        Path localPathToRead = localDirAllocator.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(str, str2) + "/file.out.index", this.attributes.getJobConf());
        Path localPathToRead2 = localDirAllocator.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(str, str2) + "/file.out", this.attributes.getJobConf());
        IndexRecord indexInformation = taskTracker.getIndexInformation(str2, i, localPathToRead);
        DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        defaultHttpResponse.setHeader(MRConstants.FROM_MAP_TASK, str2);
        defaultHttpResponse.setHeader(MRConstants.RAW_MAP_OUTPUT_LENGTH, Long.toString(indexInformation.rawLength));
        defaultHttpResponse.setHeader(MRConstants.MAP_OUTPUT_LENGTH, Long.toString(indexInformation.partLength));
        defaultHttpResponse.setHeader(MRConstants.FOR_REDUCE_TASK, Integer.toString(i));
        channel.write(defaultHttpResponse);
        File file = new File(localPathToRead2.toString());
        try {
            DefaultFileRegion defaultFileRegion = new DefaultFileRegion(new RandomAccessFile(file, "r").getChannel(), indexInformation.startOffset, indexInformation.partLength);
            ChannelFuture write = channel.write(defaultFileRegion);
            write.addListener(new ChanneFutureListenerMetrics(defaultFileRegion));
            shuffleServerMetrics.outputBytes(indexInformation.partLength);
            LOG.info("Sending out " + indexInformation.partLength + " bytes for reduce: " + i + " from map: " + str2 + " given " + indexInformation.partLength + "/" + indexInformation.rawLength);
            return write;
        } catch (FileNotFoundException e) {
            LOG.info(file + " not found");
            return null;
        }
    }

    private void sendError(ChannelHandlerContext channelHandlerContext, HttpResponseStatus httpResponseStatus) {
        sendError(channelHandlerContext, NodeBase.ROOT, httpResponseStatus);
    }

    private void sendError(ChannelHandlerContext channelHandlerContext, String str, HttpResponseStatus httpResponseStatus) {
        DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus);
        defaultHttpResponse.setHeader("Content-Type", "text/plain; charset=UTF-8");
        defaultHttpResponse.setContent(ChannelBuffers.copiedBuffer(str, CharsetUtil.UTF_8));
        channelHandlerContext.getChannel().write(defaultHttpResponse).addListener(ChannelFutureListener.CLOSE);
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) throws Exception {
        Channel channel = exceptionEvent.getChannel();
        Throwable cause = exceptionEvent.getCause();
        if (cause instanceof TooLongFrameException) {
            sendError(channelHandlerContext, HttpResponseStatus.BAD_REQUEST);
            return;
        }
        LOG.error("Shuffle error: ", cause);
        this.shuffleMetrics.failedOutput();
        if (channel.isConnected()) {
            LOG.error("Shuffle error " + exceptionEvent);
            sendError(channelHandlerContext, HttpResponseStatus.INTERNAL_SERVER_ERROR);
        }
    }
}
