package org.apache.distributedlog.benchmark;

import com.google.common.base.Preconditions;
import com.twitter.finagle.stats.OstrichStatsReceiver;
import com.twitter.finagle.stats.StatsReceiver;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.lang.StringUtils;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.benchmark.utils.ShiftableRateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/benchmark/Benchmarker.class */
public class Benchmarker {
    private static final Logger logger = LoggerFactory.getLogger(Benchmarker.class);
    static final String USAGE = "Benchmarker [-u <uri>] [-c <conf>] [-s serverset] [-m (read|write|dlwrite)]";
    final String[] args;
    String routingServiceFinagleNameString;
    final Options options = new Options();
    int rate = 100;
    int maxRate = 1000;
    int changeRate = 100;
    int changeRateSeconds = 1800;
    int concurrency = 10;
    String streamPrefix = "dlog-loadtest";
    int shardId = -1;
    int numStreams = 10;
    List<String> serversetPaths = new ArrayList();
    List<String> finagleNames = new ArrayList();
    int msgSize = 256;
    String mode = null;
    int durationMins = 60;
    URI dlUri = null;
    int batchSize = 0;
    int readersPerStream = 1;
    Integer maxStreamId = null;
    int truncationInterval = 3600;
    Integer startStreamId = null;
    Integer endStreamId = null;
    int hostConnectionCoreSize = 10;
    int hostConnectionLimit = 10;
    boolean thriftmux = false;
    boolean handshakeWithClientInfo = false;
    boolean readFromHead = false;
    int sendBufferSize = 1048576;
    int recvBufferSize = 1048576;
    boolean enableBatching = false;
    int batchBufferSize = 262144;
    int batchFlushIntervalMicros = 2000;
    final DistributedLogConfiguration conf = new DistributedLogConfiguration();
    final StatsReceiver statsReceiver = new OstrichStatsReceiver();
    StatsProvider statsProvider = null;

    Benchmarker(String[] strArr) {
        this.args = strArr;
        this.options.addOption("s", "serverset", true, "Proxy Server Set (separated by ',')");
        this.options.addOption("fn", "finagle-name", true, "Write proxy finagle name (separated by ',')");
        this.options.addOption("c", "conf", true, "DistributedLog Configuration File");
        this.options.addOption("u", "uri", true, "DistributedLog URI");
        this.options.addOption("i", "shard", true, "Shard Id");
        this.options.addOption("p", "provider", true, "DistributedLog Stats Provider");
        this.options.addOption("d", "duration", true, "Duration (minutes)");
        this.options.addOption("sp", "streamprefix", true, "Stream Prefix");
        this.options.addOption("sc", "streamcount", true, "Number of Streams");
        this.options.addOption("ms", "messagesize", true, "Message Size (bytes)");
        this.options.addOption("bs", "batchsize", true, "Batch Size");
        this.options.addOption("r", "rate", true, "Rate limit (requests/second)");
        this.options.addOption("mr", "max-rate", true, "Maximum Rate limit (requests/second)");
        this.options.addOption("cr", "change-rate", true, "Rate to increase each change period (requests/second)");
        this.options.addOption("ci", "change-interval", true, "Rate to increase period, seconds");
        this.options.addOption("t", "concurrency", true, "Concurrency (number of threads)");
        this.options.addOption("m", "mode", true, "Benchmark mode (read/write)");
        this.options.addOption("rps", "readers-per-stream", true, "Number readers per stream");
        this.options.addOption("msid", "max-stream-id", true, "Max Stream ID");
        this.options.addOption("ti", "truncation-interval", true, "Truncation interval in seconds");
        this.options.addOption("ssid", "start-stream-id", true, "Start Stream ID");
        this.options.addOption("esid", "end-stream-id", true, "Start Stream ID");
        this.options.addOption("hccs", "host-connection-core-size", true, "Finagle hostConnectionCoreSize");
        this.options.addOption("hcl", "host-connection-limit", true, "Finagle hostConnectionLimit");
        this.options.addOption("mx", "thriftmux", false, "Enable thriftmux (write mode only)");
        this.options.addOption("hsci", "handshake-with-client-info", false, "Enable handshaking with client info");
        this.options.addOption("rfh", "read-from-head", false, "Read from head of the stream");
        this.options.addOption("sb", "send-buffer", true, "Channel send buffer size, in bytes");
        this.options.addOption("rb", "recv-buffer", true, "Channel recv buffer size, in bytes");
        this.options.addOption("bt", "enable-batch", false, "Enable batching on writers");
        this.options.addOption("bbs", "batch-buffer-size", true, "The batch buffer size in bytes");
        this.options.addOption("bfi", "batch-flush-interval", true, "The batch buffer flush interval in micros");
        this.options.addOption("rs", "routing-service", true, "The routing service finagle name for server-side routing");
        this.options.addOption("h", "help", false, "Print usage.");
    }

    void printUsage() {
        new HelpFormatter().printHelp(USAGE, this.options);
    }

    void run() throws Exception {
        logger.info("Running benchmark.");
        CommandLine parse = new BasicParser().parse(this.options, this.args);
        if (parse.hasOption("h")) {
            printUsage();
            System.exit(0);
        }
        if (parse.hasOption("s")) {
            this.serversetPaths = Arrays.asList(StringUtils.split(parse.getOptionValue("s"), ','));
        }
        if (parse.hasOption("fn")) {
            this.finagleNames = Arrays.asList(StringUtils.split(parse.getOptionValue("fn"), ','));
        }
        if (parse.hasOption("i")) {
            this.shardId = Integer.parseInt(parse.getOptionValue("i"));
        }
        if (parse.hasOption("d")) {
            this.durationMins = Integer.parseInt(parse.getOptionValue("d"));
        }
        if (parse.hasOption("sp")) {
            this.streamPrefix = parse.getOptionValue("sp");
        }
        if (parse.hasOption("sc")) {
            this.numStreams = Integer.parseInt(parse.getOptionValue("sc"));
        }
        if (parse.hasOption("ms")) {
            this.msgSize = Integer.parseInt(parse.getOptionValue("ms"));
        }
        if (parse.hasOption("r")) {
            this.rate = Integer.parseInt(parse.getOptionValue("r"));
        }
        if (parse.hasOption("mr")) {
            this.maxRate = Integer.parseInt(parse.getOptionValue("mr"));
        }
        if (parse.hasOption("cr")) {
            this.changeRate = Integer.parseInt(parse.getOptionValue("cr"));
        }
        if (parse.hasOption("ci")) {
            this.changeRateSeconds = Integer.parseInt(parse.getOptionValue("ci"));
        }
        if (parse.hasOption("t")) {
            this.concurrency = Integer.parseInt(parse.getOptionValue("t"));
        }
        if (parse.hasOption("m")) {
            this.mode = parse.getOptionValue("m");
        }
        if (parse.hasOption("u")) {
            this.dlUri = URI.create(parse.getOptionValue("u"));
        }
        if (parse.hasOption("bs")) {
            this.batchSize = Integer.parseInt(parse.getOptionValue("bs"));
            Preconditions.checkArgument("write" != this.mode, "batchSize supported only for mode=write");
        }
        if (parse.hasOption("c")) {
            this.conf.loadConf(new File(parse.getOptionValue("c")).toURI().toURL());
        }
        if (parse.hasOption("rps")) {
            this.readersPerStream = Integer.parseInt(parse.getOptionValue("rps"));
        }
        if (parse.hasOption("msid")) {
            this.maxStreamId = Integer.valueOf(Integer.parseInt(parse.getOptionValue("msid")));
        }
        if (parse.hasOption("ti")) {
            this.truncationInterval = Integer.parseInt(parse.getOptionValue("ti"));
        }
        if (parse.hasOption("ssid")) {
            this.startStreamId = Integer.valueOf(Integer.parseInt(parse.getOptionValue("ssid")));
        }
        if (parse.hasOption("esid")) {
            this.endStreamId = Integer.valueOf(Integer.parseInt(parse.getOptionValue("esid")));
        }
        if (parse.hasOption("hccs")) {
            this.hostConnectionCoreSize = Integer.parseInt(parse.getOptionValue("hccs"));
        }
        if (parse.hasOption("hcl")) {
            this.hostConnectionLimit = Integer.parseInt(parse.getOptionValue("hcl"));
        }
        if (parse.hasOption("sb")) {
            this.sendBufferSize = Integer.parseInt(parse.getOptionValue("sb"));
        }
        if (parse.hasOption("rb")) {
            this.recvBufferSize = Integer.parseInt(parse.getOptionValue("rb"));
        }
        if (parse.hasOption("rs")) {
            this.routingServiceFinagleNameString = parse.getOptionValue("rs");
        }
        this.thriftmux = parse.hasOption("mx");
        this.handshakeWithClientInfo = parse.hasOption("hsci");
        this.readFromHead = parse.hasOption("rfh");
        this.enableBatching = parse.hasOption("bt");
        if (parse.hasOption("bbs")) {
            this.batchBufferSize = Integer.parseInt(parse.getOptionValue("bbs"));
        }
        if (parse.hasOption("bfi")) {
            this.batchFlushIntervalMicros = Integer.parseInt(parse.getOptionValue("bfi"));
        }
        Preconditions.checkArgument(this.shardId >= 0, "shardId must be >= 0");
        Preconditions.checkArgument(this.numStreams > 0, "numStreams must be > 0");
        Preconditions.checkArgument(this.durationMins > 0, "durationMins must be > 0");
        Preconditions.checkArgument(this.streamPrefix != null, "streamPrefix must be defined");
        Preconditions.checkArgument(this.hostConnectionCoreSize > 0, "host connection core size must be > 0");
        Preconditions.checkArgument(this.hostConnectionLimit > 0, "host connection limit must be > 0");
        if (parse.hasOption("p")) {
            this.statsProvider = (StatsProvider) ReflectionUtils.newInstance(parse.getOptionValue("p"), StatsProvider.class);
        } else {
            this.statsProvider = new NullStatsProvider();
        }
        logger.info("Starting stats provider : {}.", this.statsProvider.getClass());
        this.statsProvider.start(this.conf);
        Worker worker = null;
        if (this.mode.startsWith("read")) {
            worker = runReader();
        } else if (this.mode.startsWith("write")) {
            worker = runWriter();
        } else if (this.mode.startsWith("dlwrite")) {
            worker = runDLWriter();
        } else if (this.mode.startsWith("dlread")) {
            worker = runDLReader();
        }
        if (worker == null) {
            throw new IOException("Unknown mode " + this.mode + " to run the benchmark.");
        }
        new Thread(worker, this.mode + "-benchmark-thread").start();
        TimeUnit.MINUTES.sleep(this.durationMins);
        logger.info("{} minutes passed, exiting...", Integer.valueOf(this.durationMins));
        worker.close();
        if (null != this.statsProvider) {
            this.statsProvider.stop();
        }
        Runtime.getRuntime().exit(0);
    }

    Worker runWriter() {
        Preconditions.checkArgument((this.finagleNames.isEmpty() && this.serversetPaths.isEmpty() && null == this.dlUri) ? false : true, "either serverset paths, finagle-names or uri required");
        Preconditions.checkArgument(this.msgSize > 0, "messagesize must be greater than 0");
        Preconditions.checkArgument(this.rate > 0, "rate must be greater than 0");
        Preconditions.checkArgument(this.maxRate >= this.rate, "max rate must be greater than rate");
        Preconditions.checkArgument(this.changeRate >= 0, "change rate must be positive");
        Preconditions.checkArgument(this.changeRateSeconds >= 0, "change rate must be positive");
        Preconditions.checkArgument(this.concurrency > 0, "concurrency must be greater than 0");
        return createWriteWorker(this.streamPrefix, this.dlUri, null == this.startStreamId ? this.shardId * this.numStreams : this.startStreamId.intValue(), null == this.endStreamId ? (this.shardId + 1) * this.numStreams : this.endStreamId.intValue(), new ShiftableRateLimiter(this.rate, this.maxRate, this.changeRate, this.changeRateSeconds, TimeUnit.SECONDS), this.concurrency, this.msgSize, this.batchSize, this.hostConnectionCoreSize, this.hostConnectionLimit, this.serversetPaths, this.finagleNames, this.statsReceiver.scope("write_client"), this.statsProvider.getStatsLogger("write"), this.thriftmux, this.handshakeWithClientInfo, this.sendBufferSize, this.recvBufferSize, this.enableBatching, this.batchBufferSize, this.batchFlushIntervalMicros, this.routingServiceFinagleNameString);
    }

    protected WriterWorker createWriteWorker(String str, URI uri, int i, int i2, ShiftableRateLimiter shiftableRateLimiter, int i3, int i4, int i5, int i6, int i7, List<String> list, List<String> list2, StatsReceiver statsReceiver, StatsLogger statsLogger, boolean z, boolean z2, int i8, int i9, boolean z3, int i10, int i11, String str2) {
        return new WriterWorker(str, uri, i, i2, shiftableRateLimiter, i3, i4, i5, i6, i7, list, list2, statsReceiver, statsLogger, z, z2, i8, i9, z3, i10, i11, str2);
    }

    Worker runDLWriter() throws IOException {
        Preconditions.checkNotNull(this.dlUri, "dlUri must be defined");
        Preconditions.checkArgument(this.rate > 0, "rate must be greater than 0");
        Preconditions.checkArgument(this.maxRate >= this.rate, "max rate must be greater than rate");
        Preconditions.checkArgument(this.changeRate >= 0, "change rate must be positive");
        Preconditions.checkArgument(this.changeRateSeconds >= 0, "change rate must be positive");
        Preconditions.checkArgument(this.concurrency > 0, "concurrency must be greater than 0");
        return new DLWriterWorker(this.conf, this.dlUri, this.streamPrefix, this.shardId * this.numStreams, (this.shardId + 1) * this.numStreams, new ShiftableRateLimiter(this.rate, this.maxRate, this.changeRate, this.changeRateSeconds, TimeUnit.SECONDS), this.concurrency, this.msgSize, this.statsProvider.getStatsLogger("dlwrite"));
    }

    Worker runReader() throws IOException {
        Preconditions.checkArgument((this.finagleNames.isEmpty() && this.serversetPaths.isEmpty() && null == this.dlUri) ? false : true, "either serverset paths, finagle-names or dlUri required");
        Preconditions.checkArgument(this.concurrency > 0, "concurrency must be greater than 0");
        Preconditions.checkArgument(this.truncationInterval > 0, "truncation interval should be greater than 0");
        return runReaderInternal(this.serversetPaths, this.finagleNames, this.truncationInterval);
    }

    Worker runDLReader() throws IOException {
        return runReaderInternal(new ArrayList(), new ArrayList(), 0);
    }

    private Worker runReaderInternal(List<String> list, List<String> list2, int i) throws IOException {
        Preconditions.checkNotNull(this.dlUri);
        int intValue = null == this.startStreamId ? this.shardId * this.numStreams : this.startStreamId.intValue();
        int intValue2 = null == this.endStreamId ? (this.shardId + this.readersPerStream) * this.numStreams : this.endStreamId.intValue();
        if (null != this.maxStreamId) {
            intValue2 = Math.min(intValue2, this.maxStreamId.intValue());
        }
        return createReaderWorker(this.conf, this.dlUri, this.streamPrefix, intValue, intValue2, this.concurrency, list, list2, i, this.readFromHead, this.statsReceiver, this.statsProvider.getStatsLogger("dlreader"));
    }

    protected ReaderWorker createReaderWorker(DistributedLogConfiguration distributedLogConfiguration, URI uri, String str, int i, int i2, int i3, List<String> list, List<String> list2, int i4, boolean z, StatsReceiver statsReceiver, StatsLogger statsLogger) throws IOException {
        return new ReaderWorker(distributedLogConfiguration, uri, str, i, i2, i3, list, list2, i4, z, statsReceiver, statsLogger);
    }

    public static void main(String[] strArr) {
        try {
            new Benchmarker(strArr).run();
        } catch (Exception e) {
            logger.info("Benchmark quit due to : ", e);
        }
    }
}
