package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.TableCfWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
@Category({ReplicationTests.class, LargeTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.class */
public class TestWALEntryStream {
    private static HBaseTestingUtility TEST_UTIL;
    private static Configuration conf;
    private static FileSystem fs;
    private static MiniDFSCluster cluster;
    private static final TableName tableName = TableName.valueOf("tablename");
    private static final byte[] family = Bytes.toBytes("column");
    private static final byte[] qualifier = Bytes.toBytes("qualifier");
    private static final HRegionInfo info = new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
    private static final HTableDescriptor htd = new HTableDescriptor(tableName);
    private static NavigableMap<byte[], Integer> scopes;
    private WAL log;
    PriorityBlockingQueue<Path> walQueue;
    private PathWatcher pathWatcher;

    @Rule
    public TestName tn = new TestName();
    private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream$PathWatcher.class */
    public class PathWatcher extends WALActionsListener.Base {
        Path currentPath;

        PathWatcher() {
        }

        public void preLogRoll(Path path, Path path2) throws IOException {
            TestWALEntryStream.this.walQueue.add(path2);
            this.currentPath = path2;
        }
    }

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        TEST_UTIL = new HBaseTestingUtility();
        conf = TEST_UTIL.getConfiguration();
        TEST_UTIL.startMiniDFSCluster(3);
        cluster = TEST_UTIL.getDFSCluster();
        fs = cluster.getFileSystem();
        scopes = new TreeMap(Bytes.BYTES_COMPARATOR);
        Iterator it = htd.getFamiliesKeys().iterator();
        while (it.hasNext()) {
            scopes.put((byte[]) it.next(), 0);
        }
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        TEST_UTIL.shutdownMiniCluster();
    }

    @Before
    public void setUp() throws Exception {
        this.walQueue = new PriorityBlockingQueue<>();
        ArrayList arrayList = new ArrayList();
        this.pathWatcher = new PathWatcher();
        arrayList.add(this.pathWatcher);
        this.log = new WALFactory(conf, arrayList, this.tn.getMethodName()).getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
    }

    @After
    public void tearDown() throws Exception {
        this.log.close();
    }

    @Test
    public void testDifferentCounts() throws Exception {
        int[] iArr = {1, 100};
        Boolean[] boolArr = {false, true};
        for (int i : new int[]{1500, 60000}) {
            for (int i2 : iArr) {
                for (Boolean bool : boolArr) {
                    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.wal.enablecompression", bool.booleanValue());
                    this.mvcc.advanceTo(1L);
                    for (int i3 = 0; i3 < i; i3++) {
                        appendToLogPlus(i2);
                    }
                    this.log.rollWriter();
                    WALEntryStream wALEntryStream = new WALEntryStream(this.walQueue, fs, conf, new MetricsSource("1"));
                    Throwable th = null;
                    try {
                        try {
                            int i4 = 0;
                            Iterator it = wALEntryStream.iterator();
                            while (it.hasNext()) {
                                Assert.assertNotNull((WAL.Entry) it.next());
                                i4++;
                            }
                            Assert.assertEquals(i, i4);
                            Assert.assertFalse(wALEntryStream.hasNext());
                            if (wALEntryStream != null) {
                                if (0 != 0) {
                                    try {
                                        wALEntryStream.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    wALEntryStream.close();
                                }
                            }
                            this.log.close();
                            setUp();
                        } finally {
                        }
                    } catch (Throwable th3) {
                        if (wALEntryStream != null) {
                            if (th != null) {
                                try {
                                    wALEntryStream.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                wALEntryStream.close();
                            }
                        }
                        throw th3;
                    }
                }
            }
        }
    }

    @Test
    public void testAppendsWithRolls() throws Exception {
        WALEntryStream wALEntryStream;
        long position;
        Throwable th;
        appendToLog();
        WALEntryStream wALEntryStream2 = new WALEntryStream(this.walQueue, fs, conf, new MetricsSource("1"));
        Throwable th2 = null;
        try {
            try {
                Assert.assertTrue(wALEntryStream2.hasNext());
                Assert.assertNotNull(wALEntryStream2.next());
                Assert.assertFalse(wALEntryStream2.hasNext());
                try {
                    wALEntryStream2.next();
                    Assert.fail();
                } catch (NoSuchElementException e) {
                }
                long position2 = wALEntryStream2.getPosition();
                if (wALEntryStream2 != null) {
                    if (0 != 0) {
                        try {
                            wALEntryStream2.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        wALEntryStream2.close();
                    }
                }
                appendToLog();
                wALEntryStream = new WALEntryStream(this.walQueue, fs, conf, position2, new MetricsSource("1"));
                Throwable th4 = null;
                try {
                    try {
                        WAL.Entry next = wALEntryStream.next();
                        Assert.assertNotEquals(position2, wALEntryStream.getPosition());
                        Assert.assertNotNull(next);
                        position = wALEntryStream.getPosition();
                        if (wALEntryStream != null) {
                            if (0 != 0) {
                                try {
                                    wALEntryStream.close();
                                } catch (Throwable th5) {
                                    th4.addSuppressed(th5);
                                }
                            } else {
                                wALEntryStream.close();
                            }
                        }
                        appendToLog();
                        this.log.rollWriter();
                        appendToLog();
                        wALEntryStream = new WALEntryStream(this.walQueue, fs, conf, position, new MetricsSource("1"));
                        th = null;
                    } catch (Throwable th6) {
                        th4 = th6;
                        throw th6;
                    }
                } finally {
                }
            } catch (Throwable th7) {
                th2 = th7;
                throw th7;
            }
            try {
                try {
                    WAL.Entry next2 = wALEntryStream.next();
                    Assert.assertNotEquals(position, wALEntryStream.getPosition());
                    Assert.assertNotNull(next2);
                    WAL.Entry next3 = wALEntryStream.next();
                    Assert.assertNotEquals(position, wALEntryStream.getPosition());
                    Assert.assertNotNull(next3);
                    Assert.assertFalse(wALEntryStream.hasNext());
                    wALEntryStream.getPosition();
                    if (wALEntryStream != null) {
                        if (0 == 0) {
                            wALEntryStream.close();
                            return;
                        }
                        try {
                            wALEntryStream.close();
                        } catch (Throwable th8) {
                            th.addSuppressed(th8);
                        }
                    }
                } catch (Throwable th9) {
                    th = th9;
                    throw th9;
                }
            } finally {
            }
        } catch (Throwable th10) {
            if (wALEntryStream2 != null) {
                if (th2 != null) {
                    try {
                        wALEntryStream2.close();
                    } catch (Throwable th11) {
                        th2.addSuppressed(th11);
                    }
                } else {
                    wALEntryStream2.close();
                }
            }
            throw th10;
        }
    }

    @Test
    public void testLogrollWhileStreaming() throws Exception {
        appendToLog("1");
        appendToLog("2");
        WALEntryStream wALEntryStream = new WALEntryStream(this.walQueue, fs, conf, new MetricsSource("1"));
        Throwable th = null;
        try {
            Assert.assertEquals("1", getRow(wALEntryStream.next()));
            appendToLog("3");
            this.log.rollWriter();
            appendToLog("4");
            Assert.assertEquals("2", getRow(wALEntryStream.next()));
            Assert.assertEquals(2L, this.walQueue.size());
            Assert.assertEquals("3", getRow(wALEntryStream.next()));
            Assert.assertEquals("4", getRow(wALEntryStream.next()));
            Assert.assertEquals(1L, this.walQueue.size());
            Assert.assertFalse(wALEntryStream.hasNext());
            if (wALEntryStream != null) {
                if (0 == 0) {
                    wALEntryStream.close();
                    return;
                }
                try {
                    wALEntryStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (wALEntryStream != null) {
                if (0 != 0) {
                    try {
                        wALEntryStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    wALEntryStream.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testNewEntriesWhileStreaming() throws Exception {
        appendToLog("1");
        WALEntryStream wALEntryStream = new WALEntryStream(this.walQueue, fs, conf, 0L, new MetricsSource("1"));
        Throwable th = null;
        try {
            wALEntryStream.next();
            appendToLog("2");
            appendToLog("3");
            Assert.assertFalse(wALEntryStream.hasNext());
            wALEntryStream.reset();
            Assert.assertEquals("2", getRow(wALEntryStream.next()));
            Assert.assertEquals("3", getRow(wALEntryStream.next()));
            Assert.assertFalse(wALEntryStream.hasNext());
            if (wALEntryStream != null) {
                if (0 == 0) {
                    wALEntryStream.close();
                    return;
                }
                try {
                    wALEntryStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (wALEntryStream != null) {
                if (0 != 0) {
                    try {
                        wALEntryStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    wALEntryStream.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testResumeStreamingFromPosition() throws Exception {
        appendToLog("1");
        WALEntryStream wALEntryStream = new WALEntryStream(this.walQueue, fs, conf, 0L, new MetricsSource("1"));
        Throwable th = null;
        try {
            try {
                wALEntryStream.next();
                appendToLog("2");
                appendToLog("3");
                long position = wALEntryStream.getPosition();
                if (wALEntryStream != null) {
                    if (0 != 0) {
                        try {
                            wALEntryStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        wALEntryStream.close();
                    }
                }
                wALEntryStream = new WALEntryStream(this.walQueue, fs, conf, position, new MetricsSource("1"));
                Throwable th3 = null;
                try {
                    try {
                        Assert.assertEquals("2", getRow(wALEntryStream.next()));
                        Assert.assertEquals("3", getRow(wALEntryStream.next()));
                        Assert.assertFalse(wALEntryStream.hasNext());
                        Assert.assertEquals(1L, this.walQueue.size());
                        if (wALEntryStream != null) {
                            if (0 == 0) {
                                wALEntryStream.close();
                                return;
                            }
                            try {
                                wALEntryStream.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        }
                    } catch (Throwable th5) {
                        th3 = th5;
                        throw th5;
                    }
                } finally {
                }
            } catch (Throwable th6) {
                th = th6;
                throw th6;
            }
        } finally {
        }
    }

    @Test
    public void testPosition() throws Exception {
        appendEntriesToLog(3);
        WALEntryStream wALEntryStream = new WALEntryStream(this.walQueue, fs, conf, 0L, new MetricsSource("1"));
        Throwable th = null;
        try {
            try {
                wALEntryStream.next();
                long position = wALEntryStream.getPosition();
                if (wALEntryStream != null) {
                    if (0 != 0) {
                        try {
                            wALEntryStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        wALEntryStream.close();
                    }
                }
                WALEntryStream wALEntryStream2 = new WALEntryStream(this.walQueue, fs, conf, position, new MetricsSource("1"));
                Throwable th3 = null;
                try {
                    Assert.assertNotNull(wALEntryStream2.next());
                    Assert.assertNotNull(wALEntryStream2.next());
                    Assert.assertFalse(wALEntryStream2.hasNext());
                    if (wALEntryStream2 != null) {
                        if (0 == 0) {
                            wALEntryStream2.close();
                            return;
                        }
                        try {
                            wALEntryStream2.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    if (wALEntryStream2 != null) {
                        if (0 != 0) {
                            try {
                                wALEntryStream2.close();
                            } catch (Throwable th6) {
                                th3.addSuppressed(th6);
                            }
                        } else {
                            wALEntryStream2.close();
                        }
                    }
                    throw th5;
                }
            } catch (Throwable th7) {
                th = th7;
                throw th7;
            }
        } catch (Throwable th8) {
            if (wALEntryStream != null) {
                if (th != null) {
                    try {
                        wALEntryStream.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    wALEntryStream.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void testEmptyStream() throws Exception {
        WALEntryStream wALEntryStream = new WALEntryStream(this.walQueue, fs, conf, 0L, new MetricsSource("1"));
        Throwable th = null;
        try {
            Assert.assertFalse(wALEntryStream.hasNext());
            if (wALEntryStream != null) {
                if (0 == 0) {
                    wALEntryStream.close();
                    return;
                }
                try {
                    wALEntryStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (wALEntryStream != null) {
                if (0 != 0) {
                    try {
                        wALEntryStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    wALEntryStream.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testReplicationSourceWALReaderThread() throws Exception {
        appendEntriesToLog(3);
        WALEntryStream wALEntryStream = new WALEntryStream(this.walQueue, fs, conf, new MetricsSource("1"));
        Throwable th = null;
        try {
            try {
                wALEntryStream.next();
                wALEntryStream.next();
                wALEntryStream.next();
                long position = wALEntryStream.getPosition();
                if (wALEntryStream != null) {
                    if (0 != 0) {
                        try {
                            wALEntryStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        wALEntryStream.close();
                    }
                }
                ReplicationSourceManager replicationSourceManager = (ReplicationSourceManager) Mockito.mock(ReplicationSourceManager.class);
                Mockito.when(replicationSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0L));
                ReplicationSourceWALReaderThread replicationSourceWALReaderThread = new ReplicationSourceWALReaderThread(replicationSourceManager, getQueueInfo(), this.walQueue, 0L, fs, conf, getDummyFilter(), new MetricsSource("1"));
                Path peek = this.walQueue.peek();
                replicationSourceWALReaderThread.start();
                ReplicationSourceWALReaderThread.WALEntryBatch take = replicationSourceWALReaderThread.take();
                Assert.assertNotNull(take);
                Assert.assertEquals(3L, take.getWalEntries().size());
                Assert.assertEquals(position, take.getLastWalPosition());
                Assert.assertEquals(peek, take.getLastWalPath());
                Assert.assertEquals(3L, take.getNbRowKeys());
                appendToLog("foo");
                ReplicationSourceWALReaderThread.WALEntryBatch take2 = replicationSourceWALReaderThread.take();
                Assert.assertEquals(1L, take2.getNbEntries());
                Assert.assertEquals(getRow((WAL.Entry) take2.getWalEntries().get(0)), "foo");
            } finally {
            }
        } catch (Throwable th3) {
            if (wALEntryStream != null) {
                if (th != null) {
                    try {
                        wALEntryStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    wALEntryStream.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testReplicationSourceWALReaderThreadRecoveredQueue() throws Exception {
        appendEntriesToLog(3);
        this.log.rollWriter();
        appendEntriesToLog(2);
        WALEntryStream wALEntryStream = new WALEntryStream(new PriorityBlockingQueue(this.walQueue), fs, conf, new MetricsSource("1"));
        Throwable th = null;
        try {
            try {
                wALEntryStream.next();
                wALEntryStream.next();
                wALEntryStream.next();
                wALEntryStream.next();
                wALEntryStream.next();
                long position = wALEntryStream.getPosition();
                if (wALEntryStream != null) {
                    if (0 != 0) {
                        try {
                            wALEntryStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        wALEntryStream.close();
                    }
                }
                ReplicationSourceManager replicationSourceManager = (ReplicationSourceManager) Mockito.mock(ReplicationSourceManager.class);
                Mockito.when(replicationSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0L));
                ReplicationSourceWALReaderThread replicationSourceWALReaderThread = new ReplicationSourceWALReaderThread(replicationSourceManager, getRecoveredQueueInfo(), this.walQueue, 0L, fs, conf, getDummyFilter(), new MetricsSource("1"));
                Path path = ((Path[]) this.walQueue.toArray(new Path[2]))[1];
                replicationSourceWALReaderThread.start();
                ReplicationSourceWALReaderThread.WALEntryBatch take = replicationSourceWALReaderThread.take();
                Assert.assertNotNull(take);
                Assert.assertEquals(5L, take.getWalEntries().size());
                Assert.assertEquals(position, take.getLastWalPosition());
                Assert.assertEquals(path, take.getLastWalPath());
                Assert.assertFalse(take.hasMoreEntries());
            } finally {
            }
        } catch (Throwable th3) {
            if (wALEntryStream != null) {
                if (th != null) {
                    try {
                        wALEntryStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    wALEntryStream.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testWALKeySerialization() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("foo", Bytes.toBytes("foo-value"));
        hashMap.put("bar", Bytes.toBytes("bar-value"));
        WALKey wALKey = new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), 0L, 0L, this.mvcc, hashMap);
        Assert.assertEquals(hashMap, wALKey.getExtendedAttributes());
        WALProtos.WALKey build = wALKey.getBuilder((WALCellCodec.ByteStringCompressor) null).build();
        WALKey wALKey2 = new WALKey();
        wALKey2.readFieldsFromPb(build, (WALCellCodec.ByteStringUncompressor) null);
        Assert.assertEquals(wALKey, wALKey2);
        Assert.assertEquals(wALKey.getExtendedAttributes().keySet(), wALKey2.getExtendedAttributes().keySet());
        for (Map.Entry entry : wALKey2.getExtendedAttributes().entrySet()) {
            Assert.assertArrayEquals(wALKey.getExtendedAttribute((String) entry.getKey()), (byte[]) entry.getValue());
        }
    }

    @Test
    public void testReplicationSourceWALReaderThreadWithFilter() throws Exception {
        byte[] bytes = Bytes.toBytes("notReplicated");
        HashMap hashMap = new HashMap();
        hashMap.put(tableName, Collections.singletonList(Bytes.toString(family)));
        ReplicationPeer replicationPeer = (ReplicationPeer) Mockito.mock(ReplicationPeer.class);
        Mockito.when(replicationPeer.getTableCFs()).thenReturn(hashMap);
        ChainWALEntryFilter chainWALEntryFilter = new ChainWALEntryFilter(new WALEntryFilter[]{new TableCfWALEntryFilter(replicationPeer)});
        appendToLogPlus(3, bytes);
        appendToLogPlus(3, bytes);
        appendToLogPlus(3, bytes);
        appendEntriesToLog(2);
        ReplicationSourceManager replicationSourceManager = (ReplicationSourceManager) Mockito.mock(ReplicationSourceManager.class);
        Mockito.when(replicationSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0L));
        ReplicationSourceWALReaderThread replicationSourceWALReaderThread = new ReplicationSourceWALReaderThread(replicationSourceManager, getQueueInfo(), this.walQueue, 0L, fs, conf, chainWALEntryFilter, new MetricsSource("1"));
        replicationSourceWALReaderThread.start();
        ReplicationSourceWALReaderThread.WALEntryBatch take = replicationSourceWALReaderThread.take();
        Assert.assertNotNull(take);
        Assert.assertFalse(take.isEmpty());
        List walEntries = take.getWalEntries();
        Assert.assertEquals(2L, walEntries.size());
        Iterator it = walEntries.iterator();
        while (it.hasNext()) {
            ArrayList cells = ((WAL.Entry) it.next()).getEdit().getCells();
            Assert.assertTrue(cells.size() == 1);
            Assert.assertTrue(CellUtil.matchingFamily((Cell) cells.get(0), family));
        }
    }

    @Test
    public void testReplicationSourceWALReaderThreadWithFilterWhenLogRolled() throws Exception {
        byte[] bytes = Bytes.toBytes("notReplicated");
        HashMap hashMap = new HashMap();
        hashMap.put(tableName, Collections.singletonList(Bytes.toString(family)));
        ReplicationPeer replicationPeer = (ReplicationPeer) Mockito.mock(ReplicationPeer.class);
        Mockito.when(replicationPeer.getTableCFs()).thenReturn(hashMap);
        ChainWALEntryFilter chainWALEntryFilter = new ChainWALEntryFilter(new WALEntryFilter[]{new TableCfWALEntryFilter(replicationPeer)});
        appendToLogPlus(3, bytes);
        Path peek = this.walQueue.peek();
        final long position = getPosition(peek);
        ReplicationSourceManager replicationSourceManager = (ReplicationSourceManager) Mockito.mock(ReplicationSourceManager.class);
        Mockito.when(replicationSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0L));
        final ReplicationSourceWALReaderThread replicationSourceWALReaderThread = new ReplicationSourceWALReaderThread(replicationSourceManager, getQueueInfo(), this.walQueue, 0L, fs, conf, chainWALEntryFilter, new MetricsSource("1"));
        replicationSourceWALReaderThread.start();
        Waiter.waitFor(conf, 20000L, new Waiter.Predicate<Exception>() { // from class: org.apache.hadoop.hbase.replication.regionserver.TestWALEntryStream.1
            public boolean evaluate() {
                return replicationSourceWALReaderThread.getLastReadPosition() >= position;
            }
        });
        Assert.assertNull(replicationSourceWALReaderThread.poll(0L));
        this.log.rollWriter();
        ReplicationSourceWALReaderThread.WALEntryBatch take = replicationSourceWALReaderThread.take();
        Path peek2 = this.walQueue.peek();
        long position2 = getPosition(peek2);
        Assert.assertNotNull(take);
        Assert.assertTrue(take.isEmpty());
        Assert.assertEquals(1L, this.walQueue.size());
        Assert.assertNotEquals(peek, take.getLastWalPath());
        Assert.assertEquals(peek2, take.getLastWalPath());
        Assert.assertEquals(position2, take.getLastWalPosition());
    }

    private long getPosition(Path path) throws IOException {
        WALEntryStream wALEntryStream = new WALEntryStream(new PriorityBlockingQueue(Collections.singletonList(path)), fs, conf, new MetricsSource("1"));
        wALEntryStream.hasNext();
        return wALEntryStream.getPosition();
    }

    private String getRow(WAL.Entry entry) {
        Cell cell = (Cell) entry.getEdit().getCells().get(0);
        return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
    }

    private void appendToLog(String str) throws IOException {
        this.log.sync(this.log.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), this.mvcc), getWALEdit(str), true));
    }

    private void appendEntriesToLog(int i) throws IOException {
        for (int i2 = 0; i2 < i; i2++) {
            appendToLog();
        }
    }

    private void appendToLog() throws IOException {
        appendToLogPlus(1);
    }

    private void appendToLogPlus(int i) throws IOException {
        appendToLogPlus(i, family, qualifier);
    }

    private void appendToLogPlus(int i, byte[] bArr) throws IOException {
        appendToLogPlus(i, bArr, qualifier);
    }

    private void appendToLogPlus(int i, byte[] bArr, byte[] bArr2) throws IOException {
        this.log.sync(this.log.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), this.mvcc), getWALEdits(i, bArr, bArr2), true));
    }

    private WALEdit getWALEdits(int i, byte[] bArr, byte[] bArr2) {
        WALEdit wALEdit = new WALEdit();
        for (int i2 = 0; i2 < i; i2++) {
            wALEdit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), bArr, bArr2, System.currentTimeMillis(), bArr2));
        }
        return wALEdit;
    }

    private WALEdit getWALEdit(String str) {
        WALEdit wALEdit = new WALEdit();
        wALEdit.add(new KeyValue(Bytes.toBytes(str), family, qualifier, System.currentTimeMillis(), qualifier));
        return wALEdit;
    }

    private WALEntryFilter getDummyFilter() {
        return new WALEntryFilter() { // from class: org.apache.hadoop.hbase.replication.regionserver.TestWALEntryStream.2
            public WAL.Entry filter(WAL.Entry entry) {
                return entry;
            }
        };
    }

    private ReplicationQueueInfo getRecoveredQueueInfo() {
        return getQueueInfo("1-1");
    }

    private ReplicationQueueInfo getQueueInfo() {
        return getQueueInfo("1");
    }

    private ReplicationQueueInfo getQueueInfo(String str) {
        return new ReplicationQueueInfo(str);
    }
}
