package org.apache.hadoop.yarn.submarine.client.cli.runjob;

import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.submarine.client.cli.AbstractCli;
import org.apache.hadoop.yarn.submarine.client.cli.CliConstants;
import org.apache.hadoop.yarn.submarine.client.cli.CliUtils;
import org.apache.hadoop.yarn.submarine.client.cli.Command;
import org.apache.hadoop.yarn.submarine.client.cli.param.ParametersHolder;
import org.apache.hadoop.yarn.submarine.client.cli.param.runjob.RunJobParameters;
import org.apache.hadoop.yarn.submarine.client.cli.param.yaml.YamlConfigFile;
import org.apache.hadoop.yarn.submarine.client.cli.param.yaml.YamlParseException;
import org.apache.hadoop.yarn.submarine.common.ClientContext;
import org.apache.hadoop.yarn.submarine.common.exception.SubmarineException;
import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor;
import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
import org.apache.hadoop.yarn.submarine.runtimes.common.StorageKeyConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.Constructor;

/* loaded from: input_file:org/apache/hadoop/yarn/submarine/client/cli/runjob/RunJobCli.class */
public class RunJobCli extends AbstractCli {
    private static final Logger LOG = LoggerFactory.getLogger(RunJobCli.class);
    private static final String CAN_BE_USED_WITH_TF_PYTORCH = "Can be used with TensorFlow or PyTorch frameworks.";
    private static final String CAN_BE_USED_WITH_TF_ONLY = "Can only be used with TensorFlow framework.";
    public static final String YAML_PARSE_FAILED = "Failed to parse YAML config";
    private Options options;
    private JobSubmitter jobSubmitter;
    private JobMonitor jobMonitor;
    private ParametersHolder parametersHolder;

    public RunJobCli(ClientContext clientContext) {
        this(clientContext, clientContext.getRuntimeFactory().getJobSubmitterInstance(), clientContext.getRuntimeFactory().getJobMonitorInstance());
    }

    @VisibleForTesting
    public RunJobCli(ClientContext clientContext, JobSubmitter jobSubmitter, JobMonitor jobMonitor) {
        super(clientContext);
        this.options = generateOptions();
        this.jobSubmitter = jobSubmitter;
        this.jobMonitor = jobMonitor;
    }

    public void printUsages() {
        new HelpFormatter().printHelp("job run", this.options);
    }

    private Options generateOptions() {
        Options options = new Options();
        options.addOption(CliConstants.YAML_CONFIG, true, "Config file (in YAML format)");
        options.addOption(CliConstants.FRAMEWORK, true, String.format("Framework to use. Valid values are: %s! The default framework is Tensorflow.", Framework.getValues()));
        options.addOption(CliConstants.NAME, true, "Name of the job");
        options.addOption(CliConstants.INPUT_PATH, true, "Input of the job, could be local or other FS directory");
        options.addOption(CliConstants.CHECKPOINT_PATH, true, "Training output directory of the job, could be local or other FS directory. This typically includes checkpoint files and exported model ");
        options.addOption(CliConstants.SAVED_MODEL_PATH, true, "Model exported path (savedmodel) of the job, which is needed when exported model is not placed under ${checkpoint_path}could be local or other FS directory. This will be used to serve.");
        options.addOption(CliConstants.DOCKER_IMAGE, true, "Docker image name/tag");
        options.addOption(CliConstants.QUEUE, true, "Name of queue to run the job, by default it uses default queue");
        addWorkerOptions(options);
        addPSOptions(options);
        addTensorboardOptions(options);
        options.addOption(CliConstants.ENV, true, "Common environment variable of worker/ps");
        options.addOption(CliConstants.VERBOSE, false, "Print verbose log for troubleshooting");
        options.addOption(CliConstants.WAIT_JOB_FINISH, false, "Specified when user want to wait the job finish");
        options.addOption(CliConstants.QUICKLINK, true, "Specify quicklink so YARNweb UI shows link to given role instance and port. When --tensorboard is specified, quicklink to tensorboard instance will be added automatically. The format of quick link is: Quick_link_label=http(or https)://role-name:port. For example, if want to link to first worker's 7070 port, and text of quicklink is Notebook_UI, user need to specify --quicklink Notebook_UI=https://master-0:7070");
        options.addOption(CliConstants.LOCALIZATION, true, "Specify localization to make remote/local file/directory available to all container(Docker). Argument format is \"RemoteUri:LocalFilePath[:rw] \" (ro permission is not supported yet) The RemoteUri can be a file or directory in local or HDFS or s3 or abfs or http .etc. The LocalFilePath can be absolute or relative. If it's a relative path, it'll be under container's implied working directory but sub directory is not supported yet. This option can be set mutiple times. Examples are \n-localization \"hdfs:///user/yarn/mydir2:/opt/data\"\n-localization \"s3a:///a/b/myfile1:./\"\n-localization \"https:///a/b/myfile2:./myfile\"\n-localization \"/user/yarn/mydir3:/opt/mydir3\"\n-localization \"./mydir1:.\"\n");
        options.addOption(CliConstants.KEYTAB, true, "Specify keytab used by the job under security environment");
        options.addOption(CliConstants.PRINCIPAL, true, "Specify principal used by the job under security environment");
        options.addOption(CliConstants.DISTRIBUTE_KEYTAB, false, "Distribute local keytab to cluster machines for service authentication. If not specified, pre-distributed keytab of which path specified by parameterkeytab on cluster machines will be used");
        options.addOption("h", "help", false, "Print help");
        options.addOption(CliConstants.INSECURE_CLUSTER, false, "Cluster is not Kerberos enabled.");
        options.addOption(CliConstants.ARG_CONF, true, "User specified configuration, as key=val pairs.");
        return options;
    }

    private void addWorkerOptions(Options options) {
        options.addOption(CliConstants.N_WORKERS, true, "Number of worker tasks of the job, by default it's 1.Can be used with TensorFlow or PyTorch frameworks.");
        options.addOption(CliConstants.WORKER_DOCKER_IMAGE, true, "Specify docker image for WORKER, when this is not specified, WORKER uses --docker_image as default.Can be used with TensorFlow or PyTorch frameworks.");
        options.addOption(CliConstants.WORKER_LAUNCH_CMD, true, "Commandline of worker, arguments will be directly used to launch the workerCan be used with TensorFlow or PyTorch frameworks.");
        options.addOption(CliConstants.WORKER_RES, true, "Resource of each worker, for example memory-mb=2048,vcores=2,yarn.io/gpu=2Can be used with TensorFlow or PyTorch frameworks.");
    }

    private void addPSOptions(Options options) {
        options.addOption(CliConstants.N_PS, true, "Number of PS tasks of the job, by default it's 0. Can only be used with TensorFlow framework.");
        options.addOption(CliConstants.PS_DOCKER_IMAGE, true, "Specify docker image for PS, when this is not specified, PS uses --docker_image as default.Can only be used with TensorFlow framework.");
        options.addOption(CliConstants.PS_LAUNCH_CMD, true, "Commandline of worker, arguments will be directly used to launch the PSCan only be used with TensorFlow framework.");
        options.addOption(CliConstants.PS_RES, true, "Resource of each PS, for example memory-mb=2048,vcores=2,yarn.io/gpu=2Can only be used with TensorFlow framework.");
    }

    private void addTensorboardOptions(Options options) {
        options.addOption(CliConstants.TENSORBOARD, false, "Should we run TensorBoard for this job? By default it's disabled.Can only be used with TensorFlow framework.");
        options.addOption(CliConstants.TENSORBOARD_RESOURCES, true, "Specify resources of Tensorboard, by default it is memory=4G,vcores=1.Can only be used with TensorFlow framework.");
        options.addOption(CliConstants.TENSORBOARD_DOCKER_IMAGE, true, "Specify Tensorboard docker image. when this is not specified, Tensorboard uses --docker_image as default.Can only be used with TensorFlow framework.");
    }

    private void parseCommandLineAndGetRunJobParameters(String[] strArr) throws ParseException, IOException, YarnException {
        try {
            this.parametersHolder = createParametersHolder(new GnuParser().parse(this.options, strArr));
            this.parametersHolder.updateParameters(this.clientContext);
        } catch (ParseException e) {
            LOG.error("Exception in parse: {}", e.getMessage());
            printUsages();
            throw e;
        }
    }

    private ParametersHolder createParametersHolder(CommandLine commandLine) throws ParseException, YarnException {
        String optionValue = commandLine.getOptionValue(CliConstants.YAML_CONFIG);
        if (optionValue == null) {
            LOG.info("Using CLI configuration!");
            return ParametersHolder.createWithCmdLine(commandLine, Command.RUN_JOB);
        }
        YamlConfigFile readYamlConfigFile = readYamlConfigFile(optionValue);
        checkYamlConfig(optionValue, readYamlConfigFile);
        LOG.info("Using YAML configuration!");
        return ParametersHolder.createWithCmdLineAndYaml(commandLine, readYamlConfigFile, Command.RUN_JOB);
    }

    private void checkYamlConfig(String str, YamlConfigFile yamlConfigFile) {
        if (yamlConfigFile == null) {
            throw new YamlParseException(String.format("Failed to parse YAML config, file is empty: %s", str));
        }
        if (yamlConfigFile.getConfigs() == null) {
            throw new YamlParseException(String.format("Failed to parse YAML config, config section should be defined, but it cannot be found in YAML file '%s'!", str));
        }
    }

    private YamlConfigFile readYamlConfigFile(String str) {
        Constructor constructor = new Constructor(YamlConfigFile.class);
        constructor.setPropertyUtils(new RunJobParameters.UnderscoreConverterPropertyUtils());
        try {
            LOG.info("Reading YAML configuration from file: {}", str);
            return (YamlConfigFile) new Yaml(constructor).loadAs(FileUtils.openInputStream(new File(str)), YamlConfigFile.class);
        } catch (FileNotFoundException e) {
            logExceptionOfYamlParse(str, e);
            throw new YamlParseException("Failed to parse YAML config, file does not exist!");
        } catch (Exception e2) {
            logExceptionOfYamlParse(str, e2);
            throw new YamlParseException(String.format("Failed to parse YAML config, details: %s", e2.getMessage()));
        }
    }

    private void logExceptionOfYamlParse(String str, Exception exc) {
        LOG.error(String.format("Exception while parsing YAML file %s", str), exc);
    }

    private void storeJobInformation(RunJobParameters runJobParameters, ApplicationId applicationId, String[] strArr) throws IOException {
        String name = runJobParameters.getName();
        HashMap hashMap = new HashMap();
        hashMap.put(StorageKeyConstants.JOB_NAME, name);
        hashMap.put(StorageKeyConstants.APPLICATION_ID, applicationId.toString());
        if (runJobParameters.getCheckpointPath() != null) {
            hashMap.put(StorageKeyConstants.CHECKPOINT_PATH, runJobParameters.getCheckpointPath());
        }
        if (runJobParameters.getInputPath() != null) {
            hashMap.put(StorageKeyConstants.INPUT_PATH, runJobParameters.getInputPath());
        }
        if (runJobParameters.getSavedModelPath() != null) {
            hashMap.put(StorageKeyConstants.SAVED_MODEL_PATH, runJobParameters.getSavedModelPath());
        }
        hashMap.put(StorageKeyConstants.JOB_RUN_ARGS, String.join(" ", strArr));
        this.clientContext.getRuntimeFactory().getSubmarineStorage().addNewJob(name, hashMap);
    }

    @Override // org.apache.hadoop.yarn.submarine.client.cli.AbstractCli
    public int run(String[] strArr) throws ParseException, IOException, YarnException, SubmarineException {
        if (CliUtils.argsForHelp(strArr)) {
            printUsages();
            return 0;
        }
        parseCommandLineAndGetRunJobParameters(strArr);
        ApplicationId submitJob = this.jobSubmitter.submitJob(this.parametersHolder);
        RunJobParameters runJobParameters = (RunJobParameters) this.parametersHolder.getParameters();
        storeJobInformation(runJobParameters, submitJob, strArr);
        if (!runJobParameters.isWaitJobFinish()) {
            return 0;
        }
        this.jobMonitor.waitTrainingFinal(runJobParameters.getName());
        return 0;
    }

    @VisibleForTesting
    public JobSubmitter getJobSubmitter() {
        return this.jobSubmitter;
    }

    @VisibleForTesting
    public RunJobParameters getRunJobParameters() {
        return (RunJobParameters) this.parametersHolder.getParameters();
    }
}
