package org.apache.hadoop.hdfs;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Random;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.junit.Test;

/* loaded from: input_file:org/apache/hadoop/hdfs/TestDataTransferProtocol.class */
public class TestDataTransferProtocol extends TestCase {
    private static final Log LOG = LogFactory.getLog("org.apache.hadoop.hdfs.TestDataTransferProtocol");
    DatanodeID datanode;
    InetSocketAddress dnAddr;
    ByteArrayOutputStream sendBuf = new ByteArrayOutputStream(128);
    DataOutputStream sendOut = new DataOutputStream(this.sendBuf);
    ByteArrayOutputStream recvBuf = new ByteArrayOutputStream(128);
    DataOutputStream recvOut = new DataOutputStream(this.recvBuf);

    private void sendRecvData(String str, boolean z) throws IOException {
        Socket socket = null;
        if (str != null) {
            try {
                LOG.info("Testing : " + str);
            } catch (Throwable th) {
                IOUtils.closeSocket(socket);
                throw th;
            }
        }
        socket = new Socket();
        socket.connect(this.dnAddr, 60000);
        socket.setSoTimeout(60000);
        OutputStream outputStream = socket.getOutputStream();
        byte[] bArr = new byte[this.recvBuf.size()];
        DataInputStream dataInputStream = new DataInputStream(socket.getInputStream());
        outputStream.write(this.sendBuf.toByteArray());
        outputStream.flush();
        try {
            dataInputStream.readFully(bArr);
            for (byte b : bArr) {
                System.out.print((int) b);
            }
            System.out.println(":");
            if (z) {
                throw new IOException("Did not recieve IOException when an exception is expected while reading from " + this.datanode.getName());
            }
            byte[] byteArray = this.recvBuf.toByteArray();
            for (int i = 0; i < bArr.length; i++) {
                System.out.print((int) bArr[i]);
                assertEquals("checking byte[" + i + "]", byteArray[i], bArr[i]);
            }
            IOUtils.closeSocket(socket);
        } catch (EOFException e) {
            if (!z) {
                throw e;
            }
            LOG.info("Got EOF as expected.");
            IOUtils.closeSocket(socket);
        }
    }

    void createFile(FileSystem fileSystem, Path path, int i) throws IOException {
        FSDataOutputStream create = fileSystem.create(path);
        create.write(new byte[i]);
        create.close();
    }

    void readFile(FileSystem fileSystem, Path path, int i) throws IOException {
        fileSystem.open(path).readFully(new byte[i]);
    }

    private void writeZeroLengthPacket(Block block, String str) throws IOException {
        this.sendOut.writeByte(1);
        this.sendOut.writeInt(512);
        new DataTransferProtocol.PacketHeader(8, block.getNumBytes(), 100L, true, 0).write(this.sendOut);
        this.sendOut.writeInt(0);
        DataTransferProtocol.Status.SUCCESS.write(this.recvOut);
        Text.writeString(this.recvOut, "");
        new DataTransferProtocol.PipelineAck(100L, new DataTransferProtocol.Status[]{DataTransferProtocol.Status.SUCCESS}).write(this.recvOut);
        sendRecvData(str, false);
    }

    private void testWrite(Block block, DataTransferProtocol.BlockConstructionStage blockConstructionStage, long j, String str, Boolean bool) throws IOException {
        this.sendBuf.reset();
        this.recvBuf.reset();
        DataTransferProtocol.Sender.opWriteBlock(this.sendOut, block, 0, blockConstructionStage, j, block.getNumBytes(), block.getNumBytes(), "cl", (DatanodeInfo) null, new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
        if (bool.booleanValue()) {
            DataTransferProtocol.Status.ERROR.write(this.recvOut);
            sendRecvData(str, true);
        } else {
            if (blockConstructionStage != DataTransferProtocol.BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
                writeZeroLengthPacket(block, str);
                return;
            }
            DataTransferProtocol.Status.SUCCESS.write(this.recvOut);
            Text.writeString(this.recvOut, "");
            sendRecvData(str, false);
        }
    }

    @Test
    public void testOpWrite() throws IOException {
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        hdfsConfiguration.setBoolean("dfs.support.append", true);
        MiniDFSCluster build = new MiniDFSCluster.Builder(hdfsConfiguration).numDataNodes(1).build();
        try {
            build.waitActive();
            this.datanode = build.getDataNodes().get(0).dnRegistration;
            this.dnAddr = NetUtils.createSocketAddr(this.datanode.getName());
            FileSystem fileSystem = build.getFileSystem();
            Path path = new Path("dataprotocol.dat");
            DFSTestUtil.createFile(fileSystem, path, 1L, (short) 1, 0L);
            Block firstBlock = DFSTestUtil.getFirstBlock(fileSystem, path);
            testWrite(firstBlock, DataTransferProtocol.BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, "Cannot create an existing block", true);
            testWrite(firstBlock, DataTransferProtocol.BlockConstructionStage.DATA_STREAMING, 0L, "Unexpected stage", true);
            testWrite(firstBlock, DataTransferProtocol.BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY, firstBlock.getGenerationStamp() + 1, "Cannot recover data streaming to a finalized replica", true);
            long generationStamp = firstBlock.getGenerationStamp() + 1;
            testWrite(firstBlock, DataTransferProtocol.BlockConstructionStage.PIPELINE_SETUP_APPEND, generationStamp, "Append to a finalized replica", false);
            firstBlock.setGenerationStamp(generationStamp);
            Path path2 = new Path("dataprotocol1.dat");
            DFSTestUtil.createFile(fileSystem, path2, 1L, (short) 1, 0L);
            Block firstBlock2 = DFSTestUtil.getFirstBlock(fileSystem, path2);
            testWrite(firstBlock2, DataTransferProtocol.BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, firstBlock2.getGenerationStamp() + 1, "Recover appending to a finalized replica", false);
            Path path3 = new Path("dataprotocol2.dat");
            DFSTestUtil.createFile(fileSystem, path3, 1L, (short) 1, 0L);
            Block firstBlock3 = DFSTestUtil.getFirstBlock(fileSystem, path3);
            long generationStamp2 = firstBlock3.getGenerationStamp() + 1;
            testWrite(firstBlock3, DataTransferProtocol.BlockConstructionStage.PIPELINE_CLOSE_RECOVERY, generationStamp2, "Recover failed close to a finalized replica", false);
            firstBlock3.setGenerationStamp(generationStamp2);
            Block block = new Block(firstBlock3.getBlockId() + 1, 0L, firstBlock3.getGenerationStamp());
            testWrite(block, DataTransferProtocol.BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, "Create a new block", false);
            long generationStamp3 = block.getGenerationStamp() + 1;
            block.setBlockId(block.getBlockId() + 1);
            testWrite(block, DataTransferProtocol.BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY, generationStamp3, "Recover a new block", true);
            testWrite(block, DataTransferProtocol.BlockConstructionStage.PIPELINE_SETUP_APPEND, block.getGenerationStamp() + 1, "Cannot append to a new block", true);
            block.setBlockId(block.getBlockId() + 1);
            testWrite(block, DataTransferProtocol.BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, block.getGenerationStamp() + 1, "Cannot append to a new block", true);
            Path path4 = new Path("dataprotocol1.dat");
            DFSTestUtil.createFile(fileSystem, path4, 1L, (short) 1, 0L);
            DFSOutputStream wrappedStream = fileSystem.append(path4).getWrappedStream();
            wrappedStream.write(1);
            wrappedStream.hflush();
            FSDataInputStream open = fileSystem.open(path4);
            Block block2 = DFSTestUtil.getAllBlocks(open).get(0).getBlock();
            block2.setNumBytes(2L);
            try {
                testWrite(block2, DataTransferProtocol.BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, "Cannot create a RBW block", true);
                long generationStamp4 = block.getGenerationStamp() + 1;
                testWrite(block2, DataTransferProtocol.BlockConstructionStage.PIPELINE_SETUP_APPEND, generationStamp4, "Cannot append to a RBW replica", true);
                testWrite(block2, DataTransferProtocol.BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, generationStamp4, "Recover append to a RBW replica", false);
                block2.setGenerationStamp(generationStamp4);
                Path path5 = new Path("dataprotocol2.dat");
                DFSTestUtil.createFile(fileSystem, path5, 1L, (short) 1, 0L);
                wrappedStream = fileSystem.append(path5).getWrappedStream();
                wrappedStream.write(1);
                wrappedStream.hflush();
                open = fileSystem.open(path5);
                Block block3 = DFSTestUtil.getAllBlocks(open).get(0).getBlock();
                block3.setNumBytes(2L);
                testWrite(block3, DataTransferProtocol.BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY, block3.getGenerationStamp() + 1, "Recover a RBW replica", false);
                IOUtils.closeStream(open);
                IOUtils.closeStream(wrappedStream);
            } catch (Throwable th) {
                IOUtils.closeStream(open);
                IOUtils.closeStream(wrappedStream);
                throw th;
            }
        } finally {
            build.shutdown();
        }
    }

    @Test
    public void testDataTransferProtocol() throws IOException {
        Random random = new Random();
        Path path = new Path("dataprotocol.dat");
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        hdfsConfiguration.setInt("dfs.replication", 1);
        MiniDFSCluster build = new MiniDFSCluster.Builder(hdfsConfiguration).numDataNodes(1).build();
        try {
            build.waitActive();
            this.datanode = new DFSClient(new InetSocketAddress("localhost", build.getNameNodePort()), hdfsConfiguration).datanodeReport(FSConstants.DatanodeReportType.LIVE)[0];
            this.dnAddr = NetUtils.createSocketAddr(this.datanode.getName());
            FileSystem fileSystem = build.getFileSystem();
            int min = Math.min(hdfsConfiguration.getInt("dfs.blocksize", 4096), 4096);
            createFile(fileSystem, path, min);
            Block firstBlock = DFSTestUtil.getFirstBlock(fileSystem, path);
            long blockId = firstBlock.getBlockId() + 1;
            this.recvBuf.reset();
            this.sendBuf.reset();
            this.recvOut.writeShort(18);
            this.sendOut.writeShort(18);
            sendRecvData("Wrong Version", true);
            this.sendBuf.reset();
            this.sendOut.writeShort(19);
            this.sendOut.writeByte(DataTransferProtocol.Op.WRITE_BLOCK.code - 1);
            sendRecvData("Wrong Op Code", true);
            this.sendBuf.reset();
            DataTransferProtocol.Sender.opWriteBlock(this.sendOut, new Block(blockId), 0, DataTransferProtocol.BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", (DatanodeInfo) null, new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
            this.sendOut.writeByte(1);
            this.sendOut.writeInt((-1) - random.nextInt(1048576));
            this.recvBuf.reset();
            DataTransferProtocol.Status.ERROR.write(this.recvOut);
            sendRecvData("wrong bytesPerChecksum while writing", true);
            this.sendBuf.reset();
            this.recvBuf.reset();
            long j = blockId + 1;
            DataTransferProtocol.Sender.opWriteBlock(this.sendOut, new Block(j), 0, DataTransferProtocol.BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", (DatanodeInfo) null, new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
            this.sendOut.writeByte(1);
            this.sendOut.writeInt(512);
            new DataTransferProtocol.PacketHeader(4, 0L, 100L, false, (-1) - random.nextInt(1048576)).write(this.sendOut);
            DataTransferProtocol.Status.SUCCESS.write(this.recvOut);
            Text.writeString(this.recvOut, "");
            new DataTransferProtocol.PipelineAck(100L, new DataTransferProtocol.Status[]{DataTransferProtocol.Status.ERROR}).write(this.recvOut);
            sendRecvData("negative DATA_CHUNK len while writing block " + j, true);
            this.sendBuf.reset();
            this.recvBuf.reset();
            long j2 = j + 1;
            DataTransferProtocol.Sender.opWriteBlock(this.sendOut, new Block(j2), 0, DataTransferProtocol.BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", (DatanodeInfo) null, new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
            this.sendOut.writeByte(1);
            this.sendOut.writeInt(512);
            new DataTransferProtocol.PacketHeader(8, 0L, 100L, true, 0).write(this.sendOut);
            this.sendOut.writeInt(0);
            this.sendOut.flush();
            DataTransferProtocol.Status.SUCCESS.write(this.recvOut);
            Text.writeString(this.recvOut, "");
            new DataTransferProtocol.PipelineAck(100L, new DataTransferProtocol.Status[]{DataTransferProtocol.Status.SUCCESS}).write(this.recvOut);
            sendRecvData("Writing a zero len block blockid " + j2, false);
            Block block = new Block(firstBlock);
            long blockId2 = block.getBlockId();
            this.sendBuf.reset();
            this.recvBuf.reset();
            block.setBlockId(blockId2 - 1);
            DataTransferProtocol.Sender.opReadBlock(this.sendOut, block, 0L, min, "cl", BlockTokenSecretManager.DUMMY_TOKEN);
            sendRecvData("Wrong block ID " + j2 + " for read", false);
            this.sendBuf.reset();
            block.setBlockId(blockId2);
            DataTransferProtocol.Sender.opReadBlock(this.sendOut, block, -1L, min, "cl", BlockTokenSecretManager.DUMMY_TOKEN);
            sendRecvData("Negative start-offset for read for block " + firstBlock.getBlockId(), false);
            this.sendBuf.reset();
            DataTransferProtocol.Sender.opReadBlock(this.sendOut, block, min, min, "cl", BlockTokenSecretManager.DUMMY_TOKEN);
            sendRecvData("Wrong start-offset for reading block " + firstBlock.getBlockId(), false);
            this.recvBuf.reset();
            DataTransferProtocol.Status.SUCCESS.write(this.recvOut);
            this.sendBuf.reset();
            DataTransferProtocol.Sender.opReadBlock(this.sendOut, block, 0L, (-1) - random.nextInt(1048576), "cl", BlockTokenSecretManager.DUMMY_TOKEN);
            sendRecvData("Negative length for reading block " + firstBlock.getBlockId(), false);
            this.recvBuf.reset();
            DataTransferProtocol.Status.ERROR.write(this.recvOut);
            this.sendBuf.reset();
            DataTransferProtocol.Sender.opReadBlock(this.sendOut, block, 0L, min + 1, "cl", BlockTokenSecretManager.DUMMY_TOKEN);
            sendRecvData("Wrong length for reading block " + firstBlock.getBlockId(), false);
            this.sendBuf.reset();
            DataTransferProtocol.Sender.opReadBlock(this.sendOut, block, 0L, min, "cl", BlockTokenSecretManager.DUMMY_TOKEN);
            readFile(fileSystem, path, min);
            build.shutdown();
        } catch (Throwable th) {
            build.shutdown();
            throw th;
        }
    }

    @Test
    public void testPacketHeader() throws IOException {
        DataTransferProtocol.PacketHeader packetHeader = new DataTransferProtocol.PacketHeader(4, 1024L, 100L, false, 4096);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        packetHeader.write(new DataOutputStream(byteArrayOutputStream));
        DataTransferProtocol.PacketHeader packetHeader2 = new DataTransferProtocol.PacketHeader();
        packetHeader2.readFields(new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())));
        assertEquals(packetHeader, packetHeader2);
        DataTransferProtocol.PacketHeader packetHeader3 = new DataTransferProtocol.PacketHeader();
        packetHeader3.readFields(ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
        assertEquals(packetHeader, packetHeader3);
        new DataTransferProtocol.PacketHeader(4, 0L, 100L, true, 0);
        assertTrue(packetHeader.sanityCheck(99L));
        assertFalse(packetHeader.sanityCheck(100L));
    }
}
