package org.apache.hadoop.hdfs.server.namenode.ha;

import com.google.common.base.Supplier;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MultithreadedTestUtil;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.class */
public class TestPipelinesFailover {
    protected static final Log LOG;
    private static final Path TEST_PATH;
    private static final int BLOCK_SIZE = 4096;
    private static final int BLOCK_AND_A_HALF = 6144;
    private static final int STRESS_NUM_THREADS = 25;
    private static final int STRESS_RUNTIME = 40000;

    /* loaded from: input_file:org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover$PipelineTestThread.class */
    private static class PipelineTestThread extends MultithreadedTestUtil.RepeatingTestThread {
        private final FileSystem fs;
        private final FileSystem fsOtherUser;
        private final Path path;

        public PipelineTestThread(MultithreadedTestUtil.TestContext testContext, FileSystem fileSystem, FileSystem fileSystem2, Path path) {
            super(testContext);
            this.fs = fileSystem;
            this.fsOtherUser = fileSystem2;
            this.path = path;
        }

        public void doAnAction() throws Exception {
            FSDataOutputStream create = this.fs.create(this.path, true);
            try {
                AppendTestUtil.write(create, 0, 100);
                create.hflush();
                TestPipelinesFailover.loopRecoverLease(this.fsOtherUser, this.path);
                AppendTestUtil.check(this.fs, this.path, 100L);
            } finally {
                try {
                    create.close();
                } catch (IOException e) {
                }
            }
        }

        public String toString() {
            return "Pipeline test thread for " + this.path;
        }
    }

    @Test(timeout = 30000)
    public void testWriteOverFailover() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInt("dfs.blocksize", BLOCK_SIZE);
        configuration.setInt("dfs.namenode.replication.interval", 1000);
        Closeable closeable = null;
        MiniDFSCluster build = new MiniDFSCluster.Builder(configuration).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(3).build();
        try {
            build.waitActive();
            build.transitionToActive(0);
            Thread.sleep(500L);
            LOG.info("Starting with NN 0 active");
            FileSystem configureFailoverFs = HATestUtil.configureFailoverFs(build, configuration);
            FSDataOutputStream create = configureFailoverFs.create(TEST_PATH);
            AppendTestUtil.write(create, 0, BLOCK_AND_A_HALF);
            create.hflush();
            LOG.info("Failing over to NN 1");
            build.transitionToStandby(0);
            build.transitionToActive(1);
            Assert.assertTrue(configureFailoverFs.exists(TEST_PATH));
            FSNamesystem namesystem = build.getNameNode(1).getNamesystem();
            BlockManagerTestUtil.updateState(namesystem.getBlockManager());
            Assert.assertEquals(0L, namesystem.getPendingReplicationBlocks());
            Assert.assertEquals(0L, namesystem.getCorruptReplicaBlocks());
            Assert.assertEquals(0L, namesystem.getMissingBlocksCount());
            AppendTestUtil.write(create, BLOCK_AND_A_HALF, BLOCK_AND_A_HALF);
            create.close();
            closeable = null;
            AppendTestUtil.check(configureFailoverFs, TEST_PATH, 12288L);
            IOUtils.closeStream((Closeable) null);
            build.shutdown();
        } catch (Throwable th) {
            IOUtils.closeStream(closeable);
            build.shutdown();
            throw th;
        }
    }

    @Test(timeout = 30000)
    public void testWriteOverFailoverWithDnFail() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInt("dfs.blocksize", BLOCK_SIZE);
        Closeable closeable = null;
        MiniDFSCluster build = new MiniDFSCluster.Builder(configuration).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(5).build();
        try {
            build.waitActive();
            build.transitionToActive(0);
            Thread.sleep(500L);
            LOG.info("Starting with NN 0 active");
            FileSystem configureFailoverFs = HATestUtil.configureFailoverFs(build, configuration);
            FSDataOutputStream create = configureFailoverFs.create(TEST_PATH);
            AppendTestUtil.write(create, 0, BLOCK_AND_A_HALF);
            create.hflush();
            LOG.info("Failing over to NN 1");
            build.transitionToStandby(0);
            build.transitionToActive(1);
            Assert.assertTrue(configureFailoverFs.exists(TEST_PATH));
            build.stopDataNode(0);
            AppendTestUtil.write(create, BLOCK_AND_A_HALF, BLOCK_AND_A_HALF);
            create.hflush();
            LOG.info("Failing back to NN 0");
            build.transitionToStandby(0);
            build.transitionToActive(1);
            build.stopDataNode(1);
            AppendTestUtil.write(create, 12288, BLOCK_AND_A_HALF);
            create.hflush();
            create.close();
            closeable = null;
            AppendTestUtil.check(configureFailoverFs, TEST_PATH, 18432L);
            IOUtils.closeStream((Closeable) null);
            build.shutdown();
        } catch (Throwable th) {
            IOUtils.closeStream(closeable);
            build.shutdown();
            throw th;
        }
    }

    @Test(timeout = 30000)
    public void testLeaseRecoveryAfterFailover() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setBoolean("dfs.permissions.enabled", false);
        configuration.setInt("dfs.blocksize", BLOCK_SIZE);
        OutputStream outputStream = null;
        MiniDFSCluster build = new MiniDFSCluster.Builder(configuration).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(3).build();
        try {
            build.waitActive();
            build.transitionToActive(0);
            Thread.sleep(500L);
            LOG.info("Starting with NN 0 active");
            FileSystem configureFailoverFs = HATestUtil.configureFailoverFs(build, configuration);
            outputStream = configureFailoverFs.create(TEST_PATH);
            AppendTestUtil.write(outputStream, 0, BLOCK_AND_A_HALF);
            outputStream.hflush();
            LOG.info("Failing over to NN 1");
            build.transitionToStandby(0);
            build.transitionToActive(1);
            Assert.assertTrue(configureFailoverFs.exists(TEST_PATH));
            loopRecoverLease(createFsAsOtherUser(build, configuration), TEST_PATH);
            AppendTestUtil.check(configureFailoverFs, TEST_PATH, 6144L);
            build.transitionToStandby(1);
            build.transitionToActive(0);
            AppendTestUtil.check(configureFailoverFs, TEST_PATH, 6144L);
            IOUtils.closeStream(outputStream);
            build.shutdown();
        } catch (Throwable th) {
            IOUtils.closeStream(outputStream);
            build.shutdown();
            throw th;
        }
    }

    @Test(timeout = 30000)
    public void testFailoverRightBeforeCommitSynchronization() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setBoolean("dfs.permissions.enabled", false);
        configuration.setInt("dfs.blocksize", BLOCK_SIZE);
        OutputStream outputStream = null;
        MiniDFSCluster build = new MiniDFSCluster.Builder(configuration).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(3).build();
        try {
            build.waitActive();
            build.transitionToActive(0);
            Thread.sleep(500L);
            LOG.info("Starting with NN 0 active");
            FileSystem configureFailoverFs = HATestUtil.configureFailoverFs(build, configuration);
            outputStream = configureFailoverFs.create(TEST_PATH);
            AppendTestUtil.write(outputStream, 0, 2048);
            outputStream.hflush();
            NameNode nameNode = build.getNameNode(0);
            ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(configureFailoverFs, TEST_PATH);
            DatanodeDescriptor expectedPrimaryNode = getExpectedPrimaryNode(nameNode, firstBlock);
            LOG.info("Expecting block recovery to be triggered on DN " + expectedPrimaryNode);
            DatanodeProtocolClientSideTranslatorPB spyOnBposToNN = DataNodeTestUtils.spyOnBposToNN(build.getDataNode(expectedPrimaryNode.getIpcPort()), nameNode);
            GenericTestUtils.DelayAnswer delayAnswer = new GenericTestUtils.DelayAnswer(LOG);
            ((DatanodeProtocolClientSideTranslatorPB) Mockito.doAnswer(delayAnswer).when(spyOnBposToNN)).commitBlockSynchronization((ExtendedBlock) Mockito.eq(firstBlock), Mockito.anyInt(), Mockito.anyLong(), Mockito.eq(true), Mockito.eq(false), (DatanodeID[]) Mockito.anyObject(), (String[]) Mockito.anyObject());
            DistributedFileSystem createFsAsOtherUser = createFsAsOtherUser(build, configuration);
            Assert.assertFalse(createFsAsOtherUser.recoverLease(TEST_PATH));
            LOG.info("Waiting for commitBlockSynchronization call from primary");
            delayAnswer.waitForCall();
            LOG.info("Failing over to NN 1");
            build.transitionToStandby(0);
            build.transitionToActive(1);
            delayAnswer.proceed();
            delayAnswer.waitForResult();
            Throwable thrown = delayAnswer.getThrown();
            if (thrown == null) {
                Assert.fail("commitBlockSynchronization call did not fail on standby");
            }
            GenericTestUtils.assertExceptionContains("Operation category WRITE is not supported", thrown);
            loopRecoverLease(createFsAsOtherUser, TEST_PATH);
            AppendTestUtil.check(configureFailoverFs, TEST_PATH, 2048L);
            IOUtils.closeStream(outputStream);
            build.shutdown();
        } catch (Throwable th) {
            IOUtils.closeStream(outputStream);
            build.shutdown();
            throw th;
        }
    }

    @Test(timeout = 120000)
    public void testPipelineRecoveryStress() throws Exception {
        HAStressTestHarness hAStressTestHarness = new HAStressTestHarness();
        hAStressTestHarness.conf.setBoolean("dfs.permissions.enabled", false);
        MiniDFSCluster startCluster = hAStressTestHarness.startCluster();
        try {
            startCluster.waitActive();
            startCluster.transitionToActive(0);
            FileSystem failoverFs = hAStressTestHarness.getFailoverFs();
            DistributedFileSystem createFsAsOtherUser = createFsAsOtherUser(startCluster, hAStressTestHarness.conf);
            MultithreadedTestUtil.TestContext testContext = new MultithreadedTestUtil.TestContext();
            for (int i = 0; i < STRESS_NUM_THREADS; i++) {
                testContext.addThread(new PipelineTestThread(testContext, failoverFs, createFsAsOtherUser, new Path("/test-" + i)));
            }
            hAStressTestHarness.addReplicationTriggerThread(500);
            hAStressTestHarness.addFailoverThread(5000);
            hAStressTestHarness.startThreads();
            testContext.startThreads();
            testContext.waitFor(40000L);
            testContext.stop();
            hAStressTestHarness.stopThreads();
            System.err.println("===========================\n\n\n\n");
            hAStressTestHarness.shutdown();
        } catch (Throwable th) {
            System.err.println("===========================\n\n\n\n");
            hAStressTestHarness.shutdown();
            throw th;
        }
    }

    private DatanodeDescriptor getExpectedPrimaryNode(NameNode nameNode, ExtendedBlock extendedBlock) {
        BlockInfoUnderConstruction storedBlock = nameNode.getNamesystem().getBlockManager().getStoredBlock(extendedBlock.getLocalBlock());
        Assert.assertTrue("Block " + extendedBlock + " should be under construction, got: " + storedBlock, storedBlock instanceof BlockInfoUnderConstruction);
        return storedBlock.getExpectedLocations()[0];
    }

    private DistributedFileSystem createFsAsOtherUser(final MiniDFSCluster miniDFSCluster, final Configuration configuration) throws IOException, InterruptedException {
        return (DistributedFileSystem) UserGroupInformation.createUserForTesting("otheruser", new String[]{"othergroup"}).doAs(new PrivilegedExceptionAction<FileSystem>() { // from class: org.apache.hadoop.hdfs.server.namenode.ha.TestPipelinesFailover.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.security.PrivilegedExceptionAction
            public FileSystem run() throws Exception {
                return HATestUtil.configureFailoverFs(miniDFSCluster, configuration);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void loopRecoverLease(final FileSystem fileSystem, final Path path) throws TimeoutException, InterruptedException {
        try {
            GenericTestUtils.waitFor(new Supplier<Boolean>() { // from class: org.apache.hadoop.hdfs.server.namenode.ha.TestPipelinesFailover.2
                /* renamed from: get, reason: merged with bridge method [inline-methods] */
                public Boolean m192get() {
                    try {
                        boolean recoverLease = fileSystem.recoverLease(path);
                        if (!recoverLease) {
                            TestPipelinesFailover.LOG.info("Waiting to recover lease successfully");
                        }
                        return Boolean.valueOf(recoverLease);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
            }, 1000, 30000);
        } catch (TimeoutException e) {
            throw new TimeoutException("Timed out recovering lease for " + path);
        }
    }

    static {
        LogFactory.getLog(FSNamesystem.class).getLogger().setLevel(Level.ALL);
        LogFactory.getLog(BlockManager.class).getLogger().setLevel(Level.ALL);
        LogFactory.getLog("org.apache.hadoop.io.retry.RetryInvocationHandler").getLogger().setLevel(Level.ALL);
        NameNode.stateChangeLog.getLogger().setLevel(Level.ALL);
        LOG = LogFactory.getLog(TestPipelinesFailover.class);
        TEST_PATH = new Path("/test-file");
    }
}
