package org.flinkextended.flink.ml.cluster.node.runner.python;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.commons.lang.StringUtils;
import org.flinkextended.flink.ml.cluster.node.MLContext;
import org.flinkextended.flink.ml.cluster.node.runner.AbstractScriptRunner;
import org.flinkextended.flink.ml.cluster.node.runner.python.log.ProcessLogger;
import org.flinkextended.flink.ml.cluster.node.runner.python.log.ProcessOutputConsumerFactory;
import org.flinkextended.flink.ml.util.MLConstants;
import org.flinkextended.flink.ml.util.MLException;
import org.flinkextended.flink.ml.util.ShellExec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/flinkextended/flink/ml/cluster/node/runner/python/ProcessPythonRunner.class */
public class ProcessPythonRunner extends AbstractScriptRunner {
    private static final Logger LOG = LoggerFactory.getLogger(ProcessPythonRunner.class);
    private volatile Process child;
    protected AtomicBoolean toKill;

    public ProcessPythonRunner(MLContext mLContext) {
        super(mLContext);
        this.child = null;
        this.toKill = new AtomicBoolean(false);
    }

    public static int checkPythonEnvironment(String str) {
        try {
            Process exec = Runtime.getRuntime().exec(str);
            Thread thread = new Thread(new ShellExec.ProcessLogger(exec.getInputStream(), new ShellExec.StdOutConsumer()));
            Thread thread2 = new Thread(new ShellExec.ProcessLogger(exec.getErrorStream(), new ShellExec.StdOutConsumer()));
            thread.start();
            thread2.start();
            int i = 1;
            if (exec.waitFor(5L, TimeUnit.SECONDS)) {
                i = exec.exitValue();
            }
            return i;
        } catch (Exception e) {
            e.printStackTrace();
            return 1;
        }
    }

    @Override // org.flinkextended.flink.ml.cluster.node.runner.ScriptRunner
    public void runScript() throws IOException {
        String str = this.mlContext.getProperties().get(MLConstants.STARTUP_SCRIPT_FILE);
        ArrayList arrayList = new ArrayList();
        String str2 = "python" + this.mlContext.getProperties().getOrDefault(MLConstants.PYTHON_VERSION, "");
        if (checkPythonEnvironment("which " + str2) != 0) {
            throw new RuntimeException("Python executable: " + str2 + " not found");
        }
        String orDefault = this.mlContext.getProperties().getOrDefault(MLConstants.VIRTUAL_ENV_DIR, "");
        if (!orDefault.isEmpty()) {
            str2 = orDefault + "/bin/python";
        }
        arrayList.add(str2);
        if (this.mlContext.startWithStartup()) {
            arrayList.add(str);
            LOG.info("Running {} via {}", this.mlContext.getScript().getName(), str);
        } else {
            arrayList.add(this.mlContext.getScript().getAbsolutePath());
        }
        arrayList.add(String.format("%s:%d", this.mlContext.getNodeServerIP(), Integer.valueOf(this.mlContext.getNodeServerPort())));
        ProcessBuilder processBuilder = new ProcessBuilder(arrayList);
        String classPath = getClassPath();
        if (classPath == null) {
            LOG.warn("Cannot find proper classpath for the Python process.");
        } else {
            this.mlContext.putEnvProperty(MLConstants.CLASSPATH, classPath);
        }
        buildProcessBuilder(processBuilder);
        LOG.info("{} Python cmd: {}", this.mlContext.getIdentity(), Joiner.on(" ").join(arrayList));
        runProcess(processBuilder);
    }

    protected void runProcess(ProcessBuilder processBuilder) throws IOException {
        this.child = processBuilder.start();
        new ProcessLogger(this.mlContext.getIdentity(), this.child, ProcessOutputConsumerFactory.createMLRunner(this.mlContext)).start_logging();
        try {
            int i = 0;
            while (true) {
                try {
                    if (this.child.waitFor(5L, TimeUnit.SECONDS)) {
                        i = this.child.exitValue();
                        break;
                    } else if (this.toKill.get()) {
                        break;
                    }
                } catch (InterruptedException e) {
                    LOG.warn("{} interrupted, killing the process", this.mlContext.getIdentity());
                    killProcess();
                    return;
                }
            }
            if (i != 0) {
                throw new MLException(String.format("%s python process exited with code %d", this.mlContext.getIdentity(), Integer.valueOf(i)));
            }
            killProcess();
        } catch (Throwable th) {
            killProcess();
            throw th;
        }
    }

    protected void buildProcessBuilder(ProcessBuilder processBuilder) {
        StringBuilder sb = new StringBuilder();
        String str = System.getenv(MLConstants.LD_LIBRARY_PATH);
        String str2 = System.getenv(MLConstants.JAVA_HOME);
        String str3 = System.getenv(MLConstants.HADOOP_HDFS_HOME);
        sb.append(str2 + "/jre/lib/amd64/server/:");
        sb.append(str + ":");
        if (null != str3) {
            sb.append(str3 + "/lib/native/:");
        }
        StringBuilder sb2 = new StringBuilder();
        String str4 = this.mlContext.getProperties().get(MLConstants.WORK_DIR);
        this.mlContext.putEnvProperty(MLConstants.PYTHONPATH_ENV, this.mlContext.getProperties().getOrDefault("ENV:PYTHONPATH", "") + ":" + this.mlContext.getProperties().getOrDefault(MLConstants.CODE_DIR, str4));
        sb.append(str4 + "/tfenv/lib/:");
        for (Map.Entry<String, String> entry : this.mlContext.getProperties().entrySet()) {
            if (entry.getKey().startsWith(MLConstants.ENV_PROPERTY_PREFIX)) {
                String substring = entry.getKey().substring(MLConstants.ENV_PROPERTY_PREFIX.length());
                if (substring.equals(MLConstants.LD_LIBRARY_PATH)) {
                    sb.append(entry.getValue()).append(":");
                } else {
                    LOG.info("set ENV:" + substring + " " + entry.getValue());
                    processBuilder.environment().put(substring, entry.getValue());
                }
            } else if (entry.getKey().equals("SYS:LD_LIBRARY_PATH")) {
                for (String str5 : entry.getValue().split(":")) {
                    sb.append(str4 + "/tfenv/lib/" + str5).append(":");
                }
            } else if (entry.getKey().equals(MLConstants.SYS_PROPERTY_PREFIX + MLConstants.LD_PRELOAD)) {
                for (String str6 : entry.getValue().split(":")) {
                    sb2.append(str4 + "/tfenv/lib/" + str6).append(":");
                }
            }
        }
        if (!sb.toString().isEmpty()) {
            LOG.info("set ENV:LD_LIBRARY_PATH " + sb.toString());
            processBuilder.environment().put(MLConstants.LD_LIBRARY_PATH, sb.toString());
        }
        if (!sb2.toString().isEmpty()) {
            LOG.info("set ENV:" + MLConstants.LD_PRELOAD + " " + sb2.toString());
            processBuilder.environment().put(MLConstants.LD_PRELOAD, sb2.toString());
        }
        String str7 = processBuilder.environment().get(MLConstants.CLASSPATH);
        if (str7 != null) {
            Iterable<String> split = Splitter.on(File.pathSeparator).split(str7);
            ArrayList arrayList = new ArrayList();
            for (String str8 : split) {
                if (!StringUtils.isEmpty(str8) && !str8.contains("log4j.properties")) {
                    arrayList.add(str8);
                }
            }
            processBuilder.environment().put(MLConstants.CLASSPATH, Joiner.on(File.pathSeparator).join(arrayList));
        }
    }

    private synchronized void killProcess() {
        if (this.child == null || !this.child.isAlive()) {
            return;
        }
        LOG.info("Force kill {} process", this.mlContext.getIdentity());
        this.child.destroyForcibly();
        this.child = null;
    }

    @Override // org.flinkextended.flink.ml.cluster.node.runner.AbstractScriptRunner, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        super.close();
        killProcess();
        LOG.info("Python scriptRunner for {} closed", this.mlContext.getIdentity());
    }

    public static String getClassPath() throws IOException {
        return getHadoopClassPath();
    }

    private String findContainingJar(Class cls) throws IOException {
        Preconditions.checkNotNull(cls);
        ClassLoader classLoader = cls.getClassLoader();
        if (classLoader == null) {
            return null;
        }
        Enumeration<URL> resources = classLoader.getResources(cls.getName().replaceAll("\\.", "/") + ".class");
        while (resources.hasMoreElements()) {
            URL nextElement = resources.nextElement();
            String path = nextElement.getPath();
            if (path.startsWith("file:")) {
                path = path.substring("file:".length());
            }
            String decode = URLDecoder.decode(path, "UTF-8");
            if ("jar".equals(nextElement.getProtocol())) {
                return URLDecoder.decode(decode, "UTF-8").replaceAll("!.*$", "");
            }
        }
        return null;
    }

    protected static String getHadoopClassPath() {
        String findHadoopBin = findHadoopBin();
        LOG.info("HADOOP BIN:" + findHadoopBin);
        if (findHadoopBin == null) {
            return null;
        }
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.getClass();
        Preconditions.checkState(ShellExec.run(findHadoopBin + " classpath --glob", (Consumer<String>) stringBuffer::append), "Failed to get hadoop class path");
        return stringBuffer.toString();
    }

    protected static String findHadoopBin() {
        String str = null;
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.getClass();
        if (ShellExec.run("type -p hadoop", (Consumer<String>) stringBuffer::append, true)) {
            str = stringBuffer.toString();
        } else {
            String str2 = System.getenv("HADOOP_HOME");
            if (!StringUtils.isEmpty(str2)) {
                str = Joiner.on(File.separator).join(new String[]{str2, "bin", "hadoop"});
            }
        }
        if (str != null) {
            Preconditions.checkState(new File(str).exists(), str + " doesn't exist");
        }
        return str;
    }

    @Override // org.flinkextended.flink.ml.cluster.node.runner.ScriptRunner
    public void notifyKillSignal() {
        this.toKill.set(true);
    }
}
