package org.flinkextended.flink.ml.util;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import java.io.File;
import java.util.Collections;
import org.flinkextended.flink.ml.util.Docker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/flinkextended/flink/ml/util/MiniYarnCluster.class */
public class MiniYarnCluster {
    private static Logger LOG = LoggerFactory.getLogger(MiniYarnCluster.class);
    public static final String ZK_IMAGE = "zookeeper";
    public static final String ZK_SERVER_NAME = "minizk";
    public static final String HDFS_HOME = "/usr/local/hadoop";
    public static final int HDFS_PORT = 9000;
    private static final String CONTAINER_WORK_HOME = "/opt/work_home/";
    public static final String VENV_PACK = "tfenv.zip";
    public static final String VENV_LOCAL_PATH = "/opt/work_home//temp/test/tfenv.zip";
    public static final String VENV_HDFS_PATH = "/user/root/";
    public static final String YARN_NAME = "hadoop-master";
    public static final String YARN_IMAGE = "flink-yarn:v1";
    public static final String YARN_CMD = "sh -x /etc/bootstrap.sh -d";
    public static final String FLINK_HOME = "/opt/flink";
    public static final String HADOOP_HOME = "/usr/local/hadoop";
    public static final String FLINK_CMD = "/opt/flink/bin/flink";
    public static final String HADOOP_CMD = "/usr/local/hadoop/bin/hadoop";
    private static final int YARN_WEBUI_HOST_PORT = 58088;
    private String execJarPath = "";

    public void setExecJar(String str) {
        this.execJarPath = str;
    }

    private MiniYarnCluster() {
    }

    private static void waitHDFSReady() throws InterruptedException {
        Thread.sleep(10000L);
        Preconditions.checkState(MiniCluster.mayNeedToFixAuthorizedKeys(getYarnContainer()), "Failed to start HDFS");
    }

    private static void waitClusterReady() {
        try {
            waitHDFSReady();
            while (!ShellExec.run(String.format("curl http://localhost:%d", Integer.valueOf(YARN_WEBUI_HOST_PORT)), true)) {
                Thread.sleep(5000L);
            }
        } catch (InterruptedException e) {
            LOG.warn("Interrupted waiting for cluster to get ready", e);
        }
    }

    public static MiniYarnCluster start(boolean z) {
        if (!Docker.imageExist(YARN_IMAGE)) {
            String format = String.format("cd %s && docker build -t %s .", TestUtil.getProjectRootPath() + "/docker/yarn", YARN_IMAGE);
            Logger logger = LOG;
            logger.getClass();
            Preconditions.checkState(ShellExec.run(format, logger::info), "Failed to build image: flink-yarn:v1");
        }
        MiniYarnCluster miniYarnCluster = new MiniYarnCluster();
        try {
            Preconditions.checkState(startZookeeper(), "Failed to start Zookeeper");
            Preconditions.checkState(startYarn(), "Failed to start Yarn cluster");
            waitClusterReady();
            if (z) {
                Preconditions.checkState(miniYarnCluster.buildVirtualEnv(), "Failed to build virtual env");
                Preconditions.checkState(miniYarnCluster.uploadVirtualEnv(), "Failed to upload virtual env");
            }
            return miniYarnCluster;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private boolean copyFromContainerToHDFS(String str, String str2) {
        return Docker.execSilently(getYarnContainer(), "/usr/local/hadoop/bin/hadoop fs -put -f " + str + " " + str2);
    }

    private boolean uploadVirtualEnv() {
        return copyFromContainerToHDFS(VENV_LOCAL_PATH, VENV_HDFS_PATH);
    }

    private boolean buildVirtualEnv() {
        if (new File(TestUtil.getProjectRootPath() + "/temp/test/tfenv.zip").exists()) {
            return true;
        }
        return Docker.exec(getYarnContainer(), String.format("sh %s", "/opt/work_home/docker/flink/create_venv.sh"));
    }

    public void stop() {
        Docker.killAndRemoveContainer(getZKContainer());
        Docker.killAndRemoveContainer(getYarnContainer());
    }

    public String flinkStreamRun(String str, String... strArr) {
        StringBuffer stringBuffer = new StringBuffer();
        Docker.exec(getYarnContainer(), String.format("/opt/flink/bin/flink run -m yarn-cluster -yst -yn 2 -yjm 2048 -ytm 2048 -yD taskmanager.network.memory.max=268435456 -c %s %s %s", str, uberJar(), Joiner.on(" ").join(strArr)), stringBuffer);
        return stringBuffer.toString();
    }

    public static void dumpFlinkLogs(String str, String str2) {
        if (Docker.copyFromContainer(getYarnContainer(), "/usr/local/hadoop/logs/userlogs/" + str, str2)) {
            return;
        }
        LOG.warn("Failed to dump logs for " + getYarnContainer());
    }

    public static String parseApplicationId(String str) {
        for (String str2 : str.split("\n")) {
            int indexOf = str2.indexOf("Submitting application master");
            if (indexOf > 0) {
                return str2.substring(indexOf + "Submitting application master".length() + 1, str2.length());
            }
        }
        return "";
    }

    private static boolean startZookeeper() {
        Docker.ContainerBuilder containerBuilder = new Docker.ContainerBuilder();
        containerBuilder.image(ZK_IMAGE).cmd("").name(getZKContainer()).opts("-d");
        return containerBuilder.build();
    }

    public static boolean startYarn() {
        LOG.info("Starting Yarn...");
        Docker.ContainerBuilder containerBuilder = new Docker.ContainerBuilder();
        containerBuilder.name(getYarnContainer()).cmd(YARN_CMD).image(YARN_IMAGE);
        containerBuilder.linkHosts(getZKContainer());
        containerBuilder.opts(Collections.singletonList("-d"));
        containerBuilder.mapPorts(YARN_WEBUI_HOST_PORT, 8088);
        containerBuilder.volumes(TestUtil.getProjectRootPath(), "/opt/work_home/");
        LOG.info("Starting YARN WebUI at localhost:{}", Integer.valueOf(YARN_WEBUI_HOST_PORT));
        return containerBuilder.build();
    }

    public static boolean flinkRun(String... strArr) {
        return Docker.exec(getYarnContainer(), "/opt/flink/bin/flink run " + Joiner.on(" ").join(strArr));
    }

    public static String getYarnContainer() {
        return toContainerName(YARN_NAME);
    }

    public static String getZKContainer() {
        return toContainerName(ZK_SERVER_NAME);
    }

    public static int getZKPort() {
        return 2181;
    }

    private static String toContainerName(String str) {
        return str;
    }

    private String uberJar() {
        return "/opt/work_home/" + this.execJarPath;
    }

    public static String getVenvHdfsPath() {
        return String.format("hdfs://%s:%d%s", getYarnContainer(), Integer.valueOf(HDFS_PORT), "/user/root/tfenv.zip");
    }
}
