/*
 * Decompiled with CFR 0.152.
 */
package alluxio.stress.cli.fuse;

import alluxio.ClientContext;
import alluxio.annotation.SuppressFBWarnings;
import alluxio.client.job.JobMasterClient;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.InstancedConfiguration;
import alluxio.stress.cli.Benchmark;
import alluxio.stress.common.SummaryStatistics;
import alluxio.stress.fuse.FuseIOOperation;
import alluxio.stress.fuse.FuseIOParameters;
import alluxio.stress.fuse.FuseIOTaskResult;
import alluxio.util.CommonUtils;
import alluxio.util.ConfigurationUtils;
import alluxio.util.FormatUtils;
import alluxio.util.executor.ExecutorServiceFactories;
import alluxio.worker.job.JobMasterClientContext;
import com.beust.jcommander.ParametersDelegate;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FuseIOBench
extends Benchmark<FuseIOTaskResult> {
    private static final Logger LOG = LoggerFactory.getLogger(FuseIOBench.class);
    private static final String TEST_DIR_STRING_FORMAT = "%s/%s/dir-%d";
    private static final String TEST_FILE_STRING_FORMAT = "%s/%s/dir-%d/file-%d";
    @ParametersDelegate
    private FuseIOParameters mParameters = new FuseIOParameters();
    private List<String> mJobWorkerDirNames;
    private int mJobWorkerZeroBasedId;
    private volatile boolean mStartBarrierPassed = false;

    public static void main(String[] args) {
        FuseIOBench.mainInternal(args, new FuseIOBench());
    }

    @Override
    public String getBenchDescription() {
        return String.join((CharSequence)"\n", (Iterable<? extends CharSequence>)ImmutableList.of((Object)"A stress bench for testing the reading throughput of Fuse-based POSIX API.", (Object)"To run the test, data must be written first by executing \"Write\" operation, then run \"Read\" operation to test the reading throughput. The three different options of read are: ", (Object)"LocalRead: Each job worker, or client, will read the files it wrote through local Fuse mount point.", (Object)"RemoteRead: Each job worker will evenly read the files written by other job workers through local Fuse mount point.", (Object)"ClusterRead: Read <numAllFiles>/<numJobWorker> number of files evenly from all directories created by all job workers through local Fuse mount point.", (Object)"Optionally one can set alluxio.user.metadata.cache.enabled=true when mounting Alluxio Fuse and run \"ListFile\" before \"Read\" to cache the metadata of the test files and eliminate the effect of metadata operations while getting the reading throughput data.", (Object)"Note that \"--operation\" is required, \"--local-path\" can be a local filesystem path or a mounted Fuse path, and non-cluster mode only supports local read.", (Object)"", (Object)"Example:", (Object)"# The test will be run in cluster mode using job service", (Object)"# The test data will be written to /mnt/alluxio-fuse/FuseIOTest", (Object)"# Files will be evenly distributed into 32 directories, each contains 10 files of size 100 MB. 32 threads of each worker will be used to generate the files", (Object[])new String[]{"# Metadata of the test files will be cached", "# 16 threads of each worker will be used for testing the reading throughput with ClusterRead.", "# 5 seconds of warmup time and 30 seconds of actual reading test time", "$ bin/alluxio runClass alluxio.stress.cli.fuse.FuseIOBench --operation Write \\", "--local-path /mnt/alluxio-fuse/FuseIOTest --num-dirs 32 --num-files-per-dir 10 \\", "--file-size 100m --threads 32 --cluster", "$ bin/alluxio runClass alluxio.stress.cli.fuse.FuseIOBench --operation ListFile \\", "--local-path /mnt/alluxio-fuse/FuseIOTest", "$ bin/alluxio runClass alluxio.stress.cli.fuse.FuseIOBench --operation ClusterRead \\", "--local-path /mnt/alluxio-fuse/FuseIOTest --num-dirs 32 --num-files-per-dir 10 \\", "--file-size 100m --threads 16 --warmup 5s --duration 30s --cluster", ""}));
    }

    @Override
    public void prepare() throws Exception {
        int numJobWorkers;
        if (this.mBaseParameters.mCluster) {
            return;
        }
        if (this.mParameters.mThreads > this.mParameters.mNumDirs && this.mParameters.mOperation != FuseIOOperation.LIST_FILE) {
            throw new IllegalArgumentException(String.format("Some of the threads are not being used. Please set the number of directories to be at least the number of threads, preferably a multiple of it.", new Object[0]));
        }
        File localPath = new File(this.mParameters.mLocalPath);
        if (this.mParameters.mOperation == FuseIOOperation.WRITE) {
            LOG.warn("Cannot write repeatedly, so warmup is not possible. Setting warmup to 0s.");
            this.mParameters.mWarmup = "0s";
            for (int i = 0; i < this.mParameters.mNumDirs; ++i) {
                Files.createDirectories(Paths.get(String.format(TEST_DIR_STRING_FORMAT, this.mParameters.mLocalPath, this.mBaseParameters.mId, i), new String[0]), new FileAttribute[0]);
            }
            return;
        }
        if (!(this.mParameters.mOperation != FuseIOOperation.REMOTE_READ && this.mParameters.mOperation != FuseIOOperation.CLUSTER_READ || this.mBaseParameters.mDistributed)) {
            throw new IllegalArgumentException(String.format("Single-node Fuse IO stress bench doesn't support RemoteRead or ClusterRead.", new Object[0]));
        }
        File[] jobWorkerDirs = localPath.listFiles();
        if (jobWorkerDirs == null) {
            throw new IOException(String.format("--local-path %s is not a valid path for this bench. Make sure using the correct path", this.mParameters.mLocalPath));
        }
        if (!this.mBaseParameters.mDistributed) {
            this.mJobWorkerDirNames = Arrays.asList(this.mBaseParameters.mId);
            return;
        }
        try (JobMasterClient client = JobMasterClient.Factory.create((JobMasterClientContext)JobMasterClientContext.newBuilder((ClientContext)ClientContext.create((AlluxioConfiguration)new InstancedConfiguration(ConfigurationUtils.defaults()))).build());){
            numJobWorkers = client.getAllWorkerHealth().size();
        }
        if (numJobWorkers != jobWorkerDirs.length) {
            throw new IllegalStateException("Some job worker crashed or joined after data are written. The test is stopped.");
        }
        this.mJobWorkerDirNames = Arrays.asList(jobWorkerDirs).stream().map(file -> file.getName()).collect(Collectors.toList());
        this.mJobWorkerZeroBasedId = this.mJobWorkerDirNames.indexOf(this.mBaseParameters.mId);
        if (this.mJobWorkerZeroBasedId == -1) {
            throw new IllegalStateException(String.format("Directory %s is not found. Please use this bench to generate test files, and make sure no job worker crashes or joins after data is written. The test is stopped.", this.mBaseParameters.mId));
        }
    }

    @Override
    public FuseIOTaskResult runLocal() throws Exception {
        FuseIOTaskResult taskResult = this.runFuseBench();
        taskResult.setBaseParameters(this.mBaseParameters);
        taskResult.setParameters(this.mParameters);
        return taskResult;
    }

    private FuseIOTaskResult runFuseBench() throws Exception {
        ExecutorService service = ExecutorServiceFactories.fixedThreadPool((String)"bench-thread", (int)this.mParameters.mThreads).create();
        long durationMs = FormatUtils.parseTimeSize((String)this.mParameters.mDuration);
        long warmupMs = FormatUtils.parseTimeSize((String)this.mParameters.mWarmup);
        long startMs = this.mBaseParameters.mStartMs;
        if (startMs == -1L || this.mStartBarrierPassed) {
            startMs = CommonUtils.getCurrentMs() + 10000L;
        }
        long endMs = startMs + warmupMs + durationMs;
        BenchContext context = new BenchContext(startMs, endMs);
        ArrayList<BenchThread> callables = new ArrayList<BenchThread>(this.mParameters.mThreads);
        for (int i = 0; i < this.mParameters.mThreads; ++i) {
            callables.add(new BenchThread(context, i));
        }
        service.invokeAll(callables, FormatUtils.parseTimeSize((String)this.mBaseParameters.mBenchTimeout), TimeUnit.MILLISECONDS);
        service.shutdownNow();
        service.awaitTermination(30L, TimeUnit.SECONDS);
        FuseIOTaskResult result = context.getResult();
        LOG.info(String.format("job worker id: %s, errors: %d, IO throughput (MB/s): %f", this.mBaseParameters.mId, result.getErrors().size(), Float.valueOf(result.getIOMBps())));
        return result;
    }

    @SuppressFBWarnings(value={"DMI_HARDCODED_ABSOLUTE_FILENAME"})
    public synchronized Map<String, SummaryStatistics> addAdditionalResult(long startMs, long endMs) throws IOException {
        HashMap<String, SummaryStatistics> summaryStatistics = new HashMap<String, SummaryStatistics>();
        Map<String, Benchmark.MethodStatistics> nameStatistics = this.processMethodProfiles(startMs, endMs, profileInput -> {
            if (profileInput.getIsttfb()) {
                return profileInput.getMethod();
            }
            return null;
        });
        if (!nameStatistics.isEmpty()) {
            for (Map.Entry<String, Benchmark.MethodStatistics> entry : nameStatistics.entrySet()) {
                summaryStatistics.put(entry.getKey(), this.toSummaryStatistics(entry.getValue()));
            }
        }
        return summaryStatistics;
    }

    private SummaryStatistics toSummaryStatistics(Benchmark.MethodStatistics methodStatistics) {
        float[] responseTimePercentile = new float[101];
        for (int i = 0; i <= 100; ++i) {
            responseTimePercentile[i] = (float)methodStatistics.getTimeNs().getValueAtPercentile((double)i) / 1000000.0f;
        }
        float[] responseTime99Percentile = new float[6];
        for (int i = 0; i < responseTime99Percentile.length; ++i) {
            responseTime99Percentile[i] = (float)methodStatistics.getTimeNs().getValueAtPercentile(100.0 - 1.0 / Math.pow(10.0, i)) / 1000000.0f;
        }
        float[] maxResponseTimesMs = new float[20];
        Arrays.fill(maxResponseTimesMs, -1.0f);
        for (int i = 0; i < methodStatistics.getMaxTimeNs().length; ++i) {
            maxResponseTimesMs[i] = (float)methodStatistics.getMaxTimeNs()[i] / 1000000.0f;
        }
        return new SummaryStatistics((long)methodStatistics.getNumSuccess(), responseTimePercentile, responseTime99Percentile, maxResponseTimesMs);
    }

    private final class BenchThread
    implements Callable<Void> {
        private final BenchContext mContext;
        private final int mThreadId;
        private final byte[] mBuffer;
        private final long mFileSize;
        private FileInputStream mInStream = null;
        private FileOutputStream mOutStream = null;
        private long mCurrentOffset;
        private long mRecordMs;
        private final FuseIOTaskResult mFuseIOTaskResult = new FuseIOTaskResult();

        private BenchThread(BenchContext context, int threadId) {
            this.mContext = context;
            this.mThreadId = threadId;
            this.mBuffer = new byte[(int)FormatUtils.parseSpaceSize((String)((FuseIOBench)FuseIOBench.this).mParameters.mBufferSize)];
            Arrays.fill(this.mBuffer, (byte)65);
            this.mFileSize = FormatUtils.parseSpaceSize((String)((FuseIOBench)FuseIOBench.this).mParameters.mFileSize);
            this.mRecordMs = this.mContext.getStartMs() + FormatUtils.parseTimeSize((String)((FuseIOBench)FuseIOBench.this).mParameters.mWarmup);
        }

        @Override
        public Void call() {
            try {
                this.runInternal();
            }
            catch (Exception e) {
                LOG.error(Thread.currentThread().getName() + ": failed", (Throwable)e);
                this.mFuseIOTaskResult.addErrorMessage(e.getMessage());
            }
            finally {
                this.closeInStream();
                this.closeOutStream();
            }
            this.mFuseIOTaskResult.setEndMs(CommonUtils.getCurrentMs());
            this.mContext.mergeThreadResult(this.mFuseIOTaskResult);
            return null;
        }

        private void runInternal() throws Exception {
            this.mFuseIOTaskResult.setRecordStartMs(this.mRecordMs);
            long waitMs = this.mContext.getStartMs() - CommonUtils.getCurrentMs();
            if (waitMs < 0L) {
                throw new IllegalStateException(String.format("Thread missed barrier. Increase the start delay. start: %d current: %d", this.mContext.getStartMs(), CommonUtils.getCurrentMs()));
            }
            CommonUtils.sleepMs((long)waitMs);
            FuseIOBench.this.mStartBarrierPassed = true;
            switch (((FuseIOBench)FuseIOBench.this).mParameters.mOperation) {
                case LIST_FILE: {
                    this.listFile();
                    break;
                }
                case WRITE: 
                case LOCAL_READ: {
                    this.writeOrLocalRead();
                    break;
                }
                case REMOTE_READ: 
                case CLUSTER_READ: {
                    this.remoteOrClusterRead();
                    break;
                }
                default: {
                    throw new IllegalStateException("Unknown operation: " + ((FuseIOBench)FuseIOBench.this).mParameters.mOperation);
                }
            }
        }

        private void listFile() {
            for (String nameJobWorkerDir : FuseIOBench.this.mJobWorkerDirNames) {
                for (int testDirId = this.mThreadId; testDirId < ((FuseIOBench)FuseIOBench.this).mParameters.mNumDirs; testDirId += ((FuseIOBench)FuseIOBench.this).mParameters.mThreads) {
                    String dirPath = String.format(FuseIOBench.TEST_DIR_STRING_FORMAT, ((FuseIOBench)FuseIOBench.this).mParameters.mLocalPath, nameJobWorkerDir, testDirId);
                    File dir = new File(dirPath);
                    dir.listFiles();
                }
            }
        }

        private void writeOrLocalRead() throws Exception {
            for (int testDirId = this.mThreadId; testDirId < ((FuseIOBench)FuseIOBench.this).mParameters.mNumDirs; testDirId += ((FuseIOBench)FuseIOBench.this).mParameters.mThreads) {
                for (int testFileId = 0; testFileId < ((FuseIOBench)FuseIOBench.this).mParameters.mNumFilesPerDir; ++testFileId) {
                    String filePath = String.format(FuseIOBench.TEST_FILE_STRING_FORMAT, ((FuseIOBench)FuseIOBench.this).mParameters.mLocalPath, ((FuseIOBench)FuseIOBench.this).mBaseParameters.mId, testDirId, testFileId);
                    boolean stopTest = this.processFile(filePath, FuseIOOperation.isRead((FuseIOOperation)((FuseIOBench)FuseIOBench.this).mParameters.mOperation));
                    if (!stopTest) continue;
                    return;
                }
            }
            this.finishProcessingFiles();
        }

        private void remoteOrClusterRead() throws Exception {
            for (int numJobWorkerDirProcessed = 0; numJobWorkerDirProcessed < FuseIOBench.this.mJobWorkerDirNames.size(); ++numJobWorkerDirProcessed) {
                int indexCurrentJobWorkerDir = (numJobWorkerDirProcessed + FuseIOBench.this.mJobWorkerZeroBasedId) % FuseIOBench.this.mJobWorkerDirNames.size();
                if (indexCurrentJobWorkerDir == FuseIOBench.this.mJobWorkerZeroBasedId && ((FuseIOBench)FuseIOBench.this).mParameters.mOperation == FuseIOOperation.REMOTE_READ) continue;
                String nameCurrentJobWorkerDir = (String)FuseIOBench.this.mJobWorkerDirNames.get(indexCurrentJobWorkerDir);
                for (int testDirId = FuseIOBench.this.mJobWorkerZeroBasedId; testDirId < ((FuseIOBench)FuseIOBench.this).mParameters.mNumDirs; testDirId += FuseIOBench.this.mJobWorkerDirNames.size()) {
                    for (int testFileId = this.mThreadId; testFileId < ((FuseIOBench)FuseIOBench.this).mParameters.mNumFilesPerDir; testFileId += ((FuseIOBench)FuseIOBench.this).mParameters.mThreads) {
                        String filePath = String.format(FuseIOBench.TEST_FILE_STRING_FORMAT, ((FuseIOBench)FuseIOBench.this).mParameters.mLocalPath, nameCurrentJobWorkerDir, testDirId, testFileId);
                        boolean stopTest = this.processFile(filePath, FuseIOOperation.isRead((FuseIOOperation)((FuseIOBench)FuseIOBench.this).mParameters.mOperation));
                        if (!stopTest) continue;
                        return;
                    }
                }
            }
            this.finishProcessingFiles();
        }

        private boolean processFile(String filePath, boolean isRead) throws IOException {
            this.mCurrentOffset = 0L;
            while (!Thread.currentThread().isInterrupted()) {
                if (isRead && CommonUtils.getCurrentMs() > this.mContext.getEndMs()) {
                    this.closeInStream();
                    return true;
                }
                long ioBytes = this.applyOperation(filePath);
                if (ioBytes <= 0L) {
                    return false;
                }
                if (CommonUtils.getCurrentMs() <= this.mFuseIOTaskResult.getRecordStartMs()) continue;
                this.mFuseIOTaskResult.incrementIOBytes(ioBytes);
            }
            return true;
        }

        private long applyOperation(String filePath) throws IOException {
            if (FuseIOOperation.isRead((FuseIOOperation)((FuseIOBench)FuseIOBench.this).mParameters.mOperation) && this.mInStream == null) {
                this.mInStream = new FileInputStream(filePath);
            }
            switch (((FuseIOBench)FuseIOBench.this).mParameters.mOperation) {
                case LOCAL_READ: 
                case REMOTE_READ: 
                case CLUSTER_READ: {
                    int bytesRead;
                    if (this.mInStream == null) {
                        this.mInStream = new FileInputStream(filePath);
                    }
                    if ((bytesRead = this.mInStream.read(this.mBuffer)) < 0) {
                        this.closeInStream();
                    }
                    return bytesRead;
                }
                case WRITE: {
                    int bytesToWrite;
                    if (this.mOutStream == null) {
                        this.mOutStream = new FileOutputStream(filePath);
                    }
                    if ((bytesToWrite = (int)Math.min(this.mFileSize - this.mCurrentOffset, (long)this.mBuffer.length)) == 0) {
                        this.closeOutStream();
                        return -1L;
                    }
                    this.mOutStream.write(this.mBuffer, 0, bytesToWrite);
                    this.mCurrentOffset += (long)bytesToWrite;
                    return bytesToWrite;
                }
            }
            throw new IllegalStateException("Unknown operation: " + ((FuseIOBench)FuseIOBench.this).mParameters.mOperation);
        }

        private void finishProcessingFiles() {
            if (FuseIOOperation.isRead((FuseIOOperation)((FuseIOBench)FuseIOBench.this).mParameters.mOperation)) {
                throw new IllegalArgumentException(String.format("Thread %d finishes reading all its files before the bench ends. For more accurate result, use more files, or larger files, or a shorter duration", this.mThreadId));
            }
        }

        private void closeInStream() {
            try {
                if (this.mInStream != null) {
                    this.mInStream.close();
                }
            }
            catch (IOException e) {
                this.mFuseIOTaskResult.addErrorMessage(e.getMessage());
            }
            finally {
                this.mInStream = null;
            }
        }

        private void closeOutStream() {
            try {
                if (this.mOutStream != null) {
                    this.mOutStream.close();
                }
            }
            catch (IOException e) {
                this.mFuseIOTaskResult.addErrorMessage(e.getMessage());
            }
            finally {
                this.mOutStream = null;
            }
        }
    }

    private final class BenchContext {
        private final long mStartMs;
        private final long mEndMs;
        private FuseIOTaskResult mFuseIOTaskResult;

        public BenchContext(long startMs, long endMs) {
            this.mStartMs = startMs;
            this.mEndMs = endMs;
        }

        public long getStartMs() {
            return this.mStartMs;
        }

        public long getEndMs() {
            return this.mEndMs;
        }

        public synchronized void mergeThreadResult(FuseIOTaskResult threadResult) {
            if (this.mFuseIOTaskResult == null) {
                this.mFuseIOTaskResult = threadResult;
            } else {
                try {
                    this.mFuseIOTaskResult.merge(threadResult);
                }
                catch (Exception e) {
                    this.mFuseIOTaskResult.addErrorMessage(e.getMessage());
                }
            }
        }

        public synchronized FuseIOTaskResult getResult() {
            return this.mFuseIOTaskResult;
        }
    }
}

