package org.apache.hadoop.hbase.procedure2.store.wal;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;

/* loaded from: input_file:org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.class */
public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
    protected static final HBaseCommonTestingUtility UTIL;
    public static int DEFAULT_NUM_THREADS;
    public static Option NUM_THREADS_OPTION;
    public static int DEFAULT_NUM_PROCS;
    public static Option NUM_PROCS_OPTION;
    public static int DEFAULT_NUM_WALS;
    public static Option NUM_WALS_OPTION;
    public static int DEFAULT_STATE_SIZE;
    public static Option STATE_SIZE_OPTION;
    public static Option SYNC_OPTION;
    public static String DEFAULT_SYNC_OPTION;
    public int numThreads;
    public long numProcs;
    public int numWals;
    public String syncType;
    public int stateSize;
    static byte[] serializedState;
    private WALProcedureStore store;
    private static final int WORKER_THREADS_TIMEOUT_SEC = 600;
    static final /* synthetic */ boolean $assertionsDisabled;
    public long numProcsPerWal = Long.MAX_VALUE;
    private AtomicLong procIds = new AtomicLong(0);
    private AtomicBoolean workersFailed = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation$NoSyncWalProcedureStore.class */
    public class NoSyncWalProcedureStore extends WALProcedureStore {
        public NoSyncWalProcedureStore(Configuration configuration, Path path) throws IOException {
            super(configuration, path, (Path) null, new WALProcedureStore.LeaseRecovery() { // from class: org.apache.hadoop.hbase.procedure2.store.wal.ProcedureWALPerformanceEvaluation.NoSyncWalProcedureStore.1
                public void recoverFileLease(FileSystem fileSystem, Path path2) throws IOException {
                }
            });
        }

        protected void syncStream(FSDataOutputStream fSDataOutputStream) {
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation$Worker.class */
    private final class Worker implements Callable<Integer> {
        private final long start;

        public Worker(long j) {
            this.start = j;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Integer call() throws IOException {
            while (!ProcedureWALPerformanceEvaluation.this.workersFailed.get()) {
                long andIncrement = ProcedureWALPerformanceEvaluation.this.procIds.getAndIncrement();
                if (andIncrement >= ProcedureWALPerformanceEvaluation.this.numProcs) {
                    return 0;
                }
                if (andIncrement != 0 && andIncrement % 10000 == 0) {
                    System.out.println("Wrote " + andIncrement + " procedures in " + StringUtils.humanTimeDiff(System.currentTimeMillis() - this.start));
                }
                if (andIncrement > 0) {
                    try {
                        if (andIncrement % ProcedureWALPerformanceEvaluation.this.numProcsPerWal == 0) {
                            ProcedureWALPerformanceEvaluation.this.store.rollWriterForTesting();
                            System.out.println("Starting new log : " + ProcedureWALPerformanceEvaluation.this.store.getActiveLogs().get(ProcedureWALPerformanceEvaluation.this.store.getActiveLogs().size() - 1));
                        }
                    } catch (IOException e) {
                        ProcedureWALPerformanceEvaluation.this.workersFailed.set(true);
                        System.err.println("Exception when rolling log file. Current procId = " + andIncrement);
                        e.printStackTrace();
                        return 1;
                    }
                }
                ProcedureTestingUtility.TestProcedure testProcedure = new ProcedureTestingUtility.TestProcedure(andIncrement);
                testProcedure.setData(ProcedureWALPerformanceEvaluation.serializedState);
                ProcedureWALPerformanceEvaluation.this.store.insert(testProcedure, (Procedure[]) null);
                ProcedureWALPerformanceEvaluation.this.store.update(testProcedure);
            }
            return 1;
        }
    }

    private void setupConf() {
        this.conf.setBoolean("hbase.procedure.store.wal.use.hsync", "hsync".equals(this.syncType));
        if (this.numWals > 0) {
            this.conf.setLong("hbase.procedure.store.wal.roll.threshold", Long.MAX_VALUE);
            this.numProcsPerWal = this.numProcs / this.numWals;
        }
    }

    private void setupProcedureStore() throws IOException {
        Path dataTestDir = UTIL.getDataTestDir();
        FileSystem fileSystem = dataTestDir.getFileSystem(this.conf);
        Path path = new Path(dataTestDir, "proc-logs");
        System.out.println("Logs directory : " + path.toString());
        fileSystem.delete(path, true);
        if ("nosync".equals(this.syncType)) {
            this.store = new NoSyncWalProcedureStore(this.conf, path);
        } else {
            this.store = ProcedureTestingUtility.createWalStore(this.conf, path);
        }
        this.store.start(this.numThreads);
        this.store.recoverLease();
        this.store.load(new ProcedureTestingUtility.LoadCounter());
        System.out.println("Starting new log : " + this.store.getActiveLogs().get(this.store.getActiveLogs().size() - 1));
    }

    private void tearDownProcedureStore() {
        this.store.stop(false);
        try {
            this.store.getFileSystem().delete(this.store.getWALDir(), true);
        } catch (IOException e) {
            System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up disk space. Location: " + this.store.getWALDir().toString());
            e.printStackTrace();
        }
    }

    public void processOptions(CommandLine commandLine) {
        this.numThreads = getOptionAsInt(commandLine, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS);
        this.numProcs = getOptionAsInt(commandLine, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS);
        this.numWals = getOptionAsInt(commandLine, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS);
        this.syncType = commandLine.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION);
        if (!$assertionsDisabled && !"hsync".equals(this.syncType) && !"hflush".equals(this.syncType) && !"nosync".equals(this.syncType)) {
            throw new AssertionError("sync argument can only accept one of these three values: hsync, hflush, nosync");
        }
        this.stateSize = getOptionAsInt(commandLine, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE);
        serializedState = new byte[this.stateSize];
        setupConf();
    }

    public void addOptions() {
        addOption(NUM_THREADS_OPTION);
        addOption(NUM_PROCS_OPTION);
        addOption(NUM_WALS_OPTION);
        addOption(SYNC_OPTION);
        addOption(STATE_SIZE_OPTION);
    }

    public int doWork() {
        try {
            try {
                setupProcedureStore();
                ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.numThreads);
                Future[] futureArr = new Future[this.numThreads];
                long currentTimeMillis = System.currentTimeMillis();
                for (int i = 0; i < this.numThreads; i++) {
                    futureArr[i] = newFixedThreadPool.submit(new Worker(currentTimeMillis));
                }
                boolean z = false;
                try {
                    for (Future future : futureArr) {
                        z |= future.get((currentTimeMillis + 600000) - System.currentTimeMillis(), TimeUnit.MILLISECONDS).equals(1);
                    }
                    newFixedThreadPool.shutdown();
                    if (z) {
                        tearDownProcedureStore();
                        return 1;
                    }
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    System.out.println("******************************************");
                    System.out.println("Num threads    : " + this.numThreads);
                    System.out.println("Num procedures : " + this.numProcs);
                    System.out.println("Sync type      : " + this.syncType);
                    System.out.println("Time taken     : " + (((float) currentTimeMillis2) / 1000.0f) + "sec");
                    System.out.println("******************************************");
                    System.out.println("Raw format for scripts");
                    System.out.println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, total_time_ms=%s]", NUM_PROCS_OPTION.getOpt(), Long.valueOf(this.numProcs), STATE_SIZE_OPTION.getOpt(), Integer.valueOf(this.stateSize), SYNC_OPTION.getOpt(), this.syncType, NUM_THREADS_OPTION.getOpt(), Integer.valueOf(this.numThreads), NUM_WALS_OPTION.getOpt(), Integer.valueOf(this.numWals), Long.valueOf(currentTimeMillis2)));
                    tearDownProcedureStore();
                    return 0;
                } catch (Exception e) {
                    System.err.println("Exception in worker thread.");
                    e.printStackTrace();
                    tearDownProcedureStore();
                    return 1;
                }
            } catch (Throwable th) {
                tearDownProcedureStore();
                throw th;
            }
        } catch (IOException e2) {
            e2.printStackTrace();
            tearDownProcedureStore();
            return 1;
        }
    }

    public static void main(String[] strArr) throws IOException {
        ProcedureWALPerformanceEvaluation procedureWALPerformanceEvaluation = new ProcedureWALPerformanceEvaluation();
        procedureWALPerformanceEvaluation.setConf(UTIL.getConfiguration());
        procedureWALPerformanceEvaluation.run(strArr);
    }

    static {
        $assertionsDisabled = !ProcedureWALPerformanceEvaluation.class.desiredAssertionStatus();
        UTIL = new HBaseCommonTestingUtility();
        DEFAULT_NUM_THREADS = 20;
        NUM_THREADS_OPTION = new Option("threads", true, "Number of parallel threads which will write insert/updates/deletes to WAL. Default: " + DEFAULT_NUM_THREADS);
        DEFAULT_NUM_PROCS = 1000000;
        NUM_PROCS_OPTION = new Option("procs", true, "Total number of procedures. Each procedure writes one insert and one update. Default: " + DEFAULT_NUM_PROCS);
        DEFAULT_NUM_WALS = 0;
        NUM_WALS_OPTION = new Option("wals", true, "Number of WALs to write. If -ve or 0, uses hbase.procedure.store.wal.roll.threshold conf to roll the logs. Default: " + DEFAULT_NUM_WALS);
        DEFAULT_STATE_SIZE = 1024;
        STATE_SIZE_OPTION = new Option("state_size", true, "Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE + "bytes");
        SYNC_OPTION = new Option("sync", true, "Type of sync to use when writing WAL contents to file system. Accepted values: hflush, hsync, nosync. Default: hflush");
        DEFAULT_SYNC_OPTION = "hflush";
    }
}
