package org.apache.linkis.manager.engineplugin.shell.executor;

import java.io.BufferedReader;
import java.io.File;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.apache.linkis.common.utils.Utils;
import org.apache.linkis.engineconn.computation.executor.execute.ComputationExecutor;
import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
import org.apache.linkis.engineconn.core.EngineConnObject;
import org.apache.linkis.governance.common.utils.GovernanceUtils;
import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
import org.apache.linkis.manager.common.entity.resource.NodeResource;
import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
import org.apache.linkis.manager.engineplugin.shell.common.ShellEngineConnPluginConst;
import org.apache.linkis.manager.engineplugin.shell.conf.ShellEngineConnConf;
import org.apache.linkis.manager.engineplugin.shell.exception.ShellCodeErrorException;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.protocol.engine.JobProgressInfo;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
import org.apache.linkis.scheduler.executer.ExecuteResponse;
import org.apache.linkis.scheduler.executer.SuccessExecuteResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContextExecutorService;

/* loaded from: input_file:org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.class */
public class ShellEngineConnExecutor extends ComputationExecutor {
    private static final Logger logger = LoggerFactory.getLogger(ShellEngineConnExecutor.class);
    private EngineExecutionContext engineExecutionContext;
    private List<Label<?>> executorLabels;
    Map<String, ShellECTaskInfo> shellECTaskInfoCache;
    private int id;
    private Process process;
    private YarnAppIdExtractor extractor;
    final ExecutionContextExecutorService logAsyncService;

    public ShellEngineConnExecutor(int i) {
        super(ShellEngineConnConf.SHELL_ENGINECONN_OUTPUT_PRINT_LIMIT);
        this.executorLabels = new ArrayList();
        this.shellECTaskInfoCache = new ConcurrentHashMap();
        this.logAsyncService = Utils.newCachedExecutionContext(ShellEngineConnConf.LOG_SERVICE_MAX_THREAD_SIZE, "ShelLogService-Thread-", true);
        this.id = i;
    }

    public void init() {
        logger.info("Ready to change engine state!");
        super.init();
    }

    public ExecuteResponse executeCompletely(EngineExecutionContext engineExecutionContext, String str, String str2) {
        String str3 = str2 + str;
        logger.debug("newcode is " + str3);
        return executeLine(engineExecutionContext, str3);
    }

    public ExecuteResponse executeLine(EngineExecutionContext engineExecutionContext, String str) {
        String[] strArr;
        String str2;
        if (engineExecutionContext != null) {
            this.engineExecutionContext = engineExecutionContext;
            logger.info("Shell executor reset new engineExecutionContext!");
        }
        if (engineExecutionContext.getJobId().isEmpty()) {
            return new ErrorExecuteResponse("taskID is null", (Throwable) null);
        }
        String str3 = (String) engineExecutionContext.getJobId().get();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        ReaderThread readerThread = null;
        ReaderThread readerThread2 = null;
        try {
            try {
                engineExecutionContext.appendStdout(getId() + " >> " + str.trim());
                if (engineExecutionContext.getTotalParagraph() == 1 && engineExecutionContext.getProperties() != null && engineExecutionContext.getProperties().containsKey(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY)) {
                    ArrayList arrayList = (ArrayList) engineExecutionContext.getProperties().get(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY);
                    strArr = (String[]) arrayList.toArray(new String[arrayList.size()]);
                    logger.info("Will execute shell task with user-specified arguments: '{}'", StringUtils.join(strArr, "' '"));
                } else {
                    strArr = null;
                }
                if (engineExecutionContext.getTotalParagraph() == 1 && engineExecutionContext.getProperties() != null && engineExecutionContext.getProperties().containsKey(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY)) {
                    String str4 = (String) engineExecutionContext.getProperties().get(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY);
                    if (isExecutePathExist(str4)) {
                        logger.info("Will execute shell task under user-specified working-directory: '{}'", str4);
                        str2 = str4;
                    } else {
                        logger.warn("User-specified working-directory: '{}' does not exist or user does not have access permission. Will execute shell task under default working-directory. Please contact the administrator!", str4);
                        str2 = null;
                    }
                } else {
                    str2 = null;
                }
                ProcessBuilder processBuilder = new ProcessBuilder((strArr == null || strArr.length == 0) ? generateRunCode(str) : generateRunCodeWithArgs(str, strArr));
                if (StringUtils.isNotBlank(str2)) {
                    processBuilder.directory(new File(str2));
                }
                processBuilder.redirectErrorStream(false);
                this.extractor = new YarnAppIdExtractor();
                this.process = processBuilder.start();
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(this.process.getInputStream()));
                BufferedReader bufferedReader2 = new BufferedReader(new InputStreamReader(this.process.getErrorStream()));
                this.shellECTaskInfoCache.put(str3, new ShellECTaskInfo(str3, this.process, this.extractor));
                CountDownLatch countDownLatch = new CountDownLatch(2);
                ReaderThread readerThread3 = new ReaderThread(engineExecutionContext, bufferedReader, this.extractor, true, countDownLatch);
                ReaderThread readerThread4 = new ReaderThread(engineExecutionContext, bufferedReader2, this.extractor, false, countDownLatch);
                this.logAsyncService.execute(readerThread3);
                this.logAsyncService.execute(readerThread4);
                int waitFor = this.process.waitFor();
                countDownLatch.await();
                atomicBoolean.set(true);
                if (waitFor != 0) {
                    ErrorExecuteResponse errorExecuteResponse = new ErrorExecuteResponse("run shell failed", new ShellCodeErrorException());
                    if (bufferedReader2 != null) {
                        readerThread4.onDestroy();
                    }
                    if (readerThread3 != null) {
                        readerThread3.onDestroy();
                    }
                    this.shellECTaskInfoCache.remove(str3);
                    return errorExecuteResponse;
                }
                SuccessExecuteResponse successExecuteResponse = new SuccessExecuteResponse();
                if (bufferedReader2 != null) {
                    readerThread4.onDestroy();
                }
                if (readerThread3 != null) {
                    readerThread3.onDestroy();
                }
                this.shellECTaskInfoCache.remove(str3);
                return successExecuteResponse;
            } catch (Exception e) {
                logger.error("Execute shell code failed, reason:", e);
                ErrorExecuteResponse errorExecuteResponse2 = new ErrorExecuteResponse("run shell failed", e);
                if (0 != 0) {
                    readerThread.onDestroy();
                }
                if (0 != 0) {
                    readerThread2.onDestroy();
                }
                this.shellECTaskInfoCache.remove(str3);
                return errorExecuteResponse2;
            }
        } catch (Throwable th) {
            if (0 != 0) {
                readerThread.onDestroy();
            }
            if (0 != 0) {
                readerThread2.onDestroy();
            }
            this.shellECTaskInfoCache.remove(str3);
            throw th;
        }
    }

    private boolean isExecutePathExist(String str) {
        File file = new File(str);
        return file.exists() && file.isDirectory();
    }

    private String[] generateRunCode(String str) {
        return new String[]{"sh", "-c", str};
    }

    private String[] generateRunCodeWithArgs(String str, String[] strArr) {
        return new String[]{"sh", "-c", "echo \"dummy " + StringUtils.join(strArr, " ") + "\" | xargs sh -c '" + str + "'"};
    }

    public String getId() {
        return Sender.getThisServiceInstance().getInstance() + "_" + this.id;
    }

    public JobProgressInfo[] getProgressInfo(String str) {
        ArrayList arrayList = new ArrayList();
        if (this.engineExecutionContext == null) {
            return (JobProgressInfo[]) arrayList.toArray(new JobProgressInfo[0]);
        }
        String str2 = this.engineExecutionContext.getJobId().isDefined() ? (String) this.engineExecutionContext.getJobId().get() : "";
        if (progress(str) == 0.0f) {
            arrayList.add(new JobProgressInfo(str2, 1, 1, 0, 0));
        } else {
            arrayList.add(new JobProgressInfo(str2, 1, 0, 0, 1));
        }
        return (JobProgressInfo[]) arrayList.toArray(new JobProgressInfo[0]);
    }

    public float progress(String str) {
        if (this.engineExecutionContext != null) {
            return this.engineExecutionContext.getCurrentParagraph() / this.engineExecutionContext.getTotalParagraph();
        }
        return 0.0f;
    }

    public boolean supportCallBackLogs() {
        return true;
    }

    public NodeResource requestExpectedResource(NodeResource nodeResource) {
        return null;
    }

    public NodeResource getCurrentNodeResource() {
        CommonNodeResource commonNodeResource = new CommonNodeResource();
        commonNodeResource.setUsedResource(NodeResourceUtils.applyAsLoadInstanceResource(EngineConnObject.getEngineCreationContext().getOptions()));
        return commonNodeResource;
    }

    public List<Label<?>> getExecutorLabels() {
        return this.executorLabels;
    }

    public void setExecutorLabels(List<Label<?>> list) {
        if (list != null) {
            this.executorLabels.clear();
            this.executorLabels.addAll(list);
        }
    }

    public void killTask(String str) {
        GovernanceUtils.killProcess(String.valueOf(getPid(this.process)), "kill task " + str + " process", false);
        GovernanceUtils.killYarnJobApp(this.extractor.getExtractedYarnAppIds());
        logger.info("Finished kill yarn app ids in the engine of ({})", getId());
        super.killTask(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getPid(Process process) {
        try {
            Field declaredField = Class.forName("java.lang.UNIXProcess").getDeclaredField("pid");
            declaredField.setAccessible(true);
            return declaredField.getInt(process);
        } catch (Exception e) {
            logger.warn("Failed to acquire pid for shell process");
            return -1;
        }
    }

    public void close() {
        try {
            this.process.destroy();
        } catch (Exception e) {
            logger.error("kill process " + this.process.toString() + " failed ", e);
        } catch (Throwable th) {
            logger.error("kill process " + this.process.toString() + " failed ", th);
        }
        super.close();
    }
}
