package org.apache.distributedlog;

import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/TestBKSyncLogReader.class */
public class TestBKSyncLogReader extends TestDistributedLogBase {
    static final Logger logger = LoggerFactory.getLogger(TestBKSyncLogReader.class);

    @Rule
    public TestName testName = new TestName();

    @Test(timeout = 60000)
    public void testCreateReaderBeyondLastTransactionId() throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, this.testName.getMethodName());
        BKSyncLogWriter startLogSegmentNonPartitioned = createNewDLM.startLogSegmentNonPartitioned();
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 >= 10) {
                break;
            }
            startLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(j2));
            j = j2 + 1;
        }
        startLogSegmentNonPartitioned.closeAndComplete();
        LogReader inputStream = createNewDLM.getInputStream(20L);
        Assert.assertNull(inputStream.readNext(false));
        BKSyncLogWriter startLogSegmentNonPartitioned2 = createNewDLM.startLogSegmentNonPartitioned();
        long j3 = 10;
        while (true) {
            long j4 = j3;
            if (j4 >= 30) {
                break;
            }
            startLogSegmentNonPartitioned2.write(DLMTestUtil.getLogRecordInstance(j4));
            j3 = j4 + 1;
        }
        startLogSegmentNonPartitioned2.closeAndComplete();
        for (int i = 0; i < 10; i++) {
            Assert.assertEquals(20 + i, waitForNextRecord(inputStream).getTransactionId());
        }
        Assert.assertNull(inputStream.readNext(false));
    }

    @Test(timeout = 60000)
    public void testDeletingLogWhileReading() throws Exception {
        String methodName = this.testName.getMethodName();
        BKDistributedLogManager createNewDLM = createNewDLM(conf, methodName);
        BKSyncLogWriter startLogSegmentNonPartitioned = createNewDLM.startLogSegmentNonPartitioned();
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 >= 10) {
                break;
            }
            startLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(j2));
            j = j2 + 1;
        }
        startLogSegmentNonPartitioned.closeAndComplete();
        LogReader inputStream = createNewDLM.getInputStream(1L);
        for (int i = 1; i < 10; i++) {
            Assert.assertEquals(i, waitForNextRecord(inputStream).getTransactionId());
        }
        createNewDLM(conf, methodName).delete();
        try {
            for (LogRecordWithDLSN readNext = inputStream.readNext(false); null == readNext; readNext = inputStream.readNext(false)) {
            }
            Assert.fail("Should fail reading next with LogNotFound");
        } catch (LogNotFoundException e) {
        }
    }

    @Test(timeout = 60000)
    public void testReadingFromEmptyLog() throws Exception {
        String methodName = this.testName.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(Integer.MAX_VALUE);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        BKSyncLogWriter startLogSegmentNonPartitioned = createNewDLM.startLogSegmentNonPartitioned();
        startLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L));
        LogReader inputStream = createNewDLM.getInputStream(1L);
        Assert.assertNull(inputStream.readNext(true));
        Assert.assertNull(inputStream.readNext(false));
        startLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(2L));
        LogRecord waitForNextRecord = waitForNextRecord(inputStream);
        Assert.assertNotNull(waitForNextRecord);
        Assert.assertEquals(1L, waitForNextRecord.getTransactionId());
        DLMTestUtil.verifyLogRecord(waitForNextRecord);
        Assert.assertNull(inputStream.readNext(true));
        startLogSegmentNonPartitioned.close();
        inputStream.close();
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testReadRecordsAfterReadAheadCaughtUp() throws Exception {
        String methodName = this.testName.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(Integer.MAX_VALUE);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        BKSyncLogWriter startLogSegmentNonPartitioned = createNewDLM.startLogSegmentNonPartitioned();
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 > 10) {
                break;
            }
            startLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(j2));
            j = j2 + 1;
        }
        startLogSegmentNonPartitioned.flush();
        startLogSegmentNonPartitioned.commit();
        logger.info("Write first 10 records");
        BKSyncLogReader inputStream = createNewDLM.getInputStream(1L);
        while (!inputStream.getReadAheadReader().isReadAheadCaughtUp()) {
            TimeUnit.MILLISECONDS.sleep(20L);
        }
        logger.info("ReadAhead is caught up with first 10 records");
        long j3 = 11;
        while (true) {
            long j4 = j3;
            if (j4 > 20) {
                break;
            }
            startLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(j4));
            j3 = j4 + 1;
        }
        startLogSegmentNonPartitioned.flush();
        startLogSegmentNonPartitioned.commit();
        logger.info("Write another 10 records");
        while (inputStream.getReadAheadReader().getNextEntryPosition().getEntryId() < 21) {
            TimeUnit.MILLISECONDS.sleep(20L);
        }
        logger.info("ReadAhead is caught up with another 10 records");
        int i = 0;
        long j5 = 1;
        for (LogRecordWithDLSN readNext = inputStream.readNext(false); null != readNext; readNext = inputStream.readNext(false)) {
            i++;
            Assert.assertEquals(j5, readNext.getTransactionId());
            DLMTestUtil.verifyLogRecord(readNext);
            j5++;
        }
        Assert.assertEquals(20L, i);
        startLogSegmentNonPartitioned.close();
        inputStream.close();
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testReadRecordsWhenReadAheadCatchingUp() throws Exception {
        String methodName = this.testName.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(Integer.MAX_VALUE);
        distributedLogConfiguration.setReadAheadMaxRecords(1);
        distributedLogConfiguration.setReadAheadBatchSize(1);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        BKSyncLogWriter startLogSegmentNonPartitioned = createNewDLM.startLogSegmentNonPartitioned();
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 > 10) {
                break;
            }
            startLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(j2));
            j = j2 + 1;
        }
        startLogSegmentNonPartitioned.flush();
        startLogSegmentNonPartitioned.commit();
        logger.info("Write first 10 records");
        BKSyncLogReader inputStream = createNewDLM.getInputStream(1L);
        int i = 0;
        long j3 = 1;
        for (LogRecordWithDLSN readNext = inputStream.readNext(false); null != readNext; readNext = inputStream.readNext(false)) {
            i++;
            Assert.assertEquals(j3, readNext.getTransactionId());
            DLMTestUtil.verifyLogRecord(readNext);
            j3++;
        }
        Assert.assertEquals(10L, i);
        startLogSegmentNonPartitioned.close();
        inputStream.close();
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testReadRecordsWhenReadAheadCatchingUp2() throws Exception {
        String methodName = this.testName.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(Integer.MAX_VALUE);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        final BKSyncLogWriter startLogSegmentNonPartitioned = createNewDLM.startLogSegmentNonPartitioned();
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 > 10) {
                break;
            }
            startLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(j2));
            j = j2 + 1;
        }
        startLogSegmentNonPartitioned.flush();
        startLogSegmentNonPartitioned.commit();
        final AtomicLong atomicLong = new AtomicLong(11L);
        logger.info("Write first 10 records");
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        newSingleThreadScheduledExecutor.scheduleAtFixedRate(new Runnable() { // from class: org.apache.distributedlog.TestBKSyncLogReader.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    startLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(atomicLong.getAndIncrement()));
                } catch (IOException e) {
                }
            }
        }, 0L, 400L, TimeUnit.MILLISECONDS);
        BKSyncLogReader inputStream = createNewDLM.getInputStream(1L);
        int i = 0;
        long j3 = 1;
        for (LogRecordWithDLSN readNext = inputStream.readNext(false); null != readNext; readNext = inputStream.readNext(false)) {
            i++;
            Assert.assertEquals(j3, readNext.getTransactionId());
            DLMTestUtil.verifyLogRecord(readNext);
            j3++;
        }
        Assert.assertTrue(i >= 10);
        newSingleThreadScheduledExecutor.shutdown();
        startLogSegmentNonPartitioned.close();
        inputStream.close();
        createNewDLM.close();
    }
}
