package org.apache.distributedlog;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.distributedlog.api.AsyncLogReader;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.common.concurrent.FutureEventListener;
import org.apache.distributedlog.util.Utils;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/TestReader.class */
public class TestReader implements FutureEventListener<LogRecordWithDLSN> {
    static final Logger LOG = LoggerFactory.getLogger(TestReader.class);
    final String readerName;
    final DistributedLogManager dlm;
    AsyncLogReader reader;
    final DLSN startDLSN;
    DLSN nextDLSN;
    final boolean simulateErrors;
    int delayMs;
    final CountDownLatch readyLatch;
    final CountDownLatch completionLatch;
    final CountDownLatch countLatch;
    final AtomicBoolean errorsFound = new AtomicBoolean(false);
    final AtomicInteger readCount = new AtomicInteger(0);
    final AtomicInteger positionReaderCount = new AtomicInteger(0);
    final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

    public TestReader(String str, DistributedLogManager distributedLogManager, DLSN dlsn, boolean z, int i, CountDownLatch countDownLatch, CountDownLatch countDownLatch2, CountDownLatch countDownLatch3) {
        this.readerName = str;
        this.dlm = distributedLogManager;
        this.startDLSN = dlsn;
        this.simulateErrors = z;
        this.delayMs = i;
        this.readyLatch = countDownLatch;
        this.countLatch = countDownLatch2;
        this.completionLatch = countDownLatch3;
    }

    public AtomicInteger getNumReaderPositions() {
        return this.positionReaderCount;
    }

    public AtomicInteger getNumReads() {
        return this.readCount;
    }

    public boolean areErrorsFound() {
        return this.errorsFound.get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int nextDelayMs() {
        int min = Math.min(this.delayMs * 2, 500);
        if (0 == this.delayMs) {
            min = 10;
        }
        this.delayMs = min;
        return this.delayMs;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void positionReader(final DLSN dlsn) {
        this.positionReaderCount.incrementAndGet();
        this.executorService.schedule(new Runnable() { // from class: org.apache.distributedlog.TestReader.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    BKAsyncLogReader asyncLogReader = TestReader.this.dlm.getAsyncLogReader(dlsn);
                    if (TestReader.this.simulateErrors) {
                        asyncLogReader.simulateErrors();
                    }
                    TestReader.this.nextDLSN = dlsn;
                    TestReader.LOG.info("Positioned reader {} at {}", TestReader.this.readerName, dlsn);
                    if (null != TestReader.this.reader) {
                        Utils.close(TestReader.this.reader);
                    }
                    TestReader.this.reader = asyncLogReader;
                    TestReader.this.readNext();
                    TestReader.this.readyLatch.countDown();
                } catch (IOException e) {
                    TestReader.LOG.info("Encountered exception {} on opening reader {} at {}, retrying in {} ms", new Object[]{e, TestReader.this.readerName, dlsn, Integer.valueOf(TestReader.this.nextDelayMs())});
                    TestReader.this.positionReader(dlsn);
                }
            }
        }, this.delayMs, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void readNext() {
        this.reader.readNext().whenComplete((BiConsumer) this);
    }

    public void onSuccess(LogRecordWithDLSN logRecordWithDLSN) {
        try {
            Assert.assertTrue(logRecordWithDLSN.getDlsn().compareTo(this.nextDLSN) >= 0);
            LOG.info("Received record {} from log {} for reader {}", new Object[]{logRecordWithDLSN.getDlsn(), this.dlm.getStreamName(), this.readerName});
            Assert.assertFalse(logRecordWithDLSN.isControl());
            Assert.assertEquals(0L, logRecordWithDLSN.getDlsn().getSlotId());
            DLMTestUtil.verifyLargeLogRecord((LogRecord) logRecordWithDLSN);
            this.readCount.incrementAndGet();
            this.countLatch.countDown();
            if (this.countLatch.getCount() <= 0) {
                LOG.info("Reader {} is completed", this.readerName);
                closeReader();
                this.completionLatch.countDown();
            } else {
                LOG.info("Reader {} : read count becomes {}, latch = {}", new Object[]{this.readerName, Integer.valueOf(this.readCount.get()), Long.valueOf(this.countLatch.getCount())});
                this.nextDLSN = logRecordWithDLSN.getDlsn().getNextDLSN();
                readNext();
            }
        } catch (Exception e) {
            LOG.error("Exception encountered when verifying received log record {} for reader {} :", new Object[]{logRecordWithDLSN.getDlsn(), e, this.readerName});
            this.errorsFound.set(true);
            this.completionLatch.countDown();
        }
    }

    public void onFailure(Throwable th) {
        LOG.error("{} encountered exception on reading next record : ", this.readerName, th);
        closeReader();
        nextDelayMs();
        positionReader(this.nextDLSN);
    }

    private void closeReader() {
        if (null != this.reader) {
            this.reader.asyncClose().whenComplete((r6, th) -> {
                LOG.warn("Exception on closing reader {} : ", this.readerName, th);
            });
        }
    }

    public void start() {
        positionReader(this.startDLSN);
    }

    public void stop() {
        closeReader();
        this.executorService.shutdown();
    }
}
