package org.apache.distributedlog;

import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.distributedlog.api.AsyncLogReader;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.common.concurrent.FutureEventListener;
import org.apache.distributedlog.exceptions.LockCancelledException;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
import org.apache.distributedlog.io.AsyncCloseable;
import org.apache.distributedlog.lock.LockClosedException;
import org.apache.distributedlog.util.Utils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/TestAsyncReaderLock.class */
public class TestAsyncReaderLock extends TestDistributedLogBase {
    static final Logger LOG = LoggerFactory.getLogger(TestAsyncReaderLock.class);

    @Rule
    public TestName runtime = new TestName();

    /* loaded from: input_file:org/apache/distributedlog/TestAsyncReaderLock$ReadRecordsListener.class */
    static class ReadRecordsListener implements FutureEventListener<AsyncLogReader> {
        final AtomicReference<DLSN> currentDLSN;
        final String name;
        final ExecutorService executorService;
        final CountDownLatch latch = new CountDownLatch(1);
        boolean failed = false;

        public ReadRecordsListener(AtomicReference<DLSN> atomicReference, String str, ExecutorService executorService) {
            this.currentDLSN = atomicReference;
            this.name = str;
            this.executorService = executorService;
        }

        public CountDownLatch getLatch() {
            return this.latch;
        }

        public boolean failed() {
            return this.failed;
        }

        public boolean done() {
            return this.latch.getCount() == 0;
        }

        public void onSuccess(final AsyncLogReader asyncLogReader) {
            TestAsyncReaderLock.LOG.info("Reader {} is ready to read entries", this.name);
            this.executorService.submit(new Runnable() { // from class: org.apache.distributedlog.TestAsyncReaderLock.ReadRecordsListener.1
                @Override // java.lang.Runnable
                public void run() {
                    ReadRecordsListener.this.readEntries(asyncLogReader);
                }
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void readEntries(AsyncLogReader asyncLogReader) {
            try {
                for (int i = 0; i < 300; i++) {
                    try {
                        this.currentDLSN.set(((LogRecordWithDLSN) Utils.ioResult(asyncLogReader.readNext())).getDlsn());
                    } catch (Exception e) {
                        this.failed = true;
                        this.latch.countDown();
                        return;
                    }
                }
                this.latch.countDown();
            } catch (Throwable th) {
                this.latch.countDown();
                throw th;
            }
        }

        public void onFailure(Throwable th) {
            TestAsyncReaderLock.LOG.error("{} failed to open reader", this.name, th);
            this.failed = true;
            this.latch.countDown();
        }
    }

    void assertAcquiredFlagsSet(boolean[] zArr, int i) {
        for (int i2 = 0; i2 < i; i2++) {
            Assert.assertTrue("reader " + i2 + " should have acquired lock", zArr[i2]);
        }
        for (int i3 = i; i3 < zArr.length; i3++) {
            Assert.assertFalse("reader " + i3 + " should not have acquired lock", zArr[i3]);
        }
    }

    @Test(timeout = 60000)
    public void testReaderLockIfLockPathDoesntExist() throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, this.runtime.getMethodName());
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L));
        startAsyncLogSegmentNonPartitioned.closeAndComplete();
        BKAsyncLogReader bKAsyncLogReader = (BKAsyncLogReader) Utils.ioResult(createNewDLM.getAsyncLogReaderWithLock(DLSN.InitialDLSN));
        LogRecordWithDLSN logRecordWithDLSN = (LogRecordWithDLSN) Utils.ioResult(bKAsyncLogReader.readNext());
        Assert.assertEquals(1L, logRecordWithDLSN.getTransactionId());
        Assert.assertEquals(0L, logRecordWithDLSN.getSequenceId());
        DLMTestUtil.verifyLogRecord(logRecordWithDLSN);
        String readLockPath = bKAsyncLogReader.readHandler.getReadLockPath();
        Utils.close(bKAsyncLogReader);
        createNewDLM.getNamespaceDriver().getWriterZKC().get().delete(readLockPath, -1);
        LogRecordWithDLSN logRecordWithDLSN2 = (LogRecordWithDLSN) Utils.ioResult(((AsyncLogReader) Utils.ioResult(createNewDLM.getAsyncLogReaderWithLock(DLSN.InitialDLSN))).readNext());
        Assert.assertEquals(1L, logRecordWithDLSN2.getTransactionId());
        Assert.assertEquals(0L, logRecordWithDLSN2.getSequenceId());
        DLMTestUtil.verifyLogRecord(logRecordWithDLSN2);
    }

    @Test(timeout = 60000)
    public void testReaderLockCloseInAcquireCallback() throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, this.runtime.getMethodName());
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L));
        startAsyncLogSegmentNonPartitioned.closeAndComplete();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        createNewDLM.getAsyncLogReaderWithLock(DLSN.InitialDLSN).thenCompose(asyncLogReader -> {
            return asyncLogReader.asyncClose().thenApply(r3 -> {
                countDownLatch.countDown();
                return null;
            });
        });
        countDownLatch.await();
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testReaderLockBackgroundReaderLockAcquire() throws Exception {
        final String methodName = this.runtime.getMethodName();
        BKDistributedLogManager createNewDLM = createNewDLM(conf, methodName);
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L));
        startAsyncLogSegmentNonPartitioned.closeAndComplete();
        AsyncLogReader asyncLogReader = (AsyncLogReader) Utils.ioResult(createNewDLM.getAsyncLogReaderWithLock(DLSN.InitialDLSN));
        asyncLogReader.readNext();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        new Thread(new Runnable() { // from class: org.apache.distributedlog.TestAsyncReaderLock.1
            @Override // java.lang.Runnable
            public void run() {
                DistributedLogManager distributedLogManager = null;
                try {
                    try {
                        distributedLogManager = TestAsyncReaderLock.this.createNewDLM(TestDistributedLogBase.conf, methodName);
                        atomicBoolean.set(true);
                        countDownLatch.countDown();
                        try {
                            distributedLogManager.close();
                        } catch (Exception e) {
                            Assert.fail("shouldn't reach here");
                        }
                    } catch (Exception e2) {
                        Assert.fail("shouldn't reach here");
                        try {
                            distributedLogManager.close();
                        } catch (Exception e3) {
                            Assert.fail("shouldn't reach here");
                        }
                    }
                } catch (Throwable th) {
                    try {
                        distributedLogManager.close();
                    } catch (Exception e4) {
                        Assert.fail("shouldn't reach here");
                    }
                    throw th;
                }
            }
        }, "acquire-thread").start();
        Thread.sleep(1000L);
        Assert.assertEquals(false, Boolean.valueOf(atomicBoolean.get()));
        Utils.close(asyncLogReader);
        countDownLatch.await();
        Assert.assertEquals(true, Boolean.valueOf(atomicBoolean.get()));
        createNewDLM.close();
    }

    int countDefined(ArrayList<CompletableFuture<AsyncLogReader>> arrayList) {
        int i = 0;
        Iterator<CompletableFuture<AsyncLogReader>> it = arrayList.iterator();
        while (it.hasNext()) {
            if (it.next().isDone()) {
                i++;
            }
        }
        return i;
    }

    @Test(timeout = 60000)
    public void testReaderLockManyLocks() throws Exception {
        String methodName = this.runtime.getMethodName();
        BKDistributedLogManager createNewDLM = createNewDLM(conf, methodName);
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L));
        startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(2L));
        startAsyncLogSegmentNonPartitioned.closeAndComplete();
        final CountDownLatch countDownLatch = new CountDownLatch(5);
        ArrayList arrayList = new ArrayList(5);
        for (int i = 0; i < 5; i++) {
            arrayList.add(null);
        }
        DistributedLogManager[] distributedLogManagerArr = new DistributedLogManager[5];
        for (int i2 = 0; i2 < 5; i2++) {
            distributedLogManagerArr[i2] = createNewDLM(conf, methodName);
            arrayList.set(i2, distributedLogManagerArr[i2].getAsyncLogReaderWithLock(DLSN.InitialDLSN));
            ((CompletableFuture) arrayList.get(i2)).whenComplete((BiConsumer) new FutureEventListener<AsyncLogReader>() { // from class: org.apache.distributedlog.TestAsyncReaderLock.2
                public void onSuccess(AsyncLogReader asyncLogReader) {
                    countDownLatch.countDown();
                    asyncLogReader.asyncClose();
                }

                public void onFailure(Throwable th) {
                    Assert.fail("acquire shouldnt have failed");
                }
            });
        }
        countDownLatch.await();
        for (int i3 = 0; i3 < 5; i3++) {
            distributedLogManagerArr[i3].close();
        }
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testReaderLockDlmClosed() throws Exception {
        String methodName = this.runtime.getMethodName();
        BKDistributedLogManager createNewDLM = createNewDLM(conf, methodName);
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L));
        startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(2L));
        startAsyncLogSegmentNonPartitioned.closeAndComplete();
        BKDistributedLogManager createNewDLM2 = createNewDLM(conf, methodName);
        AsyncLogReader asyncLogReader = (AsyncLogReader) Utils.ioResult(createNewDLM2.getAsyncLogReaderWithLock(DLSN.InitialDLSN));
        BKDistributedLogManager createNewDLM3 = createNewDLM(conf, methodName);
        CompletableFuture asyncLogReaderWithLock = createNewDLM3.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
        createNewDLM3.close();
        try {
            Utils.ioResult(asyncLogReaderWithLock);
            Assert.fail("should have thrown exception!");
        } catch (LockCancelledException e) {
        } catch (CancellationException e2) {
        } catch (LockClosedException e3) {
        }
        Utils.close(asyncLogReader);
        createNewDLM.close();
        createNewDLM2.close();
    }

    @Test(timeout = 60000)
    public void testReaderLockSessionExpires() throws Exception {
        String methodName = this.runtime.getMethodName();
        URI createDLMURI = createDLMURI("/" + methodName);
        ensureURICreated(createDLMURI);
        Namespace build = NamespaceBuilder.newBuilder().conf(conf).uri(createDLMURI).build();
        DistributedLogManager openLog = build.openLog(methodName);
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = openLog.startAsyncLogSegmentNonPartitioned();
        startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L));
        startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(2L));
        startAsyncLogSegmentNonPartitioned.closeAndComplete();
        Namespace build2 = NamespaceBuilder.newBuilder().conf(conf).uri(createDLMURI).build();
        DistributedLogManager openLog2 = build2.openLog(methodName);
        AsyncLogReader asyncLogReader = (AsyncLogReader) Utils.ioResult(openLog2.getAsyncLogReaderWithLock(DLSN.InitialDLSN));
        ZooKeeperClientUtils.expireSession(build2.getNamespaceDriver().getWriterZKC(), zkServers, 1000);
        boolean z = false;
        try {
            Utils.ioResult(asyncLogReader.readNext());
            z = true;
        } catch (LockingException e) {
        }
        if (z) {
            Utils.ioResult(asyncLogReader.readNext());
        }
        Utils.close(asyncLogReader);
        openLog.close();
        build.close();
        openLog2.close();
        build2.close();
    }

    @Test(timeout = 60000)
    public void testReaderLockFutureCancelledWhileWaiting() throws Exception {
        String methodName = this.runtime.getMethodName();
        BKDistributedLogManager createNewDLM = createNewDLM(conf, methodName);
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L));
        startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(2L));
        startAsyncLogSegmentNonPartitioned.closeAndComplete();
        BKDistributedLogManager createNewDLM2 = createNewDLM(conf, methodName);
        AsyncLogReader asyncLogReader = (AsyncLogReader) Utils.ioResult(createNewDLM2.getAsyncLogReaderWithLock(DLSN.InitialDLSN));
        BKDistributedLogManager createNewDLM3 = createNewDLM(conf, methodName);
        CompletableFuture asyncLogReaderWithLock = createNewDLM3.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
        try {
            asyncLogReaderWithLock.cancel(true);
            Utils.ioResult(asyncLogReaderWithLock);
            Assert.fail("Should fail getting log reader as it is cancelled");
        } catch (LockCancelledException e) {
        } catch (CancellationException e2) {
        } catch (OwnershipAcquireFailedException e3) {
        } catch (LockClosedException e4) {
        }
        CompletableFuture asyncLogReaderWithLock2 = createNewDLM3.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
        Utils.close(asyncLogReader);
        Utils.ioResult(asyncLogReaderWithLock2);
        createNewDLM.close();
        createNewDLM2.close();
        createNewDLM3.close();
    }

    @Test(timeout = 60000)
    public void testReaderLockFutureCancelledWhileLocked() throws Exception {
        String methodName = this.runtime.getMethodName();
        BKDistributedLogManager createNewDLM = createNewDLM(conf, methodName);
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L));
        startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(2L));
        startAsyncLogSegmentNonPartitioned.closeAndComplete();
        BKDistributedLogManager createNewDLM2 = createNewDLM(conf, methodName);
        CompletableFuture asyncLogReaderWithLock = createNewDLM2.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
        Utils.ioResult(asyncLogReaderWithLock);
        asyncLogReaderWithLock.cancel(true);
        Utils.ioResult(((AsyncLogReader) Utils.ioResult(asyncLogReaderWithLock)).readNext());
        createNewDLM.close();
        createNewDLM2.close();
    }

    @Test(timeout = 60000)
    public void testReaderLockSharedDlmDoesNotConflict() throws Exception {
        String methodName = this.runtime.getMethodName();
        BKDistributedLogManager createNewDLM = createNewDLM(conf, methodName);
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L));
        startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(2L));
        startAsyncLogSegmentNonPartitioned.closeAndComplete();
        BKDistributedLogManager createNewDLM2 = createNewDLM(conf, methodName);
        CompletableFuture asyncLogReaderWithLock = createNewDLM2.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
        CompletableFuture asyncLogReaderWithLock2 = createNewDLM2.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
        Utils.ioResult(asyncLogReaderWithLock);
        Utils.ioResult(asyncLogReaderWithLock2);
        createNewDLM.close();
        createNewDLM2.close();
    }

    @Test(timeout = 60000)
    public void testReaderLockMultiReadersScenario() throws Exception {
        String methodName = this.runtime.getMethodName();
        URI createDLMURI = createDLMURI("/" + methodName);
        ensureURICreated(createDLMURI);
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(60000);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setNumWorkerThreads(2);
        distributedLogConfiguration.setLockTimeout(Long.MAX_VALUE);
        DistributedLogManager openLog = NamespaceBuilder.newBuilder().conf(distributedLogConfiguration).uri(createDLMURI).clientId("main").build().openLog(methodName);
        DLMTestUtil.generateCompletedLogSegments(openLog, distributedLogConfiguration, 9L, 100L);
        openLog.close();
        int i = 0;
        AtomicReference atomicReference = new AtomicReference(DLSN.InitialDLSN);
        Namespace build = NamespaceBuilder.newBuilder().conf(distributedLogConfiguration).uri(createDLMURI).clientId("reader1").build();
        DistributedLogManager openLog2 = build.openLog(methodName);
        Namespace build2 = NamespaceBuilder.newBuilder().conf(distributedLogConfiguration).uri(createDLMURI).clientId("reader2").build();
        DistributedLogManager openLog3 = build2.openLog(methodName);
        Namespace build3 = NamespaceBuilder.newBuilder().conf(distributedLogConfiguration).uri(createDLMURI).clientId("reader3").build();
        DistributedLogManager openLog4 = build3.openLog(methodName);
        LOG.info("{} is opening reader on stream {}", "reader1", methodName);
        AsyncLogReader asyncLogReader = (AsyncLogReader) Utils.ioResult(openLog2.getAsyncLogReaderWithLock(DLSN.InitialDLSN));
        LOG.info("{} opened reader on stream {}", "reader1", methodName);
        LOG.info("{} is opening reader on stream {}", "reader2", methodName);
        CompletableFuture asyncLogReaderWithLock = openLog3.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
        LOG.info("{} is opening reader on stream {}", "reader3", methodName);
        CompletableFuture asyncLogReaderWithLock2 = openLog4.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        ReadRecordsListener readRecordsListener = new ReadRecordsListener(atomicReference, "reader2", newCachedThreadPool);
        ReadRecordsListener readRecordsListener2 = new ReadRecordsListener(atomicReference, "reader3", newCachedThreadPool);
        asyncLogReaderWithLock.whenComplete((BiConsumer) readRecordsListener);
        asyncLogReaderWithLock2.whenComplete((BiConsumer) readRecordsListener2);
        while (i < 200) {
            atomicReference.set(((LogRecordWithDLSN) Utils.ioResult(asyncLogReader.readNext())).getDlsn());
            i++;
        }
        Thread.sleep(1000L);
        Assert.assertFalse(readRecordsListener.done());
        asyncLogReaderWithLock.cancel(true);
        readRecordsListener.getLatch().await();
        Assert.assertTrue(readRecordsListener.done());
        Assert.assertTrue(readRecordsListener.failed());
        while (i < 300) {
            atomicReference.set(((LogRecordWithDLSN) Utils.ioResult(asyncLogReader.readNext())).getDlsn());
            i++;
        }
        Assert.assertFalse(readRecordsListener2.done());
        Utils.close(asyncLogReader);
        readRecordsListener2.getLatch().await();
        Assert.assertTrue(readRecordsListener2.done());
        Assert.assertFalse(readRecordsListener2.failed());
        Assert.assertEquals(new DLSN(3L, 99L, 0L), atomicReference.get());
        try {
            Utils.ioResult(asyncLogReaderWithLock);
        } catch (Exception e) {
        }
        Utils.close((AsyncCloseable) Utils.ioResult(asyncLogReaderWithLock2));
        openLog2.close();
        build.close();
        openLog3.close();
        build2.close();
        openLog4.close();
        build3.close();
        newCachedThreadPool.shutdown();
    }

    @Test(timeout = 60000)
    public void testAsyncReadWithSubscriberId() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-asyncread-with-sbuscriber-id");
        DLSN dlsn = DLSN.InitialDLSN;
        int i = 1;
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 3) {
                break;
            }
            BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
            long j3 = 1;
            while (true) {
                long j4 = j3;
                if (j4 <= 10) {
                    int i2 = i;
                    i++;
                    DLSN dlsn2 = (DLSN) Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getEmptyLogRecordInstance(i2)));
                    if (j2 == 1 && j4 == 1) {
                        dlsn = dlsn2;
                    }
                    j3 = j4 + 1;
                }
            }
            startAsyncLogSegmentNonPartitioned.closeAndComplete();
            j = j2 + 1;
        }
        BKAsyncLogReader bKAsyncLogReader = (BKAsyncLogReader) Utils.ioResult(createNewDLM.getAsyncLogReaderWithLock("asyncreader"));
        Assert.assertEquals(DLSN.NonInclusiveLowerBound, bKAsyncLogReader.getStartDLSN());
        long j5 = 0;
        Object ioResult = Utils.ioResult(bKAsyncLogReader.readNext());
        while (true) {
            LogRecordWithDLSN logRecordWithDLSN = (LogRecordWithDLSN) ioResult;
            if (null == logRecordWithDLSN) {
                break;
            }
            DLMTestUtil.verifyEmptyLogRecord(logRecordWithDLSN);
            j5++;
            Assert.assertEquals(j5, logRecordWithDLSN.getTransactionId());
            Assert.assertEquals(logRecordWithDLSN.getTransactionId() - 1, logRecordWithDLSN.getSequenceId());
            if (i - 1 == j5) {
                break;
            } else {
                ioResult = Utils.ioResult(bKAsyncLogReader.readNext());
            }
        }
        Assert.assertEquals(i - 1, j5);
        Utils.close(bKAsyncLogReader);
        Utils.ioResult(createNewDLM.getSubscriptionsStore().advanceCommitPosition("asyncreader", dlsn));
        BKAsyncLogReader bKAsyncLogReader2 = (BKAsyncLogReader) Utils.ioResult(createNewDLM.getAsyncLogReaderWithLock("asyncreader"));
        Assert.assertEquals(dlsn, bKAsyncLogReader2.getStartDLSN());
        long j6 = 0;
        long j7 = 10;
        Object ioResult2 = Utils.ioResult(bKAsyncLogReader2.readNext());
        while (true) {
            LogRecordWithDLSN logRecordWithDLSN2 = (LogRecordWithDLSN) ioResult2;
            if (null == logRecordWithDLSN2) {
                break;
            }
            DLMTestUtil.verifyEmptyLogRecord(logRecordWithDLSN2);
            j6++;
            j7++;
            Assert.assertEquals(j7, logRecordWithDLSN2.getTransactionId());
            Assert.assertEquals(logRecordWithDLSN2.getTransactionId() - 1, logRecordWithDLSN2.getSequenceId());
            if (j7 == i - 1) {
                break;
            } else {
                ioResult2 = Utils.ioResult(bKAsyncLogReader2.readNext());
            }
        }
        Assert.assertEquals(i - 1, j7);
        Assert.assertEquals(20L, j6);
        Utils.close(bKAsyncLogReader2);
        createNewDLM.close();
    }
}
