package org.apache.distributedlog.benchmark.stream;

import com.google.common.base.Stopwatch;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.api.namespace.Namespace;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.class */
public class SyncReaderBenchmark extends AbstractReaderBenchmark {
    private static final Logger logger = LoggerFactory.getLogger(SyncReaderBenchmark.class);

    @Override // org.apache.distributedlog.benchmark.stream.StreamBenchmark
    protected void benchmark(Namespace namespace, String str, StatsLogger statsLogger) {
        DistributedLogManager distributedLogManager = null;
        while (null == distributedLogManager) {
            try {
                distributedLogManager = namespace.openLog(str);
            } catch (IOException e) {
                logger.warn("Failed to create dlm for stream {} : ", str, e);
            }
            if (null == distributedLogManager) {
                try {
                    TimeUnit.MILLISECONDS.sleep(this.conf.getZKSessionTimeoutMilliseconds());
                } catch (InterruptedException e2) {
                    logger.warn("Interrupted from sleep while creating dlm for stream {} : ", str, e2);
                }
            }
        }
        OpStatsLogger opStatsLogger = statsLogger.getOpStatsLogger("open_reader");
        OpStatsLogger opStatsLogger2 = statsLogger.getOpStatsLogger("non_blocking_read");
        OpStatsLogger opStatsLogger3 = statsLogger.getOpStatsLogger("blocking_read");
        Counter counter = statsLogger.getCounter("null_read");
        logger.info("Created dlm for stream {}.", str);
        LogReader logReader = null;
        Long l = null;
        while (null == logReader) {
            if (null == l) {
                switch (this.readMode) {
                    case OLDEST:
                        l = 0L;
                        break;
                    case LATEST:
                        try {
                            l = Long.valueOf(distributedLogManager.getLastTxId());
                            break;
                        } catch (IOException e3) {
                            break;
                        }
                    case REWIND:
                        l = Long.valueOf(System.currentTimeMillis() - this.rewindMs);
                        break;
                    case POSITION:
                        l = Long.valueOf(this.fromTxId);
                        break;
                    default:
                        logger.warn("Unsupported mode {}", this.readMode);
                        printUsage();
                        System.exit(0);
                        break;
                }
                logger.info("Reading from transaction id {}", l);
            }
            Stopwatch createStarted = Stopwatch.createStarted();
            try {
                logReader = distributedLogManager.getInputStream(l.longValue());
                opStatsLogger.registerSuccessfulEvent(createStarted.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                logger.info("It took {} ms to position the reader to transaction id {}", l);
            } catch (IOException e4) {
                opStatsLogger.registerFailedEvent(createStarted.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                logger.warn("Failed to create reader for stream {} reading from {}.", str, l);
            }
            if (null == logReader) {
                try {
                    TimeUnit.MILLISECONDS.sleep(this.conf.getZKSessionTimeoutMilliseconds());
                } catch (InterruptedException e5) {
                    logger.warn("Interrupted from sleep after reader was reassigned null for stream {} : ", str, e5);
                }
            } else {
                boolean z = false;
                Stopwatch createUnstarted = Stopwatch.createUnstarted();
                long j = 0;
                long j2 = 0;
                Stopwatch createStarted2 = Stopwatch.createStarted();
                while (true) {
                    try {
                        createUnstarted.start();
                        LogRecordWithDLSN readNext = logReader.readNext(z);
                        if (null != readNext) {
                            long elapsed = createUnstarted.stop().elapsed(TimeUnit.MICROSECONDS);
                            if (z) {
                                opStatsLogger2.registerSuccessfulEvent(elapsed, TimeUnit.MICROSECONDS);
                            } else {
                                j2 += readNext.getPayload().length;
                                j++;
                                opStatsLogger3.registerSuccessfulEvent(elapsed, TimeUnit.MICROSECONDS);
                            }
                            l = Long.valueOf(readNext.getTransactionId());
                        } else {
                            counter.inc();
                        }
                        if (null == readNext && !z) {
                            z = true;
                            createStarted2.stop();
                            logger.info("Catchup {} records (total {} bytes) in {} milliseconds", new Object[]{Long.valueOf(j), Long.valueOf(j2), Long.valueOf(createUnstarted.elapsed(TimeUnit.MILLISECONDS))});
                        }
                        createUnstarted.reset();
                    } catch (IOException e6) {
                        logger.warn("Encountered reading record from stream {} : ", str, e6);
                        logReader = null;
                        try {
                            TimeUnit.MILLISECONDS.sleep(this.conf.getZKSessionTimeoutMilliseconds());
                        } catch (InterruptedException e7) {
                            logger.warn("Interrupted from sleep while creating reader for stream {} : ", str, e7);
                        }
                    }
                }
            }
        }
    }
}
