package org.apache.distributedlog.benchmark.stream;

import com.google.common.base.Charsets;
import com.google.common.base.Stopwatch;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.distributedlog.BookKeeperClientBuilder;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.ZooKeeperClientBuilder;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/benchmark/stream/LedgerReadBenchmark.class */
public class LedgerReadBenchmark extends AbstractReaderBenchmark {
    private static final Logger logger = LoggerFactory.getLogger(AsyncReaderBenchmark.class);

    @Override // org.apache.distributedlog.benchmark.stream.StreamBenchmark
    protected void benchmark(Namespace namespace, String str, StatsLogger statsLogger) {
        DistributedLogManager distributedLogManager = null;
        while (null == distributedLogManager) {
            try {
                distributedLogManager = namespace.openLog(this.streamName);
            } catch (IOException e) {
                logger.warn("Failed to create dlm for stream {} : ", this.streamName, e);
            }
            if (null == distributedLogManager) {
                try {
                    TimeUnit.MILLISECONDS.sleep(this.conf.getZKSessionTimeoutMilliseconds());
                } catch (InterruptedException e2) {
                    logger.warn("Interrupted from sleep while creating dlm for stream {} : ", this.streamName, e2);
                }
            }
        }
        logger.info("Created dlm for stream {}.", this.streamName);
        List<LogSegmentMetadata> list = null;
        while (null == list) {
            try {
                list = distributedLogManager.getLogSegments();
            } catch (IOException e3) {
                logger.warn("Failed to get log segments for stream {} : ", this.streamName, e3);
            }
            if (null == list) {
                try {
                    TimeUnit.MILLISECONDS.sleep(this.conf.getZKSessionTimeoutMilliseconds());
                } catch (InterruptedException e4) {
                    logger.warn("Interrupted from sleep while geting log segments for stream {} : ", this.streamName, e4);
                }
            }
        }
        final Counter counter = statsLogger.getCounter("reads");
        logger.info("Reading from log segments : {}", list);
        try {
            BKDLConfig resolveDLConfig = BKDLConfig.resolveDLConfig(ZooKeeperClientBuilder.newBuilder().uri(this.uri).name("benchmark-zkc").sessionTimeoutMs(this.conf.getZKSessionTimeoutMilliseconds()).zkAclId((String) null).build(), this.uri);
            try {
                BookKeeper bookKeeper = BookKeeperClientBuilder.newBuilder().name("benchmark-bkc").dlConfig(this.conf).zkServers(resolveDLConfig.getBkZkServersForReader()).ledgersPath(resolveDLConfig.getBkLedgersPath()).build().get();
                int i = this.conf.getInt("ledger_read_concurrency", 1000);
                boolean z = this.conf.getBoolean("ledger_stream_read", true);
                try {
                    for (LogSegmentMetadata logSegmentMetadata : list) {
                        Stopwatch createStarted = Stopwatch.createStarted();
                        LedgerHandle openLedgerNoRecovery = bookKeeper.openLedgerNoRecovery(logSegmentMetadata.getLogSegmentId(), BookKeeper.DigestType.CRC32, this.conf.getBKDigestPW().getBytes(Charsets.UTF_8));
                        logger.info("It took {} ms to open log segment {}", new Object[]{Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)), Long.valueOf(openLedgerNoRecovery.getLastAddConfirmed() + 1), logSegmentMetadata});
                        createStarted.reset().start();
                        (z ? new LedgerStreamReader(openLedgerNoRecovery, new BookkeeperInternalCallbacks.ReadEntryListener() { // from class: org.apache.distributedlog.benchmark.stream.LedgerReadBenchmark.1
                            public void onEntryComplete(int i2, LedgerHandle ledgerHandle, LedgerEntry ledgerEntry, Object obj) {
                                counter.inc();
                            }
                        }, i) : new LedgerStreamReader(openLedgerNoRecovery, new BookkeeperInternalCallbacks.ReadEntryListener() { // from class: org.apache.distributedlog.benchmark.stream.LedgerReadBenchmark.2
                            public void onEntryComplete(int i2, LedgerHandle ledgerHandle, LedgerEntry ledgerEntry, Object obj) {
                                counter.inc();
                            }
                        }, i)).run();
                        logger.info("It took {} ms to complete reading {} entries from log segment {}", new Object[]{Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)), Long.valueOf(openLedgerNoRecovery.getLastAddConfirmed() + 1), logSegmentMetadata});
                    }
                } catch (Exception e5) {
                    logger.error("Error on reading bk ", e5);
                }
            } catch (IOException e6) {
            }
        } catch (IOException e7) {
        }
    }
}
