package org.apache.distributedlog;

import com.google.common.util.concurrent.RateLimiter;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.util.Utils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/distributedlog/TestNonBlockingReadsMultiReader.class */
public class TestNonBlockingReadsMultiReader extends TestDistributedLogBase {

    /* loaded from: input_file:org/apache/distributedlog/TestNonBlockingReadsMultiReader$ReaderThread.class */
    static class ReaderThread extends Thread {
        final LogReader reader;
        final boolean nonBlockReading;
        volatile boolean running;
        final AtomicInteger readCount;

        ReaderThread(String str, LogReader logReader, boolean z) {
            super(str);
            this.running = true;
            this.readCount = new AtomicInteger(0);
            this.reader = logReader;
            this.nonBlockReading = z;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.running) {
                try {
                    LogRecordWithDLSN readNext = this.reader.readNext(this.nonBlockReading);
                    if (readNext != null) {
                        this.readCount.incrementAndGet();
                        if (this.readCount.get() % 1000 == 0) {
                            TestDistributedLogBase.LOG.info("{} reading {}", getName(), Long.valueOf(readNext.getTransactionId()));
                        }
                    }
                } catch (DLInterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (IOException e2) {
                    return;
                }
            }
        }

        void stopReading() {
            TestDistributedLogBase.LOG.info("Stopping reader.");
            this.running = false;
            interrupt();
            try {
                join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                TestDistributedLogBase.LOG.error("Interrupted on waiting reader thread {} exiting : ", getName(), e);
            }
        }

        int getReadCount() {
            return this.readCount.get();
        }
    }

    @Test(timeout = 60000)
    public void testMultiReaders() throws Exception {
        final RateLimiter create = RateLimiter.create(1000.0d);
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.setOutputBufferSize(0);
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-multireaders");
        final AsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(0L)));
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(1L)));
        final AtomicInteger atomicInteger = new AtomicInteger(2);
        BKDistributedLogManager createNewDLM2 = createNewDLM(conf, "distrlog-multireaders");
        BKSyncLogReader inputStream = createNewDLM2.getInputStream(0L);
        try {
            ReaderThread[] readerThreadArr = {new ReaderThread("reader0-non-blocking", inputStream, false)};
            final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            Thread thread = new Thread("WriteThread") { // from class: org.apache.distributedlog.TestNonBlockingReadsMultiReader.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        long j = 2;
                        DLSN dlsn = DLSN.InvalidDLSN;
                        while (atomicBoolean.get()) {
                            create.acquire();
                            long j2 = j;
                            j = j2 + 1;
                            dlsn = (DLSN) Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(j2)));
                            atomicInteger.incrementAndGet();
                            if (j2 % 1000 == 0) {
                                TestDistributedLogBase.LOG.info("writer write {}", Long.valueOf(j2));
                            }
                        }
                        TestDistributedLogBase.LOG.info("Completed writing record at {}", dlsn);
                        Utils.close(startAsyncLogSegmentNonPartitioned);
                    } catch (Exception e) {
                    } catch (DLInterruptedException e2) {
                        Thread.currentThread().interrupt();
                    }
                }
            };
            for (ReaderThread readerThread : readerThreadArr) {
                readerThread.start();
            }
            thread.start();
            TimeUnit.SECONDS.sleep(5L);
            LOG.info("Stopping writer");
            atomicBoolean.set(false);
            thread.join();
            LOG.info("Writer stopped after writing {} records, waiting for reader to complete", Integer.valueOf(atomicInteger.get()));
            while (atomicInteger.get() > readerThreadArr[0].getReadCount()) {
                LOG.info("Write Count = {}, Read Count = {}", new Object[]{Integer.valueOf(atomicInteger.get()), Integer.valueOf(readerThreadArr[0].getReadCount())});
                TimeUnit.MILLISECONDS.sleep(100L);
            }
            Assert.assertEquals(atomicInteger.get(), readerThreadArr[0].getReadCount());
            for (ReaderThread readerThread2 : readerThreadArr) {
                readerThread2.stopReading();
            }
        } finally {
            createNewDLM.close();
            inputStream.close();
            createNewDLM2.close();
        }
    }
}
