package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ReplicationTests.class, MediumTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.class */
public class TestReplicationSource {
    private static FileSystem FS;
    private static Path oldLogDir;
    private static Path logDir;

    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationSource.class);
    private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSource.class);
    private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
    private static final HBaseTestingUtil TEST_UTIL_PEER = new HBaseTestingUtil();
    private static Configuration conf = TEST_UTIL.getConfiguration();

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource$BadReplicationEndpoint.class */
    public static class BadReplicationEndpoint extends DoNothingReplicationEndpoint {
        static boolean failing = true;

        @Override // org.apache.hadoop.hbase.replication.regionserver.TestReplicationSource.DoNothingReplicationEndpoint
        public synchronized UUID getPeerUUID() {
            if (failing) {
                return null;
            }
            return super.getPeerUUID();
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource$DoNothingReplicationEndpoint.class */
    public static class DoNothingReplicationEndpoint extends HBaseInterClusterReplicationEndpoint {
        private final UUID uuid = UUID.randomUUID();

        public void init(ReplicationEndpoint.Context context) throws IOException {
            this.ctx = context;
        }

        public WALEntryFilter getWALEntryfilter() {
            return null;
        }

        public synchronized UUID getPeerUUID() {
            return this.uuid;
        }

        protected void doStart() {
            notifyStarted();
        }

        protected void doStop() {
            notifyStopped();
        }

        public boolean canReplicateToSameCluster() {
            return true;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource$FaultyReplicationEndpoint.class */
    public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint {
        static int count = 0;

        @Override // org.apache.hadoop.hbase.replication.regionserver.TestReplicationSource.DoNothingReplicationEndpoint
        public synchronized UUID getPeerUUID() {
            throw new RuntimeException();
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource$FlakyReplicationEndpoint.class */
    public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint {
        static int count = 0;

        @Override // org.apache.hadoop.hbase.replication.regionserver.TestReplicationSource.DoNothingReplicationEndpoint
        public synchronized UUID getPeerUUID() {
            if (count != 0) {
                return super.getPeerUUID();
            }
            count++;
            throw new RuntimeException();
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource$ShutdownDelayRegionServer.class */
    public static class ShutdownDelayRegionServer extends HRegionServer {
        public ShutdownDelayRegionServer(Configuration configuration) throws IOException {
            super(configuration);
        }

        protected void stopServiceThreads() {
            TestReplicationSource.LOG.info("Adding a delay to the regionserver shutdown");
            try {
                Thread.sleep(2000L);
            } catch (InterruptedException e) {
                TestReplicationSource.LOG.error("Interrupted while sleeping");
            }
            super.stopServiceThreads();
        }
    }

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        TEST_UTIL.startMiniDFSCluster(1);
        FS = TEST_UTIL.getDFSCluster().getFileSystem();
        Path createRootDir = TEST_UTIL.createRootDir();
        oldLogDir = new Path(createRootDir, "oldWALs");
        if (FS.exists(oldLogDir)) {
            FS.delete(oldLogDir, true);
        }
        logDir = new Path(createRootDir, "WALs");
        if (FS.exists(logDir)) {
            FS.delete(logDir, true);
        }
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        TEST_UTIL_PEER.shutdownMiniHBaseCluster();
        TEST_UTIL.shutdownMiniHBaseCluster();
        TEST_UTIL.shutdownMiniDFSCluster();
    }

    @Test
    public void testDefaultSkipsMetaWAL() throws IOException {
        ReplicationSource replicationSource = new ReplicationSource();
        Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
        configuration.setInt("replication.source.maxretriesmultiplier", 1);
        ReplicationPeer replicationPeer = (ReplicationPeer) Mockito.mock(ReplicationPeer.class);
        Mockito.when(replicationPeer.getConfiguration()).thenReturn(configuration);
        Mockito.when(Long.valueOf(replicationPeer.getPeerBandwidth())).thenReturn(0L);
        ReplicationPeerConfig replicationPeerConfig = (ReplicationPeerConfig) Mockito.mock(ReplicationPeerConfig.class);
        Mockito.when(replicationPeerConfig.getReplicationEndpointImpl()).thenReturn(DoNothingReplicationEndpoint.class.getName());
        Mockito.when(replicationPeer.getPeerConfig()).thenReturn(replicationPeerConfig);
        ReplicationSourceManager replicationSourceManager = (ReplicationSourceManager) Mockito.mock(ReplicationSourceManager.class);
        Mockito.when(replicationSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong());
        Mockito.when(replicationSourceManager.getGlobalMetrics()).thenReturn(Mockito.mock(MetricsReplicationGlobalSourceSource.class));
        RegionServerServices createMockRegionServerService = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
        replicationSource.init(configuration, (FileSystem) null, replicationSourceManager, (ReplicationQueueStorage) null, replicationPeer, createMockRegionServerService, "qid", (UUID) null, path -> {
            return OptionalLong.empty();
        }, new MetricsSource("qid"));
        try {
            replicationSource.startup();
            Assert.assertTrue(replicationSource.isSourceActive());
            Assert.assertEquals(0L, replicationSource.getSourceMetrics().getSizeOfLogQueue());
            replicationSource.enqueueLog(new Path("a.1.meta"));
            Assert.assertEquals(0L, replicationSource.getSourceMetrics().getSizeOfLogQueue());
            replicationSource.enqueueLog(new Path("a.1"));
            Assert.assertEquals(1L, replicationSource.getSourceMetrics().getSizeOfLogQueue());
            replicationSource.terminate("Done");
            createMockRegionServerService.stop("Done");
        } catch (Throwable th) {
            replicationSource.terminate("Done");
            createMockRegionServerService.stop("Done");
            throw th;
        }
    }

    @Test
    public void testWALEntryFilter() throws IOException {
        ReplicationSource replicationSource = new ReplicationSource();
        UUID randomUUID = UUID.randomUUID();
        Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
        ReplicationPeer replicationPeer = (ReplicationPeer) Mockito.mock(ReplicationPeer.class);
        Mockito.when(replicationPeer.getConfiguration()).thenReturn(configuration);
        Mockito.when(Long.valueOf(replicationPeer.getPeerBandwidth())).thenReturn(0L);
        ReplicationPeerConfig replicationPeerConfig = (ReplicationPeerConfig) Mockito.mock(ReplicationPeerConfig.class);
        Mockito.when(replicationPeerConfig.getReplicationEndpointImpl()).thenReturn(DoNothingReplicationEndpoint.class.getName());
        Mockito.when(replicationPeer.getPeerConfig()).thenReturn(replicationPeerConfig);
        ReplicationSourceManager replicationSourceManager = (ReplicationSourceManager) Mockito.mock(ReplicationSourceManager.class);
        Mockito.when(replicationSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong());
        RegionServerServices createMockRegionServerService = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
        replicationSource.init(configuration, (FileSystem) null, replicationSourceManager, (ReplicationQueueStorage) null, replicationPeer, createMockRegionServerService, "qid", randomUUID, path -> {
            return OptionalLong.empty();
        }, new MetricsSource("qid"));
        try {
            replicationSource.startup();
            TEST_UTIL.waitFor(30000L, () -> {
                return replicationSource.getWalEntryFilter() != null;
            });
            WALEntryFilter walEntryFilter = replicationSource.getWalEntryFilter();
            WALEdit add = new WALEdit().add(CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(HConstants.EMPTY_START_ROW).setFamily(HConstants.CATALOG_FAMILY).setType(Cell.Type.Put).build());
            WAL.Entry entry = new WAL.Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.valueOf("test"), -1L, -1L, randomUUID), add);
            Assert.assertTrue(walEntryFilter.filter(entry) == entry);
            Assert.assertNull(walEntryFilter.filter(new WAL.Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.META_TABLE_NAME, -1L, -1L, randomUUID), add)));
            replicationSource.terminate("Done");
            createMockRegionServerService.stop("Done");
        } catch (Throwable th) {
            replicationSource.terminate("Done");
            createMockRegionServerService.stop("Done");
            throw th;
        }
    }

    @Test
    public void testLogMoving() throws Exception {
        Path path = new Path(logDir, "log");
        if (!FS.exists(logDir)) {
            FS.mkdirs(logDir);
        }
        if (!FS.exists(oldLogDir)) {
            FS.mkdirs(oldLogDir);
        }
        WALProvider.Writer createWALWriter = WALFactory.createWALWriter(FS, path, TEST_UTIL.getConfiguration());
        for (int i = 0; i < 3; i++) {
            byte[] bytes = Bytes.toBytes(Integer.toString(i));
            KeyValue keyValue = new KeyValue(bytes, bytes, bytes);
            WALEdit wALEdit = new WALEdit();
            wALEdit.add(keyValue);
            createWALWriter.append(new WAL.Entry(new WALKeyImpl(bytes, TableName.valueOf(bytes), 0L, 0L, HConstants.DEFAULT_CLUSTER_ID), wALEdit));
            createWALWriter.sync(false);
        }
        createWALWriter.close();
        WAL.Reader createReader = WALFactory.createReader(FS, path, TEST_UTIL.getConfiguration());
        Assert.assertNotNull(createReader.next());
        FS.rename(path, new Path(oldLogDir, "log"));
        Assert.assertNotNull(createReader.next());
        createReader.next();
        Assert.assertNull(createReader.next());
        createReader.close();
    }

    @Test
    public void testTerminateTimeout() throws Exception {
        ReplicationSource replicationSource = new ReplicationSource();
        DoNothingReplicationEndpoint doNothingReplicationEndpoint = new DoNothingReplicationEndpoint();
        try {
            doNothingReplicationEndpoint.start();
            ReplicationPeer replicationPeer = (ReplicationPeer) Mockito.mock(ReplicationPeer.class);
            Mockito.when(Long.valueOf(replicationPeer.getPeerBandwidth())).thenReturn(0L);
            Configuration create = HBaseConfiguration.create();
            create.setInt("replication.source.maxretriesmultiplier", 1);
            ReplicationSourceManager replicationSourceManager = (ReplicationSourceManager) Mockito.mock(ReplicationSourceManager.class);
            Mockito.when(replicationSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong());
            replicationSource.init(create, (FileSystem) null, replicationSourceManager, (ReplicationQueueStorage) null, replicationPeer, (Server) null, "testPeer", (UUID) null, path -> {
                return OptionalLong.empty();
            }, (MetricsSource) null);
            Future<?> submit = Executors.newSingleThreadExecutor().submit(() -> {
                replicationSource.terminate("testing source termination");
            });
            long j = create.getLong("replication.source.sleepforretries", 1000L) * 2;
            submit.getClass();
            Waiter.waitFor(create, j, submit::isDone);
            doNothingReplicationEndpoint.stop();
        } catch (Throwable th) {
            doNothingReplicationEndpoint.stop();
            throw th;
        }
    }

    @Test
    public void testTerminateClearsBuffer() throws Exception {
        ReplicationSource replicationSource = new ReplicationSource();
        ReplicationSourceManager replicationSourceManager = (ReplicationSourceManager) Mockito.mock(ReplicationSourceManager.class);
        MetricsReplicationGlobalSourceSource metricsReplicationGlobalSourceSource = (MetricsReplicationGlobalSourceSource) Mockito.mock(MetricsReplicationGlobalSourceSource.class);
        Mockito.when(replicationSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong());
        Mockito.when(replicationSourceManager.getGlobalMetrics()).thenReturn(metricsReplicationGlobalSourceSource);
        ReplicationPeer replicationPeer = (ReplicationPeer) Mockito.mock(ReplicationPeer.class);
        Mockito.when(Long.valueOf(replicationPeer.getPeerBandwidth())).thenReturn(0L);
        replicationSource.init(HBaseConfiguration.create(), (FileSystem) null, replicationSourceManager, (ReplicationQueueStorage) null, replicationPeer, (Server) Mockito.mock(Server.class), "testPeer", (UUID) null, path -> {
            return OptionalLong.empty();
        }, (MetricsSource) Mockito.mock(MetricsSource.class));
        ReplicationSourceWALReader replicationSourceWALReader = new ReplicationSourceWALReader((FileSystem) null, conf, (ReplicationSourceLogQueue) null, 0L, (WALEntryFilter) null, replicationSource, (String) null);
        ReplicationSourceShipper replicationSourceShipper = new ReplicationSourceShipper(conf, (String) null, (ReplicationSourceLogQueue) null, replicationSource);
        replicationSourceShipper.entryReader = replicationSourceWALReader;
        replicationSource.workerThreads.put("testPeer", replicationSourceShipper);
        WALEntryBatch wALEntryBatch = new WALEntryBatch(10, logDir);
        WAL.Entry entry = (WAL.Entry) Mockito.mock(WAL.Entry.class);
        WALEdit wALEdit = (WALEdit) Mockito.mock(WALEdit.class);
        WALKeyImpl wALKeyImpl = (WALKeyImpl) Mockito.mock(WALKeyImpl.class);
        Mockito.when(entry.getEdit()).thenReturn(wALEdit);
        Mockito.when(Boolean.valueOf(wALEdit.isEmpty())).thenReturn(false);
        Mockito.when(entry.getKey()).thenReturn(wALKeyImpl);
        Mockito.when(Long.valueOf(wALKeyImpl.estimatedSerializedSizeOf())).thenReturn(1000L);
        Mockito.when(Long.valueOf(wALEdit.heapSize())).thenReturn(10000L);
        Mockito.when(Integer.valueOf(wALEdit.size())).thenReturn(0);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"), Bytes.toBytes("1"), Bytes.toBytes("v1")));
        Mockito.when(wALEdit.getCells()).thenReturn(arrayList);
        replicationSourceWALReader.addEntryToBatch(wALEntryBatch, entry);
        replicationSourceWALReader.entryBatchQueue.put(wALEntryBatch);
        replicationSource.terminate("test");
        Assert.assertEquals(0L, replicationSource.getSourceManager().getTotalBufferUsed().get());
    }

    @Test
    public void testServerShutdownRecoveredQueue() throws Exception {
        try {
            conf.set("hbase.wal.provider", "defaultProvider");
            conf.setInt("replication.sleep.before.failover", 2000);
            conf.set("hbase.regionserver.impl", ShutdownDelayRegionServer.class.getName());
            SingleProcessHBaseCluster startMiniCluster = TEST_UTIL.startMiniCluster(2);
            TEST_UTIL_PEER.startMiniCluster(1);
            HRegionServer regionServer = startMiniCluster.getRegionServer(0);
            final ReplicationSourceManager replicationManager = regionServer.getReplicationSourceService().getReplicationManager();
            HRegionServer regionServer2 = startMiniCluster.getRegionServer(1);
            final ReplicationSourceManager replicationManager2 = regionServer2.getReplicationSourceService().getReplicationManager();
            Admin admin = TEST_UTIL.getAdmin();
            admin.addReplicationPeer("TestPeer", ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build());
            Waiter.waitFor(conf, 20000L, new Waiter.Predicate<Exception>() { // from class: org.apache.hadoop.hbase.replication.regionserver.TestReplicationSource.1
                public boolean evaluate() {
                    return (replicationManager.getSources().isEmpty() || replicationManager2.getSources().isEmpty()) ? false : true;
                }
            });
            admin.disableReplicationPeer("TestPeer");
            startMiniCluster.stopRegionServer(regionServer.getServerName());
            Waiter.waitFor(conf, 20000L, new Waiter.Predicate<Exception>() { // from class: org.apache.hadoop.hbase.replication.regionserver.TestReplicationSource.2
                public boolean evaluate() throws Exception {
                    return replicationManager2.getOldSources().size() == 1;
                }
            });
            final HRegionServer regionServer3 = startMiniCluster.startRegionServer().getRegionServer();
            regionServer3.waitForServerOnline();
            Waiter.waitFor(conf, 20000L, new Waiter.Predicate<Exception>() { // from class: org.apache.hadoop.hbase.replication.regionserver.TestReplicationSource.3
                public boolean evaluate() throws Exception {
                    return regionServer3.getReplicationSourceService() != null;
                }
            });
            ReplicationSourceManager replicationManager3 = regionServer3.getReplicationSourceService().getReplicationManager();
            Assert.assertEquals(0L, replicationManager3.getOldSources().size());
            startMiniCluster.stopRegionServer(regionServer2.getServerName());
            Waiter.waitFor(conf, 20000L, () -> {
                return replicationManager3.getOldSources().size() == 2;
            });
            admin.enableReplicationPeer("TestPeer");
            Waiter.waitFor(conf, 20000L, () -> {
                return replicationManager3.getOldSources().size() == 0;
            });
            conf.set("hbase.regionserver.impl", HRegionServer.class.getName());
        } catch (Throwable th) {
            conf.set("hbase.regionserver.impl", HRegionServer.class.getName());
            throw th;
        }
    }

    @Test
    public void testRecoveredReplicationSourceShipperGetPosition() throws Exception {
        ServerName valueOf = ServerName.valueOf("www.example.com", 12006, 1524679704418L);
        ServerName valueOf2 = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
        RecoveredReplicationSource recoveredReplicationSource = (RecoveredReplicationSource) Mockito.mock(RecoveredReplicationSource.class);
        Server server = (Server) Mockito.mock(Server.class);
        Mockito.when(server.getServerName()).thenReturn(valueOf);
        Mockito.when(recoveredReplicationSource.getServer()).thenReturn(server);
        Mockito.when(recoveredReplicationSource.getServerWALsBelongTo()).thenReturn(valueOf2);
        ReplicationQueueStorage replicationQueueStorage = (ReplicationQueueStorage) Mockito.mock(ReplicationQueueStorage.class);
        Mockito.when(Long.valueOf(replicationQueueStorage.getWALPosition((ServerName) Mockito.eq(valueOf), (String) Mockito.any(), (String) Mockito.any()))).thenReturn(1001L);
        Mockito.when(Long.valueOf(replicationQueueStorage.getWALPosition((ServerName) Mockito.eq(valueOf2), (String) Mockito.any(), (String) Mockito.any()))).thenReturn(-1L);
        Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
        configuration.setInt("replication.source.maxretriesmultiplier", -1);
        MetricsSource metricsSource = (MetricsSource) Mockito.mock(MetricsSource.class);
        ((MetricsSource) Mockito.doNothing().when(metricsSource)).incrSizeOfLogQueue();
        ReplicationSourceLogQueue replicationSourceLogQueue = new ReplicationSourceLogQueue(configuration, metricsSource, recoveredReplicationSource);
        replicationSourceLogQueue.enqueueLog(new Path("/www/html/test"), "fake-wal-group-id");
        Assert.assertEquals(1001L, new RecoveredReplicationSourceShipper(configuration, "fake-wal-group-id", replicationSourceLogQueue, recoveredReplicationSource, replicationQueueStorage).getStartPosition());
    }

    private RegionServerServices setupForAbortTests(ReplicationSource replicationSource, Configuration configuration, String str) throws IOException {
        configuration.setInt("replication.source.maxretriesmultiplier", 1);
        ReplicationPeer replicationPeer = (ReplicationPeer) Mockito.mock(ReplicationPeer.class);
        Mockito.when(replicationPeer.getConfiguration()).thenReturn(configuration);
        Mockito.when(Long.valueOf(replicationPeer.getPeerBandwidth())).thenReturn(0L);
        ReplicationPeerConfig replicationPeerConfig = (ReplicationPeerConfig) Mockito.mock(ReplicationPeerConfig.class);
        FaultyReplicationEndpoint.count = 0;
        Mockito.when(replicationPeerConfig.getReplicationEndpointImpl()).thenReturn(str);
        Mockito.when(replicationPeer.getPeerConfig()).thenReturn(replicationPeerConfig);
        ReplicationSourceManager replicationSourceManager = (ReplicationSourceManager) Mockito.mock(ReplicationSourceManager.class);
        Mockito.when(replicationSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong());
        Mockito.when(replicationSourceManager.getGlobalMetrics()).thenReturn(Mockito.mock(MetricsReplicationGlobalSourceSource.class));
        RegionServerServices createMockRegionServerService = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
        replicationSource.init(configuration, (FileSystem) null, replicationSourceManager, (ReplicationQueueStorage) null, replicationPeer, createMockRegionServerService, "qid", (UUID) null, path -> {
            return OptionalLong.empty();
        }, new MetricsSource("qid"));
        return createMockRegionServerService;
    }

    @Test
    public void testAbortFalseOnError() throws IOException {
        Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
        configuration.setBoolean("replication.source.regionserver.abort", false);
        ReplicationSource replicationSource = new ReplicationSource();
        RegionServerServices regionServerServices = setupForAbortTests(replicationSource, configuration, FlakyReplicationEndpoint.class.getName());
        try {
            replicationSource.startup();
            Assert.assertTrue(replicationSource.isSourceActive());
            Assert.assertEquals(0L, replicationSource.getSourceMetrics().getSizeOfLogQueue());
            replicationSource.enqueueLog(new Path("a.1.meta"));
            Assert.assertEquals(0L, replicationSource.getSourceMetrics().getSizeOfLogQueue());
            replicationSource.enqueueLog(new Path("a.1"));
            Assert.assertEquals(1L, replicationSource.getSourceMetrics().getSizeOfLogQueue());
            replicationSource.terminate("Done");
            regionServerServices.stop("Done");
        } catch (Throwable th) {
            replicationSource.terminate("Done");
            regionServerServices.stop("Done");
            throw th;
        }
    }

    @Test
    public void testReplicationSourceInitializingMetric() throws IOException {
        Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
        configuration.setBoolean("replication.source.regionserver.abort", false);
        ReplicationSource replicationSource = new ReplicationSource();
        RegionServerServices regionServerServices = setupForAbortTests(replicationSource, configuration, BadReplicationEndpoint.class.getName());
        try {
            replicationSource.startup();
            Assert.assertTrue(replicationSource.isSourceActive());
            Waiter.waitFor(configuration, 10000L, () -> {
                return replicationSource.getSourceMetrics().getSourceInitializing() == 1;
            });
            BadReplicationEndpoint.failing = false;
            Waiter.waitFor(configuration, 10000L, () -> {
                return replicationSource.getSourceMetrics().getSourceInitializing() == 0;
            });
            replicationSource.terminate("Done");
            regionServerServices.stop("Done");
        } catch (Throwable th) {
            replicationSource.terminate("Done");
            regionServerServices.stop("Done");
            throw th;
        }
    }

    @Test
    public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException {
        Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
        ReplicationSource replicationSource = new ReplicationSource();
        RegionServerServices regionServerServices = setupForAbortTests(replicationSource, configuration, FaultyReplicationEndpoint.class.getName());
        try {
            replicationSource.startup();
            Assert.assertTrue(true);
            replicationSource.terminate("Done");
            regionServerServices.stop("Done");
        } catch (Throwable th) {
            replicationSource.terminate("Done");
            regionServerServices.stop("Done");
            throw th;
        }
    }

    @Test
    public void testAbortTrueOnError() throws IOException {
        Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
        ReplicationSource replicationSource = new ReplicationSource();
        RegionServerServices regionServerServices = setupForAbortTests(replicationSource, configuration, FlakyReplicationEndpoint.class.getName());
        try {
            replicationSource.startup();
            Assert.assertTrue(replicationSource.isSourceActive());
            Waiter.waitFor(configuration, 1000L, () -> {
                return regionServerServices.isAborted();
            });
            Assert.assertTrue(regionServerServices.isAborted());
            Waiter.waitFor(configuration, 1000L, () -> {
                return !replicationSource.isSourceActive();
            });
            Assert.assertFalse(replicationSource.isSourceActive());
            replicationSource.terminate("Done");
            regionServerServices.stop("Done");
        } catch (Throwable th) {
            replicationSource.terminate("Done");
            regionServerServices.stop("Done");
            throw th;
        }
    }

    @Test
    public void testAgeOfOldestWal() throws Exception {
        try {
            ManualEnvironmentEdge manualEnvironmentEdge = new ManualEnvironmentEdge();
            EnvironmentEdgeManager.injectEdge(manualEnvironmentEdge);
            MetricsSource metricsSource = new MetricsSource("1");
            Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
            configuration.setInt("replication.source.maxretriesmultiplier", 1);
            ReplicationPeer replicationPeer = (ReplicationPeer) Mockito.mock(ReplicationPeer.class);
            Mockito.when(replicationPeer.getConfiguration()).thenReturn(configuration);
            Mockito.when(Long.valueOf(replicationPeer.getPeerBandwidth())).thenReturn(0L);
            ReplicationPeerConfig replicationPeerConfig = (ReplicationPeerConfig) Mockito.mock(ReplicationPeerConfig.class);
            Mockito.when(replicationPeerConfig.getReplicationEndpointImpl()).thenReturn(DoNothingReplicationEndpoint.class.getName());
            Mockito.when(replicationPeer.getPeerConfig()).thenReturn(replicationPeerConfig);
            ReplicationSourceManager replicationSourceManager = (ReplicationSourceManager) Mockito.mock(ReplicationSourceManager.class);
            Mockito.when(replicationSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong());
            Mockito.when(replicationSourceManager.getGlobalMetrics()).thenReturn(Mockito.mock(MetricsReplicationGlobalSourceSource.class));
            RegionServerServices createMockRegionServerService = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
            ReplicationSource replicationSource = new ReplicationSource();
            replicationSource.init(configuration, (FileSystem) null, replicationSourceManager, (ReplicationQueueStorage) null, replicationPeer, createMockRegionServerService, "1", (UUID) null, path -> {
                return OptionalLong.empty();
            }, metricsSource);
            Path path2 = new Path(logDir, "log-walgroup-a.8");
            manualEnvironmentEdge.setValue(10L);
            replicationSource.enqueueLog(path2);
            MetricsReplicationSourceSource sourceMetrics = getSourceMetrics("1");
            Assert.assertEquals(2L, sourceMetrics.getOldestWalAge());
            replicationSource.enqueueLog(new Path(logDir, "log-walgroup-b.4"));
            Assert.assertEquals(6L, sourceMetrics.getOldestWalAge());
            metricsSource.clear();
        } finally {
            EnvironmentEdgeManager.reset();
        }
    }

    private MetricsReplicationSourceSource getSourceMetrics(String str) {
        return ((MetricsReplicationSourceFactoryImpl) CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)).getSource(str);
    }
}
