package org.apache.distributedlog.impl.logsegment;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.distributedlog.BKDistributedLogManager;
import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.BookKeeperClientBuilder;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.Entry;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClientBuilder;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.bk.LedgerAllocator;
import org.apache.distributedlog.exceptions.EndOfLogSegmentException;
import org.apache.distributedlog.exceptions.ReadCancelledException;
import org.apache.distributedlog.injector.AsyncFailureInjector;
import org.apache.distributedlog.io.AsyncCloseable;
import org.apache.distributedlog.util.ConfUtils;
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.Utils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

/* loaded from: input_file:org/apache/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.class */
public class TestBKLogSegmentEntryReader extends TestDistributedLogBase {

    @Rule
    public TestName runtime = new TestName();
    private OrderedScheduler scheduler;
    private BookKeeperClient bkc;
    private ZooKeeperClient zkc;

    @Override // org.apache.distributedlog.TestDistributedLogBase
    @Before
    public void setup() throws Exception {
        super.setup();
        this.zkc = ZooKeeperClientBuilder.newBuilder().name("test-zk").zkServers(bkutil.getZkServers()).sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()).zkAclId(conf.getZkAclId()).build();
        this.bkc = BookKeeperClientBuilder.newBuilder().name("test-bk").dlConfig(conf).ledgersPath("/ledgers").zkServers(bkutil.getZkServers()).build();
        this.scheduler = OrderedScheduler.newBuilder().name("test-bk-logsegment-entry-reader").corePoolSize(1).build();
    }

    @Override // org.apache.distributedlog.TestDistributedLogBase
    @After
    public void teardown() throws Exception {
        if (null != this.bkc) {
            this.bkc.close();
        }
        if (null != this.scheduler) {
            this.scheduler.shutdown();
        }
        if (null != this.zkc) {
            this.zkc.close();
        }
        super.teardown();
    }

    BKLogSegmentEntryReader createEntryReader(LogSegmentMetadata logSegmentMetadata, long j, DistributedLogConfiguration distributedLogConfiguration) throws Exception {
        return (BKLogSegmentEntryReader) Utils.ioResult(new BKLogSegmentEntryStore(distributedLogConfiguration, ConfUtils.getConstDynConf(distributedLogConfiguration), this.zkc, this.bkc, this.scheduler, (LedgerAllocator) null, NullStatsLogger.INSTANCE, AsyncFailureInjector.NULL).openReader(logSegmentMetadata, j));
    }

    /* JADX WARN: Type inference failed for: r0v8, types: [long, org.apache.distributedlog.api.AsyncLogWriter, org.apache.distributedlog.io.AsyncCloseable] */
    void generateCompletedLogSegments(DistributedLogManager distributedLogManager, DistributedLogConfiguration distributedLogConfiguration, long j, long j2) throws Exception {
        long j3 = 1;
        long j4 = 0;
        while (true) {
            long j5 = j4;
            if (j5 >= j) {
                return;
            }
            ?? r0 = (AsyncLogWriter) Utils.ioResult(distributedLogManager.openAsyncLogWriter());
            long j6 = 1;
            while (true) {
                long j7 = j6;
                if (j7 <= j2) {
                    j3++;
                    Utils.ioResult(r0.write(DLMTestUtil.getLogRecordInstance(r0)));
                    LogRecord logRecordInstance = DLMTestUtil.getLogRecordInstance(j3);
                    logRecordInstance.setControl();
                    Utils.ioResult(r0.write(logRecordInstance));
                    j6 = j7 + 1;
                }
            }
            Utils.close((AsyncCloseable) r0);
            j4 = j5 + 1;
        }
    }

    AsyncLogWriter createInprogressLogSegment(DistributedLogManager distributedLogManager, DistributedLogConfiguration distributedLogConfiguration, long j) throws Exception {
        AsyncLogWriter asyncLogWriter = (AsyncLogWriter) Utils.ioResult(distributedLogManager.openAsyncLogWriter());
        long j2 = 1;
        while (true) {
            long j3 = j2;
            if (j3 > j) {
                return asyncLogWriter;
            }
            Utils.ioResult(asyncLogWriter.write(DLMTestUtil.getLogRecordInstance(j3)));
            LogRecord logRecordInstance = DLMTestUtil.getLogRecordInstance(j3);
            logRecordInstance.setControl();
            Utils.ioResult(asyncLogWriter.write(logRecordInstance));
            j2 = j3 + 1;
        }
    }

    @Test(timeout = 60000)
    public void testReadEntriesFromCompleteLogSegment() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(0);
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        distributedLogConfiguration.setNumPrefetchEntriesPerLogSegment(10);
        distributedLogConfiguration.setMaxPrefetchEntriesPerLogSegment(10);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, this.runtime.getMethodName());
        generateCompletedLogSegments(createNewDLM, distributedLogConfiguration, 1L, 20L);
        List logSegments = createNewDLM.getLogSegments();
        Assert.assertEquals(logSegments.size() + " log segments found, expected to be only one", 1L, logSegments.size());
        BKLogSegmentEntryReader createEntryReader = createEntryReader((LogSegmentMetadata) logSegments.get(0), 0L, distributedLogConfiguration);
        createEntryReader.start();
        boolean z = false;
        long j = 1;
        long j2 = 0;
        while (!z) {
            try {
                Entry.Reader reader = (Entry.Reader) ((List) Utils.ioResult(createEntryReader.readNext(1))).get(0);
                LogRecordWithDLSN nextRecord = reader.nextRecord();
                while (true) {
                    LogRecordWithDLSN logRecordWithDLSN = nextRecord;
                    if (null == logRecordWithDLSN) {
                        break;
                    }
                    if (!logRecordWithDLSN.isControl()) {
                        DLMTestUtil.verifyLogRecord(logRecordWithDLSN);
                        Assert.assertEquals(j, logRecordWithDLSN.getTransactionId());
                        j++;
                    }
                    DLSN dlsn = logRecordWithDLSN.getDlsn();
                    Assert.assertEquals(1L, dlsn.getLogSegmentSequenceNo());
                    Assert.assertEquals(j2, dlsn.getEntryId());
                    nextRecord = reader.nextRecord();
                }
                j2++;
            } catch (EndOfLogSegmentException e) {
                z = true;
            }
        }
        Assert.assertEquals(21L, j);
        Assert.assertFalse(createEntryReader.hasCaughtUpOnInprogress());
        Utils.close(createEntryReader);
    }

    @Test(timeout = 60000)
    public void testCloseReaderToCancelPendingReads() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setNumPrefetchEntriesPerLogSegment(10);
        distributedLogConfiguration.setMaxPrefetchEntriesPerLogSegment(10);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, this.runtime.getMethodName());
        DLMTestUtil.generateCompletedLogSegments(createNewDLM, distributedLogConfiguration, 1L, 20L);
        List logSegments = createNewDLM.getLogSegments();
        Assert.assertEquals(logSegments.size() + " log segments found, expected to be only one", 1L, logSegments.size());
        BKLogSegmentEntryReader createEntryReader = createEntryReader((LogSegmentMetadata) logSegments.get(0), 0L, distributedLogConfiguration);
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 5; i++) {
            newArrayList.add(createEntryReader.readNext(1));
        }
        Assert.assertFalse("Reader should not be closed yet", createEntryReader.isClosed());
        Utils.close(createEntryReader);
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            try {
                Utils.ioResult((CompletableFuture) it.next());
                Assert.fail("The read request should be cancelled");
            } catch (ReadCancelledException e) {
            }
        }
        Assert.assertFalse(createEntryReader.hasCaughtUpOnInprogress());
        Assert.assertTrue("Reader should be closed yet", createEntryReader.isClosed());
    }

    @Test(timeout = 60000)
    public void testMaxPrefetchEntriesSmallBatch() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(0);
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        distributedLogConfiguration.setNumPrefetchEntriesPerLogSegment(2);
        distributedLogConfiguration.setMaxPrefetchEntriesPerLogSegment(10);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, this.runtime.getMethodName());
        generateCompletedLogSegments(createNewDLM, distributedLogConfiguration, 1L, 20L);
        List logSegments = createNewDLM.getLogSegments();
        Assert.assertEquals(logSegments.size() + " log segments found, expected to be only one", 1L, logSegments.size());
        BKLogSegmentEntryReader createEntryReader = createEntryReader((LogSegmentMetadata) logSegments.get(0), 0L, distributedLogConfiguration);
        createEntryReader.start();
        while (createEntryReader.readAheadEntries.size() < 10) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        long j = 1;
        Assert.assertEquals(10L, createEntryReader.readAheadEntries.size());
        Assert.assertEquals(10L, createEntryReader.getNextEntryId());
        Assert.assertFalse(createEntryReader.hasCaughtUpOnInprogress());
        Entry.Reader reader = (Entry.Reader) ((List) Utils.ioResult(createEntryReader.readNext(1))).get(0);
        LogRecordWithDLSN nextRecord = reader.nextRecord();
        while (true) {
            LogRecordWithDLSN logRecordWithDLSN = nextRecord;
            if (null == logRecordWithDLSN) {
                break;
            }
            if (!logRecordWithDLSN.isControl()) {
                DLMTestUtil.verifyLogRecord(logRecordWithDLSN);
                Assert.assertEquals(j, logRecordWithDLSN.getTransactionId());
                j++;
            }
            DLSN dlsn = logRecordWithDLSN.getDlsn();
            Assert.assertEquals(1L, dlsn.getLogSegmentSequenceNo());
            Assert.assertEquals(0L, dlsn.getEntryId());
            nextRecord = reader.nextRecord();
        }
        long j2 = 0 + 1;
        Assert.assertEquals(2L, j);
        while (createEntryReader.readAheadEntries.size() < 10) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals(10L, createEntryReader.readAheadEntries.size());
        Assert.assertEquals(11L, createEntryReader.getNextEntryId());
        Assert.assertFalse(createEntryReader.hasCaughtUpOnInprogress());
        Utils.close(createEntryReader);
    }

    @Test(timeout = 60000)
    public void testMaxPrefetchEntriesLargeBatch() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(0);
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        distributedLogConfiguration.setNumPrefetchEntriesPerLogSegment(10);
        distributedLogConfiguration.setMaxPrefetchEntriesPerLogSegment(5);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, this.runtime.getMethodName());
        generateCompletedLogSegments(createNewDLM, distributedLogConfiguration, 1L, 20L);
        List logSegments = createNewDLM.getLogSegments();
        Assert.assertEquals(logSegments.size() + " log segments found, expected to be only one", 1L, logSegments.size());
        BKLogSegmentEntryReader createEntryReader = createEntryReader((LogSegmentMetadata) logSegments.get(0), 0L, distributedLogConfiguration);
        createEntryReader.start();
        while (createEntryReader.readAheadEntries.size() < 5) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        long j = 1;
        Assert.assertEquals(5L, createEntryReader.readAheadEntries.size());
        Assert.assertEquals(5L, createEntryReader.getNextEntryId());
        Entry.Reader reader = (Entry.Reader) ((List) Utils.ioResult(createEntryReader.readNext(1))).get(0);
        LogRecordWithDLSN nextRecord = reader.nextRecord();
        while (true) {
            LogRecordWithDLSN logRecordWithDLSN = nextRecord;
            if (null == logRecordWithDLSN) {
                break;
            }
            if (!logRecordWithDLSN.isControl()) {
                DLMTestUtil.verifyLogRecord(logRecordWithDLSN);
                Assert.assertEquals(j, logRecordWithDLSN.getTransactionId());
                j++;
            }
            DLSN dlsn = logRecordWithDLSN.getDlsn();
            Assert.assertEquals(1L, dlsn.getLogSegmentSequenceNo());
            Assert.assertEquals(0L, dlsn.getEntryId());
            nextRecord = reader.nextRecord();
        }
        long j2 = 0 + 1;
        Assert.assertEquals(2L, j);
        while (createEntryReader.readAheadEntries.size() < 5) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals(5L, createEntryReader.readAheadEntries.size());
        Assert.assertEquals(6L, createEntryReader.getNextEntryId());
        Assert.assertFalse(createEntryReader.hasCaughtUpOnInprogress());
        Utils.close(createEntryReader);
    }

    @Test(timeout = 60000)
    public void testMaxPrefetchEntriesSmallSegment() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(0);
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        distributedLogConfiguration.setNumPrefetchEntriesPerLogSegment(10);
        distributedLogConfiguration.setMaxPrefetchEntriesPerLogSegment(20);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, this.runtime.getMethodName());
        generateCompletedLogSegments(createNewDLM, distributedLogConfiguration, 1L, 5L);
        List logSegments = createNewDLM.getLogSegments();
        Assert.assertEquals(logSegments.size() + " log segments found, expected to be only one", 1L, logSegments.size());
        BKLogSegmentEntryReader createEntryReader = createEntryReader((LogSegmentMetadata) logSegments.get(0), 0L, distributedLogConfiguration);
        createEntryReader.start();
        while (createEntryReader.readAheadEntries.size() < createEntryReader.getLastAddConfirmed() + 1) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        long j = 1;
        Assert.assertEquals(createEntryReader.getLastAddConfirmed() + 1, createEntryReader.readAheadEntries.size());
        Assert.assertEquals(createEntryReader.getLastAddConfirmed() + 1, createEntryReader.getNextEntryId());
        Entry.Reader reader = (Entry.Reader) ((List) Utils.ioResult(createEntryReader.readNext(1))).get(0);
        LogRecordWithDLSN nextRecord = reader.nextRecord();
        while (true) {
            LogRecordWithDLSN logRecordWithDLSN = nextRecord;
            if (null == logRecordWithDLSN) {
                long j2 = 0 + 1;
                Assert.assertEquals(2L, j);
                Assert.assertEquals(createEntryReader.getLastAddConfirmed(), createEntryReader.readAheadEntries.size());
                Assert.assertEquals(createEntryReader.getLastAddConfirmed() + 1, createEntryReader.getNextEntryId());
                Assert.assertFalse(createEntryReader.hasCaughtUpOnInprogress());
                Utils.close(createEntryReader);
                return;
            }
            if (!logRecordWithDLSN.isControl()) {
                DLMTestUtil.verifyLogRecord(logRecordWithDLSN);
                Assert.assertEquals(j, logRecordWithDLSN.getTransactionId());
                j++;
            }
            DLSN dlsn = logRecordWithDLSN.getDlsn();
            Assert.assertEquals(1L, dlsn.getLogSegmentSequenceNo());
            Assert.assertEquals(0L, dlsn.getEntryId());
            nextRecord = reader.nextRecord();
        }
    }

    @Test(timeout = 60000)
    public void testReadEntriesFromInprogressSegment() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(0);
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        distributedLogConfiguration.setNumPrefetchEntriesPerLogSegment(20);
        distributedLogConfiguration.setMaxPrefetchEntriesPerLogSegment(20);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, this.runtime.getMethodName());
        AsyncLogWriter createInprogressLogSegment = createInprogressLogSegment(createNewDLM, distributedLogConfiguration, 5L);
        List logSegments = createNewDLM.getLogSegments();
        Assert.assertEquals(logSegments.size() + " log segments found, expected to be only one", 1L, logSegments.size());
        BKLogSegmentEntryReader createEntryReader = createEntryReader((LogSegmentMetadata) logSegments.get(0), 0L, distributedLogConfiguration);
        createEntryReader.start();
        while (createEntryReader.readAheadEntries.size() < 8 + 2) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals(8 + 2, createEntryReader.getNextEntryId());
        long j = 1;
        long j2 = 0;
        do {
            Entry.Reader reader = (Entry.Reader) ((List) Utils.ioResult(createEntryReader.readNext(1))).get(0);
            LogRecordWithDLSN nextRecord = reader.nextRecord();
            while (true) {
                LogRecordWithDLSN logRecordWithDLSN = nextRecord;
                if (null == logRecordWithDLSN) {
                    break;
                }
                if (!logRecordWithDLSN.isControl()) {
                    DLMTestUtil.verifyLogRecord(logRecordWithDLSN);
                    Assert.assertEquals(j, logRecordWithDLSN.getTransactionId());
                    j++;
                }
                DLSN dlsn = logRecordWithDLSN.getDlsn();
                Assert.assertEquals(1L, dlsn.getLogSegmentSequenceNo());
                Assert.assertEquals(j2, dlsn.getEntryId());
                nextRecord = reader.nextRecord();
            }
            j2++;
        } while (j2 != 8 + 1);
        Assert.assertEquals(6L, j);
        CompletableFuture readNext = createEntryReader.readNext(1);
        Utils.ioResult(createInprogressLogSegment.write(DLMTestUtil.getLogRecordInstance(j)));
        List list = (List) Utils.ioResult(readNext);
        Assert.assertEquals(1L, list.size());
        Assert.assertTrue(createEntryReader.hasCaughtUpOnInprogress());
        Entry.Reader reader2 = (Entry.Reader) list.get(0);
        LogRecordWithDLSN nextRecord2 = reader2.nextRecord();
        Assert.assertNotNull(nextRecord2);
        Assert.assertTrue(nextRecord2.isControl());
        Assert.assertNull(reader2.nextRecord());
        while (createEntryReader.getNextEntryId() <= j2) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals(j2 + 2, createEntryReader.getNextEntryId());
        Assert.assertEquals(1L, createEntryReader.readAheadEntries.size());
        Utils.close(createEntryReader);
        Utils.close(createInprogressLogSegment);
    }

    @Test(timeout = 60000)
    public void testReadEntriesOnStateChange() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(0);
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        distributedLogConfiguration.setNumPrefetchEntriesPerLogSegment(20);
        distributedLogConfiguration.setMaxPrefetchEntriesPerLogSegment(20);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, this.runtime.getMethodName());
        AsyncLogWriter createInprogressLogSegment = createInprogressLogSegment(createNewDLM, distributedLogConfiguration, 5L);
        List logSegments = createNewDLM.getLogSegments();
        Assert.assertEquals(logSegments.size() + " log segments found, expected to be only one", 1L, logSegments.size());
        BKLogSegmentEntryReader createEntryReader = createEntryReader((LogSegmentMetadata) logSegments.get(0), 0L, distributedLogConfiguration);
        createEntryReader.start();
        while (createEntryReader.readAheadEntries.size() < 8 + 2) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals(8 + 2, createEntryReader.getNextEntryId());
        long j = 1;
        long j2 = 0;
        do {
            Entry.Reader reader = (Entry.Reader) ((List) Utils.ioResult(createEntryReader.readNext(1))).get(0);
            LogRecordWithDLSN nextRecord = reader.nextRecord();
            while (true) {
                LogRecordWithDLSN logRecordWithDLSN = nextRecord;
                if (null == logRecordWithDLSN) {
                    break;
                }
                if (!logRecordWithDLSN.isControl()) {
                    DLMTestUtil.verifyLogRecord(logRecordWithDLSN);
                    Assert.assertEquals(j, logRecordWithDLSN.getTransactionId());
                    j++;
                }
                DLSN dlsn = logRecordWithDLSN.getDlsn();
                Assert.assertEquals(1L, dlsn.getLogSegmentSequenceNo());
                Assert.assertEquals(j2, dlsn.getEntryId());
                nextRecord = reader.nextRecord();
            }
            j2++;
        } while (j2 != 8 + 1);
        Assert.assertEquals(6L, j);
        CompletableFuture readNext = createEntryReader.readNext(1);
        Utils.ioResult(createInprogressLogSegment.write(DLMTestUtil.getLogRecordInstance(j)));
        List list = (List) Utils.ioResult(readNext);
        Assert.assertEquals(1L, list.size());
        Entry.Reader reader2 = (Entry.Reader) list.get(0);
        LogRecordWithDLSN nextRecord2 = reader2.nextRecord();
        Assert.assertNotNull(nextRecord2);
        Assert.assertTrue(nextRecord2.isControl());
        Assert.assertNull(reader2.nextRecord());
        while (createEntryReader.getNextEntryId() <= j2) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals(j2 + 2, createEntryReader.getNextEntryId());
        Assert.assertEquals(1L, createEntryReader.readAheadEntries.size());
        long j3 = j2 + 1;
        Utils.close(createInprogressLogSegment);
        Entry.Reader reader3 = (Entry.Reader) ((List) Utils.ioResult(createEntryReader.readNext(1))).get(0);
        LogRecordWithDLSN nextRecord3 = reader3.nextRecord();
        Assert.assertNotNull(nextRecord3);
        Assert.assertFalse(nextRecord3.isControl());
        Assert.assertNull(reader3.nextRecord());
        while (createEntryReader.getNextEntryId() <= j3 + 1) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals(j3 + 2, createEntryReader.getNextEntryId());
        Assert.assertEquals(1L, createEntryReader.readAheadEntries.size());
        List logSegments2 = createNewDLM.getLogSegments();
        Assert.assertEquals(1L, logSegments2.size());
        Assert.assertFalse(((LogSegmentMetadata) logSegments2.get(0)).isInProgress());
        createEntryReader.onLogSegmentMetadataUpdated((LogSegmentMetadata) logSegments2.get(0));
        try {
            Utils.ioResult(createEntryReader.readNext(1));
            Utils.ioResult(createEntryReader.readNext(1));
            Assert.fail("Should reach end of log segment");
        } catch (EndOfLogSegmentException e) {
        }
        Utils.close(createEntryReader);
    }
}
