package org.apache.distributedlog;

import com.google.common.base.Charsets;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.feature.FixedValueFeature;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.distributedlog.LogRecordSet;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.api.AsyncLogReader;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.common.config.ConcurrentConstConfiguration;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.exceptions.BKTransmitException;
import org.apache.distributedlog.exceptions.DLIllegalStateException;
import org.apache.distributedlog.exceptions.EndOfStreamException;
import org.apache.distributedlog.exceptions.IdleReaderException;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.exceptions.LogRecordTooLongException;
import org.apache.distributedlog.exceptions.OverCapacityException;
import org.apache.distributedlog.exceptions.ReadCancelledException;
import org.apache.distributedlog.exceptions.WriteException;
import org.apache.distributedlog.io.CompressionCodec;
import org.apache.distributedlog.util.FailpointUtils;
import org.apache.distributedlog.util.SimplePermitLimiter;
import org.apache.distributedlog.util.Utils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/TestAsyncReaderWriter.class */
public class TestAsyncReaderWriter extends TestDistributedLogBase {
    static final Logger LOG = LoggerFactory.getLogger(TestAsyncReaderWriter.class);

    @Rule
    public TestName runtime = new TestName();
    protected DistributedLogConfiguration testConf = new DistributedLogConfiguration();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/distributedlog/TestAsyncReaderWriter$WriteFutureEventListener.class */
    public static class WriteFutureEventListener implements FutureEventListener<DLSN> {
        private final LogRecord record;
        private final long currentLogSegmentSeqNo;
        private final long currentEntryId;
        private final CountDownLatch syncLatch;
        private final AtomicBoolean errorsFound;
        private final boolean verifyEntryId;

        WriteFutureEventListener(LogRecord logRecord, long j, long j2, CountDownLatch countDownLatch, AtomicBoolean atomicBoolean, boolean z) {
            this.record = logRecord;
            this.currentLogSegmentSeqNo = j;
            this.currentEntryId = j2;
            this.syncLatch = countDownLatch;
            this.errorsFound = atomicBoolean;
            this.verifyEntryId = z;
        }

        public void onSuccess(DLSN dlsn) {
            if (dlsn.getLogSegmentSequenceNo() != this.currentLogSegmentSeqNo) {
                TestAsyncReaderWriter.LOG.error("Ledger Seq No: {}, Expected: {}", Long.valueOf(dlsn.getLogSegmentSequenceNo()), Long.valueOf(this.currentLogSegmentSeqNo));
                this.errorsFound.set(true);
            }
            if (this.verifyEntryId && dlsn.getEntryId() != this.currentEntryId) {
                TestAsyncReaderWriter.LOG.error("EntryId: {}, Expected: {}", Long.valueOf(dlsn.getEntryId()), Long.valueOf(this.currentEntryId));
                this.errorsFound.set(true);
            }
            this.syncLatch.countDown();
        }

        public void onFailure(Throwable th) {
            TestAsyncReaderWriter.LOG.error("Encountered failures on writing record as (lid = {}, eid = {}) :", new Object[]{Long.valueOf(this.currentLogSegmentSeqNo), Long.valueOf(this.currentEntryId), th});
            this.errorsFound.set(true);
            this.syncLatch.countDown();
        }
    }

    public TestAsyncReaderWriter() {
        this.testConf.loadConf(conf);
        this.testConf.setReaderIdleErrorThresholdMillis(1200000);
        this.testConf.setReadAheadWaitTimeOnEndOfStream(20);
    }

    @Test(timeout = 60000)
    public void testWriteControlRecord() throws Exception {
        long j;
        long j2;
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(1024);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        int i = 1;
        long j3 = 0;
        while (true) {
            long j4 = j3;
            if (j4 >= 3) {
                break;
            }
            BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
            int i2 = i;
            i++;
            DLSN dlsn = (DLSN) Utils.ioResult(startAsyncLogSegmentNonPartitioned.writeControlRecord(new LogRecord(i2, "control".getBytes(Charsets.UTF_8))));
            Assert.assertEquals(j4 + 1, dlsn.getLogSegmentSequenceNo());
            Assert.assertEquals(0L, dlsn.getEntryId());
            Assert.assertEquals(0L, dlsn.getSlotId());
            long j5 = 1;
            while (true) {
                long j6 = j5;
                if (j6 < 10) {
                    int i3 = i;
                    i++;
                    Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLargeLogRecordInstance(i3)));
                    j5 = j6 + 1;
                }
            }
            startAsyncLogSegmentNonPartitioned.closeAndComplete();
            j3 = j4 + 1;
        }
        createNewDLM.close();
        BKDistributedLogManager createNewDLM2 = createNewDLM(distributedLogConfiguration, methodName);
        LogReader inputStream = createNewDLM2.getInputStream(1L);
        long j7 = 0;
        long j8 = 2;
        LogRecordWithDLSN readNext = inputStream.readNext(false);
        while (true) {
            LogRecordWithDLSN logRecordWithDLSN = readNext;
            if (null == logRecordWithDLSN) {
                inputStream.close();
                Assert.assertEquals(27L, j7);
                Assert.assertEquals(27L, createNewDLM2.getLogRecordCount());
                createNewDLM2.close();
                return;
            }
            DLMTestUtil.verifyLargeLogRecord((LogRecord) logRecordWithDLSN);
            j7++;
            Assert.assertEquals(j8, logRecordWithDLSN.getTransactionId());
            if (j8 % 10 == 0) {
                j = j8;
                j2 = 2;
            } else {
                j = j8;
                j2 = 1;
            }
            j8 = j + j2;
            readNext = inputStream.readNext(false);
        }
    }

    @Test(timeout = 60000)
    public void testAsyncWritePendingWritesAbortedWhenLedgerRollTriggerFails() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(1024);
        distributedLogConfiguration.setMaxLogSegmentBytes(1024L);
        distributedLogConfiguration.setLogSegmentRollingIntervalMinutes(0);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        int i = 1 + 1;
        Assert.assertEquals(1L, ((DLSN) Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1, 2048)), 10L, TimeUnit.SECONDS)).getLogSegmentSequenceNo());
        int i2 = i + 1;
        DLMTestUtil.validateFutureFailed(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(i, 1040385)), LogRecordTooLongException.class);
        int i3 = i2 + 1;
        DLMTestUtil.validateFutureFailed(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(i2, 1040385)), WriteException.class);
        int i4 = i3 + 1;
        DLMTestUtil.validateFutureFailed(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(i3, 1040385)), WriteException.class);
        startAsyncLogSegmentNonPartitioned.closeAndComplete();
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testSimpleAsyncWrite() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(1024);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        final CountDownLatch countDownLatch = new CountDownLatch(3 * 10);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicReference atomicReference = new AtomicReference(DLSN.InvalidDLSN);
        int i = 1;
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 3) {
                countDownLatch.await();
                Assert.assertFalse("Should not encounter any errors for async writes", atomicBoolean.get());
                LogRecordWithDLSN lastLogRecord = createNewDLM.getLastLogRecord();
                Assert.assertEquals("Last DLSN" + lastLogRecord.getDlsn() + " isn't the maximum DLSN " + atomicReference.get(), lastLogRecord.getDlsn(), atomicReference.get());
                Assert.assertEquals(lastLogRecord.getDlsn(), createNewDLM.getLastDLSN());
                Assert.assertEquals(lastLogRecord.getDlsn(), Utils.ioResult(createNewDLM.getLastDLSNAsync()));
                DLMTestUtil.verifyLargeLogRecord((LogRecord) lastLogRecord);
                createNewDLM.close();
                return;
            }
            final long j3 = j2 + 1;
            BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
            long j4 = 0;
            while (true) {
                final long j5 = j4;
                if (j5 < 10) {
                    int i2 = i;
                    i++;
                    startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLargeLogRecordInstance(i2)).whenComplete((BiConsumer) new FutureEventListener<DLSN>() { // from class: org.apache.distributedlog.TestAsyncReaderWriter.1
                        public void onSuccess(DLSN dlsn) {
                            if (dlsn.getLogSegmentSequenceNo() != j3) {
                                TestAsyncReaderWriter.LOG.debug("LogSegmentSequenceNumber: {}, Expected {}", Long.valueOf(dlsn.getLogSegmentSequenceNo()), Long.valueOf(j3));
                                atomicBoolean.set(true);
                            }
                            if (dlsn.getEntryId() != j5) {
                                TestAsyncReaderWriter.LOG.debug("EntryId: {}, Expected {}", Long.valueOf(dlsn.getEntryId()), Long.valueOf(j5));
                                atomicBoolean.set(true);
                            }
                            if (dlsn.compareTo((DLSN) atomicReference.get()) > 0) {
                                atomicReference.set(dlsn);
                            }
                            countDownLatch.countDown();
                            TestAsyncReaderWriter.LOG.debug("SyncLatch: {}", Long.valueOf(countDownLatch.getCount()));
                        }

                        public void onFailure(Throwable th) {
                            TestAsyncReaderWriter.LOG.error("Encountered exception on writing record {} in log segment {}", Long.valueOf(j5), Long.valueOf(j3));
                            atomicBoolean.set(true);
                        }
                    });
                    j4 = j5 + 1;
                }
            }
            startAsyncLogSegmentNonPartitioned.closeAndComplete();
            j = j2 + 1;
        }
    }

    /* JADX WARN: Type inference failed for: r0v8, types: [long, org.apache.distributedlog.BKSyncLogWriter] */
    private static long writeRecords(DistributedLogManager distributedLogManager, int i, int i2, long j, boolean z) throws IOException {
        long j2 = j;
        long j3 = 0;
        while (true) {
            long j4 = j3;
            if (j4 >= i) {
                return j2;
            }
            ?? r0 = (BKSyncLogWriter) distributedLogManager.startLogSegmentNonPartitioned();
            long j5 = 1;
            while (true) {
                long j6 = j5;
                if (j6 <= i2) {
                    if (z) {
                        j2++;
                        r0.write(DLMTestUtil.getEmptyLogRecordInstance(r0));
                    } else {
                        long j7 = j2;
                        j2 = j7 + 1;
                        r0.write(DLMTestUtil.getLargeLogRecordInstance(j7));
                    }
                    j5 = j6 + 1;
                }
            }
            r0.closeAndComplete();
            j3 = j4 + 1;
        }
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [long, org.apache.distributedlog.api.LogWriter] */
    private static long writeLogSegment(DistributedLogManager distributedLogManager, int i, long j, int i2, boolean z) throws IOException {
        long j2 = j;
        ?? startLogSegmentNonPartitioned = distributedLogManager.startLogSegmentNonPartitioned();
        long j3 = 1;
        while (true) {
            long j4 = j3;
            if (j4 > i) {
                startLogSegmentNonPartitioned.flush();
                startLogSegmentNonPartitioned.commit();
                startLogSegmentNonPartitioned.close();
                return j2;
            }
            if (z) {
                j2++;
                startLogSegmentNonPartitioned.write(DLMTestUtil.getEmptyLogRecordInstance(startLogSegmentNonPartitioned));
            } else {
                long j5 = j2;
                j2 = j5 + 1;
                startLogSegmentNonPartitioned.write(DLMTestUtil.getLargeLogRecordInstance(j5));
            }
            if (j4 % i2 == 0) {
                startLogSegmentNonPartitioned.flush();
                startLogSegmentNonPartitioned.commit();
            }
            j3 = j4 + 1;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void readNext(final AsyncLogReader asyncLogReader, final DLSN dlsn, final long j, final boolean z, final CountDownLatch countDownLatch, final CountDownLatch countDownLatch2, final AtomicBoolean atomicBoolean) {
        asyncLogReader.readNext().whenComplete((BiConsumer) new FutureEventListener<LogRecordWithDLSN>() { // from class: org.apache.distributedlog.TestAsyncReaderWriter.2
            public void onSuccess(LogRecordWithDLSN logRecordWithDLSN) {
                try {
                    if (z) {
                        Assert.assertEquals(j, logRecordWithDLSN.getSequenceId());
                    } else {
                        Assert.assertTrue(logRecordWithDLSN.getSequenceId() < 0);
                        Assert.assertTrue(logRecordWithDLSN.getSequenceId() > j);
                    }
                    TestAsyncReaderWriter.LOG.info("Received record {} from {}", logRecordWithDLSN, asyncLogReader.getStreamName());
                    Assert.assertTrue(!logRecordWithDLSN.isControl());
                    Assert.assertTrue(logRecordWithDLSN.getDlsn().getSlotId() == 0);
                    Assert.assertTrue(logRecordWithDLSN.getDlsn().compareTo(dlsn) >= 0);
                    DLMTestUtil.verifyLargeLogRecord((LogRecord) logRecordWithDLSN);
                    countDownLatch.countDown();
                    if (countDownLatch.getCount() <= 0) {
                        countDownLatch2.countDown();
                    } else {
                        TestAsyncReaderWriter.readNext(asyncLogReader, logRecordWithDLSN.getDlsn().getNextDLSN(), z ? logRecordWithDLSN.getSequenceId() + 1 : logRecordWithDLSN.getSequenceId(), z, countDownLatch, countDownLatch2, atomicBoolean);
                    }
                } catch (Exception e) {
                    TestAsyncReaderWriter.LOG.debug("Exception Encountered when verifying log record {} : ", logRecordWithDLSN.getDlsn(), e);
                    atomicBoolean.set(true);
                    countDownLatch2.countDown();
                }
            }

            public void onFailure(Throwable th) {
                TestAsyncReaderWriter.LOG.error("Encountered Exception on reading {}", asyncLogReader.getStreamName(), th);
                atomicBoolean.set(true);
                countDownLatch2.countDown();
            }
        });
    }

    void simpleAsyncReadTest(String str, DistributedLogConfiguration distributedLogConfiguration) throws Exception {
        distributedLogConfiguration.setOutputBufferSize(1024);
        distributedLogConfiguration.setReadAheadWaitTime(10);
        distributedLogConfiguration.setReadAheadBatchSize(10);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, str);
        long writeLogSegment = writeLogSegment(createNewDLM, 5, writeRecords(createNewDLM, 3, 10, 1L, false), 2, false);
        AsyncLogReader asyncLogReader = createNewDLM.getAsyncLogReader(DLSN.InvalidDLSN);
        CountDownLatch countDownLatch = new CountDownLatch((int) (writeLogSegment - 1));
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        boolean supportsSequenceId = LogSegmentMetadata.supportsSequenceId(distributedLogConfiguration.getDLLedgerMetadataLayoutVersion());
        readNext(asyncLogReader, DLSN.InvalidDLSN, supportsSequenceId ? 0L : Long.MIN_VALUE, supportsSequenceId, countDownLatch, countDownLatch2, atomicBoolean);
        countDownLatch2.await();
        Assert.assertFalse("Errors encountered on reading records", atomicBoolean.get());
        countDownLatch.await();
        Utils.close(asyncLogReader);
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testSimpleAsyncRead() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        simpleAsyncReadTest(methodName, distributedLogConfiguration);
    }

    @Test(timeout = 60000)
    public void testSimpleAsyncReadWriteWithMonitoredFuturePool() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setTaskExecutionWarnTimeMicros(1000L);
        distributedLogConfiguration.setEnableTaskExecutionStats(true);
        simpleAsyncReadTest(methodName, distributedLogConfiguration);
    }

    @Test(timeout = 60000)
    public void testBulkAsyncRead() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(conf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        distributedLogConfiguration.setReadAheadWaitTime(10);
        distributedLogConfiguration.setReadAheadMaxRecords(10000);
        distributedLogConfiguration.setReadAheadBatchSize(10);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-bulkasyncread");
        writeRecords(createNewDLM, 3, 20, 1L, false);
        AsyncLogReader asyncLogReader = createNewDLM.getAsyncLogReader(DLSN.InitialDLSN);
        int i = 1;
        int i2 = 0;
        while (i <= 3 * 20 && i != 3 * 20) {
            List list = (List) Utils.ioResult(asyncLogReader.readBulk(20));
            LOG.info("Bulk read {} entries.", Integer.valueOf(list.size()));
            Assert.assertTrue(list.size() >= 1);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                Assert.assertEquals(i, ((LogRecordWithDLSN) it.next()).getTransactionId());
                i++;
            }
            i2++;
        }
        Assert.assertTrue(i2 < 60);
        Utils.close(asyncLogReader);
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testBulkAsyncReadWithWriteBatch() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(conf);
        distributedLogConfiguration.setOutputBufferSize(1024000);
        distributedLogConfiguration.setReadAheadWaitTime(10);
        distributedLogConfiguration.setReadAheadMaxRecords(10000);
        distributedLogConfiguration.setReadAheadBatchSize(10);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-bulkasyncread-with-writebatch");
        writeRecords(createNewDLM, 3, 20, 1L, false);
        AsyncLogReader asyncLogReader = createNewDLM.getAsyncLogReader(DLSN.InitialDLSN);
        int i = 1;
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 3) {
                Utils.close(asyncLogReader);
                createNewDLM.close();
                return;
            }
            List list = (List) Utils.ioResult(asyncLogReader.readBulk(20));
            Assert.assertEquals(20L, list.size());
            Iterator it = list.iterator();
            while (it.hasNext()) {
                Assert.assertEquals(i, ((LogRecordWithDLSN) it.next()).getTransactionId());
                i++;
            }
            j = j2 + 1;
        }
    }

    @Test(timeout = 60000)
    public void testAsyncReadEmptyRecords() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setReadAheadWaitTime(10);
        distributedLogConfiguration.setReadAheadBatchSize(10);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-simpleasyncreadempty");
        long writeLogSegment = writeLogSegment(createNewDLM, 5, writeRecords(createNewDLM, 3, 10, 1L, true), 2, true);
        AsyncLogReader asyncLogReader = createNewDLM.getAsyncLogReader(DLSN.InvalidDLSN);
        Assert.assertEquals("Expected stream name = distrlog-simpleasyncreadempty but " + asyncLogReader.getStreamName() + " found", "distrlog-simpleasyncreadempty", asyncLogReader.getStreamName());
        long j = 0;
        DLSN dlsn = DLSN.InvalidDLSN;
        Object ioResult = Utils.ioResult(asyncLogReader.readNext());
        while (true) {
            LogRecordWithDLSN logRecordWithDLSN = (LogRecordWithDLSN) ioResult;
            if (null == logRecordWithDLSN) {
                break;
            }
            DLMTestUtil.verifyEmptyLogRecord(logRecordWithDLSN);
            Assert.assertEquals(0L, logRecordWithDLSN.getDlsn().getSlotId());
            Assert.assertTrue(logRecordWithDLSN.getDlsn().compareTo(dlsn) > 0);
            dlsn = logRecordWithDLSN.getDlsn();
            j++;
            if (j >= writeLogSegment - 1) {
                break;
            } else {
                ioResult = Utils.ioResult(asyncLogReader.readNext());
            }
        }
        Assert.assertEquals(writeLogSegment - 1, j);
        Utils.close(asyncLogReader);
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testSimpleAsyncReadPosition() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(1024);
        distributedLogConfiguration.setReadAheadWaitTime(10);
        distributedLogConfiguration.setReadAheadBatchSize(10);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        CountDownLatch countDownLatch = new CountDownLatch((int) (writeLogSegment(createNewDLM, 5, writeRecords(createNewDLM, 3, 10, 1L, false), Integer.MAX_VALUE, false) - 14));
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AsyncLogReader asyncLogReader = createNewDLM.getAsyncLogReader(new DLSN(2L, 2L, 4L));
        Assert.assertEquals(methodName, asyncLogReader.getStreamName());
        boolean supportsSequenceId = LogSegmentMetadata.supportsSequenceId(distributedLogConfiguration.getDLLedgerMetadataLayoutVersion());
        readNext(asyncLogReader, new DLSN(2L, 3L, 0L), supportsSequenceId ? 13L : Long.MIN_VALUE, supportsSequenceId, countDownLatch, countDownLatch2, atomicBoolean);
        countDownLatch2.await();
        Assert.assertFalse("Errors found on reading records", atomicBoolean.get());
        countDownLatch.await();
        Utils.close(asyncLogReader);
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testSimpleAsyncReadWrite() throws Exception {
        testSimpleAsyncReadWriteInternal(this.runtime.getMethodName(), false);
    }

    @Test(timeout = 60000)
    public void testSimpleAsyncReadWriteImmediateFlush() throws Exception {
        testSimpleAsyncReadWriteInternal(this.runtime.getMethodName(), true);
    }

    @Test(timeout = 60000)
    public void testNoEnvelopeWriterEnvelopeReader() throws Exception {
        testSimpleAsyncReadWriteInternal(this.runtime.getMethodName(), true, LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V4_ENVELOPED_ENTRIES.value - 1);
    }

    void testSimpleAsyncReadWriteInternal(String str, boolean z) throws Exception {
        testSimpleAsyncReadWriteInternal(str, z, LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION);
    }

    void testSimpleAsyncReadWriteInternal(String str, boolean z, int i) throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setReadAheadWaitTime(10);
        distributedLogConfiguration.setReadAheadBatchSize(10);
        distributedLogConfiguration.setOutputBufferSize(1024);
        distributedLogConfiguration.setDLLedgerMetadataLayoutVersion(i);
        distributedLogConfiguration.setImmediateFlushEnabled(z);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, str);
        CountDownLatch countDownLatch = new CountDownLatch(3 * 10);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CountDownLatch countDownLatch3 = new CountDownLatch(3 * 10);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        AsyncLogReader asyncLogReader = createNewDLM.getAsyncLogReader(DLSN.InvalidDLSN);
        Assert.assertEquals(str, asyncLogReader.getStreamName());
        int i2 = 1;
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 3) {
                countDownLatch3.await();
                Assert.assertFalse("All writes should succeed", atomicBoolean2.get());
                countDownLatch2.await();
                Assert.assertFalse("All reads should succeed", atomicBoolean.get());
                countDownLatch.await();
                Utils.close(asyncLogReader);
                createNewDLM.close();
                return;
            }
            long j3 = j2 + 1;
            BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
            long j4 = 0;
            while (true) {
                long j5 = j4;
                if (j5 < 10) {
                    int i3 = i2;
                    i2++;
                    LogRecord largeLogRecordInstance = DLMTestUtil.getLargeLogRecordInstance(i3);
                    startAsyncLogSegmentNonPartitioned.write(largeLogRecordInstance).whenComplete((BiConsumer) new WriteFutureEventListener(largeLogRecordInstance, j3, j5, countDownLatch3, atomicBoolean2, true));
                    if (j2 == 0 && j5 == 0) {
                        boolean supportsSequenceId = LogSegmentMetadata.supportsSequenceId(i);
                        readNext(asyncLogReader, DLSN.InvalidDLSN, supportsSequenceId ? 0L : Long.MIN_VALUE, supportsSequenceId, countDownLatch, countDownLatch2, atomicBoolean);
                    }
                    j4 = j5 + 1;
                }
            }
            startAsyncLogSegmentNonPartitioned.closeAndComplete();
            j = j2 + 1;
        }
    }

    @Test(timeout = 60000)
    public void testSimpleAsyncReadWriteStartEmpty() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setReadAheadWaitTime(10);
        distributedLogConfiguration.setReadAheadBatchSize(10);
        distributedLogConfiguration.setOutputBufferSize(1024);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        CountDownLatch countDownLatch3 = new CountDownLatch(3 * 10);
        TestReader testReader = new TestReader("test-reader", createNewDLM, DLSN.InitialDLSN, false, 0, countDownLatch, countDownLatch3, countDownLatch2);
        testReader.start();
        Thread.sleep(500L);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CountDownLatch countDownLatch4 = new CountDownLatch(30);
        int i = 1;
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 3) {
                break;
            }
            long j3 = j2 + 1;
            BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
            long j4 = 0;
            while (true) {
                long j5 = j4;
                if (j5 < 10) {
                    int i2 = i;
                    i++;
                    LogRecord largeLogRecordInstance = DLMTestUtil.getLargeLogRecordInstance(i2);
                    startAsyncLogSegmentNonPartitioned.write(largeLogRecordInstance).whenComplete((BiConsumer) new WriteFutureEventListener(largeLogRecordInstance, j3, j5, countDownLatch4, atomicBoolean, true));
                    j4 = j5 + 1;
                }
            }
            startAsyncLogSegmentNonPartitioned.closeAndComplete();
            j = j2 + 1;
        }
        countDownLatch4.await();
        Assert.assertFalse("All writes should succeed", atomicBoolean.get());
        countDownLatch2.await();
        Assert.assertFalse("Should not encounter errors during reading", testReader.areErrorsFound());
        countDownLatch3.await();
        Assert.assertTrue("Should position reader at least once", testReader.getNumReaderPositions().get() > 1);
        testReader.stop();
        createNewDLM.close();
    }

    @Test(timeout = 120000)
    @Ignore
    public void testSimpleAsyncReadWriteStartEmptyFactory() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setReadAheadWaitTime(10);
        distributedLogConfiguration.setReadAheadBatchSize(10);
        distributedLogConfiguration.setOutputBufferSize(1024);
        URI createDLMURI = createDLMURI("/" + methodName);
        ensureURICreated(createDLMURI);
        Namespace build = NamespaceBuilder.newBuilder().conf(distributedLogConfiguration).uri(createDLMURI).build();
        DistributedLogManager[] distributedLogManagerArr = new DistributedLogManager[1];
        TestReader[] testReaderArr = new TestReader[1];
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch[] countDownLatchArr = new CountDownLatch[1];
        CountDownLatch[] countDownLatchArr2 = new CountDownLatch[1];
        for (int i = 0; i < 1; i++) {
            distributedLogManagerArr[i] = build.openLog(methodName + String.format("%d", Integer.valueOf(i)));
            countDownLatchArr2[i] = new CountDownLatch(1);
            countDownLatchArr[i] = new CountDownLatch(3 * 1);
            testReaderArr[i] = new TestReader("reader-" + i, distributedLogManagerArr[i], DLSN.InitialDLSN, false, 0, countDownLatch, countDownLatchArr[i], countDownLatchArr2[i]);
            testReaderArr[i].start();
        }
        countDownLatch.await();
        CountDownLatch countDownLatch2 = new CountDownLatch(3 * 1);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        int i2 = 1;
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 3) {
                break;
            }
            long j3 = j2 + 1;
            BKAsyncLogWriter[] bKAsyncLogWriterArr = new BKAsyncLogWriter[1];
            for (int i3 = 0; i3 < 1; i3++) {
                bKAsyncLogWriterArr[i3] = (BKAsyncLogWriter) distributedLogManagerArr[i3].startAsyncLogSegmentNonPartitioned();
            }
            long j4 = 0;
            while (true) {
                long j5 = j4;
                if (j5 >= 1) {
                    break;
                }
                int i4 = i2;
                i2++;
                LogRecord largeLogRecordInstance = DLMTestUtil.getLargeLogRecordInstance(i4);
                for (int i5 = 0; i5 < 1; i5++) {
                    bKAsyncLogWriterArr[i5].write(largeLogRecordInstance).whenComplete((BiConsumer) new WriteFutureEventListener(largeLogRecordInstance, j3, j5, countDownLatch2, atomicBoolean, true));
                }
                j4 = j5 + 1;
            }
            for (int i6 = 0; i6 < 1; i6++) {
                bKAsyncLogWriterArr[i6].closeAndComplete();
            }
            j = j2 + 1;
        }
        countDownLatch2.await();
        Assert.assertFalse("All writes should succeed", atomicBoolean.get());
        for (int i7 = 0; i7 < 1; i7++) {
            countDownLatchArr2[i7].await();
            Assert.assertFalse("Reader " + i7 + " should not encounter errors", testReaderArr[i7].areErrorsFound());
            countDownLatchArr[i7].await();
            Assert.assertEquals(3 * 1, testReaderArr[i7].getNumReads().get());
            Assert.assertTrue("Reader " + i7 + " should position at least once", testReaderArr[i7].getNumReaderPositions().get() > 0);
        }
        for (int i8 = 0; i8 < 1; i8++) {
            testReaderArr[i8].stop();
            distributedLogManagerArr[i8].close();
        }
    }

    @Test(timeout = 300000)
    public void testSimpleAsyncReadWriteSimulateErrors() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setReadAheadWaitTime(10);
        distributedLogConfiguration.setReadAheadBatchSize(10);
        distributedLogConfiguration.setOutputBufferSize(1024);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(5 * 10);
        TestReader testReader = new TestReader("test-reader", createNewDLM, DLSN.InitialDLSN, true, 0, new CountDownLatch(1), countDownLatch2, countDownLatch);
        testReader.start();
        CountDownLatch countDownLatch3 = new CountDownLatch(5 * 10);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        int i = 1;
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 5) {
                break;
            }
            long j3 = j2 + 1;
            BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
            long j4 = 0;
            while (true) {
                long j5 = j4;
                if (j5 < 10) {
                    int i2 = i;
                    i++;
                    LogRecord largeLogRecordInstance = DLMTestUtil.getLargeLogRecordInstance(i2);
                    startAsyncLogSegmentNonPartitioned.write(largeLogRecordInstance).whenComplete((BiConsumer) new WriteFutureEventListener(largeLogRecordInstance, j3, j5, countDownLatch3, atomicBoolean, true));
                    j4 = j5 + 1;
                }
            }
            startAsyncLogSegmentNonPartitioned.closeAndComplete();
            j = j2 + 1;
        }
        countDownLatch3.await();
        Assert.assertFalse("All writes should succeed", atomicBoolean.get());
        countDownLatch.await();
        Assert.assertFalse("Should not encounter errors during reading", testReader.areErrorsFound());
        countDownLatch2.await();
        Assert.assertTrue("Should position reader at least once", testReader.getNumReaderPositions().get() > 1);
        testReader.stop();
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testSimpleAsyncReadWritePiggyBack() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setEnableReadAhead(true);
        distributedLogConfiguration.setReadAheadWaitTime(500);
        distributedLogConfiguration.setReadAheadBatchSize(10);
        distributedLogConfiguration.setReadAheadMaxRecords(100);
        distributedLogConfiguration.setOutputBufferSize(1024);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(100);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        AsyncLogReader asyncLogReader = createNewDLM.getAsyncLogReader(DLSN.InvalidDLSN);
        CountDownLatch countDownLatch = new CountDownLatch(30);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CountDownLatch countDownLatch3 = new CountDownLatch(30);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        int i = 1;
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 3) {
                countDownLatch3.await();
                Assert.assertFalse("All writes should succeed", atomicBoolean2.get());
                countDownLatch2.await();
                Assert.assertFalse("All reads should succeed", atomicBoolean.get());
                countDownLatch.await();
                Utils.close(asyncLogReader);
                createNewDLM.close();
                return;
            }
            long j3 = j2 + 1;
            BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
            long j4 = 0;
            while (true) {
                long j5 = j4;
                if (j5 < 10) {
                    Thread.sleep(50L);
                    int i2 = i;
                    i++;
                    LogRecord largeLogRecordInstance = DLMTestUtil.getLargeLogRecordInstance(i2);
                    startAsyncLogSegmentNonPartitioned.write(largeLogRecordInstance).whenComplete((BiConsumer) new WriteFutureEventListener(largeLogRecordInstance, j3, j5, countDownLatch3, atomicBoolean2, false));
                    if (j2 == 0 && j5 == 0) {
                        boolean supportsSequenceId = LogSegmentMetadata.supportsSequenceId(distributedLogConfiguration.getDLLedgerMetadataLayoutVersion());
                        readNext(asyncLogReader, DLSN.InvalidDLSN, supportsSequenceId ? 0L : Long.MIN_VALUE, supportsSequenceId, countDownLatch, countDownLatch2, atomicBoolean);
                    }
                    j4 = j5 + 1;
                }
            }
            startAsyncLogSegmentNonPartitioned.closeAndComplete();
            j = j2 + 1;
        }
    }

    @Test(timeout = 60000)
    public void testCancelReadRequestOnReaderClosed() throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(this.testConf, "distrlog-cancel-read-requests-on-reader-closed");
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L));
        startAsyncLogSegmentNonPartitioned.closeAndComplete();
        final AsyncLogReader asyncLogReader = createNewDLM.getAsyncLogReader(DLSN.InitialDLSN);
        LogRecordWithDLSN logRecordWithDLSN = (LogRecordWithDLSN) Utils.ioResult(asyncLogReader.readNext());
        Assert.assertEquals(1L, logRecordWithDLSN.getTransactionId());
        DLMTestUtil.verifyLogRecord(logRecordWithDLSN);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread(new Runnable() { // from class: org.apache.distributedlog.TestAsyncReaderWriter.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Utils.ioResult(asyncLogReader.readNext());
                } catch (ReadCancelledException e) {
                    atomicBoolean.set(true);
                } catch (Throwable th) {
                    TestAsyncReaderWriter.LOG.error("Receive unexpected exception on reading stream {} : ", "distrlog-cancel-read-requests-on-reader-closed", th);
                }
                countDownLatch.countDown();
            }
        }, "read-thread");
        thread.start();
        Thread.sleep(1000L);
        Utils.close(asyncLogReader);
        countDownLatch.await();
        thread.join();
        Assert.assertTrue("Read request should be cancelled.", atomicBoolean.get());
        try {
            Utils.ioResult(asyncLogReader.readNext());
            Assert.fail("Reader should reject readNext if it is closed.");
        } catch (ReadCancelledException e) {
        }
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testAsyncWriteWithMinDelayBetweenFlushes() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        distributedLogConfiguration.setMinDelayBetweenImmediateFlushMs(100);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-asyncwrite-mindelay");
        final Thread currentThread = Thread.currentThread();
        final CountDownLatch countDownLatch = new CountDownLatch(5000);
        int i = 1;
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        Stopwatch createStarted = Stopwatch.createStarted();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 5000) {
                break;
            }
            Thread.sleep(1L);
            int i2 = i;
            i++;
            startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(i2)).whenComplete((BiConsumer) new FutureEventListener<DLSN>() { // from class: org.apache.distributedlog.TestAsyncReaderWriter.4
                public void onSuccess(DLSN dlsn) {
                    countDownLatch.countDown();
                    TestAsyncReaderWriter.LOG.debug("SyncLatch: {} ; DLSN: {} ", Long.valueOf(countDownLatch.getCount()), dlsn);
                }

                public void onFailure(Throwable th) {
                    currentThread.interrupt();
                }
            });
            j = j2 + 1;
        }
        boolean z = false;
        if (!Thread.interrupted()) {
            try {
                z = countDownLatch.await(10L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        startAsyncLogSegmentNonPartitioned.abort();
        createStarted.stop();
        Assert.assertTrue(!Thread.interrupted());
        Assert.assertTrue(z);
        LogRecordWithDLSN lastLogRecord = createNewDLM.getLastLogRecord();
        LOG.info("Last Entry {}; elapsed time {}", Long.valueOf(lastLogRecord.getDlsn().getEntryId()), Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)));
        Assert.assertTrue(lastLogRecord.getDlsn().getEntryId() <= ((createStarted.elapsed(TimeUnit.MILLISECONDS) / ((long) distributedLogConfiguration.getMinDelayBetweenImmediateFlushMs())) + 1) * 2);
        DLMTestUtil.verifyLogRecord(lastLogRecord);
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testAsyncWriteWithMinDelayBetweenFlushesFlushFailure() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        distributedLogConfiguration.setMinDelayBetweenImmediateFlushMs(1);
        URI createDLMURI = createDLMURI("/" + methodName);
        ensureURICreated(createDLMURI);
        DistributedLogManager openLog = NamespaceBuilder.newBuilder().conf(distributedLogConfiguration).uri(createDLMURI).clientId("gabbagoo").build().openLog(methodName);
        DistributedLogManager openLog2 = NamespaceBuilder.newBuilder().conf(distributedLogConfiguration).uri(createDLMURI).clientId("tortellini").build().openLog(methodName);
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = openLog.startAsyncLogSegmentNonPartitioned();
        int i = 1 + 1;
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1)));
        startAsyncLogSegmentNonPartitioned.flushAndCommit();
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.getCachedLogWriter().getLock().asyncClose());
        openLog2.startAsyncLogSegmentNonPartitioned();
        try {
            int i2 = i + 1;
            startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(i));
            Thread.sleep(100L);
            int i3 = i2 + 1;
            Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(i2)));
            Assert.fail("should have thrown");
        } catch (LockingException e) {
            LOG.debug("caught exception ", e);
        }
        startAsyncLogSegmentNonPartitioned.close();
        openLog.close();
    }

    public void writeRecordsWithOutstandingWriteLimit(int i, int i2, boolean z) throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        distributedLogConfiguration.setPerWriterOutstandingWriteLimit(i);
        distributedLogConfiguration.setOutstandingWriteLimitDarkmode(false);
        BKDistributedLogManager createNewDLM = i2 > -1 ? createNewDLM(distributedLogConfiguration, this.runtime.getMethodName(), new SimplePermitLimiter(false, i2, new NullStatsLogger(), true, new FixedValueFeature("", 0))) : createNewDLM(distributedLogConfiguration, this.runtime.getMethodName());
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        ArrayList arrayList = new ArrayList(1000);
        for (int i3 = 0; i3 < 1000; i3++) {
            arrayList.add(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L)));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                Utils.ioResult((CompletableFuture) it.next());
                if (z) {
                    Assert.fail("should fail due to no outstanding writes permitted");
                }
            } catch (OverCapacityException e) {
                Assert.assertTrue(z);
            }
        }
        startAsyncLogSegmentNonPartitioned.closeAndComplete();
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testOutstandingWriteLimitNoLimit() throws Exception {
        writeRecordsWithOutstandingWriteLimit(-1, -1, false);
    }

    @Test(timeout = 60000)
    public void testOutstandingWriteLimitVeryHighLimit() throws Exception {
        writeRecordsWithOutstandingWriteLimit(Integer.MAX_VALUE, Integer.MAX_VALUE, false);
    }

    @Test(timeout = 60000)
    public void testOutstandingWriteLimitBlockAllStreamLimit() throws Exception {
        writeRecordsWithOutstandingWriteLimit(0, Integer.MAX_VALUE, true);
    }

    @Test(timeout = 60000)
    public void testOutstandingWriteLimitBlockAllGlobalLimit() throws Exception {
        writeRecordsWithOutstandingWriteLimit(Integer.MAX_VALUE, 0, true);
    }

    @Test(timeout = 60000)
    public void testOutstandingWriteLimitBlockAllLimitWithDarkmode() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        distributedLogConfiguration.setPerWriterOutstandingWriteLimit(0);
        distributedLogConfiguration.setOutstandingWriteLimitDarkmode(true);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, this.runtime.getMethodName());
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        ArrayList arrayList = new ArrayList(1000);
        for (int i = 0; i < 1000; i++) {
            arrayList.add(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L)));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Utils.ioResult((CompletableFuture) it.next());
        }
        startAsyncLogSegmentNonPartitioned.closeAndComplete();
        createNewDLM.close();
    }

    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.distributedlog.BKAsyncLogWriter, long] */
    /* JADX WARN: Type inference failed for: r1v13, types: [org.apache.distributedlog.BKAsyncLogWriter, long] */
    @Test(timeout = 60000)
    public void testCloseAndCompleteLogSegmentWhenStreamIsInError() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-close-and-complete-logsegment-when-stream-is-in-error");
        ?? startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        long j = 1;
        for (int i = 0; i < 5; i++) {
            long j2 = j;
            j = startAsyncLogSegmentNonPartitioned + 1;
            Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(j2)));
        }
        createNewDLM.getNamespaceDriver().getReaderBKC().get().openLedger(startAsyncLogSegmentNonPartitioned.getCachedLogWriter().getLogSegmentId(), BookKeeper.DigestType.CRC32, distributedLogConfiguration.getBKDigestPW().getBytes(Charsets.UTF_8));
        try {
            ?? r1 = j;
            long j3 = r1 + 1;
            Utils.ioResult(r1.write(DLMTestUtil.getLogRecordInstance(r1)));
            Assert.fail("Should fail write to a fenced ledger with BKTransmitException");
        } catch (BKTransmitException e) {
        }
        try {
            startAsyncLogSegmentNonPartitioned.closeAndComplete();
            Assert.fail("Should fail to complete a log segment when its ledger is fenced");
        } catch (BKTransmitException e2) {
        }
        List logSegments = createNewDLM.getLogSegments();
        Assert.assertEquals(1L, logSegments.size());
        Assert.assertTrue(((LogSegmentMetadata) logSegments.get(0)).isInProgress());
        createNewDLM.close();
    }

    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.distributedlog.BKAsyncLogWriter, long] */
    /* JADX WARN: Type inference failed for: r1v14, types: [org.apache.distributedlog.BKAsyncLogWriter, long] */
    @Test(timeout = 60000)
    public void testCloseAndCompleteLogSegmentWhenCloseFailed() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-close-and-complete-logsegment-when-close-failed");
        ?? startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        long j = 1;
        for (int i = 0; i < 5; i++) {
            long j2 = j;
            j = startAsyncLogSegmentNonPartitioned + 1;
            Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(j2)));
        }
        BKLogSegmentWriter cachedLogWriter = startAsyncLogSegmentNonPartitioned.getCachedLogWriter();
        createNewDLM.getNamespaceDriver().getReaderBKC().get().openLedger(cachedLogWriter.getLogSegmentId(), BookKeeper.DigestType.CRC32, distributedLogConfiguration.getBKDigestPW().getBytes(Charsets.UTF_8));
        try {
            ?? r1 = j;
            long j3 = r1 + 1;
            r1.write(DLMTestUtil.getLogRecordInstance(r1));
            startAsyncLogSegmentNonPartitioned.closeAndComplete();
            Assert.fail("Should fail to complete a log segment when its ledger is fenced");
        } catch (IOException e) {
            LOG.error("Failed to close and complete log segment {} : ", cachedLogWriter.getFullyQualifiedLogSegment(), e);
        }
        List logSegments = createNewDLM.getLogSegments();
        Assert.assertEquals(1L, logSegments.size());
        Assert.assertTrue(((LogSegmentMetadata) logSegments.get(0)).isInProgress());
        createNewDLM.close();
    }

    private void testAsyncReadIdleErrorInternal(String str, final int i, final boolean z, boolean z2) throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        distributedLogConfiguration.setReadAheadBatchSize(1);
        distributedLogConfiguration.setReadAheadMaxRecords(1);
        distributedLogConfiguration.setReaderIdleWarnThresholdMillis(0);
        distributedLogConfiguration.setReaderIdleErrorThresholdMillis(i);
        final BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, str);
        final Thread currentThread = Thread.currentThread();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        scheduledThreadPoolExecutor.schedule(new Runnable() { // from class: org.apache.distributedlog.TestAsyncReaderWriter.5
            @Override // java.lang.Runnable
            public void run() {
                int i2 = 1;
                for (long j = 0; j < 3; j++) {
                    try {
                        long j2 = i2;
                        BKSyncLogWriter startLogSegmentNonPartitioned = createNewDLM.startLogSegmentNonPartitioned();
                        for (long j3 = 1; j3 <= 3; j3++) {
                            int i3 = i2;
                            i2++;
                            startLogSegmentNonPartitioned.write(DLMTestUtil.getLargeLogRecordInstance(i3));
                            if (j == 0 && j3 == 1) {
                                countDownLatch.countDown();
                            }
                        }
                        if (z) {
                            int i4 = (i - 200) - 100;
                            for (int i5 = 1; i5 <= (2 * i) / i4; i5++) {
                                Thread.sleep(i4);
                                startLogSegmentNonPartitioned.write(DLMTestUtil.getLargeLogRecordInstance(i2, true));
                                startLogSegmentNonPartitioned.flush();
                            }
                            Thread.sleep(i4);
                        }
                        startLogSegmentNonPartitioned.closeAndComplete();
                        if (!z) {
                            Thread.sleep(2 * i);
                        }
                    } catch (Exception e) {
                        if (scheduledThreadPoolExecutor.isShutdown()) {
                            return;
                        }
                        currentThread.interrupt();
                        return;
                    }
                }
            }
        }, 0L, TimeUnit.MILLISECONDS);
        countDownLatch.await();
        BKAsyncLogReader asyncLogReader = createNewDLM.getAsyncLogReader(DLSN.InitialDLSN);
        if (z2) {
            asyncLogReader.disableProcessingReadRequests();
        }
        boolean z3 = false;
        int i2 = 0;
        do {
            try {
                Utils.ioResult(asyncLogReader.readNext());
                i2++;
            } catch (IdleReaderException e) {
                z3 = true;
            }
        } while (i2 < 9);
        if (z2) {
            Assert.assertTrue(z3);
        } else if (z) {
            Assert.assertFalse(z3);
            Assert.assertEquals(9L, i2);
        } else {
            Assert.assertTrue(z3);
            Assert.assertEquals(3L, i2);
        }
        Assert.assertFalse(currentThread.isInterrupted());
        Utils.close(asyncLogReader);
        scheduledThreadPoolExecutor.shutdown();
    }

    @Test(timeout = 10000)
    public void testAsyncReadIdleControlRecord() throws Exception {
        testAsyncReadIdleErrorInternal("distrlog-async-reader-idle-error-control", 500, true, false);
    }

    @Test(timeout = 10000)
    public void testAsyncReadIdleError() throws Exception {
        testAsyncReadIdleErrorInternal("distrlog-async-reader-idle-error", 1000, false, false);
    }

    @Test(timeout = 10000)
    public void testAsyncReadIdleError2() throws Exception {
        testAsyncReadIdleErrorInternal("distrlog-async-reader-idle-error-2", 1000, true, true);
    }

    @Test(timeout = 60000)
    public void testReleaseLockAfterFailedToRecover() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.testConf);
        distributedLogConfiguration.setLockTimeout(0L);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        distributedLogConfiguration.setOutputBufferSize(0);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "release-lock-after-failed-to-recover");
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L)));
        startAsyncLogSegmentNonPartitioned.abort();
        for (int i = 0; i < 2; i++) {
            FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_RecoverIncompleteLogSegments, FailpointUtils.FailPointActions.FailPointAction_Throw);
            try {
                try {
                    createNewDLM.startAsyncLogSegmentNonPartitioned();
                    Assert.fail("Should fail during recovering incomplete log segments");
                    FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_RecoverIncompleteLogSegments);
                } catch (IOException e) {
                    FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_RecoverIncompleteLogSegments);
                }
            } catch (Throwable th) {
                FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_RecoverIncompleteLogSegments);
                throw th;
            }
        }
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned2 = createNewDLM.startAsyncLogSegmentNonPartitioned();
        List logSegments = createNewDLM.getLogSegments();
        Assert.assertEquals(1L, logSegments.size());
        Assert.assertFalse(((LogSegmentMetadata) logSegments.get(0)).isInProgress());
        startAsyncLogSegmentNonPartitioned2.close();
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testAsyncReadMissingLogSegmentsNotification() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        distributedLogConfiguration.setReadAheadBatchSize(1);
        distributedLogConfiguration.setReadAheadMaxRecords(1);
        distributedLogConfiguration.setReadLACLongPollTimeout(49);
        distributedLogConfiguration.setReaderIdleWarnThresholdMillis(100);
        distributedLogConfiguration.setReaderIdleErrorThresholdMillis(20000);
        final BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-async-reader-missing-zk-notification");
        final Thread currentThread = Thread.currentThread();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        scheduledThreadPoolExecutor.schedule(new Runnable() { // from class: org.apache.distributedlog.TestAsyncReaderWriter.6
            @Override // java.lang.Runnable
            public void run() {
                int i = 1;
                for (long j = 0; j < 3; j++) {
                    try {
                        BKSyncLogWriter startLogSegmentNonPartitioned = createNewDLM.startLogSegmentNonPartitioned();
                        for (long j2 = 1; j2 <= 10; j2++) {
                            int i2 = i;
                            i++;
                            startLogSegmentNonPartitioned.write(DLMTestUtil.getLargeLogRecordInstance(i2));
                            if (j == 0 && j2 == 1) {
                                countDownLatch.countDown();
                            } else {
                                countDownLatch2.await();
                            }
                        }
                        startLogSegmentNonPartitioned.closeAndComplete();
                        Thread.sleep(100L);
                    } catch (Exception e) {
                        if (scheduledThreadPoolExecutor.isShutdown()) {
                            return;
                        }
                        currentThread.interrupt();
                        return;
                    }
                }
            }
        }, 0L, TimeUnit.MILLISECONDS);
        countDownLatch.await();
        BKAsyncLogReader asyncLogReader = createNewDLM.getAsyncLogReader(DLSN.InitialDLSN);
        asyncLogReader.disableReadAheadLogSegmentsNotification();
        boolean z = false;
        int i = 0;
        do {
            try {
                Utils.ioResult(asyncLogReader.readNext());
                if (i == 0) {
                    countDownLatch2.countDown();
                }
                i++;
            } catch (IdleReaderException e) {
                z = true;
            }
        } while (i < 30);
        Assert.assertTrue(!z);
        Assert.assertEquals(i, 30L);
        Assert.assertTrue(!currentThread.isInterrupted());
        Utils.close(asyncLogReader);
        scheduledThreadPoolExecutor.shutdown();
    }

    @Test(timeout = 60000)
    public void testGetLastTxId() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        AsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        for (int i = 0; i < 10; i++) {
            Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(i)));
            Assert.assertEquals("last tx id should become " + i, i, startAsyncLogSegmentNonPartitioned.getLastTxId());
        }
        Assert.assertEquals("recovered last tx id should be " + (10 - 1), 10 - 1, createNewDLM.startAsyncLogSegmentNonPartitioned().getLastTxId());
    }

    @Test(timeout = 60000)
    public void testMaxReadAheadRecords() throws Exception {
        int i = (1 + 8) - 1;
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(Integer.MAX_VALUE);
        distributedLogConfiguration.setReadAheadMaxRecords(1);
        distributedLogConfiguration.setReadAheadBatchSize(8);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        AsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        for (int i2 = 1; i2 <= 40; i2++) {
            Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(i2)));
            Assert.assertEquals("last tx id should become " + i2, i2, startAsyncLogSegmentNonPartitioned.getLastTxId());
        }
        LogRecord logRecordInstance = DLMTestUtil.getLogRecordInstance(40);
        logRecordInstance.setControl();
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(logRecordInstance));
        BKAsyncLogReader asyncLogReader = createNewDLM.getAsyncLogReader(DLSN.InitialDLSN);
        LogRecord logRecord = (LogRecord) Utils.ioResult(asyncLogReader.readNext());
        LOG.info("Read record {}", logRecord);
        Assert.assertEquals(1L, logRecord.getTransactionId());
        Assert.assertNotNull(asyncLogReader.getReadAheadReader());
        Assert.assertTrue(asyncLogReader.getReadAheadReader().getNumCachedEntries() <= i);
        for (int i3 = 2; i3 <= 40; i3++) {
            LogRecord logRecord2 = (LogRecord) Utils.ioResult(asyncLogReader.readNext());
            LOG.info("Read record {}", logRecord2);
            Assert.assertEquals(i3, logRecord2.getTransactionId());
            TimeUnit.MILLISECONDS.sleep(20L);
            int numCachedEntries = asyncLogReader.getReadAheadReader().getNumCachedEntries();
            Assert.assertTrue("Should cache less than 8 records but already found " + numCachedEntries + " records when reading " + i3 + "th record", numCachedEntries <= i);
        }
        Utils.close(asyncLogReader);
    }

    @Test(timeout = 60000)
    public void testMarkEndOfStream() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(0);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        int i = 1;
        while (i <= 10) {
            Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(i)));
            Assert.assertEquals("last tx id should become " + i, i, startAsyncLogSegmentNonPartitioned.getLastTxId());
            i++;
        }
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.markEndOfStream());
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.markEndOfStream());
        try {
            Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(i)));
            Assert.fail("Should have thrown");
        } catch (EndOfStreamException e) {
        }
        BKAsyncLogReader asyncLogReader = createNewDLM.getAsyncLogReader(DLSN.InitialDLSN);
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertEquals(i2 + 1, ((LogRecord) Utils.ioResult(asyncLogReader.readNext())).getTransactionId());
        }
        try {
            Assert.fail("Should have thrown");
        } catch (EndOfStreamException e2) {
        }
        Utils.close(asyncLogReader);
    }

    @Test(timeout = 60000)
    public void testMarkEndOfStreamAtBeginningOfSegment() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(0);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.markEndOfStream());
        try {
            Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L)));
            Assert.fail("Should have thrown");
        } catch (EndOfStreamException e) {
        }
        startAsyncLogSegmentNonPartitioned.close();
        BKAsyncLogReader asyncLogReader = createNewDLM.getAsyncLogReader(DLSN.InitialDLSN);
        try {
            Assert.fail("Should have thrown");
        } catch (EndOfStreamException e2) {
        }
        Utils.close(asyncLogReader);
    }

    @Test(timeout = 60000)
    public void testBulkReadWaitingMoreRecords() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(0);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L)));
        LogRecord logRecordInstance = DLMTestUtil.getLogRecordInstance(1L);
        logRecordInstance.setControl();
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(logRecordInstance));
        BKAsyncLogReader asyncLogReader = createNewDLM.getAsyncLogReader(DLSN.InitialDLSN);
        CompletableFuture readBulk = asyncLogReader.readBulk(2, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        CompletableFuture readNext = asyncLogReader.readNext();
        for (int i = 0; i < 5; i++) {
            long j = 2 + i;
            Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(j)));
            LogRecord logRecordInstance2 = DLMTestUtil.getLogRecordInstance(j);
            logRecordInstance2.setControl();
            Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(logRecordInstance2));
        }
        List list = (List) Utils.ioResult(readBulk);
        Assert.assertEquals(2L, list.size());
        Assert.assertEquals(1L, ((LogRecordWithDLSN) list.get(0)).getTransactionId());
        Assert.assertEquals(2L, ((LogRecordWithDLSN) list.get(1)).getTransactionId());
        Iterator it = list.iterator();
        while (it.hasNext()) {
            DLMTestUtil.verifyLogRecord((LogRecordWithDLSN) it.next());
        }
        LogRecordWithDLSN logRecordWithDLSN = (LogRecordWithDLSN) Utils.ioResult(readNext);
        Assert.assertEquals(3L, logRecordWithDLSN.getTransactionId());
        DLMTestUtil.verifyLogRecord(logRecordWithDLSN);
        Utils.close(asyncLogReader);
        startAsyncLogSegmentNonPartitioned.close();
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testBulkReadNotWaitingMoreRecords() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(0);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L)));
        LogRecord logRecordInstance = DLMTestUtil.getLogRecordInstance(1L);
        logRecordInstance.setControl();
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(logRecordInstance));
        BKAsyncLogReader asyncLogReader = createNewDLM.getAsyncLogReader(DLSN.InitialDLSN);
        CompletableFuture readBulk = asyncLogReader.readBulk(2, 0L, TimeUnit.MILLISECONDS);
        CompletableFuture readNext = asyncLogReader.readNext();
        List list = (List) Utils.ioResult(readBulk);
        Assert.assertEquals(1L, list.size());
        Assert.assertEquals(1L, ((LogRecordWithDLSN) list.get(0)).getTransactionId());
        Iterator it = list.iterator();
        while (it.hasNext()) {
            DLMTestUtil.verifyLogRecord((LogRecordWithDLSN) it.next());
        }
        for (int i = 0; i < 5; i++) {
            long j = 2 + i;
            Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(j)));
            LogRecord logRecordInstance2 = DLMTestUtil.getLogRecordInstance(j);
            logRecordInstance2.setControl();
            Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(logRecordInstance2));
        }
        LogRecordWithDLSN logRecordWithDLSN = (LogRecordWithDLSN) Utils.ioResult(readNext);
        Assert.assertEquals(2L, logRecordWithDLSN.getTransactionId());
        DLMTestUtil.verifyLogRecord(logRecordWithDLSN);
        Utils.close(asyncLogReader);
        startAsyncLogSegmentNonPartitioned.close();
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testReadBrokenEntries() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(0);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        distributedLogConfiguration.setReadAheadWaitTime(10);
        distributedLogConfiguration.setReadAheadBatchSize(1);
        distributedLogConfiguration.setPositionGapDetectionEnabled(false);
        distributedLogConfiguration.setReadAheadSkipBrokenEntries(true);
        distributedLogConfiguration.setEIInjectReadAheadBrokenEntries(true);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        writeRecords(createNewDLM, 3, 10, 1L, false);
        AsyncLogReader asyncLogReader = createNewDLM.getAsyncLogReader(DLSN.InvalidDLSN);
        for (int i = 0; i < 27; i++) {
            Assert.assertFalse(((LogRecordWithDLSN) Utils.ioResult(asyncLogReader.readNext())).getDlsn().getEntryId() % 10 == 0);
        }
        Utils.close(asyncLogReader);
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testReadBrokenEntriesWithGapDetection() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(0);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        distributedLogConfiguration.setReadAheadWaitTime(10);
        distributedLogConfiguration.setReadAheadBatchSize(1);
        distributedLogConfiguration.setPositionGapDetectionEnabled(true);
        distributedLogConfiguration.setReadAheadSkipBrokenEntries(true);
        distributedLogConfiguration.setEIInjectReadAheadBrokenEntries(true);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        writeRecords(createNewDLM, 1, 100, 1L, false);
        AsyncLogReader asyncLogReader = createNewDLM.getAsyncLogReader(DLSN.InvalidDLSN);
        for (int i = 0; i < 30; i++) {
            try {
                Assert.assertFalse(((LogRecordWithDLSN) Utils.ioResult(asyncLogReader.readNext())).getDlsn().getEntryId() % 10 == 0);
            } catch (DLIllegalStateException e) {
            }
        }
        Assert.fail("should have thrown");
        Utils.close(asyncLogReader);
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testReadBrokenEntriesAndLargeBatchSize() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(0);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        distributedLogConfiguration.setReadAheadWaitTime(10);
        distributedLogConfiguration.setReadAheadBatchSize(5);
        distributedLogConfiguration.setPositionGapDetectionEnabled(false);
        distributedLogConfiguration.setReadAheadSkipBrokenEntries(true);
        distributedLogConfiguration.setEIInjectReadAheadBrokenEntries(true);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        writeRecords(createNewDLM, 1, 100, 1L, false);
        AsyncLogReader asyncLogReader = createNewDLM.getAsyncLogReader(DLSN.InvalidDLSN);
        for (int i = 0; i < 50; i++) {
            Assert.assertFalse(((LogRecordWithDLSN) Utils.ioResult(asyncLogReader.readNext())).getDlsn().getEntryId() % 10 == 0);
        }
        Utils.close(asyncLogReader);
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testReadBrokenEntriesAndLargeBatchSizeCrossSegment() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(0);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        distributedLogConfiguration.setReadAheadWaitTime(10);
        distributedLogConfiguration.setReadAheadBatchSize(8);
        distributedLogConfiguration.setPositionGapDetectionEnabled(false);
        distributedLogConfiguration.setReadAheadSkipBrokenEntries(true);
        distributedLogConfiguration.setEIInjectReadAheadBrokenEntries(true);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        writeRecords(createNewDLM, 3, 5, 1L, false);
        AsyncLogReader asyncLogReader = createNewDLM.getAsyncLogReader(DLSN.InvalidDLSN);
        for (int i = 0; i < 12; i++) {
            Assert.assertFalse(((LogRecordWithDLSN) Utils.ioResult(asyncLogReader.readNext())).getDlsn().getEntryId() % 10 == 0);
        }
        Utils.close(asyncLogReader);
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testCreateLogStreamWithDifferentReplicationFactor() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(0);
        DynamicDistributedLogConfiguration dynamicDistributedLogConfiguration = new DynamicDistributedLogConfiguration(new ConcurrentConstConfiguration(distributedLogConfiguration));
        dynamicDistributedLogConfiguration.setProperty("bkcEnsembleSize", 2);
        URI createDLMURI = createDLMURI("/" + methodName);
        ensureURICreated(createDLMURI);
        Namespace build = NamespaceBuilder.newBuilder().conf(distributedLogConfiguration).uri(createDLMURI).build();
        DistributedLogManager openLog = build.openLog(methodName + "-pool");
        AsyncLogWriter startAsyncLogSegmentNonPartitioned = openLog.startAsyncLogSegmentNonPartitioned();
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L)));
        List logSegments = openLog.getLogSegments();
        Assert.assertEquals(1L, logSegments.size());
        LedgerHandle openLedgerNoRecovery = build.getNamespaceDriver().getReaderBKC().get().openLedgerNoRecovery(((LogSegmentMetadata) logSegments.get(0)).getLogSegmentId(), BookKeeper.DigestType.CRC32, distributedLogConfiguration.getBKDigestPW().getBytes(Charsets.UTF_8));
        Assert.assertEquals(3L, openLedgerNoRecovery.getLedgerMetadata().getEnsembleSize());
        openLedgerNoRecovery.close();
        Utils.close(startAsyncLogSegmentNonPartitioned);
        openLog.close();
        DistributedLogManager openLog2 = build.openLog(methodName + "-custom", Optional.empty(), Optional.of(dynamicDistributedLogConfiguration), Optional.empty());
        AsyncLogWriter startAsyncLogSegmentNonPartitioned2 = openLog2.startAsyncLogSegmentNonPartitioned();
        Utils.ioResult(startAsyncLogSegmentNonPartitioned2.write(DLMTestUtil.getLogRecordInstance(1L)));
        List logSegments2 = openLog2.getLogSegments();
        Assert.assertEquals(1L, logSegments2.size());
        LedgerHandle openLedgerNoRecovery2 = build.getNamespaceDriver().getReaderBKC().get().openLedgerNoRecovery(((LogSegmentMetadata) logSegments2.get(0)).getLogSegmentId(), BookKeeper.DigestType.CRC32, distributedLogConfiguration.getBKDigestPW().getBytes(Charsets.UTF_8));
        Assert.assertEquals(2L, openLedgerNoRecovery2.getLedgerMetadata().getEnsembleSize());
        openLedgerNoRecovery2.close();
        Utils.close(startAsyncLogSegmentNonPartitioned2);
        openLog2.close();
        build.close();
    }

    @Test(timeout = 60000)
    public void testWriteRecordSet() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(0);
        ensureURICreated(createDLMURI("/" + methodName));
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 5; i++) {
            newArrayList.add(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1 + i)));
        }
        ArrayList newArrayList2 = Lists.newArrayList();
        final LogRecordSet.Writer newWriter = LogRecordSet.newWriter(4096, CompressionCodec.Type.LZ4);
        for (int i2 = 0; i2 < 5; i2++) {
            LogRecord logRecordInstance = DLMTestUtil.getLogRecordInstance(6 + i2);
            CompletableFuture completableFuture = new CompletableFuture();
            newWriter.writeRecord(ByteBuffer.wrap(logRecordInstance.getPayload()), completableFuture);
            newArrayList2.add(completableFuture);
        }
        LogRecord logRecord = new LogRecord(6L, newWriter.getBuffer());
        logRecord.setRecordSet();
        CompletableFuture write = startAsyncLogSegmentNonPartitioned.write(logRecord);
        write.whenComplete((BiConsumer) new FutureEventListener<DLSN>() { // from class: org.apache.distributedlog.TestAsyncReaderWriter.7
            public void onSuccess(DLSN dlsn) {
                newWriter.completeTransmit(dlsn.getLogSegmentSequenceNo(), dlsn.getEntryId(), dlsn.getSlotId());
            }

            public void onFailure(Throwable th) {
                newWriter.abortTransmit(th);
            }
        });
        newArrayList.add(write);
        Utils.ioResult(write);
        for (int i3 = 0; i3 < 5; i3++) {
            CompletableFuture write2 = startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(11 + i3));
            newArrayList.add(write2);
            if (i3 == 0) {
                Utils.ioResult(write2);
                Assert.assertEquals(10L, createNewDLM.getLogRecordCount());
            }
        }
        List list = (List) Utils.ioResult(FutureUtils.collect(newArrayList));
        for (int i4 = 0; i4 < 5; i4++) {
            Assert.assertEquals(new DLSN(1L, i4, 0L), list.get(i4));
        }
        Assert.assertEquals(new DLSN(1L, 5L, 0L), list.get(5));
        for (int i5 = 0; i5 < 5; i5++) {
            Assert.assertEquals(new DLSN(1L, 6 + i5, 0L), list.get(6 + i5));
        }
        List list2 = (List) Utils.ioResult(FutureUtils.collect(newArrayList2));
        for (int i6 = 0; i6 < 5; i6++) {
            Assert.assertEquals(new DLSN(1L, 5L, i6), list2.get(i6));
        }
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.flushAndCommit());
        DistributedLogConfiguration distributedLogConfiguration2 = new DistributedLogConfiguration();
        distributedLogConfiguration2.addConfiguration(distributedLogConfiguration);
        distributedLogConfiguration2.setDeserializeRecordSetOnReads(true);
        BKDistributedLogManager createNewDLM2 = createNewDLM(distributedLogConfiguration2, methodName);
        AsyncLogReader asyncLogReader = createNewDLM2.getAsyncLogReader(DLSN.InitialDLSN);
        for (int i7 = 0; i7 < 15; i7++) {
            LogRecordWithDLSN logRecordWithDLSN = (LogRecordWithDLSN) Utils.ioResult(asyncLogReader.readNext());
            if (i7 < 5) {
                Assert.assertEquals(new DLSN(1L, i7, 0L), logRecordWithDLSN.getDlsn());
                Assert.assertEquals(1 + i7, logRecordWithDLSN.getTransactionId());
            } else if (i7 >= 10) {
                Assert.assertEquals(new DLSN(1L, (6 + i7) - 10, 0L), logRecordWithDLSN.getDlsn());
                Assert.assertEquals((11 + i7) - 10, logRecordWithDLSN.getTransactionId());
            } else {
                Assert.assertEquals(new DLSN(1L, 5L, i7 - 5), logRecordWithDLSN.getDlsn());
                Assert.assertEquals(6L, logRecordWithDLSN.getTransactionId());
            }
            Assert.assertEquals(i7 + 1, logRecordWithDLSN.getPositionWithinLogSegment());
            Assert.assertArrayEquals(DLMTestUtil.generatePayload(i7 + 1), logRecordWithDLSN.getPayload());
        }
        Utils.close(asyncLogReader);
        createNewDLM2.close();
        DistributedLogConfiguration distributedLogConfiguration3 = new DistributedLogConfiguration();
        distributedLogConfiguration3.addConfiguration(distributedLogConfiguration);
        distributedLogConfiguration3.setDeserializeRecordSetOnReads(false);
        BKDistributedLogManager createNewDLM3 = createNewDLM(distributedLogConfiguration3, methodName);
        AsyncLogReader asyncLogReader2 = createNewDLM3.getAsyncLogReader(DLSN.InitialDLSN);
        for (int i8 = 0; i8 < 11; i8++) {
            LogRecordWithDLSN logRecordWithDLSN2 = (LogRecordWithDLSN) Utils.ioResult(asyncLogReader2.readNext());
            LOG.info("Read record {}", logRecordWithDLSN2);
            if (i8 < 5) {
                Assert.assertEquals(new DLSN(1L, i8, 0L), logRecordWithDLSN2.getDlsn());
                Assert.assertEquals(1 + i8, logRecordWithDLSN2.getTransactionId());
                Assert.assertEquals(i8 + 1, logRecordWithDLSN2.getPositionWithinLogSegment());
                Assert.assertArrayEquals(DLMTestUtil.generatePayload(i8 + 1), logRecordWithDLSN2.getPayload());
            } else if (i8 >= 6) {
                Assert.assertEquals(new DLSN(1L, (6 + i8) - 6, 0L), logRecordWithDLSN2.getDlsn());
                Assert.assertEquals((11 + i8) - 6, logRecordWithDLSN2.getTransactionId());
                Assert.assertEquals((11 + i8) - 6, logRecordWithDLSN2.getPositionWithinLogSegment());
                Assert.assertArrayEquals(DLMTestUtil.generatePayload((11 + i8) - 6), logRecordWithDLSN2.getPayload());
            } else {
                Assert.assertEquals(new DLSN(1L, 5L, 0L), logRecordWithDLSN2.getDlsn());
                Assert.assertEquals(6L, logRecordWithDLSN2.getTransactionId());
                Assert.assertEquals(6L, logRecordWithDLSN2.getPositionWithinLogSegment());
                Assert.assertTrue(logRecordWithDLSN2.isRecordSet());
                Assert.assertEquals(5L, LogRecordSet.numRecords(logRecordWithDLSN2));
            }
        }
        Utils.close(asyncLogReader2);
        createNewDLM3.close();
    }

    @Test(timeout = 60000)
    public void testIdleReaderExceptionWhenKeepAliveIsDisabled() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(0);
        distributedLogConfiguration.setPeriodicKeepAliveMilliSeconds(0);
        distributedLogConfiguration.setReadLACLongPollTimeout(9);
        distributedLogConfiguration.setReaderIdleWarnThresholdMillis(20);
        distributedLogConfiguration.setReaderIdleErrorThresholdMillis(40);
        ensureURICreated(createDLMURI("/" + methodName));
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        BKAsyncLogWriter bKAsyncLogWriter = (BKAsyncLogWriter) Utils.ioResult(createNewDLM.openAsyncLogWriter());
        bKAsyncLogWriter.write(DLMTestUtil.getLogRecordInstance(1L));
        AsyncLogReader asyncLogReader = (AsyncLogReader) Utils.ioResult(createNewDLM.openAsyncLogReader(DLSN.InitialDLSN));
        try {
            Utils.ioResult(asyncLogReader.readNext());
            Assert.fail("Should fail when stream is idle");
        } catch (IdleReaderException e) {
        }
        Utils.close(asyncLogReader);
        bKAsyncLogWriter.close();
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testIdleReaderExceptionWhenKeepAliveIsEnabled() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.testConf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(0);
        distributedLogConfiguration.setPeriodicKeepAliveMilliSeconds(1000);
        distributedLogConfiguration.setReadLACLongPollTimeout(999);
        distributedLogConfiguration.setReaderIdleWarnThresholdMillis(2000);
        distributedLogConfiguration.setReaderIdleErrorThresholdMillis(4000);
        ensureURICreated(createDLMURI("/" + methodName));
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        BKAsyncLogWriter bKAsyncLogWriter = (BKAsyncLogWriter) Utils.ioResult(createNewDLM.openAsyncLogWriter());
        bKAsyncLogWriter.write(DLMTestUtil.getLogRecordInstance(1L));
        AsyncLogReader asyncLogReader = (AsyncLogReader) Utils.ioResult(createNewDLM.openAsyncLogReader(DLSN.InitialDLSN));
        LogRecordWithDLSN logRecordWithDLSN = (LogRecordWithDLSN) Utils.ioResult(asyncLogReader.readNext());
        Assert.assertEquals(1L, logRecordWithDLSN.getTransactionId());
        DLMTestUtil.verifyLogRecord(logRecordWithDLSN);
        Utils.close(asyncLogReader);
        bKAsyncLogWriter.close();
        createNewDLM.close();
    }
}
