package org.flinkextended.flink.ml.cluster.rpc;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.flinkextended.flink.ml.cluster.node.MLContext;
import org.flinkextended.flink.ml.cluster.node.runner.ExecutionStatus;
import org.flinkextended.flink.ml.cluster.node.runner.MLRunner;
import org.flinkextended.flink.ml.cluster.node.runner.MLRunnerFactory;
import org.flinkextended.flink.ml.util.FileUtil;
import org.flinkextended.flink.ml.util.MLConstants;
import org.flinkextended.flink.ml.util.MLException;
import org.flinkextended.flink.ml.util.PythonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/flinkextended/flink/ml/cluster/rpc/NodeServer.class */
public class NodeServer implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(NodeServer.class);
    private Server server;
    private String jobName;
    private volatile MLContext mlContext;
    private MLRunner runner;
    protected final ExecutorService runnerService;
    private AMCommand amCommand = AMCommand.NOPE;
    private long idleStart = Long.MAX_VALUE;
    private final long idleTimeout;

    /* loaded from: input_file:org/flinkextended/flink/ml/cluster/rpc/NodeServer$AMCommand.class */
    public enum AMCommand {
        NOPE,
        STOP,
        RESTART
    }

    public NodeServer(MLContext mLContext, String str) {
        this.mlContext = mLContext;
        this.jobName = str;
        this.idleTimeout = Long.valueOf(mLContext.getProperties().getOrDefault(MLConstants.NODE_IDLE_TIMEOUT, "600000")).longValue();
        this.runnerService = Executors.newFixedThreadPool(1, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName("runner_" + mLContext.getIdentity());
            return thread;
        });
    }

    public Integer getPort() {
        if (this.server == null) {
            return null;
        }
        return Integer.valueOf(this.server.getPort());
    }

    public static synchronized void prepareStartupScript(MLContext mLContext) {
        String absolutePath = mLContext.getWorkDir().getAbsolutePath();
        LOG.info("work dir:" + absolutePath);
        File file = new File(absolutePath + "/" + MLConstants.STARTUP_SCRIPT);
        if (file.exists()) {
            file.delete();
        }
        if (!file.exists()) {
            LOG.info("create startup.py");
            try {
                URL resource = NodeServer.class.getClassLoader().getResource(MLConstants.STARTUP_SCRIPT);
                Preconditions.checkNotNull(resource, "Cannot find startup.py in classpath");
                File file2 = new File(absolutePath + "/tmp_" + MLConstants.STARTUP_SCRIPT);
                FileUtils.copyURLToFile(resource, file2);
                file2.renameTo(file);
            } catch (IOException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        }
        mLContext.getProperties().put(MLConstants.STARTUP_SCRIPT_FILE, file.getAbsolutePath());
    }

    public static synchronized void prepareRuntimeEnv(MLContext mLContext) {
        prepareStartupScript(mLContext);
        String absolutePath = mLContext.getWorkDir().getAbsolutePath();
        try {
            PythonUtil.setupVirtualEnv(mLContext);
            if (mLContext.useDistributeCache()) {
                LOG.info("use distribute cache");
                return;
            }
            LOG.info("use user code.zip");
            String orDefault = mLContext.getProperties().getOrDefault(MLConstants.REMOTE_CODE_ZIP_FILE, "");
            LOG.info("code file:" + orDefault);
            File file = new File(absolutePath + "/code");
            LOG.info("target dir:" + file.getAbsolutePath());
            if (!orDefault.isEmpty()) {
                String parseFileName = FileUtil.parseFileName(orDefault);
                LOG.info("codeFileName:" + parseFileName);
                String fileNameWithoutExtension = FileUtil.getFileNameWithoutExtension(parseFileName);
                if (mLContext.getProperties().containsKey(MLConstants.CODE_DIR_NAME)) {
                    fileNameWithoutExtension = mLContext.getProperties().get(MLConstants.CODE_DIR_NAME);
                }
                LOG.info("codeDirName:" + fileNameWithoutExtension);
                file = new File(absolutePath + "/" + fileNameWithoutExtension);
                mLContext.getProperties().put(MLConstants.CODE_DIR, file.getAbsolutePath());
                if (file.exists()) {
                    LOG.info("target dir already exists!");
                } else {
                    LOG.info("real targetDir:" + file.getAbsolutePath());
                    synchronized (PythonUtil.class) {
                        if (file.exists()) {
                            LOG.info("target dir exists!");
                        } else {
                            LOG.info("download file to local:" + orDefault);
                            try {
                                FileUtil.downLoadZipToLocal(absolutePath, orDefault, fileNameWithoutExtension);
                            } catch (IOException e) {
                                LOG.error("Fail to download zip {} to local {}", orDefault, absolutePath);
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            }
            if (mLContext.getProperties().containsKey(MLConstants.PYTHON_SCRIPT_DIR)) {
                mLContext.setPythonDir(new File(absolutePath + "/" + mLContext.getProperties().get(MLConstants.PYTHON_SCRIPT_DIR)).toPath());
            } else {
                mLContext.setPythonDir(file.toPath());
            }
            mLContext.setPythonFiles(new String[]{mLContext.getProperties().get(MLConstants.USER_ENTRY_PYTHON_FILE)});
            if (mLContext.startWithStartup()) {
                LOG.info("Running {} via {}", mLContext.getScript().getName(), mLContext.getProperties().get(MLConstants.STARTUP_SCRIPT_FILE));
            } else {
                LOG.info("Running {} ", mLContext.getScript().getAbsolutePath());
            }
        } catch (IOException e2) {
            e2.printStackTrace();
            throw new RuntimeException(e2);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (this.runner != null) {
                LOG.warn("*** shutting down gRPC server since JVM is shutting down");
                cleanup(null);
            }
        }));
        prepareRuntimeEnv(this.mlContext);
        Future future = null;
        try {
            try {
                try {
                    this.server = ServerBuilder.forPort(0).addService(new NodeServiceImpl(this, this.mlContext)).build();
                    this.server.start();
                    LOG.info("node (" + getDisplayName() + ") server started, listening on " + this.server.getPort());
                    future = startMLRunner();
                    boolean z = false;
                    while (!z && this.runner.getResultStatus() != ExecutionStatus.SUCCEED) {
                        if (future.isDone()) {
                            if (this.idleStart == Long.MAX_VALUE) {
                                this.idleStart = System.currentTimeMillis();
                            }
                            long currentTimeMillis = System.currentTimeMillis() - this.idleStart;
                            if (currentTimeMillis > this.idleTimeout) {
                                throw new MLException(String.format("%s has been idle for %d seconds", this.mlContext.getIdentity(), Long.valueOf(currentTimeMillis / 1000)));
                            }
                            Thread.sleep(1000L);
                        } else {
                            this.idleStart = Long.MAX_VALUE;
                            this.runnerService.awaitTermination(10L, TimeUnit.SECONDS);
                        }
                        switch (getAmCommand()) {
                            case STOP:
                                stopMLRunner(future);
                                setAmCommand(AMCommand.NOPE);
                                z = true;
                                break;
                            case RESTART:
                                stopMLRunner(future);
                                future = startMLRunner();
                                setAmCommand(AMCommand.NOPE);
                                break;
                        }
                    }
                    cleanup(future);
                } catch (Exception e) {
                    LOG.error("Error to run node service {}.", e.getMessage());
                    throw new RuntimeException(e);
                }
            } catch (InterruptedException e2) {
                LOG.error(this.mlContext.getIdentity() + " node server interrupted");
                cleanup(future);
            }
        } catch (Throwable th) {
            cleanup(future);
            throw th;
        }
    }

    public String getDisplayName() {
        return this.jobName + ":" + this.mlContext.getIndex();
    }

    private Future startMLRunner() throws Exception {
        LOG.info("begin start node:" + this.mlContext.getIdentity());
        this.runner = MLRunnerFactory.createMLRunner(this.mlContext, this);
        Future<?> submit = this.runnerService.submit(this.runner);
        LOG.info("end start node:" + this.mlContext.getIdentity());
        return submit;
    }

    private void stopMLRunner(Future future) {
        if (null == this.runnerService || this.runnerService.isShutdown()) {
            return;
        }
        LOG.info("begin stop node:" + this.mlContext.getIdentity());
        try {
            this.runner.notifyStop();
            if (null != future) {
                future.cancel(true);
            }
            this.runnerService.awaitTermination(5L, TimeUnit.SECONDS);
        } catch (Exception e) {
            LOG.warn("Interrupted waiting for scriptRunner thread to finish, dumping its stack trace:" + e.getMessage());
        }
        LOG.info("end stop node:" + this.mlContext.getIdentity());
    }

    private synchronized void cleanup(Future future) {
        LOG.info("{} run cleanup!", this.mlContext.getIdentity());
        stopMLRunner(future);
        this.runnerService.shutdownNow();
        try {
            this.runnerService.awaitTermination(5L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
            LOG.warn("runner service thread poll shutdown interrupted:" + e.getMessage());
        }
        if (this.server != null) {
            LOG.info(getDisplayName() + " shut down");
            this.server.shutdownNow();
            try {
                if (!this.server.awaitTermination(2L, TimeUnit.MINUTES)) {
                    LOG.warn("{} timed out waiting for GRPC server to terminate");
                }
            } catch (InterruptedException e2) {
                e2.printStackTrace();
                LOG.info("{} interrupted shutting down GRPC server", this.mlContext.getIdentity());
            }
            this.server = null;
        }
        if (this.mlContext != null && this.mlContext.getInputQueue() != null) {
            LOG.info("{} mark input queue finished.", this.mlContext.getIdentity());
            this.mlContext.getInputQueue().markFinished();
        }
        this.runner = null;
    }

    public synchronized void setAmCommand(AMCommand aMCommand) {
        this.amCommand = aMCommand;
    }

    public synchronized AMCommand getAmCommand() {
        return this.amCommand;
    }

    @VisibleForTesting
    MLRunner getRunner() {
        return this.runner;
    }
}
