/*
 * Decompiled with CFR 0.152.
 */
package org.yamcs.cli;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import java.util.Arrays;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.yamcs.archive.XtceTmRecorder;
import org.yamcs.cli.Command;
import org.yamcs.cli.RocksDbCli;
import org.yamcs.utils.TimeEncoding;
import org.yamcs.yarch.AbstractStream;
import org.yamcs.yarch.DataType;
import org.yamcs.yarch.PartitioningSpec;
import org.yamcs.yarch.Stream;
import org.yamcs.yarch.StreamSubscriber;
import org.yamcs.yarch.TableDefinition;
import org.yamcs.yarch.TableWriter;
import org.yamcs.yarch.Tuple;
import org.yamcs.yarch.TupleDefinition;
import org.yamcs.yarch.YarchDatabase;
import org.yamcs.yarch.YarchDatabaseInstance;
import org.yamcs.yarch.rocksdb.RdbStorageEngine;

@Parameters(commandDescription="Benchmark rocksdb storage engine. The benchmark consists of a table load and a few selects.\nThe table is loaded with telemetry packets received at frequencies of [10/sec, 1/sec, 1/10sec, 1/60sec and 1/hour].\nThe table will be identical to the tm table and will contain a histogram on pname (=packet name).\nIt is possible to specify how many partitions (i.e. how many different pnames) to be loaded for each frequency\nand the time duration of the data.")
class RocksDbBenchmark
extends Command {
    @Parameter(names={"--dbDir"}, description="the directory where the database will be created.\nA \"rocksbench\" archive instance will be created in this directory", required=true)
    String dbDir;
    @Parameter(names={"--count"}, description="The partition counts for the 5 frequencies: [10/sec, 1/sec, 1/10sec, 1/60sec and 1/hour].\nIt has to be specified as a string (use quotes).\nBy default, it is \"5 5 5 5 5\"", required=false)
    String counts = "5 5 5 5 5";
    @Parameter(names={"--duration"}, description="The duration in hours of the simulated data. By default it's 24 hours", required=false)
    int durationHours = 24;
    @Parameter(names={"--baseTime"}, description="Start inserting data with this time. By default it's 2017-01-01T00:00:00", required=false)
    String baseTime = "2017-01-01T00:00:00";
    private long[] freq = new long[]{1L, 10L, 100L, 600L, 36000L};
    private int[] count;
    private String tableName = "tm";
    private YarchDatabaseInstance ydb;

    public RocksDbBenchmark(RocksDbCli rocksDbCli) {
        super("bench", rocksDbCli);
    }

    @Override
    void validate() {
        String[] a = this.counts.split("\\s+");
        if (a.length != this.freq.length) {
            throw new ParameterException("Invalid count specified; please provide " + this.freq.length + " numbers (e.g. \"1 2 3 4 5\"");
        }
        this.count = new int[a.length];
        for (int i = 0; i < a.length; ++i) {
            try {
                this.count[i] = Integer.valueOf(a[i]);
                continue;
            }
            catch (NumberFormatException e) {
                throw new ParameterException("Cannot parse '" + a[i] + "' to integer.");
            }
        }
    }

    @Override
    public void execute() throws Exception {
        YarchDatabase.setHome((String)this.dbDir);
        this.ydb = YarchDatabase.getInstance((String)"rocksbench");
        TableDefinition tblDef = this.ydb.getTable(this.tableName);
        if (tblDef == null) {
            TupleDefinition tdef = XtceTmRecorder.RECORDED_TM_TUPLE_DEFINITION;
            tblDef = new TableDefinition(this.tableName, tdef, Arrays.asList("gentime", "seqNum"));
            tblDef.setHistogramColumns(Arrays.asList("pname"));
            PartitioningSpec pspec = PartitioningSpec.valueSpec((String)"pname");
            pspec.setValueColumnType(DataType.ENUM);
            tblDef.setPartitioningSpec(pspec);
            tblDef.setStorageEngineName("rocksdb2");
            this.ydb.createTable(tblDef);
        } else {
            console.println("Table " + this.tableName + " already exists!. Old data will not be overwritten.");
        }
        this.populate(tblDef, this.durationHours * 36000);
        console.println("*********************** reading data ********************");
        this.read(this.tableName, null, -1L);
        for (int j = 0; j < this.freq.length; ++j) {
            if (this.count[j] == 0) continue;
            this.read(this.tableName, "/rocksbench/packet_" + j + "_0", this.freq[j]);
        }
        this.read(this.tableName, null, -1L);
    }

    void populate(TableDefinition tblDef, long duration100ms) throws Exception {
        RdbStorageEngine rse = (RdbStorageEngine)this.ydb.getStorageEngine(tblDef);
        TableWriter tw = rse.newTableWriter(this.ydb, tblDef, TableWriter.InsertMode.INSERT);
        long baseTime = TimeEncoding.parse((String)"2017-01-01T00:00:00");
        console.println("writing " + this.durationHours + " hours of data starting with " + TimeEncoding.toString((long)baseTime));
        ThreadLocalRandom r = ThreadLocalRandom.current();
        byte[] b = new byte[256];
        int numPackets = 0;
        TupleDefinition tdef = tblDef.getTupleDefinition();
        long t0 = System.currentTimeMillis();
        long genTime = baseTime;
        for (long i = 0L; i < duration100ms; ++i) {
            for (int j = 0; j < this.freq.length; ++j) {
                if (i % this.freq[j] != 0L) continue;
                for (int k = 0; k < this.count[j]; ++k) {
                    r.nextBytes(b);
                    ++numPackets;
                    int seqNum = (int)i;
                    genTime = baseTime + i * 100L + (long)j;
                    long recTime = TimeEncoding.getWallclockTime();
                    Tuple t = new Tuple(tdef, new Object[]{genTime, seqNum, recTime, b, "/rocksbench/packet_" + j + "_" + k});
                    tw.onTuple(null, t);
                    if (numPackets % 1000000 != 0) continue;
                    console.println(String.format("%3dM packets written; %d%% completed", numPackets / 1000000, i * 100L / duration100ms));
                }
            }
        }
        console.println("write finished; last packet time: " + TimeEncoding.toString((long)genTime) + "; total numPackets: " + numPackets);
        long t1 = System.currentTimeMillis();
        long d = t1 - t0;
        console.println("time to populate " + (double)d / 1000.0 + " seconds; speed: " + (long)numPackets * 1000L / d + " packets/sec");
    }

    void read(String tblName, final String packetName, long rate100ms) throws Exception {
        long t0 = System.currentTimeMillis();
        String q = "create stream s as select * from " + tblName;
        if (packetName != null) {
            q = q + " where pname='" + packetName + "'";
        }
        this.ydb.execute(q, new Object[0]);
        AbstractStream s = this.ydb.getStream("s");
        final Semaphore semaphore = new Semaphore(0);
        final AtomicInteger count = new AtomicInteger();
        s.addSubscriber(new StreamSubscriber(){
            int c = 0;

            public void onTuple(Stream stream, Tuple tuple) {
                if (packetName != null && !packetName.equals(tuple.getColumn("pname"))) {
                    throw new RuntimeException("invalid tuple received");
                }
                ++this.c;
            }

            public void streamClosed(Stream stream) {
                count.set(this.c);
                semaphore.release();
            }
        });
        s.start();
        semaphore.acquire();
        long t1 = System.currentTimeMillis();
        long d = t1 - t0;
        long speed = (long)(1000 * count.get()) / d;
        if (packetName == null) {
            console.println(String.format("time to read all %d packets: %.3f seconds, speed: %d packets/second", count.get(), (double)d / 1000.0, speed));
        } else {
            console.println(String.format("time to read %8d %s (pkt rate: %.2f sec) packets: %.3f seconds, speed: %d packets/second", count.get(), packetName, (double)rate100ms / 10.0, (double)d / 1000.0, speed));
        }
    }
}

