package org.apache.hadoop.hdfs;

import com.google.common.base.Supplier;
import java.io.InputStream;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/hadoop/hdfs/TestDataTransferKeepalive.class */
public class TestDataTransferKeepalive {
    final Configuration conf = new HdfsConfiguration();
    private MiniDFSCluster cluster;
    private DataNode dn;
    private static final Path TEST_FILE = new Path("/test");
    private static final int KEEPALIVE_TIMEOUT = 1000;
    private static final int WRITE_TIMEOUT = 3000;

    @Before
    public void setup() throws Exception {
        this.conf.setInt("dfs.datanode.socket.reuse.keepalive", KEEPALIVE_TIMEOUT);
        this.conf.setInt("dfs.client.max.block.acquire.failures", 0);
        this.cluster = new MiniDFSCluster.Builder(this.conf).numDataNodes(1).build();
        this.dn = this.cluster.getDataNodes().get(0);
    }

    @After
    public void teardown() {
        this.cluster.shutdown();
    }

    @Test(timeout = 30000)
    public void testDatanodeRespectsKeepAliveTimeout() throws Exception {
        Configuration configuration = new Configuration(this.conf);
        configuration.setLong("dfs.client.socketcache.expiryMsec", 60000L);
        configuration.set("dfs.client.context", "testDatanodeRespectsKeepAliveTimeout");
        DistributedFileSystem distributedFileSystem = FileSystem.get(this.cluster.getURI(), configuration);
        PeerCache peerCache = ClientContext.getFromConf(configuration).getPeerCache();
        DFSTestUtil.createFile(distributedFileSystem, TEST_FILE, 1L, (short) 1, 0L);
        Assert.assertEquals(0L, peerCache.size());
        assertXceiverCount(0);
        DFSTestUtil.readFile(distributedFileSystem, TEST_FILE);
        Assert.assertEquals(1L, peerCache.size());
        assertXceiverCount(1);
        Thread.sleep(4050L);
        assertXceiverCount(0);
        Assert.assertEquals(1L, peerCache.size());
        Assert.assertNotNull(peerCache.get(this.dn.getDatanodeId(), false));
        Assert.assertEquals(-1L, r0.getInputStream().read());
    }

    @Test(timeout = 30000)
    public void testClientResponsesKeepAliveTimeout() throws Exception {
        Configuration configuration = new Configuration(this.conf);
        configuration.setLong("dfs.client.socketcache.expiryMsec", 10L);
        configuration.set("dfs.client.context", "testClientResponsesKeepAliveTimeout");
        DistributedFileSystem distributedFileSystem = FileSystem.get(this.cluster.getURI(), configuration);
        PeerCache peerCache = ClientContext.getFromConf(configuration).getPeerCache();
        DFSTestUtil.createFile(distributedFileSystem, TEST_FILE, 1L, (short) 1, 0L);
        Assert.assertEquals(0L, peerCache.size());
        assertXceiverCount(0);
        DFSTestUtil.readFile(distributedFileSystem, TEST_FILE);
        Assert.assertEquals(1L, peerCache.size());
        assertXceiverCount(1);
        Thread.sleep(60L);
        Assert.assertTrue(peerCache.get(this.dn.getDatanodeId(), false) == null);
        Assert.assertEquals(0L, peerCache.size());
    }

    @Test(timeout = 300000)
    public void testSlowReader() throws Exception {
        Configuration configuration = new Configuration(this.conf);
        configuration.setLong("dfs.client.socketcache.expiryMsec", 600000L);
        configuration.set("dfs.client.context", "testSlowReader");
        DistributedFileSystem distributedFileSystem = FileSystem.get(this.cluster.getURI(), configuration);
        MiniDFSCluster.DataNodeProperties stopDataNode = this.cluster.stopDataNode(0);
        stopDataNode.conf.setInt("dfs.datanode.socket.write.timeout", WRITE_TIMEOUT);
        stopDataNode.conf.setInt("dfs.datanode.socket.reuse.keepalive", 120000);
        Assert.assertTrue(this.cluster.restartDataNode(stopDataNode, true));
        this.dn = this.cluster.getDataNodes().get(0);
        this.cluster.triggerHeartbeats();
        DFSTestUtil.createFile(distributedFileSystem, TEST_FILE, 8388608L, (short) 1, 0L);
        FSDataInputStream open = distributedFileSystem.open(TEST_FILE);
        open.read();
        assertXceiverCount(1);
        GenericTestUtils.waitFor(new Supplier<Boolean>() { // from class: org.apache.hadoop.hdfs.TestDataTransferKeepalive.1
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public Boolean m73get() {
                return Boolean.valueOf(TestDataTransferKeepalive.this.getXceiverCountWithoutServer() == 0);
            }
        }, 500, 50000);
        IOUtils.closeStream(open);
    }

    /* JADX WARN: Finally extract failed */
    @Test(timeout = 30000)
    public void testManyClosedSocketsInCache() throws Exception {
        Configuration configuration = new Configuration(this.conf);
        configuration.set("dfs.client.context", "testManyClosedSocketsInCache");
        DistributedFileSystem distributedFileSystem = FileSystem.get(this.cluster.getURI(), configuration);
        PeerCache peerCache = ClientContext.getFromConf(configuration).getPeerCache();
        DFSTestUtil.createFile(distributedFileSystem, TEST_FILE, 1L, (short) 1, 0L);
        InputStream[] inputStreamArr = new InputStream[5];
        for (int i = 0; i < inputStreamArr.length; i++) {
            try {
                inputStreamArr[i] = distributedFileSystem.open(TEST_FILE);
            } catch (Throwable th) {
                IOUtils.cleanup((Log) null, inputStreamArr);
                throw th;
            }
        }
        for (InputStream inputStream : inputStreamArr) {
            IOUtils.copyBytes(inputStream, new IOUtils.NullOutputStream(), 1024);
        }
        IOUtils.cleanup((Log) null, inputStreamArr);
        Assert.assertEquals(5L, peerCache.size());
        Thread.sleep(1500L);
        assertXceiverCount(0);
        Assert.assertEquals(5L, peerCache.size());
        DFSTestUtil.readFile(distributedFileSystem, TEST_FILE);
    }

    private void assertXceiverCount(int i) {
        int xceiverCountWithoutServer = getXceiverCountWithoutServer();
        if (xceiverCountWithoutServer != i) {
            ReflectionUtils.printThreadInfo(System.err, "Thread dumps");
            Assert.fail("Expected " + i + " xceivers, found " + xceiverCountWithoutServer);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int getXceiverCountWithoutServer() {
        return this.dn.getXceiverCount() - 1;
    }
}
