package org.apache.hadoop.hdfs.server.datanode;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockMissingException;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.hamcrest.CoreMatchers;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.class */
public class TestDataNodeHotSwapVolumes {
    private static final Log LOG = LogFactory.getLog(TestDataNodeHotSwapVolumes.class);
    private static final int BLOCK_SIZE = 512;
    private static final int DEFAULT_STORAGES_PER_DATANODE = 2;
    private MiniDFSCluster cluster;
    private Configuration conf;

    @After
    public void tearDown() {
        shutdown();
    }

    private void startDFSCluster(int i, int i2) throws IOException {
        startDFSCluster(i, i2, 2);
    }

    private void startDFSCluster(int i, int i2, int i3) throws IOException {
        shutdown();
        this.conf = new Configuration();
        this.conf.setLong("dfs.blocksize", 512L);
        this.conf.setInt("dfs.heartbeat.interval", 1);
        this.conf.setInt("dfs.df.interval", 1000);
        this.conf.setInt("dfs.namenode.heartbeat.recheck-interval", 1000);
        this.conf.setInt("dfs.datanode.failed.volumes.tolerated", 1);
        this.conf.setTimeDuration("dfs.datanode.disk.check.min.gap", 0L, TimeUnit.MILLISECONDS);
        this.cluster = new MiniDFSCluster.Builder(this.conf).nnTopology(MiniDFSNNTopology.simpleFederatedTopology(i)).numDataNodes(i2).storagesPerDatanode(i3).build();
        this.cluster.waitActive();
    }

    private void shutdown() {
        if (this.cluster != null) {
            this.cluster.shutdown();
            this.cluster = null;
        }
    }

    private void createFile(Path path, int i) throws IOException, InterruptedException, TimeoutException {
        createFile(path, i, (short) 1);
    }

    private void createFile(Path path, int i, short s) throws IOException, InterruptedException, TimeoutException {
        createFile(0, path, i, s);
    }

    private void createFile(int i, Path path, int i2) throws IOException, InterruptedException, TimeoutException {
        createFile(i, path, i2, (short) 1);
    }

    private void createFile(int i, Path path, int i2, short s) throws IOException, TimeoutException, InterruptedException {
        DistributedFileSystem fileSystem = this.cluster.getFileSystem(i);
        DFSTestUtil.createFile(fileSystem, path, BLOCK_SIZE * i2, s, 0L);
        DFSTestUtil.waitReplication((FileSystem) fileSystem, path, s);
    }

    private static void verifyFileLength(FileSystem fileSystem, Path path, int i) throws IOException {
        Assert.assertEquals(i * BLOCK_SIZE, fileSystem.getFileStatus(path).getLen());
    }

    private static int getNumReplicas(FileSystem fileSystem, Path path, int i) throws IOException {
        BlockLocation[] fileBlockLocations = fileSystem.getFileBlockLocations(path, 0L, Long.MAX_VALUE);
        if (fileBlockLocations.length < i + 1) {
            return 0;
        }
        return fileBlockLocations[i].getNames().length;
    }

    private static void waitReplication(FileSystem fileSystem, Path path, int i, int i2) throws IOException, TimeoutException, InterruptedException {
        for (int i3 = 50; i3 > 0; i3--) {
            int numReplicas = getNumReplicas(fileSystem, path, i);
            if (numReplicas == i2) {
                return;
            }
            System.out.printf("Block %d of file %s has %d replicas (desired %d).\n", Integer.valueOf(i), path.toString(), Integer.valueOf(numReplicas), Integer.valueOf(i2));
            Thread.sleep(100L);
        }
        throw new TimeoutException("Timed out waiting the " + i + "-th block of " + path + " to have " + i2 + " replicas.");
    }

    private static List<String> getDataDirs(DataNode dataNode) {
        return new ArrayList(dataNode.getConf().getTrimmedStringCollection("dfs.datanode.data.dir"));
    }

    private static void triggerDeleteReport(DataNode dataNode) throws IOException {
        dataNode.scheduleAllBlockReport(0L);
        DataNodeTestUtils.triggerDeletionReport(dataNode);
    }

    @Test
    public void testParseChangedVolumes() throws IOException {
        startDFSCluster(1, 1);
        DataNode dataNode = this.cluster.getDataNodes().get(0);
        String str = dataNode.getConf().get("dfs.datanode.data.dir");
        ArrayList arrayList = new ArrayList();
        for (String str2 : str.split(",")) {
            arrayList.add(StorageLocation.parse(str2));
        }
        Assert.assertFalse(arrayList.isEmpty());
        DataNode.ChangedVolumes parseChangedVolumes = dataNode.parseChangedVolumes(((StorageLocation) arrayList.get(0)).getFile().getAbsolutePath() + ",/foo/path1,/foo/path2");
        List list = parseChangedVolumes.newLocations;
        Assert.assertEquals(2L, list.size());
        Assert.assertEquals(new File("/foo/path1").getAbsolutePath(), ((StorageLocation) list.get(0)).getFile().getAbsolutePath());
        Assert.assertEquals(new File("/foo/path2").getAbsolutePath(), ((StorageLocation) list.get(1)).getFile().getAbsolutePath());
        List list2 = parseChangedVolumes.deactivateLocations;
        Assert.assertEquals(1L, list2.size());
        Assert.assertEquals(((StorageLocation) arrayList.get(1)).getFile(), ((StorageLocation) list2.get(0)).getFile());
        Assert.assertEquals(1L, parseChangedVolumes.unchangedLocations.size());
        Assert.assertEquals(((StorageLocation) arrayList.get(0)).getFile(), ((StorageLocation) parseChangedVolumes.unchangedLocations.get(0)).getFile());
    }

    @Test
    public void testParseChangedVolumesFailures() throws IOException {
        startDFSCluster(1, 1);
        try {
            this.cluster.getDataNodes().get(0).parseChangedVolumes("");
            Assert.fail("Should throw IOException: empty inputs.");
        } catch (IOException e) {
            GenericTestUtils.assertExceptionContains("No directory is specified.", e);
        }
    }

    @Test
    public void testParseStorageTypeChanges() throws IOException {
        startDFSCluster(1, 1);
        DataNode dataNode = this.cluster.getDataNodes().get(0);
        List storageLocations = DataNode.getStorageLocations(dataNode.getConf());
        try {
            dataNode.parseChangedVolumes(((StorageLocation) storageLocations.get(0)).toString() + "," + String.format("[%s]%s", StorageType.SSD, ((StorageLocation) storageLocations.get(1)).getUri()));
            Assert.fail("should throw IOE because storage type changes.");
        } catch (IOException e) {
            GenericTestUtils.assertExceptionContains("Changing storage type is not allowed", e);
        }
    }

    private void addVolumes(int i) throws InterruptedException, IOException, ReconfigurationException {
        addVolumes(i, new CountDownLatch(0));
    }

    private void addVolumes(int i, CountDownLatch countDownLatch) throws ReconfigurationException, IOException, InterruptedException {
        File file = new File(this.cluster.getDataDirectory());
        DataNode dataNode = this.cluster.getDataNodes().get(0);
        Configuration conf = dataNode.getConf();
        String str = conf.get("dfs.datanode.data.dir");
        ArrayList arrayList = new ArrayList();
        StringBuilder sb = new StringBuilder(str);
        int length = str.split(",").length + 1;
        while (new File(file, "data" + length).exists()) {
            length++;
        }
        for (int i2 = length; i2 < length + i; i2++) {
            File file2 = new File(file, "data" + String.valueOf(i2));
            arrayList.add(file2);
            file2.mkdirs();
            sb.append(",");
            sb.append(StorageLocation.parse(file2.toString()).toString());
        }
        String sb2 = sb.toString();
        Assert.assertThat("DN did not update its own config", dataNode.reconfigurePropertyImpl("dfs.datanode.data.dir", sb2), Is.is(conf.get("dfs.datanode.data.dir")));
        countDownLatch.await();
        String[] split = conf.get("dfs.datanode.data.dir").split(",");
        String[] split2 = sb2.split(",");
        Assert.assertEquals(split2.length, split.length);
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        for (int i3 = 0; i3 < split2.length; i3++) {
            StorageLocation parse = StorageLocation.parse(split2[i3]);
            StorageLocation parse2 = StorageLocation.parse(split[i3]);
            arrayList2.add(parse);
            arrayList3.add(parse2);
        }
        Comparator<StorageLocation> comparator = new Comparator<StorageLocation>() { // from class: org.apache.hadoop.hdfs.server.datanode.TestDataNodeHotSwapVolumes.1
            @Override // java.util.Comparator
            public int compare(StorageLocation storageLocation, StorageLocation storageLocation2) {
                return storageLocation.toString().compareTo(storageLocation2.toString());
            }
        };
        Collections.sort(arrayList2, comparator);
        Collections.sort(arrayList3, comparator);
        Assert.assertEquals("Effective volumes doesnt match expected", arrayList2, arrayList3);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            File file3 = new File((File) it.next(), "current");
            Assert.assertTrue(file3.exists());
            Assert.assertTrue(file3.isDirectory());
        }
    }

    private List<List<Integer>> getNumBlocksReport(int i) {
        ArrayList arrayList = new ArrayList();
        for (Map<DatanodeStorage, BlockListAsLongs> map : this.cluster.getAllBlockReports(this.cluster.getNamesystem(i).getBlockPoolId())) {
            ArrayList arrayList2 = new ArrayList();
            Iterator<BlockListAsLongs> it = map.values().iterator();
            while (it.hasNext()) {
                arrayList2.add(Integer.valueOf(it.next().getNumberOfBlocks()));
            }
            arrayList.add(arrayList2);
        }
        return arrayList;
    }

    @Test(timeout = 60000)
    public void testAddOneNewVolume() throws IOException, ReconfigurationException, InterruptedException, TimeoutException {
        startDFSCluster(1, 1);
        String blockPoolId = this.cluster.getNamesystem().getBlockPoolId();
        addVolumes(1);
        Path path = new Path("/test");
        createFile(path, 10);
        List<Map<DatanodeStorage, BlockListAsLongs>> allBlockReports = this.cluster.getAllBlockReports(blockPoolId);
        Assert.assertEquals(1L, allBlockReports.size());
        Assert.assertEquals(3L, allBlockReports.get(0).size());
        int i = Integer.MAX_VALUE;
        int i2 = Integer.MIN_VALUE;
        for (BlockListAsLongs blockListAsLongs : allBlockReports.get(0).values()) {
            i = Math.min(i, blockListAsLongs.getNumberOfBlocks());
            i2 = Math.max(i2, blockListAsLongs.getNumberOfBlocks());
        }
        Assert.assertTrue(Math.abs(i2 - i2) <= 1);
        verifyFileLength(this.cluster.getFileSystem(), path, 10);
    }

    @Test(timeout = 60000)
    public void testAddVolumesDuringWrite() throws IOException, InterruptedException, TimeoutException, ReconfigurationException {
        startDFSCluster(1, 1);
        int storagesPerDatanode = this.cluster.getStoragesPerDatanode();
        String blockPoolId = this.cluster.getNamesystem().getBlockPoolId();
        Path path = new Path("/test");
        int i = storagesPerDatanode * 2;
        createFile(path, i);
        addVolumes(5);
        int i2 = storagesPerDatanode + 5;
        DFSTestUtil.appendFile((FileSystem) this.cluster.getFileSystem(), path, BLOCK_SIZE * 9);
        verifyFileLength(this.cluster.getFileSystem(), path, i + 9);
        List asList = Arrays.asList(1, 1, 1, 1, 1, 4, 4);
        List<Map<DatanodeStorage, BlockListAsLongs>> allBlockReports = this.cluster.getAllBlockReports(blockPoolId);
        Assert.assertEquals(1L, allBlockReports.size());
        Assert.assertEquals(i2, allBlockReports.get(0).size());
        Map<DatanodeStorage, BlockListAsLongs> map = allBlockReports.get(0);
        ArrayList arrayList = new ArrayList();
        Iterator<BlockListAsLongs> it = map.values().iterator();
        while (it.hasNext()) {
            arrayList.add(Integer.valueOf(it.next().getNumberOfBlocks()));
        }
        Collections.sort(arrayList);
        Assert.assertEquals(asList, arrayList);
    }

    @Test(timeout = 180000)
    public void testAddVolumesConcurrently() throws IOException, InterruptedException, TimeoutException, ReconfigurationException {
        startDFSCluster(1, 1, 10);
        int storagesPerDatanode = this.cluster.getStoragesPerDatanode();
        String blockPoolId = this.cluster.getNamesystem().getBlockPoolId();
        Path path = new Path("/test");
        int i = storagesPerDatanode * 2;
        createFile(path, i);
        final DataNode dataNode = this.cluster.getDataNodes().get(0);
        dataNode.data = (FsDatasetSpi) Mockito.spy(dataNode.data);
        final ArrayList arrayList = new ArrayList();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        final CountDownLatch countDownLatch = new CountDownLatch(40);
        Thread thread = new Thread(new Runnable() { // from class: org.apache.hadoop.hdfs.server.datanode.TestDataNodeHotSwapVolumes.2
            @Override // java.lang.Runnable
            public void run() {
                while (countDownLatch.getCount() != 40) {
                    int i2 = 0;
                    while (true) {
                        int i3 = i2;
                        i2++;
                        if (i3 < 1000) {
                            try {
                                dataNode.getStorage().listStorageDirectories();
                            } catch (Exception e) {
                                atomicBoolean2.set(true);
                                TestDataNodeHotSwapVolumes.LOG.error("Error listing storage: " + e);
                            }
                        }
                    }
                }
            }
        });
        thread.start();
        ((FsDatasetSpi) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.hadoop.hdfs.server.datanode.TestDataNodeHotSwapVolumes.3
            public Object answer(final InvocationOnMock invocationOnMock) throws Throwable {
                final Random random = new Random();
                Thread thread2 = new Thread(new Runnable() { // from class: org.apache.hadoop.hdfs.server.datanode.TestDataNodeHotSwapVolumes.3.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            try {
                                random.setSeed(Time.now());
                                if (random.nextInt(10) > 4) {
                                    Thread.sleep((random.nextInt(10) + 1) * 100);
                                }
                                invocationOnMock.callRealMethod();
                                countDownLatch.countDown();
                            } catch (Throwable th) {
                                atomicBoolean.set(true);
                                TestDataNodeHotSwapVolumes.LOG.error("Error adding volume: " + th);
                                countDownLatch.countDown();
                            }
                        } catch (Throwable th2) {
                            countDownLatch.countDown();
                            throw th2;
                        }
                    }
                });
                arrayList.add(thread2);
                thread2.start();
                return null;
            }
        }).when(dataNode.data)).addVolume((StorageLocation) Matchers.any(StorageLocation.class), (List) Matchers.any(List.class));
        addVolumes(40, countDownLatch);
        int i2 = storagesPerDatanode + 40;
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Thread) it.next()).join();
        }
        thread.join();
        Assert.assertEquals("Error adding volumes!", false, Boolean.valueOf(atomicBoolean.get()));
        Assert.assertEquals("Error listing storage!", false, Boolean.valueOf(atomicBoolean2.get()));
        DFSTestUtil.appendFile((FileSystem) this.cluster.getFileSystem(), path, BLOCK_SIZE * 9);
        verifyFileLength(this.cluster.getFileSystem(), path, i + 9);
        List<Map<DatanodeStorage, BlockListAsLongs>> allBlockReports = this.cluster.getAllBlockReports(blockPoolId);
        Assert.assertEquals(1L, allBlockReports.size());
        Assert.assertEquals(i2, allBlockReports.get(0).size());
    }

    @Test(timeout = 60000)
    public void testAddVolumesToFederationNN() throws IOException, TimeoutException, InterruptedException, ReconfigurationException {
        startDFSCluster(2, 1);
        Path path = new Path("/test");
        createFile(0, path, 4);
        createFile(1, path, 4);
        addVolumes(2);
        DFSTestUtil.appendFile((FileSystem) this.cluster.getFileSystem(0), path, 4096);
        List<List<Integer>> numBlocksReport = getNumBlocksReport(0);
        Assert.assertEquals(this.cluster.getDataNodes().size(), numBlocksReport.size());
        List<Integer> list = numBlocksReport.get(0);
        Collections.sort(list);
        Assert.assertEquals(Arrays.asList(2, 2, 4, 4), list);
        List<List<Integer>> numBlocksReport2 = getNumBlocksReport(1);
        Assert.assertEquals(4L, numBlocksReport2.get(0).size());
        Assert.assertEquals(2L, Collections.frequency(numBlocksReport2.get(0), 0));
    }

    @Test(timeout = 60000)
    public void testRemoveOneVolume() throws ReconfigurationException, InterruptedException, TimeoutException, IOException {
        startDFSCluster(1, 1);
        Path path = new Path("/test");
        createFile(path, 10, (short) 1);
        DataNode dataNode = this.cluster.getDataNodes().get(0);
        List<String> dataDirs = getDataDirs(dataNode);
        Assert.assertThat("DN did not update its own config", dataNode.reconfigurePropertyImpl("dfs.datanode.data.dir", dataDirs.iterator().next()), Is.is(dataNode.getConf().get("dfs.datanode.data.dir")));
        assertFileLocksReleased(new ArrayList(dataDirs).subList(1, dataDirs.size()));
        dataNode.scheduleAllBlockReport(0L);
        try {
            DFSTestUtil.readFile(this.cluster.getFileSystem(), path);
            Assert.fail("Expect to throw BlockMissingException.");
        } catch (BlockMissingException e) {
            GenericTestUtils.assertExceptionContains("Could not obtain block", e);
        }
        createFile(new Path("/newFile"), 6);
        List<Map<DatanodeStorage, BlockListAsLongs>> allBlockReports = this.cluster.getAllBlockReports(this.cluster.getNamesystem().getBlockPoolId());
        Assert.assertEquals(1L, allBlockReports.size());
        Assert.assertEquals(11L, allBlockReports.get(0).values().iterator().next().getNumberOfBlocks());
    }

    @Test(timeout = 60000)
    public void testReplicatingAfterRemoveVolume() throws InterruptedException, TimeoutException, IOException, ReconfigurationException {
        startDFSCluster(1, 2);
        DistributedFileSystem fileSystem = this.cluster.getFileSystem();
        Path path = new Path("/test");
        createFile(path, 4, (short) 2);
        DataNode dataNode = this.cluster.getDataNodes().get(0);
        List<String> dataDirs = getDataDirs(dataNode);
        FsVolumeSpi volume = dataNode.getFSDataset().getVolume(DFSTestUtil.getAllBlocks(fileSystem, path).get(1).getBlock());
        String str = "[" + volume.getStorageType() + "]" + new File(volume.getBasePath()).toURI();
        String str2 = str;
        Iterator<String> it = dataDirs.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            String next = it.next();
            if (!str.startsWith(next)) {
                str2 = next;
                break;
            }
        }
        Assert.assertThat("DN did not update its own config", dataNode.reconfigurePropertyImpl("dfs.datanode.data.dir", str2), Is.is(dataNode.getConf().get("dfs.datanode.data.dir")));
        dataDirs.remove(str2);
        assertFileLocksReleased(dataDirs);
        triggerDeleteReport(dataNode);
        waitReplication(fileSystem, path, 1, 1);
        DFSTestUtil.waitReplication((FileSystem) fileSystem, path, (short) 2);
    }

    @Test
    public void testAddVolumeFailures() throws IOException {
        startDFSCluster(1, 1);
        String dataDirectory = this.cluster.getDataDirectory();
        DataNode dataNode = this.cluster.getDataNodes().get(0);
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 4; i++) {
            File file = new File(dataDirectory, "new_vol" + i);
            newArrayList.add(file.toString());
            if (i % 2 == 0) {
                file.createNewFile();
            }
        }
        try {
            dataNode.reconfigurePropertyImpl("dfs.datanode.data.dir", dataNode.getConf().get("dfs.datanode.data.dir") + "," + Joiner.on(",").join(newArrayList));
            Assert.fail("Expect to throw IOException.");
        } catch (ReconfigurationException e) {
            String[] split = e.getCause().getMessage().split("\\r?\\n");
            Assert.assertEquals(2L, split.length);
            Assert.assertThat(split[0], CoreMatchers.containsString("new_vol0"));
            Assert.assertThat(split[1], CoreMatchers.containsString("new_vol2"));
        }
        FsDatasetSpi.FsVolumeReferences fsVolumeReferences = dataNode.getFSDataset().getFsVolumeReferences();
        Throwable th = null;
        try {
            try {
                Iterator it = fsVolumeReferences.iterator();
                while (it.hasNext()) {
                    Assert.assertThat(((FsVolumeSpi) it.next()).getBasePath(), Is.is(CoreMatchers.not(CoreMatchers.anyOf(Is.is(newArrayList.get(0)), Is.is(newArrayList.get(2))))));
                }
                if (fsVolumeReferences != null) {
                    if (0 != 0) {
                        try {
                            fsVolumeReferences.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fsVolumeReferences.close();
                    }
                }
                DataStorage storage = dataNode.getStorage();
                for (int i2 = 0; i2 < storage.getNumStorageDirs(); i2++) {
                    Assert.assertThat(storage.getStorageDir(i2).getRoot().toString(), Is.is(CoreMatchers.not(CoreMatchers.anyOf(Is.is(newArrayList.get(0)), Is.is(newArrayList.get(2))))));
                }
                String[] split2 = dataNode.getConf().get("dfs.datanode.data.dir").split(",");
                Assert.assertEquals(4L, split2.length);
                for (String str : split2) {
                    Assert.assertThat(StorageLocation.parse(str).getFile().getCanonicalPath(), Is.is(CoreMatchers.not(CoreMatchers.anyOf(Is.is(newArrayList.get(0)), Is.is(newArrayList.get(2))))));
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (fsVolumeReferences != null) {
                if (th != null) {
                    try {
                        fsVolumeReferences.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fsVolumeReferences.close();
                }
            }
            throw th3;
        }
    }

    private static void assertFileLocksReleased(Collection<String> collection) throws IOException {
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            try {
                FsDatasetTestUtil.assertFileLockReleased(it.next());
            } catch (IOException e) {
                LOG.warn(e);
            }
        }
    }

    @Test(timeout = 600000)
    public void testRemoveVolumeBeingWritten() throws InterruptedException, TimeoutException, ReconfigurationException, IOException, BrokenBarrierException {
        for (int i = 0; i < 3; i++) {
            testRemoveVolumeBeingWrittenForDatanode(i);
        }
    }

    private void testRemoveVolumeBeingWrittenForDatanode(int i) throws IOException, ReconfigurationException, TimeoutException, InterruptedException, BrokenBarrierException {
        startDFSCluster(1, 4);
        DistributedFileSystem fileSystem = this.cluster.getFileSystem();
        DFSClient client = fileSystem.getClient();
        Path path = new Path("/test");
        FSDataOutputStream create = fileSystem.create(path, (short) 3);
        Random random = new Random(0L);
        byte[] bArr = new byte[256];
        random.nextBytes(bArr);
        create.write(bArr);
        create.hflush();
        int parseInt = Integer.parseInt(fileSystem.getFileBlockLocations(path, 0L, 512L)[0].getNames()[i].split(":")[1]);
        DataNode dataNode = null;
        Iterator<DataNode> it = this.cluster.getDataNodes().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            DataNode next = it.next();
            if (next.getXferPort() == parseInt) {
                dataNode = next;
                break;
            }
        }
        Assert.assertNotNull(dataNode);
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        DataNodeFaultInjector dataNodeFaultInjector = new DataNodeFaultInjector() { // from class: org.apache.hadoop.hdfs.server.datanode.TestDataNodeHotSwapVolumes.4
            public void logDelaySendingAckToUpstream(String str, long j) throws IOException {
                try {
                    if (!atomicBoolean.get()) {
                        cyclicBarrier.await();
                        Thread.sleep(1000L);
                    }
                } catch (InterruptedException | BrokenBarrierException e) {
                    throw new IOException(e);
                }
            }
        };
        DataNodeFaultInjector dataNodeFaultInjector2 = DataNodeFaultInjector.get();
        try {
            DataNodeFaultInjector.set(dataNodeFaultInjector);
            List<String> dataDirs = getDataDirs(dataNode);
            FsVolumeImpl volume = dataNode.getFSDataset().getVolume(client.getLocatedBlocks("/test", 0L).get(0).getBlock());
            StringBuffer stringBuffer = new StringBuffer();
            String str = "";
            for (String str2 : dataDirs) {
                if (!str2.contains(volume.getBasePath())) {
                    stringBuffer.append(str).append(str2);
                    str = ",";
                }
            }
            final String stringBuffer2 = stringBuffer.toString();
            final ArrayList arrayList = new ArrayList();
            final DataNode dataNode2 = dataNode;
            final CyclicBarrier cyclicBarrier2 = new CyclicBarrier(2);
            Thread thread = new Thread(new Runnable() { // from class: org.apache.hadoop.hdfs.server.datanode.TestDataNodeHotSwapVolumes.5
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        cyclicBarrier2.await();
                        cyclicBarrier.await();
                        Assert.assertThat("DN did not update its own config", dataNode2.reconfigurePropertyImpl("dfs.datanode.data.dir", stringBuffer2), Is.is(dataNode2.getConf().get("dfs.datanode.data.dir")));
                        atomicBoolean.set(true);
                    } catch (ReconfigurationException | InterruptedException | BrokenBarrierException e) {
                        arrayList.add(new IOException((Throwable) e));
                    }
                }
            });
            thread.start();
            random.nextBytes(bArr);
            create.write(bArr);
            cyclicBarrier2.await();
            create.hflush();
            create.close();
            thread.join();
            if (!arrayList.isEmpty()) {
                throw MultipleIOException.createIOException(arrayList);
            }
            FsDatasetSpi fSDataset = dataNode.getFSDataset();
            FsDatasetSpi.FsVolumeReferences fsVolumeReferences = fSDataset.getFsVolumeReferences();
            Throwable th = null;
            for (int i2 = 0; i2 < fsVolumeReferences.size(); i2++) {
                try {
                    try {
                        System.out.println("Vol: " + fsVolumeReferences.get(i2).getBasePath());
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                } finally {
                }
            }
            Assert.assertEquals("Volume remove wasn't successful.", 1L, fsVolumeReferences.size());
            if (fsVolumeReferences != null) {
                if (0 != 0) {
                    try {
                        fsVolumeReferences.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    fsVolumeReferences.close();
                }
            }
            DFSTestUtil.waitReplication((FileSystem) fileSystem, path, (short) 3);
            Assert.assertEquals(512L, DFSTestUtil.readFileBuffer(fileSystem, path).length);
            for (int i3 = 0; i3 < 10; i3++) {
                FSDataOutputStream create2 = fileSystem.create(new Path("/after-" + i3), (short) 3);
                Throwable th4 = null;
                try {
                    try {
                        random.nextBytes(bArr);
                        create2.write(bArr);
                        if (create2 != null) {
                            if (0 != 0) {
                                try {
                                    create2.close();
                                } catch (Throwable th5) {
                                    th4.addSuppressed(th5);
                                }
                            } else {
                                create2.close();
                            }
                        }
                    } catch (Throwable th6) {
                        th4 = th6;
                        throw th6;
                    }
                } catch (Throwable th7) {
                    if (create2 != null) {
                        if (th4 != null) {
                            try {
                                create2.close();
                            } catch (Throwable th8) {
                                th4.addSuppressed(th8);
                            }
                        } else {
                            create2.close();
                        }
                    }
                    throw th7;
                }
            }
            fsVolumeReferences = fSDataset.getFsVolumeReferences();
            Throwable th9 = null;
            try {
                try {
                    Assert.assertEquals("Volume remove wasn't successful.", 1L, fsVolumeReferences.size());
                    FsVolumeSpi.BlockIterator newBlockIterator = fsVolumeReferences.get(0).newBlockIterator(this.cluster.getNamesystem().getBlockPoolId(), "test");
                    int i4 = 0;
                    while (!newBlockIterator.atEnd()) {
                        newBlockIterator.nextBlock();
                        i4++;
                    }
                    Assert.assertTrue(String.format("DataNode(%d) should have more than 1 blocks", Integer.valueOf(i)), i4 > 1);
                    if (fsVolumeReferences != null) {
                        if (0 == 0) {
                            fsVolumeReferences.close();
                            return;
                        }
                        try {
                            fsVolumeReferences.close();
                        } catch (Throwable th10) {
                            th9.addSuppressed(th10);
                        }
                    }
                } catch (Throwable th11) {
                    th9 = th11;
                    throw th11;
                }
            } finally {
            }
        } finally {
            DataNodeFaultInjector.set(dataNodeFaultInjector2);
        }
    }

    @Test(timeout = 60000)
    public void testAddBackRemovedVolume() throws IOException, TimeoutException, InterruptedException, ReconfigurationException {
        startDFSCluster(1, 2);
        createFile(new Path("/test"), 32);
        DataNode dataNode = this.cluster.getDataNodes().get(0);
        String str = dataNode.getConf().get("dfs.datanode.data.dir");
        String str2 = str.split(",")[0];
        String str3 = str.split(",")[1];
        Assert.assertThat("DN did not update its own config", dataNode.reconfigurePropertyImpl("dfs.datanode.data.dir", str2), Is.is(dataNode.getConf().get("dfs.datanode.data.dir")));
        for (int i = 0; i < this.cluster.getNumNameNodes(); i++) {
            BlockPoolSliceStorage bPStorage = dataNode.getStorage().getBPStorage(this.cluster.getNamesystem(i).getBlockPoolId());
            for (int i2 = 0; i2 < bPStorage.getNumStorageDirs(); i2++) {
                Assert.assertFalse(bPStorage.getStorageDir(i2).getRoot().getAbsolutePath().startsWith(new File(str3).getAbsolutePath()));
            }
            Assert.assertEquals(dataNode.getStorage().getBPStorage(r0).getNumStorageDirs(), 1L);
        }
        Assert.assertThat("DN did not update its own config", dataNode.reconfigurePropertyImpl("dfs.datanode.data.dir", str), Is.is(dataNode.getConf().get("dfs.datanode.data.dir")));
    }

    @Test(timeout = 60000)
    public void testDirectlyReloadAfterCheckDiskError() throws Exception {
        Assume.assumeTrue(!Path.WINDOWS);
        startDFSCluster(1, 2);
        createFile(new Path("/test"), 32, (short) 2);
        DataNode dataNode = this.cluster.getDataNodes().get(0);
        String str = dataNode.getConf().get("dfs.datanode.data.dir");
        File file = new File(this.cluster.getDataDirectory(), "data1");
        FsVolumeImpl volume = DataNodeTestUtils.getVolume(dataNode, file);
        Assert.assertTrue("No FsVolume was found for " + file, volume != null);
        long dfsUsed = volume.getDfsUsed();
        DataNodeTestUtils.injectDataDirFailure(file);
        DataNodeTestUtils.waitForDiskError(dataNode, volume);
        createFile(new Path("/test1"), 32, (short) 2);
        Assert.assertEquals(dfsUsed, volume.getDfsUsed());
        DataNodeTestUtils.restoreDataDirFromFailure(file);
        Assert.assertThat("DN did not update its own config", dataNode.reconfigurePropertyImpl("dfs.datanode.data.dir", str), Is.is(dataNode.getConf().get("dfs.datanode.data.dir")));
        createFile(new Path("/test2"), 32, (short) 2);
        FsVolumeImpl volume2 = DataNodeTestUtils.getVolume(dataNode, file);
        Assert.assertTrue(volume2 != null);
        Assert.assertTrue(volume2 != volume);
        Assert.assertTrue(volume2.getDfsUsed() > dfsUsed);
    }

    @Test(timeout = 100000)
    public void testFullBlockReportAfterRemovingVolumes() throws IOException, ReconfigurationException {
        Configuration configuration = new Configuration();
        configuration.setLong("dfs.blocksize", 512L);
        configuration.setLong("dfs.blockreport.intervalMsec", 10800000L);
        configuration.setLong("dfs.heartbeat.interval", 1080L);
        this.cluster = new MiniDFSCluster.Builder(configuration).numDataNodes(2).build();
        this.cluster.waitActive();
        DataNode dataNode = this.cluster.getDataNodes().get(0);
        DatanodeProtocolClientSideTranslatorPB spyOnBposToNN = InternalDataNodeTestUtils.spyOnBposToNN(dataNode, this.cluster.getNameNode());
        Assert.assertThat("DN did not update its own config", dataNode.reconfigurePropertyImpl("dfs.datanode.data.dir", new File(this.cluster.getDataDirectory(), "data1").toString()), Is.is(dataNode.getConf().get("dfs.datanode.data.dir")));
        ((DatanodeProtocolClientSideTranslatorPB) Mockito.verify(spyOnBposToNN, Mockito.timeout(60000).times(1))).blockReport((DatanodeRegistration) Matchers.any(DatanodeRegistration.class), Matchers.anyString(), (StorageBlockReport[]) Matchers.any(StorageBlockReport[].class), (BlockReportContext) Matchers.any(BlockReportContext.class));
    }
}
