package org.yamcs.server.cli;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import java.util.Arrays;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.yamcs.archive.XtceTmRecorder;
import org.yamcs.utils.TimeEncoding;
import org.yamcs.yarch.DataType;
import org.yamcs.yarch.PartitioningSpec;
import org.yamcs.yarch.Stream;
import org.yamcs.yarch.StreamSubscriber;
import org.yamcs.yarch.TableDefinition;
import org.yamcs.yarch.TableWriter;
import org.yamcs.yarch.Tuple;
import org.yamcs.yarch.TupleDefinition;
import org.yamcs.yarch.YarchDatabase;
import org.yamcs.yarch.YarchDatabaseInstance;

@Parameters(commandDescription = "Benchmark rocksdb storage engine. The benchmark consists of a table load and a few selects.\nThe table is loaded with telemetry packets received at frequencies of [10/sec, 1/sec, 1/10sec, 1/60sec and 1/hour].\nThe table will be identical to the tm table and will contain a histogram on pname (=packet name).\nIt is possible to specify how many partitions (i.e. how many different pnames) to be loaded for each frequency\nand the time duration of the data.")
/* loaded from: input_file:org/yamcs/server/cli/RocksDbBenchmark.class */
class RocksDbBenchmark extends Command {

    @Parameter(names = {"--dbDir"}, description = "the directory where the database will be created.\nA \"rocksbench\" archive instance will be created in this directory", required = true)
    String dbDir;

    @Parameter(names = {"--count"}, description = "The partition counts for the 5 frequencies: [10/sec, 1/sec, 1/10sec, 1/60sec and 1/hour].\nIt has to be specified as a string (use quotes).\nBy default, it is \"5 5 5 5 5\"", required = false)
    String counts;

    @Parameter(names = {"--duration"}, description = "The duration in hours of the simulated data. By default it's 24 hours", required = false)
    int durationHours;

    @Parameter(names = {"--baseTime"}, description = "Start inserting data with this time. By default it's 2017-01-01T00:00:00", required = false)
    String baseTime;
    private long[] freq;
    private int[] count;
    private String tableName;
    private YarchDatabaseInstance ydb;

    public RocksDbBenchmark(RocksDbCli rocksDbCli) {
        super("bench", rocksDbCli);
        this.counts = "5 5 5 5 5";
        this.durationHours = 24;
        this.baseTime = "2017-01-01T00:00:00";
        this.freq = new long[]{1, 10, 100, 600, 36000};
        this.tableName = "tm";
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.yamcs.server.cli.Command
    public void validate() {
        String[] split = this.counts.split("\\s+");
        if (split.length != this.freq.length) {
            throw new ParameterException("Invalid count specified; please provide " + this.freq.length + " numbers (e.g. \"1 2 3 4 5\"");
        }
        this.count = new int[split.length];
        for (int i = 0; i < split.length; i++) {
            try {
                this.count[i] = Integer.valueOf(split[i]).intValue();
            } catch (NumberFormatException e) {
                throw new ParameterException("Cannot parse '" + split[i] + "' to integer.");
            }
        }
    }

    @Override // org.yamcs.server.cli.Command
    public void execute() throws Exception {
        YarchDatabase.setHome(this.dbDir);
        this.ydb = YarchDatabase.getInstance("rocksbench");
        TableDefinition table = this.ydb.getTable(this.tableName);
        if (table == null) {
            table = new TableDefinition(this.tableName, XtceTmRecorder.RECORDED_TM_TUPLE_DEFINITION, Arrays.asList("gentime", "seqNum"));
            table.setHistogramColumns(Arrays.asList("pname"));
            PartitioningSpec valueSpec = PartitioningSpec.valueSpec("pname");
            valueSpec.setValueColumnType(DataType.ENUM);
            table.setPartitioningSpec(valueSpec);
            table.setStorageEngineName("rocksdb2");
            this.ydb.createTable(table);
        } else {
            console.println("Table " + this.tableName + " already exists!. Old data will not be overwritten.");
        }
        populate(table, this.durationHours * 36000);
        console.println("*********************** reading data ********************");
        read(this.tableName, null, -1L);
        for (int i = 0; i < this.freq.length; i++) {
            if (this.count[i] != 0) {
                read(this.tableName, "/rocksbench/packet_" + i + "_0", this.freq[i]);
            }
        }
        read(this.tableName, null, -1L);
    }

    void populate(TableDefinition tableDefinition, long j) throws Exception {
        TableWriter newTableWriter = this.ydb.getStorageEngine(tableDefinition).newTableWriter(this.ydb, tableDefinition, TableWriter.InsertMode.INSERT);
        long parse = TimeEncoding.parse("2017-01-01T00:00:00");
        console.println("writing " + this.durationHours + " hours of data starting with " + TimeEncoding.toString(parse));
        ThreadLocalRandom current = ThreadLocalRandom.current();
        byte[] bArr = new byte[256];
        int i = 0;
        TupleDefinition tupleDefinition = tableDefinition.getTupleDefinition();
        long currentTimeMillis = System.currentTimeMillis();
        long j2 = parse;
        long j3 = 0;
        while (true) {
            long j4 = j3;
            if (j4 >= j) {
                console.println("write finished; last packet time: " + TimeEncoding.toString(j2) + "; total numPackets: " + i);
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                console.println("time to populate " + (currentTimeMillis2 / 1000.0d) + " seconds; speed: " + ((i * 1000) / currentTimeMillis2) + " packets/sec");
                return;
            }
            for (int i2 = 0; i2 < this.freq.length; i2++) {
                if (j4 % this.freq[i2] == 0) {
                    for (int i3 = 0; i3 < this.count[i2]; i3++) {
                        current.nextBytes(bArr);
                        i++;
                        j2 = parse + (j4 * 100) + i2;
                        newTableWriter.onTuple((Stream) null, new Tuple(tupleDefinition, new Object[]{Long.valueOf(j2), Integer.valueOf((int) j4), Long.valueOf(TimeEncoding.getWallclockTime()), bArr, "/rocksbench/packet_" + i2 + "_" + i3}));
                        if (i % 1000000 == 0) {
                            console.println(String.format("%3dM packets written; %d%% completed", Integer.valueOf(i / 1000000), Long.valueOf((j4 * 100) / j)));
                        }
                    }
                }
            }
            j3 = j4 + 1;
        }
    }

    void read(String str, final String str2, long j) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        String str3 = "create stream s as select * from " + str;
        if (str2 != null) {
            str3 = str3 + " where pname='" + str2 + "'";
        }
        this.ydb.execute(str3, new Object[0]);
        Stream stream = this.ydb.getStream("s");
        final Semaphore semaphore = new Semaphore(0);
        final AtomicInteger atomicInteger = new AtomicInteger();
        stream.addSubscriber(new StreamSubscriber() { // from class: org.yamcs.server.cli.RocksDbBenchmark.1
            int c = 0;

            public void onTuple(Stream stream2, Tuple tuple) {
                if (str2 != null && !str2.equals(tuple.getColumn("pname"))) {
                    throw new RuntimeException("invalid tuple received");
                }
                this.c++;
            }

            public void streamClosed(Stream stream2) {
                atomicInteger.set(this.c);
                semaphore.release();
            }
        });
        stream.start();
        semaphore.acquire();
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        long j2 = (1000 * atomicInteger.get()) / currentTimeMillis2;
        if (str2 == null) {
            console.println(String.format("time to read all %d packets: %.3f seconds, speed: %d packets/second", Integer.valueOf(atomicInteger.get()), Double.valueOf(currentTimeMillis2 / 1000.0d), Long.valueOf(j2)));
        } else {
            console.println(String.format("time to read %8d %s (pkt rate: %.2f sec) packets: %.3f seconds, speed: %d packets/second", Integer.valueOf(atomicInteger.get()), str2, Double.valueOf(j / 10.0d), Double.valueOf(currentTimeMillis2 / 1000.0d), Long.valueOf(j2)));
        }
    }
}
