package org.apache.hadoop.hdfs.server.namenode;

import com.google.common.base.Supplier;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.TestDataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.JournalSet;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.class */
public class TestEditLogRace {
    private static boolean useAsyncEditLog;
    private static final String NAME_DIR;
    private static final Log LOG;
    static final int NUM_THREADS = 16;
    static final int NUM_ROLLS = 30;
    static final int NUM_SAVE_IMAGE = 30;
    private final List<Transactions> workers = new ArrayList();
    private static final int NUM_DATA_NODES = 1;
    private static final int BLOCK_TIME = 4;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hdfs/server/namenode/TestEditLogRace$Transactions.class */
    public static class Transactions implements Runnable {
        final NamenodeProtocols nn;
        final MiniDFSCluster cluster;
        FileSystem fs;
        short replication = 3;
        long blockSize = 64;
        volatile boolean stopped = false;
        volatile Thread thr;
        final AtomicReference<Throwable> caught;

        Transactions(MiniDFSCluster miniDFSCluster, AtomicReference<Throwable> atomicReference) {
            this.cluster = miniDFSCluster;
            this.nn = miniDFSCluster.getNameNodeRpc();
            try {
                this.fs = miniDFSCluster.getFileSystem();
            } catch (IOException e) {
                atomicReference.set(e);
            }
            this.caught = atomicReference;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.thr = Thread.currentThread();
            FsPermission fsPermission = new FsPermission((short) 511);
            int i = 0;
            while (!this.stopped) {
                try {
                    String str = "/thr-" + this.thr.getId() + "-dir-" + i;
                    if (i % 2 == 0) {
                        Path path = new Path(str);
                        this.fs.mkdirs(path);
                        this.fs.delete(path, true);
                    } else {
                        this.nn.mkdirs(str, fsPermission, true);
                        this.nn.delete(str, true);
                    }
                } catch (SafeModeException e) {
                } catch (Throwable th) {
                    if ((th instanceof RemoteException) && th.getClassName().contains("SafeModeException")) {
                        return;
                    }
                    TestEditLogRace.LOG.warn("Got error in transaction thread", th);
                    this.caught.compareAndSet(null, th);
                    return;
                }
                i += TestEditLogRace.NUM_DATA_NODES;
            }
        }

        public void stop() {
            this.stopped = true;
        }

        public Thread getThread() {
            return this.thr;
        }
    }

    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Object[]{false});
        arrayList.add(new Object[]{true});
        return arrayList;
    }

    public TestEditLogRace(boolean z) {
        useAsyncEditLog = z;
    }

    private void startTransactionWorkers(MiniDFSCluster miniDFSCluster, AtomicReference<Throwable> atomicReference) {
        for (int i = 0; i < NUM_THREADS; i += NUM_DATA_NODES) {
            Transactions transactions = new Transactions(miniDFSCluster, atomicReference);
            new Thread(transactions, "TransactionThread-" + i).start();
            this.workers.add(transactions);
        }
    }

    private void stopTransactionWorkers() {
        Iterator<Transactions> it = this.workers.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        Iterator<Transactions> it2 = this.workers.iterator();
        while (it2.hasNext()) {
            Thread thread = it2.next().getThread();
            if (thread != null) {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                }
            }
        }
    }

    @Test
    public void testEditLogRolling() throws Exception {
        MiniDFSCluster build = new MiniDFSCluster.Builder(getConf()).numDataNodes(NUM_DATA_NODES).build();
        FileSystem fileSystem = null;
        AtomicReference<Throwable> atomicReference = new AtomicReference<>();
        try {
            build.waitActive();
            fileSystem = build.getFileSystem();
            NamenodeProtocols rpcServer = build.getNameNode().getRpcServer();
            FSImage fSImage = build.getNamesystem().getFSImage();
            Storage.StorageDirectory storageDir = fSImage.getStorage().getStorageDir(0);
            startTransactionWorkers(build, atomicReference);
            long j = 1;
            for (int i = 0; i < 30; i += NUM_DATA_NODES) {
                if (atomicReference.get() != null) {
                    break;
                }
                try {
                    Thread.sleep(20L);
                } catch (InterruptedException e) {
                }
                LOG.info("Starting roll " + i + ".");
                long j2 = rpcServer.rollEditLog().curSegmentTxId;
                j += verifyEditLogs(build.getNamesystem(), fSImage, NNStorage.getFinalizedEditsFileName(j, j2 - 1), j);
                Assert.assertEquals(j, j2);
                File inProgressEditsFile = NNStorage.getInProgressEditsFile(storageDir, j);
                Assert.assertTrue("Expect " + inProgressEditsFile + " to exist", inProgressEditsFile.exists());
            }
            stopTransactionWorkers();
            if (atomicReference.get() != null) {
                throw new RuntimeException(atomicReference.get());
            }
            if (fileSystem != null) {
                fileSystem.close();
            }
            if (build != null) {
                build.shutdown();
            }
        } catch (Throwable th) {
            stopTransactionWorkers();
            if (atomicReference.get() != null) {
                throw new RuntimeException(atomicReference.get());
            }
            if (fileSystem != null) {
                fileSystem.close();
            }
            if (build != null) {
                build.shutdown();
            }
            throw th;
        }
    }

    private long verifyEditLogs(FSNamesystem fSNamesystem, FSImage fSImage, String str, long j) throws IOException {
        long j2 = -1;
        Iterator it = fSImage.getStorage().dirIterable(NNStorage.NameNodeDirType.EDITS).iterator();
        while (it.hasNext()) {
            File file = new File(((Storage.StorageDirectory) it.next()).getCurrentDir(), str);
            System.out.println("Verifying file: " + file);
            long loadFSEdits = new FSEditLogLoader(fSNamesystem, j).loadFSEdits(new EditLogFileInputStream(file), j);
            System.out.println("Number of edits: " + loadFSEdits);
            Assert.assertTrue(j2 == -1 || loadFSEdits == j2);
            j2 = loadFSEdits;
        }
        Assert.assertTrue(j2 != -1);
        return j2;
    }

    @Test
    public void testSaveNamespace() throws Exception {
        Configuration conf = getConf();
        MiniDFSCluster miniDFSCluster = null;
        FileSystem fileSystem = null;
        AtomicReference<Throwable> atomicReference = new AtomicReference<>();
        try {
            miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
            miniDFSCluster.waitActive();
            fileSystem = miniDFSCluster.getFileSystem();
            FSNamesystem namesystem = miniDFSCluster.getNamesystem();
            FSImage fSImage = namesystem.getFSImage();
            FSEditLog editLog = fSImage.getEditLog();
            startTransactionWorkers(miniDFSCluster, atomicReference);
            for (int i = 0; i < 30; i += NUM_DATA_NODES) {
                if (atomicReference.get() != null) {
                    break;
                }
                try {
                    Thread.sleep(20L);
                } catch (InterruptedException e) {
                }
                LOG.info("Save " + i + ": entering safe mode");
                namesystem.enterSafeMode(false);
                long mostRecentCheckpointTxId = fSImage.getStorage().getMostRecentCheckpointTxId() + 1;
                verifyEditLogs(namesystem, fSImage, NNStorage.getInProgressEditsFileName(mostRecentCheckpointTxId), mostRecentCheckpointTxId);
                LOG.info("Save " + i + ": saving namespace");
                namesystem.saveNamespace();
                LOG.info("Save " + i + ": leaving safemode");
                verifyEditLogs(namesystem, fSImage, NNStorage.getFinalizedEditsFileName(mostRecentCheckpointTxId, fSImage.getStorage().getMostRecentCheckpointTxId()), mostRecentCheckpointTxId);
                Assert.assertEquals(fSImage.getStorage().getMostRecentCheckpointTxId(), editLog.getLastWrittenTxId() - 1);
                namesystem.leaveSafeMode(false);
                LOG.info("Save " + i + ": complete");
            }
            stopTransactionWorkers();
            if (atomicReference.get() != null) {
                throw new RuntimeException(atomicReference.get());
            }
            if (fileSystem != null) {
                fileSystem.close();
            }
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
        } catch (Throwable th) {
            stopTransactionWorkers();
            if (atomicReference.get() != null) {
                throw new RuntimeException(atomicReference.get());
            }
            if (fileSystem != null) {
                fileSystem.close();
            }
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
            throw th;
        }
    }

    private Configuration getConf() {
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        hdfsConfiguration.setBoolean("dfs.namenode.edits.asynclogging", useAsyncEditLog);
        FileSystem.setDefaultUri(hdfsConfiguration, "hdfs://localhost:0");
        hdfsConfiguration.set("dfs.namenode.http-address", "0.0.0.0:0");
        hdfsConfiguration.set("dfs.namenode.name.dir", NAME_DIR);
        hdfsConfiguration.set("dfs.namenode.edits.dir", NAME_DIR);
        hdfsConfiguration.setBoolean("dfs.permissions.enabled", false);
        return hdfsConfiguration;
    }

    @Test
    public void testSaveImageWhileSyncInProgress() throws Exception {
        Configuration conf = getConf();
        NameNode.initMetrics(conf, HdfsServerConstants.NamenodeRole.NAMENODE);
        DFSTestUtil.formatNameNode(conf);
        final FSNamesystem loadFromDisk = FSNamesystem.loadFromDisk(conf);
        try {
            FSImage fSImage = loadFromDisk.getFSImage();
            JournalSet.JournalAndStream journalAndStream = (JournalSet.JournalAndStream) fSImage.getEditLog().getJournals().get(0);
            EditLogFileOutputStream editLogFileOutputStream = (EditLogFileOutputStream) Mockito.spy(journalAndStream.getCurrentStream());
            journalAndStream.setCurrentStreamForTests(editLogFileOutputStream);
            final AtomicReference atomicReference = new AtomicReference();
            final CountDownLatch countDownLatch = new CountDownLatch(NUM_DATA_NODES);
            final Thread thread = new Thread() { // from class: org.apache.hadoop.hdfs.server.namenode.TestEditLogRace.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        TestEditLogRace.LOG.info("Starting mkdirs");
                        loadFromDisk.mkdirs("/test", new PermissionStatus("test", "test", new FsPermission((short) 493)), true);
                        TestEditLogRace.LOG.info("mkdirs complete");
                    } catch (Throwable th) {
                        TestEditLogRace.LOG.fatal("Got exception", th);
                        atomicReference.set(th);
                        countDownLatch.countDown();
                    }
                }
            };
            ((EditLogFileOutputStream) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.hadoop.hdfs.server.namenode.TestEditLogRace.2
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public Void m404answer(InvocationOnMock invocationOnMock) throws Throwable {
                    TestEditLogRace.LOG.info("Flush called");
                    if (TestEditLogRace.useAsyncEditLog || Thread.currentThread() == thread) {
                        TestEditLogRace.LOG.info("edit thread: Telling main thread we made it to flush section...");
                        countDownLatch.countDown();
                        TestEditLogRace.LOG.info("edit thread: sleeping for 4secs");
                        Thread.sleep(4000L);
                        TestEditLogRace.LOG.info("Going through to flush. This will allow the main thread to continue.");
                    }
                    invocationOnMock.callRealMethod();
                    TestEditLogRace.LOG.info("Flush complete");
                    return null;
                }
            }).when(editLogFileOutputStream)).flush();
            thread.start();
            LOG.info("Main thread: waiting to enter flush...");
            countDownLatch.await();
            Assert.assertNull(atomicReference.get());
            LOG.info("Main thread: detected that logSync is in unsynchronized section.");
            LOG.info("Trying to enter safe mode.");
            LOG.info("This should block for 4sec, since flush will sleep that long");
            long now = Time.now();
            loadFromDisk.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
            long now2 = Time.now();
            LOG.info("Entered safe mode");
            Assert.assertTrue(now2 - now > 3000);
            loadFromDisk.saveNamespace();
            LOG.info("Joining on edit thread...");
            thread.join();
            Assert.assertNull(atomicReference.get());
            Assert.assertEquals(3L, verifyEditLogs(loadFromDisk, fSImage, NNStorage.getFinalizedEditsFileName(1L, 3L), 1L));
            Assert.assertEquals(1L, verifyEditLogs(loadFromDisk, fSImage, NNStorage.getInProgressEditsFileName(4L), 4L));
            LOG.info("Closing nn");
            if (loadFromDisk != null) {
                loadFromDisk.close();
            }
        } catch (Throwable th) {
            LOG.info("Closing nn");
            if (loadFromDisk != null) {
                loadFromDisk.close();
            }
            throw th;
        }
    }

    @Test
    public void testSaveRightBeforeSync() throws Exception {
        Configuration conf = getConf();
        NameNode.initMetrics(conf, HdfsServerConstants.NamenodeRole.NAMENODE);
        DFSTestUtil.formatNameNode(conf);
        final FSNamesystem loadFromDisk = FSNamesystem.loadFromDisk(conf);
        try {
            FSImage fSImage = loadFromDisk.getFSImage();
            final FSEditLog editLog = fSImage.getEditLog();
            final AtomicReference atomicReference = new AtomicReference();
            final CountDownLatch countDownLatch = new CountDownLatch(NUM_DATA_NODES);
            Thread thread = new Thread() { // from class: org.apache.hadoop.hdfs.server.namenode.TestEditLogRace.3
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        TestEditLogRace.LOG.info("Starting setOwner");
                        loadFromDisk.writeLock();
                        try {
                            editLog.logSetOwner("/", "test", "test");
                            loadFromDisk.writeUnlock();
                            countDownLatch.countDown();
                            TestEditLogRace.LOG.info("edit thread: sleeping for 4secs");
                            Thread.sleep(4000L);
                            editLog.logSync();
                            TestEditLogRace.LOG.info("edit thread: logSync complete");
                        } catch (Throwable th) {
                            loadFromDisk.writeUnlock();
                            throw th;
                        }
                    } catch (Throwable th2) {
                        TestEditLogRace.LOG.fatal("Got exception", th2);
                        atomicReference.set(th2);
                        countDownLatch.countDown();
                    }
                }
            };
            thread.setDaemon(true);
            thread.start();
            LOG.info("Main thread: waiting to just before logSync...");
            countDownLatch.await(200L, TimeUnit.MILLISECONDS);
            Assert.assertNull(atomicReference.get());
            LOG.info("Main thread: detected that logSync about to be called.");
            LOG.info("Trying to enter safe mode.");
            long now = Time.now();
            loadFromDisk.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
            long now2 = Time.now();
            LOG.info("Entered safe mode after " + (now2 - now) + "ms");
            Assert.assertTrue(now2 - now < TestDataNodeFaultInjector.MetricsDataNodeFaultInjector.DELAY);
            loadFromDisk.saveNamespace();
            LOG.info("Joining on edit thread...");
            thread.join();
            Assert.assertNull(atomicReference.get());
            Assert.assertEquals(3L, verifyEditLogs(loadFromDisk, fSImage, NNStorage.getFinalizedEditsFileName(1L, 3L), 1L));
            Assert.assertEquals(1L, verifyEditLogs(loadFromDisk, fSImage, NNStorage.getInProgressEditsFileName(4L), 4L));
            LOG.info("Closing nn");
            if (loadFromDisk != null) {
                loadFromDisk.close();
            }
        } catch (Throwable th) {
            LOG.info("Closing nn");
            if (loadFromDisk != null) {
                loadFromDisk.close();
            }
            throw th;
        }
    }

    @Test(timeout = 180000)
    public void testDeadlock() throws Throwable {
        GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.INFO);
        Configuration conf = getConf();
        NameNode.initMetrics(conf, HdfsServerConstants.NamenodeRole.NAMENODE);
        DFSTestUtil.formatNameNode(conf);
        FSNamesystem loadFromDisk = FSNamesystem.loadFromDisk(conf);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final Semaphore semaphore = new Semaphore(0);
        final CountDownLatch countDownLatch = new CountDownLatch(NUM_DATA_NODES);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        try {
            final FSEditLog editLog = loadFromDisk.getEditLog();
            FSEditLogOp.SetOwnerOp group = FSEditLogOp.SetOwnerOp.getInstance(editLog.cache.get()).setSource("/").setUser("u").setGroup("g");
            final FSEditLogOp fSEditLogOp = (FSEditLogOp) Mockito.spy(group);
            ((FSEditLogOp) Mockito.doNothing().when(fSEditLogOp)).reset();
            Future[] futureArr = new Future[NUM_THREADS];
            for (int i = 0; i < futureArr.length; i += NUM_DATA_NODES) {
                final int i2 = i;
                futureArr[i] = newCachedThreadPool.submit(new Callable() { // from class: org.apache.hadoop.hdfs.server.namenode.TestEditLogRace.4
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        Thread.currentThread().setName("Log spammer " + i2);
                        countDownLatch.await();
                        for (int i3 = 0; !atomicBoolean.get() && i3 < 1000000; i3 += TestEditLogRace.NUM_DATA_NODES) {
                            editLog.logEdit(fSEditLogOp);
                            if (i3 % 2048 == 0) {
                                TestEditLogRace.LOG.info("thread[" + i2 + "] edits=" + i3);
                            }
                        }
                        Assert.assertTrue("too many edits", atomicBoolean.get());
                        return null;
                    }
                });
            }
            final FSEditLogOp fSEditLogOp2 = (FSEditLogOp) Mockito.spy(group);
            ((FSEditLogOp) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.hadoop.hdfs.server.namenode.TestEditLogRace.5
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public Void m405answer(InvocationOnMock invocationOnMock) throws Throwable {
                    countDownLatch.countDown();
                    semaphore.acquire();
                    invocationOnMock.callRealMethod();
                    return null;
                }
            }).when(fSEditLogOp2)).setTransactionId(Mockito.anyLong());
            ((FSEditLogOp) Mockito.doNothing().when(fSEditLogOp2)).reset();
            for (int i3 = 0; i3 < 8; i3 += NUM_DATA_NODES) {
                Future submit = newCachedThreadPool.submit(new Callable() { // from class: org.apache.hadoop.hdfs.server.namenode.TestEditLogRace.6
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        Thread.currentThread().setName("Log blocker");
                        editLog.logEdit(fSEditLogOp2);
                        editLog.logSync();
                        return null;
                    }
                });
                final long lastWrittenTxIdWithoutLock = editLog.getLastWrittenTxIdWithoutLock();
                final long[] jArr = {lastWrittenTxIdWithoutLock, lastWrittenTxIdWithoutLock, lastWrittenTxIdWithoutLock};
                GenericTestUtils.waitFor(new Supplier<Boolean>() { // from class: org.apache.hadoop.hdfs.server.namenode.TestEditLogRace.7
                    /* renamed from: get, reason: merged with bridge method [inline-methods] */
                    public Boolean m406get() {
                        jArr[0] = jArr[TestEditLogRace.NUM_DATA_NODES];
                        jArr[TestEditLogRace.NUM_DATA_NODES] = jArr[2];
                        jArr[2] = editLog.getLastWrittenTxIdWithoutLock();
                        return Boolean.valueOf(jArr[0] == jArr[TestEditLogRace.NUM_DATA_NODES] && jArr[TestEditLogRace.NUM_DATA_NODES] == jArr[2] && jArr[2] > lastWrittenTxIdWithoutLock);
                    }
                }, 100, 10000);
                final CountDownLatch countDownLatch2 = new CountDownLatch(NUM_DATA_NODES);
                Future submit2 = newCachedThreadPool.submit(new Callable() { // from class: org.apache.hadoop.hdfs.server.namenode.TestEditLogRace.8
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        Thread.currentThread().setName("Log synchronizer");
                        countDownLatch2.countDown();
                        synchronized (editLog) {
                            editLog.logEdit(fSEditLogOp);
                            editLog.logSync();
                        }
                        return null;
                    }
                });
                countDownLatch2.await();
                semaphore.release();
                submit.get();
                submit2.get();
            }
            atomicBoolean.set(true);
            for (int i4 = 0; i4 < futureArr.length; i4 += NUM_DATA_NODES) {
                futureArr[i4].get();
            }
            editLog.logSyncAll();
            LOG.info("Closing nn");
            newCachedThreadPool.shutdownNow();
            if (loadFromDisk != null) {
                loadFromDisk.getFSImage().getStorage().close();
                loadFromDisk.close();
            }
        } catch (Throwable th) {
            LOG.info("Closing nn");
            newCachedThreadPool.shutdownNow();
            if (loadFromDisk != null) {
                loadFromDisk.getFSImage().getStorage().close();
                loadFromDisk.close();
            }
            throw th;
        }
    }

    static {
        GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
        NAME_DIR = MiniDFSCluster.getBaseDirectory() + "name1";
        LOG = LogFactory.getLog(TestEditLogRace.class);
    }
}
