package org.apache.tez.auxservices;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.tez.auxservices.ShuffleHandler;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/auxservices/TestShuffleHandler.class */
public class TestShuffleHandler {
    static final long MiB = 1048576;
    private static final Logger LOG = LoggerFactory.getLogger(TestShuffleHandler.class);

    /* loaded from: input_file:org/apache/tez/auxservices/TestShuffleHandler$LastSocketAddress.class */
    static class LastSocketAddress {
        SocketAddress lastAddress;

        LastSocketAddress() {
        }

        void setAddress(SocketAddress socketAddress) {
            this.lastAddress = socketAddress;
        }

        SocketAddress getSocketAddres() {
            return this.lastAddress;
        }
    }

    /* loaded from: input_file:org/apache/tez/auxservices/TestShuffleHandler$MockShuffleHandler.class */
    class MockShuffleHandler extends ShuffleHandler {
        MockShuffleHandler() {
        }

        protected ShuffleHandler.Shuffle getShuffle(Configuration configuration) {
            return new ShuffleHandler.Shuffle(configuration) { // from class: org.apache.tez.auxservices.TestShuffleHandler.MockShuffleHandler.1
                protected void verifyRequest(String str, ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, HttpResponse httpResponse, URL url) throws IOException {
                }

                protected ShuffleHandler.Shuffle.MapOutputInfo getMapOutputInfo(String str, String str2, ShuffleHandler.Range range, String str3, String str4) throws IOException {
                    return null;
                }

                protected void populateHeaders(List<String> list, String str, String str2, String str3, ShuffleHandler.Range range, HttpResponse httpResponse, boolean z, Map<String, ShuffleHandler.Shuffle.MapOutputInfo> map) throws IOException {
                }

                protected ChannelFuture sendMapOutput(ChannelHandlerContext channelHandlerContext, Channel channel, String str, String str2, ShuffleHandler.Range range, ShuffleHandler.Shuffle.MapOutputInfo mapOutputInfo) throws IOException {
                    ShuffleHeader shuffleHeader = new ShuffleHeader("attempt_12345_1_m_1_0", 5678L, 5678L, 1);
                    DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
                    shuffleHeader.write(dataOutputBuffer);
                    channel.write(ChannelBuffers.wrappedBuffer(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength()));
                    DataOutputBuffer dataOutputBuffer2 = new DataOutputBuffer();
                    for (int i = 0; i < 100; i++) {
                        shuffleHeader.write(dataOutputBuffer2);
                    }
                    return channel.write(ChannelBuffers.wrappedBuffer(dataOutputBuffer2.getData(), 0, dataOutputBuffer2.getLength()));
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/tez/auxservices/TestShuffleHandler$MockShuffleHandler2.class */
    private static class MockShuffleHandler2 extends ShuffleHandler {
        boolean socketKeepAlive;

        private MockShuffleHandler2() {
            this.socketKeepAlive = false;
        }

        protected ShuffleHandler.Shuffle getShuffle(Configuration configuration) {
            return new ShuffleHandler.Shuffle(configuration) { // from class: org.apache.tez.auxservices.TestShuffleHandler.MockShuffleHandler2.1
                protected void verifyRequest(String str, ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, HttpResponse httpResponse, URL url) throws IOException {
                    SocketChannel channel = channelHandlerContext.getChannel();
                    MockShuffleHandler2.this.socketKeepAlive = channel.getConfig().isKeepAlive();
                }
            };
        }

        protected boolean isSocketKeepAlive() {
            return this.socketKeepAlive;
        }
    }

    @Test(timeout = 10000)
    public void testSerializeMeta() throws Exception {
        Assert.assertEquals(1L, ShuffleHandler.deserializeMetaData(ShuffleHandler.serializeMetaData(1)));
        Assert.assertEquals(-1L, ShuffleHandler.deserializeMetaData(ShuffleHandler.serializeMetaData(-1)));
        Assert.assertEquals(8080L, ShuffleHandler.deserializeMetaData(ShuffleHandler.serializeMetaData(8080)));
    }

    @Test(timeout = 10000)
    public void testShuffleMetrics() throws Exception {
        MetricsSystemImpl metricsSystemImpl = new MetricsSystemImpl();
        ShuffleHandler shuffleHandler = new ShuffleHandler(metricsSystemImpl);
        ChannelFuture channelFuture = (ChannelFuture) Mockito.mock(ChannelFuture.class);
        Mockito.when(Boolean.valueOf(channelFuture.isSuccess())).thenReturn(true, new Boolean[]{false});
        shuffleHandler.metrics.shuffleConnections.incr();
        shuffleHandler.metrics.shuffleOutputBytes.incr(MiB);
        shuffleHandler.metrics.shuffleConnections.incr();
        shuffleHandler.metrics.shuffleOutputBytes.incr(2097152L);
        checkShuffleMetrics(metricsSystemImpl, 3145728L, 0, 0, 2);
        shuffleHandler.metrics.operationComplete(channelFuture);
        shuffleHandler.metrics.operationComplete(channelFuture);
        checkShuffleMetrics(metricsSystemImpl, 3145728L, 1, 1, 0);
    }

    static void checkShuffleMetrics(MetricsSystem metricsSystem, long j, int i, int i2, int i3) {
    }

    @Test(timeout = 10000)
    public void testClientClosesConnection() throws Exception {
        final ArrayList arrayList = new ArrayList(1);
        Configuration configuration = new Configuration();
        configuration.setInt("tez.shuffle.port", 0);
        ShuffleHandler shuffleHandler = new ShuffleHandler() { // from class: org.apache.tez.auxservices.TestShuffleHandler.1
            protected ShuffleHandler.Shuffle getShuffle(Configuration configuration2) {
                return new ShuffleHandler.Shuffle(configuration2) { // from class: org.apache.tez.auxservices.TestShuffleHandler.1.1
                    protected ShuffleHandler.Shuffle.MapOutputInfo getMapOutputInfo(String str, String str2, ShuffleHandler.Range range, String str3, String str4) throws IOException {
                        return null;
                    }

                    protected void populateHeaders(List<String> list, String str, String str2, String str3, ShuffleHandler.Range range, HttpResponse httpResponse, boolean z, Map<String, ShuffleHandler.Shuffle.MapOutputInfo> map) throws IOException {
                        super.setResponseHeaders(httpResponse, z, 100L);
                    }

                    protected void verifyRequest(String str, ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, HttpResponse httpResponse, URL url) throws IOException {
                    }

                    protected ChannelFuture sendMapOutput(ChannelHandlerContext channelHandlerContext, Channel channel, String str, String str2, ShuffleHandler.Range range, ShuffleHandler.Shuffle.MapOutputInfo mapOutputInfo) throws IOException {
                        ShuffleHeader shuffleHeader = new ShuffleHeader("attempt_12345_1_m_1_0", 5678L, 5678L, 1);
                        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
                        shuffleHeader.write(dataOutputBuffer);
                        channel.write(ChannelBuffers.wrappedBuffer(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength()));
                        DataOutputBuffer dataOutputBuffer2 = new DataOutputBuffer();
                        for (int i = 0; i < 100000; i++) {
                            shuffleHeader.write(dataOutputBuffer2);
                        }
                        return channel.write(ChannelBuffers.wrappedBuffer(dataOutputBuffer2.getData(), 0, dataOutputBuffer2.getLength()));
                    }

                    protected void sendError(ChannelHandlerContext channelHandlerContext, HttpResponseStatus httpResponseStatus) {
                        if (arrayList.size() == 0) {
                            arrayList.add(new Error());
                            channelHandlerContext.getChannel().close();
                        }
                    }

                    protected void sendError(ChannelHandlerContext channelHandlerContext, String str, HttpResponseStatus httpResponseStatus) {
                        if (arrayList.size() == 0) {
                            arrayList.add(new Error());
                            channelHandlerContext.getChannel().close();
                        }
                    }
                };
            }
        };
        shuffleHandler.init(configuration);
        shuffleHandler.start();
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get("tez.shuffle.port") + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_1_0").openConnection();
        httpURLConnection.setRequestProperty("name", "mapreduce");
        httpURLConnection.setRequestProperty("version", "1.0.0");
        httpURLConnection.connect();
        DataInputStream dataInputStream = new DataInputStream(httpURLConnection.getInputStream());
        Assert.assertEquals(200L, httpURLConnection.getResponseCode());
        Assert.assertEquals("close", httpURLConnection.getHeaderField("Connection"));
        new ShuffleHeader().readFields(dataInputStream);
        dataInputStream.close();
        shuffleHandler.stop();
        Assert.assertTrue("sendError called when client closed connection", arrayList.size() == 0);
    }

    @Test(timeout = 10000)
    public void testKeepAlive() throws Exception {
        final ArrayList arrayList = new ArrayList(1);
        Configuration configuration = new Configuration();
        configuration.setInt("tez.shuffle.port", 0);
        configuration.setBoolean("tez.shuffle.connection-keep-alive.enable", true);
        configuration.setInt("tez.shuffle.connection-keep-alive.timeout", -100);
        final LastSocketAddress lastSocketAddress = new LastSocketAddress();
        ShuffleHandler shuffleHandler = new ShuffleHandler() { // from class: org.apache.tez.auxservices.TestShuffleHandler.2
            protected ShuffleHandler.Shuffle getShuffle(Configuration configuration2) {
                return new ShuffleHandler.Shuffle(configuration2) { // from class: org.apache.tez.auxservices.TestShuffleHandler.2.1
                    protected ShuffleHandler.Shuffle.MapOutputInfo getMapOutputInfo(String str, String str2, ShuffleHandler.Range range, String str3, String str4) throws IOException {
                        return null;
                    }

                    protected void verifyRequest(String str, ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, HttpResponse httpResponse, URL url) throws IOException {
                    }

                    protected void populateHeaders(List<String> list, String str, String str2, String str3, ShuffleHandler.Range range, HttpResponse httpResponse, boolean z, Map<String, ShuffleHandler.Shuffle.MapOutputInfo> map) throws IOException {
                        ShuffleHeader shuffleHeader = new ShuffleHeader("attempt_12345_1_m_1_0", 5678L, 5678L, 1);
                        shuffleHeader.write(new DataOutputBuffer());
                        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
                        for (int i = 0; i < 100000; i++) {
                            shuffleHeader.write(dataOutputBuffer);
                        }
                        long length = dataOutputBuffer.getLength();
                        if (z) {
                            AnonymousClass2.this.connectionKeepAliveEnabled = false;
                        }
                        super.setResponseHeaders(httpResponse, z, length);
                    }

                    protected ChannelFuture sendMapOutput(ChannelHandlerContext channelHandlerContext, Channel channel, String str, String str2, ShuffleHandler.Range range, ShuffleHandler.Shuffle.MapOutputInfo mapOutputInfo) throws IOException {
                        lastSocketAddress.setAddress(channel.getRemoteAddress());
                        new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                        ShuffleHeader shuffleHeader = new ShuffleHeader("attempt_12345_1_m_1_0", 5678L, 5678L, 1);
                        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
                        shuffleHeader.write(dataOutputBuffer);
                        channel.write(ChannelBuffers.wrappedBuffer(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength()));
                        DataOutputBuffer dataOutputBuffer2 = new DataOutputBuffer();
                        for (int i = 0; i < 100000; i++) {
                            shuffleHeader.write(dataOutputBuffer2);
                        }
                        return channel.write(ChannelBuffers.wrappedBuffer(dataOutputBuffer2.getData(), 0, dataOutputBuffer2.getLength()));
                    }

                    protected void sendError(ChannelHandlerContext channelHandlerContext, HttpResponseStatus httpResponseStatus) {
                        if (arrayList.size() == 0) {
                            arrayList.add(new Error());
                            channelHandlerContext.getChannel().close();
                        }
                    }

                    protected void sendError(ChannelHandlerContext channelHandlerContext, String str, HttpResponseStatus httpResponseStatus) {
                        if (arrayList.size() == 0) {
                            arrayList.add(new Error());
                            channelHandlerContext.getChannel().close();
                        }
                    }
                };
            }
        };
        shuffleHandler.init(configuration);
        shuffleHandler.start();
        String str = "http://127.0.0.1:" + shuffleHandler.getConfig().get("tez.shuffle.port");
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL(str + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_1_0").openConnection();
        httpURLConnection.setRequestProperty("name", "mapreduce");
        httpURLConnection.setRequestProperty("version", "1.0.0");
        httpURLConnection.connect();
        DataInputStream dataInputStream = new DataInputStream(httpURLConnection.getInputStream());
        Assert.assertEquals("keep-alive", httpURLConnection.getHeaderField("Connection"));
        Assert.assertEquals("timeout=1", httpURLConnection.getHeaderField("keep-alive"));
        Assert.assertEquals(200L, httpURLConnection.getResponseCode());
        new ShuffleHeader().readFields(dataInputStream);
        do {
        } while (dataInputStream.read(new byte[1024]) != -1);
        SocketAddress socketAddres = lastSocketAddress.getSocketAddres();
        dataInputStream.close();
        HttpURLConnection httpURLConnection2 = (HttpURLConnection) new URL(str + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_1_0&keepAlive=true").openConnection();
        httpURLConnection2.setRequestProperty("name", "mapreduce");
        httpURLConnection2.setRequestProperty("version", "1.0.0");
        httpURLConnection2.connect();
        DataInputStream dataInputStream2 = new DataInputStream(httpURLConnection2.getInputStream());
        Assert.assertEquals("keep-alive", httpURLConnection2.getHeaderField("Connection"));
        Assert.assertEquals("timeout=1", httpURLConnection2.getHeaderField("keep-alive"));
        Assert.assertEquals(200L, httpURLConnection2.getResponseCode());
        new ShuffleHeader().readFields(dataInputStream2);
        dataInputStream2.close();
        SocketAddress socketAddres2 = lastSocketAddress.getSocketAddres();
        Assert.assertNotNull("Initial shuffle address should not be null", socketAddres);
        Assert.assertNotNull("Keep-Alive shuffle address should not be null", socketAddres2);
        Assert.assertEquals("Initial shuffle address and keep-alive shuffle address should be the same", socketAddres, socketAddres2);
    }

    @Test
    public void testSocketKeepAlive() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInt("tez.shuffle.port", 0);
        configuration.setBoolean("tez.shuffle.connection-keep-alive.enable", true);
        configuration.setInt("tez.shuffle.connection-keep-alive.timeout", -100);
        HttpURLConnection httpURLConnection = null;
        MockShuffleHandler2 mockShuffleHandler2 = new MockShuffleHandler2();
        try {
            mockShuffleHandler2.init(configuration);
            mockShuffleHandler2.start();
            httpURLConnection = (HttpURLConnection) new URL(("http://127.0.0.1:" + mockShuffleHandler2.getConfig().get("tez.shuffle.port")) + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_1_0").openConnection();
            httpURLConnection.setRequestProperty("name", "mapreduce");
            httpURLConnection.setRequestProperty("version", "1.0.0");
            httpURLConnection.connect();
            httpURLConnection.getInputStream();
            Assert.assertTrue("socket should be set KEEP_ALIVE", mockShuffleHandler2.isSocketKeepAlive());
            if (httpURLConnection != null) {
                httpURLConnection.disconnect();
            }
            mockShuffleHandler2.stop();
        } catch (Throwable th) {
            if (httpURLConnection != null) {
                httpURLConnection.disconnect();
            }
            mockShuffleHandler2.stop();
            throw th;
        }
    }

    @Test(timeout = 10000)
    public void testIncompatibleShuffleVersion() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInt("tez.shuffle.port", 0);
        ShuffleHandler shuffleHandler = new ShuffleHandler();
        shuffleHandler.init(configuration);
        shuffleHandler.start();
        URL url = new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get("tez.shuffle.port") + "/mapOutput?job=job_12345_1&&dag=1reduce=1&map=attempt_12345_1_m_1_0");
        int i = 0;
        while (i < 3) {
            HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
            httpURLConnection.setRequestProperty("name", i == 0 ? "mapreduce" : "other");
            httpURLConnection.setRequestProperty("version", i == 1 ? "1.0.0" : "1.0.1");
            httpURLConnection.connect();
            Assert.assertEquals(400L, httpURLConnection.getResponseCode());
            i++;
        }
        shuffleHandler.stop();
        shuffleHandler.close();
    }

    @Test(timeout = 10000)
    public void testMaxConnections() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInt("tez.shuffle.port", 0);
        configuration.setInt("tez.shuffle.max.connections", 3);
        ShuffleHandler shuffleHandler = new ShuffleHandler() { // from class: org.apache.tez.auxservices.TestShuffleHandler.3
            protected ShuffleHandler.Shuffle getShuffle(Configuration configuration2) {
                return new ShuffleHandler.Shuffle(configuration2) { // from class: org.apache.tez.auxservices.TestShuffleHandler.3.1
                    protected ShuffleHandler.Shuffle.MapOutputInfo getMapOutputInfo(String str, String str2, ShuffleHandler.Range range, String str3, String str4) throws IOException {
                        return null;
                    }

                    protected void populateHeaders(List<String> list, String str, String str2, String str3, ShuffleHandler.Range range, HttpResponse httpResponse, boolean z, Map<String, ShuffleHandler.Shuffle.MapOutputInfo> map) throws IOException {
                    }

                    protected void verifyRequest(String str, ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, HttpResponse httpResponse, URL url) throws IOException {
                    }

                    protected ChannelFuture sendMapOutput(ChannelHandlerContext channelHandlerContext, Channel channel, String str, String str2, ShuffleHandler.Range range, ShuffleHandler.Shuffle.MapOutputInfo mapOutputInfo) throws IOException {
                        ShuffleHeader shuffleHeader = new ShuffleHeader("dummy_header", 5678L, 5678L, 1);
                        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
                        shuffleHeader.write(dataOutputBuffer);
                        channel.write(ChannelBuffers.wrappedBuffer(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength()));
                        DataOutputBuffer dataOutputBuffer2 = new DataOutputBuffer();
                        for (int i = 0; i < 100000; i++) {
                            shuffleHeader.write(dataOutputBuffer2);
                        }
                        return channel.write(ChannelBuffers.wrappedBuffer(dataOutputBuffer2.getData(), 0, dataOutputBuffer2.getLength()));
                    }
                };
            }
        };
        shuffleHandler.init(configuration);
        shuffleHandler.start();
        HttpURLConnection[] httpURLConnectionArr = new HttpURLConnection[3];
        for (int i = 0; i < 3; i++) {
            httpURLConnectionArr[i] = (HttpURLConnection) new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get("tez.shuffle.port") + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_" + i + "_0").openConnection();
            httpURLConnectionArr[i].setRequestProperty("name", "mapreduce");
            httpURLConnectionArr[i].setRequestProperty("version", "1.0.0");
        }
        for (int i2 = 0; i2 < 3; i2++) {
            httpURLConnectionArr[i2].connect();
        }
        httpURLConnectionArr[0].getInputStream();
        Assert.assertEquals(200L, httpURLConnectionArr[0].getResponseCode());
        httpURLConnectionArr[1].getInputStream();
        Assert.assertEquals(200L, httpURLConnectionArr[1].getResponseCode());
        try {
            httpURLConnectionArr[2].getInputStream();
            httpURLConnectionArr[2].getResponseCode();
            Assert.fail("Expected a SocketException");
        } catch (SocketException e) {
            LOG.info("Expected - connection should not be open");
        } catch (Exception e2) {
            Assert.fail("Expected a SocketException");
        }
        shuffleHandler.stop();
    }

    @Test(timeout = 10000)
    public void testRangedFetch() throws IOException {
        Configuration configuration = new Configuration();
        configuration.setInt("tez.shuffle.port", 0);
        configuration.setInt("tez.shuffle.max.connections", 3);
        configuration.set("hadoop.security.authentication", "simple");
        UserGroupInformation.setConfiguration(configuration);
        File absoluteFile = new File("target", TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
        configuration.set("yarn.nodemanager.local-dirs", absoluteFile.getAbsolutePath());
        ApplicationId newInstance = ApplicationId.newInstance(12345L, 1);
        LOG.info(newInstance.toString());
        createShuffleHandlerFiles(absoluteFile, "randomUser", newInstance.toString(), "attempt_12345_1_m_1_0", configuration, new ArrayList());
        ShuffleHandler shuffleHandler = new ShuffleHandler() { // from class: org.apache.tez.auxservices.TestShuffleHandler.4
            protected ShuffleHandler.Shuffle getShuffle(Configuration configuration2) {
                return new ShuffleHandler.Shuffle(configuration2) { // from class: org.apache.tez.auxservices.TestShuffleHandler.4.1
                    protected void verifyRequest(String str, ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, HttpResponse httpResponse, URL url) throws IOException {
                    }
                };
            }
        };
        shuffleHandler.init(configuration);
        try {
            shuffleHandler.start();
            DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
            dataOutputBuffer.reset();
            new Token("identifier".getBytes(), "password".getBytes(), new Text("randomUser"), new Text("shuffleService")).write(dataOutputBuffer);
            shuffleHandler.initializeApplication(new ApplicationInitializationContext("randomUser", newInstance, ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength())));
            HttpURLConnection httpURLConnection = (HttpURLConnection) new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get("tez.shuffle.port") + "/mapOutput?job=job_12345_0001&dag=1&reduce=0-1&map=attempt_12345_1_m_1_0").openConnection();
            httpURLConnection.setRequestProperty("name", "mapreduce");
            httpURLConnection.setRequestProperty("version", "1.0.0");
            httpURLConnection.connect();
            boolean z = false;
            try {
                DataInputStream dataInputStream = new DataInputStream(httpURLConnection.getInputStream());
                int readVInt = WritableUtils.readVInt(dataInputStream);
                ArrayList arrayList = new ArrayList(2);
                for (int i = 0; i < readVInt; i++) {
                    ShuffleHeader shuffleHeader = new ShuffleHeader();
                    shuffleHeader.readFields(dataInputStream);
                    Assert.assertEquals("Incorrect map id", "attempt_12345_1_m_1_0", shuffleHeader.getMapId());
                    Assert.assertEquals("Incorrect reduce id", i, shuffleHeader.getPartition());
                    arrayList.add(shuffleHeader);
                }
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    dataInputStream.read(new byte[(int) ((ShuffleHeader) it.next()).getCompressedLength()]);
                }
                z = true;
                dataInputStream.readByte();
                Assert.fail("More fetch bytes that expected in stream");
            } catch (EOFException e) {
                Assert.assertTrue("Failed to copy ranged fetch", z);
            }
        } finally {
            shuffleHandler.stop();
            FileUtil.fullyDelete(absoluteFile);
        }
    }

    @Test(timeout = 100000)
    public void testMapFileAccess() throws IOException {
        Assume.assumeTrue(NativeIO.isAvailable());
        Configuration configuration = new Configuration();
        configuration.setInt("tez.shuffle.port", 0);
        configuration.setInt("tez.shuffle.max.connections", 3);
        configuration.set("hadoop.security.authentication", "kerberos");
        UserGroupInformation.setConfiguration(configuration);
        File absoluteFile = new File("target", TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
        configuration.set("yarn.nodemanager.local-dirs", absoluteFile.getAbsolutePath());
        ApplicationId newInstance = ApplicationId.newInstance(12345L, 1);
        LOG.info(newInstance.toString());
        ArrayList arrayList = new ArrayList();
        createShuffleHandlerFiles(absoluteFile, "randomUser", newInstance.toString(), "attempt_12345_1_m_1_0", configuration, arrayList);
        ShuffleHandler shuffleHandler = new ShuffleHandler() { // from class: org.apache.tez.auxservices.TestShuffleHandler.5
            protected ShuffleHandler.Shuffle getShuffle(Configuration configuration2) {
                return new ShuffleHandler.Shuffle(configuration2) { // from class: org.apache.tez.auxservices.TestShuffleHandler.5.1
                    protected void verifyRequest(String str, ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, HttpResponse httpResponse, URL url) throws IOException {
                    }
                };
            }
        };
        shuffleHandler.init(configuration);
        try {
            shuffleHandler.start();
            DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
            dataOutputBuffer.reset();
            new Token("identifier".getBytes(), "password".getBytes(), new Text("randomUser"), new Text("shuffleService")).write(dataOutputBuffer);
            shuffleHandler.initializeApplication(new ApplicationInitializationContext("randomUser", newInstance, ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength())));
            HttpURLConnection httpURLConnection = (HttpURLConnection) new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get("tez.shuffle.port") + "/mapOutput?job=job_12345_0001&dag=1&reduce=0&map=attempt_12345_1_m_1_0").openConnection();
            httpURLConnection.setRequestProperty("name", "mapreduce");
            httpURLConnection.setRequestProperty("version", "1.0.0");
            httpURLConnection.connect();
            byte[] bArr = new byte[10000];
            try {
                new DataInputStream(httpURLConnection.getInputStream()).readFully(bArr);
            } catch (EOFException e) {
            }
            FileInputStream fileInputStream = new FileInputStream((File) arrayList.get(0));
            String owner = NativeIO.POSIX.getFstat(fileInputStream.getFD()).getOwner();
            fileInputStream.close();
            Assert.assertTrue(new String(bArr).contains("Owner '" + owner + "' for path " + ((File) arrayList.get(0)).getAbsolutePath() + " did not match expected owner 'randomUser'"));
            shuffleHandler.stop();
            FileUtil.fullyDelete(absoluteFile);
        } catch (Throwable th) {
            shuffleHandler.stop();
            FileUtil.fullyDelete(absoluteFile);
            throw th;
        }
    }

    private static void createShuffleHandlerFiles(File file, String str, String str2, String str3, Configuration configuration, List<File> list) throws IOException {
        File file2 = new File(StringUtils.join("/", new String[]{file.getAbsolutePath(), "usercache", str, "appcache", str2, "dag_1/output", str3}));
        file2.mkdirs();
        System.out.println(file2.getAbsolutePath());
        File file3 = new File(file2, "file.out.index");
        list.add(file3);
        createIndexFile(file3, configuration);
        File file4 = new File(file2, "file.out");
        list.add(file4);
        createMapOutputFile(file4, configuration);
    }

    private static void createMapOutputFile(File file, Configuration configuration) throws IOException {
        FileOutputStream fileOutputStream = new FileOutputStream(file);
        fileOutputStream.write("Creating new dummy map output file. Used only for testing".getBytes());
        fileOutputStream.flush();
        fileOutputStream.close();
    }

    private static void createIndexFile(File file, Configuration configuration) throws IOException {
        if (file.exists()) {
            System.out.println("Deleting existing file");
            file.delete();
        }
        PureJavaCrc32 pureJavaCrc32 = new PureJavaCrc32();
        TezSpillRecord tezSpillRecord = new TezSpillRecord(2);
        tezSpillRecord.putIndex(new TezIndexRecord(0L, 10L, 10L), 0);
        tezSpillRecord.putIndex(new TezIndexRecord(10L, 10L, 10L), 1);
        tezSpillRecord.writeToFile(new Path(file.getAbsolutePath()), configuration, pureJavaCrc32);
    }

    @Test
    public void testRecovery() throws IOException {
        ApplicationId newInstance = ApplicationId.newInstance(12345L, 1);
        JobID.downgrade(TypeConverter.fromYarn(newInstance));
        File file = new File(System.getProperty("test.build.data", System.getProperty("java.io.tmpdir")), TestShuffleHandler.class.getName());
        Configuration configuration = new Configuration();
        configuration.setInt("tez.shuffle.port", 0);
        configuration.setInt("tez.shuffle.max.connections", 3);
        ShuffleHandler shuffleHandler = new ShuffleHandler();
        shuffleHandler.setRecoveryPath(new Path(file.toString()));
        file.mkdirs();
        try {
            shuffleHandler.init(configuration);
            shuffleHandler.start();
            DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
            dataOutputBuffer.reset();
            new Token("identifier".getBytes(), "password".getBytes(), new Text("someuser"), new Text("shuffleService")).write(dataOutputBuffer);
            shuffleHandler.initializeApplication(new ApplicationInitializationContext("someuser", newInstance, ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength())));
            Assert.assertEquals(200L, getShuffleResponseCode(shuffleHandler, r0));
            shuffleHandler.close();
            ShuffleHandler shuffleHandler2 = new ShuffleHandler();
            shuffleHandler2.setRecoveryPath(new Path(file.toString()));
            shuffleHandler2.init(configuration);
            shuffleHandler2.start();
            Assert.assertEquals(200L, getShuffleResponseCode(shuffleHandler2, r0));
            shuffleHandler2.stopApplication(new ApplicationTerminationContext(newInstance));
            Assert.assertEquals(401L, getShuffleResponseCode(shuffleHandler2, r0));
            shuffleHandler2.close();
            shuffleHandler = new ShuffleHandler();
            shuffleHandler.setRecoveryPath(new Path(file.toString()));
            shuffleHandler.init(configuration);
            shuffleHandler.start();
            Assert.assertEquals(401L, getShuffleResponseCode(shuffleHandler, r0));
            if (shuffleHandler != null) {
                shuffleHandler.close();
            }
            FileUtil.fullyDelete(file);
        } catch (Throwable th) {
            if (shuffleHandler != null) {
                shuffleHandler.close();
            }
            FileUtil.fullyDelete(file);
            throw th;
        }
    }

    @Test
    public void testRecoveryFromOtherVersions() throws IOException {
        ApplicationId newInstance = ApplicationId.newInstance(12345L, 1);
        File file = new File(System.getProperty("test.build.data", System.getProperty("java.io.tmpdir")), TestShuffleHandler.class.getName());
        Configuration configuration = new Configuration();
        configuration.setInt("tez.shuffle.port", 0);
        configuration.setInt("tez.shuffle.max.connections", 3);
        ShuffleHandler shuffleHandler = new ShuffleHandler();
        shuffleHandler.setRecoveryPath(new Path(file.toString()));
        file.mkdirs();
        try {
            shuffleHandler.init(configuration);
            shuffleHandler.start();
            DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
            dataOutputBuffer.reset();
            new Token("identifier".getBytes(), "password".getBytes(), new Text("someuser"), new Text("shuffleService")).write(dataOutputBuffer);
            shuffleHandler.initializeApplication(new ApplicationInitializationContext("someuser", newInstance, ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength())));
            Assert.assertEquals(200L, getShuffleResponseCode(shuffleHandler, r0));
            shuffleHandler.close();
            ShuffleHandler shuffleHandler2 = new ShuffleHandler();
            shuffleHandler2.setRecoveryPath(new Path(file.toString()));
            shuffleHandler2.init(configuration);
            shuffleHandler2.start();
            Assert.assertEquals(200L, getShuffleResponseCode(shuffleHandler2, r0));
            Version newInstance2 = Version.newInstance(1, 0);
            Assert.assertEquals(newInstance2, shuffleHandler2.getCurrentVersion());
            Version newInstance3 = Version.newInstance(1, 1);
            shuffleHandler2.storeVersion(newInstance3);
            Assert.assertEquals(newInstance3, shuffleHandler2.loadVersion());
            shuffleHandler2.close();
            ShuffleHandler shuffleHandler3 = new ShuffleHandler();
            shuffleHandler3.setRecoveryPath(new Path(file.toString()));
            shuffleHandler3.init(configuration);
            shuffleHandler3.start();
            Assert.assertEquals(newInstance2, shuffleHandler3.loadVersion());
            Assert.assertEquals(200L, getShuffleResponseCode(shuffleHandler3, r0));
            Version newInstance4 = Version.newInstance(2, 1);
            shuffleHandler3.storeVersion(newInstance4);
            Assert.assertEquals(newInstance4, shuffleHandler3.loadVersion());
            shuffleHandler3.close();
            shuffleHandler = new ShuffleHandler();
            shuffleHandler.setRecoveryPath(new Path(file.toString()));
            shuffleHandler.init(configuration);
            try {
                shuffleHandler.start();
                Assert.fail("Incompatible version, should expect fail here.");
            } catch (ServiceStateException e) {
                Assert.assertTrue("Exception message mismatch", e.getMessage().contains("Incompatible version for state DB schema:"));
            }
            if (shuffleHandler != null) {
                shuffleHandler.close();
            }
            FileUtil.fullyDelete(file);
        } catch (Throwable th) {
            if (shuffleHandler != null) {
                shuffleHandler.close();
            }
            FileUtil.fullyDelete(file);
            throw th;
        }
    }

    private static int getShuffleResponseCode(ShuffleHandler shuffleHandler, Token<JobTokenIdentifier> token) throws IOException {
        URL url = new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get("tez.shuffle.port") + "/mapOutput?job=job_12345_0001&dag=1&reduce=0&map=attempt_12345_1_m_1_0");
        HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
        httpURLConnection.addRequestProperty("UrlHash", SecureShuffleUtils.hashFromString(SecureShuffleUtils.buildMsgFrom(url), new JobTokenSecretManager(JobTokenSecretManager.createSecretKey(token.getPassword()))));
        httpURLConnection.setRequestProperty("name", "mapreduce");
        httpURLConnection.setRequestProperty("version", "1.0.0");
        httpURLConnection.connect();
        int responseCode = httpURLConnection.getResponseCode();
        httpURLConnection.disconnect();
        return responseCode;
    }

    @Test(timeout = 100000)
    public void testGetMapOutputInfo() throws Exception {
        final ArrayList arrayList = new ArrayList(1);
        Configuration configuration = new Configuration();
        configuration.setInt("tez.shuffle.port", 0);
        configuration.setInt("tez.shuffle.max.connections", 3);
        configuration.set("hadoop.security.authentication", "simple");
        UserGroupInformation.setConfiguration(configuration);
        File absoluteFile = new File("target", TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
        configuration.set("yarn.nodemanager.local-dirs", absoluteFile.getAbsolutePath());
        ApplicationId newInstance = ApplicationId.newInstance(12345L, 1);
        createShuffleHandlerFiles(absoluteFile, "randomUser", newInstance.toString(), "attempt_12345_1_m_1_0", configuration, new ArrayList());
        ShuffleHandler shuffleHandler = new ShuffleHandler() { // from class: org.apache.tez.auxservices.TestShuffleHandler.6
            protected ShuffleHandler.Shuffle getShuffle(Configuration configuration2) {
                return new ShuffleHandler.Shuffle(configuration2) { // from class: org.apache.tez.auxservices.TestShuffleHandler.6.1
                    protected void populateHeaders(List<String> list, String str, String str2, String str3, ShuffleHandler.Range range, HttpResponse httpResponse, boolean z, Map<String, ShuffleHandler.Shuffle.MapOutputInfo> map) throws IOException {
                        super.setResponseHeaders(httpResponse, z, 100L);
                    }

                    protected void verifyRequest(String str, ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, HttpResponse httpResponse, URL url) throws IOException {
                    }

                    protected void sendError(ChannelHandlerContext channelHandlerContext, String str, HttpResponseStatus httpResponseStatus) {
                        if (arrayList.size() == 0) {
                            arrayList.add(new Error(str));
                            channelHandlerContext.getChannel().close();
                        }
                    }

                    protected ChannelFuture sendMapOutput(ChannelHandlerContext channelHandlerContext, Channel channel, String str, String str2, ShuffleHandler.Range range, ShuffleHandler.Shuffle.MapOutputInfo mapOutputInfo) throws IOException {
                        ShuffleHeader shuffleHeader = new ShuffleHeader("attempt_12345_1_m_1_0", 5678L, 5678L, 1);
                        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
                        shuffleHeader.write(dataOutputBuffer);
                        return channel.write(ChannelBuffers.wrappedBuffer(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength()));
                    }
                };
            }
        };
        shuffleHandler.init(configuration);
        try {
            shuffleHandler.start();
            DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
            dataOutputBuffer.reset();
            new Token("identifier".getBytes(), "password".getBytes(), new Text("randomUser"), new Text("shuffleService")).write(dataOutputBuffer);
            shuffleHandler.initializeApplication(new ApplicationInitializationContext("randomUser", newInstance, ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength())));
            HttpURLConnection httpURLConnection = (HttpURLConnection) new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get("tez.shuffle.port") + "/mapOutput?job=job_12345_0001&dag=1&reduce=0&map=attempt_12345_1_m_1_0").openConnection();
            httpURLConnection.setRequestProperty("name", "mapreduce");
            httpURLConnection.setRequestProperty("version", "1.0.0");
            httpURLConnection.connect();
            try {
                DataInputStream dataInputStream = new DataInputStream(httpURLConnection.getInputStream());
                new ShuffleHeader().readFields(dataInputStream);
                dataInputStream.close();
            } catch (EOFException e) {
            }
            Assert.assertEquals("sendError called due to shuffle error", 0L, arrayList.size());
            shuffleHandler.stop();
            FileUtil.fullyDelete(absoluteFile);
        } catch (Throwable th) {
            shuffleHandler.stop();
            FileUtil.fullyDelete(absoluteFile);
            throw th;
        }
    }

    @Test(timeout = 5000)
    public void testDagDelete() throws Exception {
        final ArrayList arrayList = new ArrayList(1);
        Configuration configuration = new Configuration();
        configuration.setInt("tez.shuffle.max.connections", 3);
        configuration.setInt("tez.shuffle.port", 0);
        configuration.set("hadoop.security.authentication", "simple");
        UserGroupInformation.setConfiguration(configuration);
        File absoluteFile = new File("target", TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
        configuration.set("yarn.nodemanager.local-dirs", absoluteFile.getAbsolutePath());
        ApplicationId newInstance = ApplicationId.newInstance(12345L, 1);
        createShuffleHandlerFiles(absoluteFile, "randomUser", newInstance.toString(), "attempt_12345_1_m_1_0", configuration, new ArrayList());
        ShuffleHandler shuffleHandler = new ShuffleHandler() { // from class: org.apache.tez.auxservices.TestShuffleHandler.7
            protected ShuffleHandler.Shuffle getShuffle(Configuration configuration2) {
                return new ShuffleHandler.Shuffle(configuration2) { // from class: org.apache.tez.auxservices.TestShuffleHandler.7.1
                    protected void sendError(ChannelHandlerContext channelHandlerContext, String str, HttpResponseStatus httpResponseStatus) {
                        if (arrayList.size() == 0) {
                            arrayList.add(new Error(str));
                            channelHandlerContext.getChannel().close();
                        }
                    }
                };
            }
        };
        shuffleHandler.init(configuration);
        try {
            shuffleHandler.start();
            DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
            dataOutputBuffer.reset();
            new Token("identifier".getBytes(), "password".getBytes(), new Text("randomUser"), new Text("shuffleService")).write(dataOutputBuffer);
            shuffleHandler.initializeApplication(new ApplicationInitializationContext("randomUser", newInstance, ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength())));
            HttpURLConnection httpURLConnection = (HttpURLConnection) new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get("tez.shuffle.port") + "/mapOutput?dagAction=delete&job=job_12345_0001&dag=1").openConnection();
            httpURLConnection.setRequestProperty("name", "mapreduce");
            httpURLConnection.setRequestProperty("version", "1.0.0");
            File file = new File(StringUtils.join("/", new String[]{absoluteFile.getAbsolutePath(), "usercache", "randomUser", "appcache", newInstance.toString(), "dag_1/"}));
            Assert.assertTrue("Dag Directory does not exist!", file.exists());
            httpURLConnection.connect();
            try {
                new DataInputStream(httpURLConnection.getInputStream()).close();
                Assert.assertFalse("Dag Directory was not deleted!", file.exists());
            } catch (EOFException e) {
            }
            Assert.assertEquals("sendError called due to shuffle error", 0L, arrayList.size());
            shuffleHandler.stop();
            FileUtil.fullyDelete(absoluteFile);
        } catch (Throwable th) {
            shuffleHandler.stop();
            FileUtil.fullyDelete(absoluteFile);
            throw th;
        }
    }

    @Test(timeout = 4000)
    public void testSendMapCount() throws Exception {
        ArrayList arrayList = new ArrayList();
        ChannelHandlerContext channelHandlerContext = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class);
        MessageEvent messageEvent = (MessageEvent) Mockito.mock(MessageEvent.class);
        Channel channel = (Channel) Mockito.mock(AbstractChannel.class);
        ChannelPipeline channelPipeline = (ChannelPipeline) Mockito.mock(ChannelPipeline.class);
        HttpRequest createMockHttpRequest = createMockHttpRequest();
        ChannelFuture createMockChannelFuture = createMockChannelFuture(channel, arrayList);
        ShuffleHandler.TimeoutHandler timeoutHandler = new ShuffleHandler.TimeoutHandler();
        ((ChannelHandlerContext) Mockito.doReturn(channel).when(channelHandlerContext)).getChannel();
        Mockito.when(channel.getPipeline()).thenReturn(channelPipeline);
        Mockito.when(channelPipeline.get((String) Mockito.any(String.class))).thenReturn(timeoutHandler);
        Mockito.when(channelHandlerContext.getChannel()).thenReturn(channel);
        ((Channel) Mockito.doReturn(createMockChannelFuture).when(channel)).write(Mockito.any(Object.class));
        Mockito.when(channel.write(Object.class)).thenReturn(createMockChannelFuture);
        ((MessageEvent) Mockito.doReturn(channel).when(messageEvent)).getChannel();
        Mockito.when(messageEvent.getChannel()).thenReturn(channel);
        ((MessageEvent) Mockito.doReturn(createMockHttpRequest).when(messageEvent)).getMessage();
        MockShuffleHandler mockShuffleHandler = new MockShuffleHandler();
        Configuration configuration = new Configuration();
        configuration.setInt("tez.shuffle.port", 0);
        mockShuffleHandler.init(configuration);
        mockShuffleHandler.start();
        int i = configuration.getInt("tez.shuffle.max.session-open-files", 3);
        mockShuffleHandler.getShuffle(configuration).messageReceived(channelHandlerContext, messageEvent);
        Assert.assertTrue("Number of Open files should not exceed the configured value!-Not Expected", arrayList.size() <= i);
        while (!arrayList.isEmpty()) {
            arrayList.remove(0).operationComplete(createMockChannelFuture);
            Assert.assertTrue("Number of Open files should not exceed the configured value!-Not Expected", arrayList.size() <= i);
        }
        mockShuffleHandler.close();
    }

    public ChannelFuture createMockChannelFuture(Channel channel, final List<ShuffleHandler.ReduceMapFileCount> list) {
        ChannelFuture channelFuture = (ChannelFuture) Mockito.mock(ChannelFuture.class);
        Mockito.when(channelFuture.getChannel()).thenReturn(channel);
        ((ChannelFuture) Mockito.doReturn(true).when(channelFuture)).isSuccess();
        ((ChannelFuture) Mockito.doAnswer(new Answer() { // from class: org.apache.tez.auxservices.TestShuffleHandler.8
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                if (invocationOnMock.getArguments()[0].getClass() != ShuffleHandler.ReduceMapFileCount.class) {
                    return null;
                }
                list.add((ShuffleHandler.ReduceMapFileCount) invocationOnMock.getArguments()[0]);
                return null;
            }
        }).when(channelFuture)).addListener((ChannelFutureListener) Mockito.any(ShuffleHandler.ReduceMapFileCount.class));
        return channelFuture;
    }

    public HttpRequest createMockHttpRequest() {
        HttpRequest httpRequest = (HttpRequest) Mockito.mock(HttpRequest.class);
        ((HttpRequest) Mockito.doReturn(HttpMethod.GET).when(httpRequest)).getMethod();
        ((HttpRequest) Mockito.doAnswer(new Answer() { // from class: org.apache.tez.auxservices.TestShuffleHandler.9
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                String str = "/mapOutput?job=job_12345_1&dag=1&reduce=1";
                for (int i = 0; i < 100; i++) {
                    str = str.concat("&map=attempt_12345_1_m_" + i + "_0");
                }
                return str;
            }
        }).when(httpRequest)).getUri();
        return httpRequest;
    }
}
