package org.apache.distributedlog.benchmark.stream;

import com.google.common.base.Stopwatch;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.api.AsyncLogReader;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.class */
public class AsyncReaderBenchmark extends AbstractReaderBenchmark {
    private static final Logger logger = LoggerFactory.getLogger(AsyncReaderBenchmark.class);

    @Override // org.apache.distributedlog.benchmark.stream.StreamBenchmark
    protected void benchmark(Namespace namespace, String str, StatsLogger statsLogger) {
        DistributedLogManager distributedLogManager = null;
        while (null == distributedLogManager) {
            try {
                distributedLogManager = namespace.openLog(this.streamName);
            } catch (IOException e) {
                logger.warn("Failed to create dlm for stream {} : ", this.streamName, e);
            }
            if (null == distributedLogManager) {
                try {
                    TimeUnit.MILLISECONDS.sleep(this.conf.getZKSessionTimeoutMilliseconds());
                } catch (InterruptedException e2) {
                    logger.warn("Interrupted from sleep while creating dlm for stream {} : ", this.streamName, e2);
                }
            }
        }
        logger.info("Created dlm for stream {}.", this.streamName);
        OpStatsLogger opStatsLogger = statsLogger.getOpStatsLogger("open_reader");
        OpStatsLogger opStatsLogger2 = statsLogger.getOpStatsLogger("blocking_read");
        Counter counter = statsLogger.getCounter("reads");
        AsyncLogReader asyncLogReader = null;
        DLSN dlsn = null;
        Long l = null;
        while (null == asyncLogReader) {
            if (null == l) {
                switch (this.readMode) {
                    case OLDEST:
                        l = 0L;
                        dlsn = DLSN.InitialDLSN;
                        break;
                    case LATEST:
                        l = Long.MAX_VALUE;
                        try {
                            dlsn = distributedLogManager.getLastDLSN();
                            break;
                        } catch (IOException e3) {
                            break;
                        }
                    case REWIND:
                        l = Long.valueOf(System.currentTimeMillis() - this.rewindMs);
                        dlsn = null;
                        break;
                    case POSITION:
                        l = Long.valueOf(this.fromTxId);
                        dlsn = null;
                        break;
                    default:
                        logger.warn("Unsupported mode {}", this.readMode);
                        printUsage();
                        System.exit(0);
                        break;
                }
                logger.info("Reading from transaction id = {}, dlsn = {}", l, dlsn);
            }
            Stopwatch createStarted = Stopwatch.createStarted();
            if (null == dlsn) {
                try {
                    asyncLogReader = (AsyncLogReader) FutureUtils.result(distributedLogManager.openAsyncLogReader(l.longValue()));
                } catch (Exception e4) {
                    opStatsLogger.registerFailedEvent(createStarted.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                    logger.warn("Failed to create reader for stream {} reading from tx id = {}, dlsn = {}.", new Object[]{this.streamName, l, dlsn});
                }
            } else {
                asyncLogReader = (AsyncLogReader) FutureUtils.result(distributedLogManager.openAsyncLogReader(dlsn));
            }
            opStatsLogger.registerSuccessfulEvent(createStarted.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
            logger.info("It took {} ms to position the reader to transaction id = {}, dlsn = {}", l, dlsn);
            if (null == asyncLogReader) {
                try {
                    TimeUnit.MILLISECONDS.sleep(this.conf.getZKSessionTimeoutMilliseconds());
                } catch (InterruptedException e5) {
                    logger.warn("Interrupted from sleep after reader was reassigned null for stream {} : ", this.streamName, e5);
                }
            } else {
                Stopwatch createUnstarted = Stopwatch.createUnstarted();
                while (true) {
                    try {
                        createUnstarted.start();
                        List list = (List) FutureUtils.result(asyncLogReader.readBulk(this.batchSize));
                        opStatsLogger2.registerSuccessfulEvent(createUnstarted.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                        if (!list.isEmpty()) {
                            counter.add(list.size());
                            LogRecordWithDLSN logRecordWithDLSN = (LogRecordWithDLSN) list.get(list.size() - 1);
                            l = Long.valueOf(logRecordWithDLSN.getTransactionId());
                            dlsn = logRecordWithDLSN.getDlsn();
                        }
                        createUnstarted.reset();
                    } catch (Exception e6) {
                        logger.warn("Encountered reading record from stream {} : ", this.streamName, e6);
                        asyncLogReader = null;
                        try {
                            TimeUnit.MILLISECONDS.sleep(this.conf.getZKSessionTimeoutMilliseconds());
                        } catch (InterruptedException e7) {
                            logger.warn("Interrupted from sleep while creating reader for stream {} : ", this.streamName, e7);
                        }
                    }
                }
            }
        }
    }
}
