package org.apache.twill.yarn;

import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.Uninterruptibles;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.RunId;
import org.apache.twill.api.ServiceController;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.logging.LogHandler;
import org.apache.twill.internal.AbstractTwillController;
import org.apache.twill.internal.ProcessController;
import org.apache.twill.internal.appmaster.ApplicationMasterLiveNodeData;
import org.apache.twill.internal.appmaster.TrackerService;
import org.apache.twill.internal.state.SystemMessages;
import org.apache.twill.internal.yarn.YarnAppClient;
import org.apache.twill.internal.yarn.YarnApplicationReport;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/twill/yarn/YarnTwillController.class */
public final class YarnTwillController extends AbstractTwillController implements TwillController {
    private static final Logger LOG = LoggerFactory.getLogger(YarnTwillController.class);
    private final String appName;
    private final Callable<ProcessController<YarnApplicationReport>> startUp;
    private final long startTimeout;
    private final TimeUnit startTimeoutUnit;
    private volatile ApplicationMasterLiveNodeData amLiveNodeData;
    private ProcessController<YarnApplicationReport> processController;
    private ApplicationAttemptId currentAttemptId;
    private Thread statusPollingThread;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.twill.yarn.YarnTwillController$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/twill/yarn/YarnTwillController$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hadoop$yarn$api$records$YarnApplicationState = new int[YarnApplicationState.values().length];

        static {
            try {
                $SwitchMap$org$apache$hadoop$yarn$api$records$YarnApplicationState[YarnApplicationState.RUNNING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hadoop$yarn$api$records$YarnApplicationState[YarnApplicationState.FINISHED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$hadoop$yarn$api$records$YarnApplicationState[YarnApplicationState.FAILED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$hadoop$yarn$api$records$YarnApplicationState[YarnApplicationState.KILLED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public YarnTwillController(String str, RunId runId, ZKClient zKClient, ApplicationMasterLiveNodeData applicationMasterLiveNodeData, YarnAppClient yarnAppClient) {
        this(str, runId, zKClient, applicationMasterLiveNodeData.getKafkaZKConnect() != null, Collections.emptyList(), () -> {
            return yarnAppClient.createProcessController(ApplicationId.newInstance(applicationMasterLiveNodeData.getAppIdClusterTime(), applicationMasterLiveNodeData.getAppId()));
        }, 60L, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public YarnTwillController(String str, RunId runId, ZKClient zKClient, boolean z, Iterable<LogHandler> iterable, Callable<ProcessController<YarnApplicationReport>> callable, long j, TimeUnit timeUnit) {
        super(str, runId, zKClient, z, iterable);
        this.appName = str;
        this.startUp = callable;
        this.startTimeout = j;
        this.startTimeoutUnit = timeUnit;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ListenableFuture<Void> secureStoreUpdated() {
        return sendMessage(SystemMessages.SECURE_STORE_UPDATED, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public ApplicationMasterLiveNodeData getApplicationMasterLiveNodeData() {
        return this.amLiveNodeData;
    }

    protected void doStartUp() {
        super.doStartUp();
        try {
            this.processController = this.startUp.call();
            YarnApplicationReport yarnApplicationReport = (YarnApplicationReport) this.processController.getReport();
            ApplicationId applicationId = yarnApplicationReport.getApplicationId();
            LOG.info("Application {} with id {} submitted", this.appName, applicationId);
            YarnApplicationState yarnApplicationState = yarnApplicationReport.getYarnApplicationState();
            Stopwatch start = new Stopwatch().start();
            LOG.debug("Checking yarn application status for {} {}", this.appName, applicationId);
            while (!hasRun(yarnApplicationState) && start.elapsedTime(this.startTimeoutUnit) < this.startTimeout) {
                yarnApplicationReport = (YarnApplicationReport) this.processController.getReport();
                yarnApplicationState = yarnApplicationReport.getYarnApplicationState();
                LOG.debug("Yarn application status for {} {}: {}", new Object[]{this.appName, applicationId, yarnApplicationState});
                TimeUnit.SECONDS.sleep(1L);
            }
            LOG.info("Yarn application {} {} is in state {}", new Object[]{this.appName, applicationId, yarnApplicationState});
            if (yarnApplicationState != YarnApplicationState.RUNNING) {
                LOG.info("Yarn application {} {} is not in running state. Shutting down controller.", this.appName, applicationId);
                forceShutDown();
            }
            this.currentAttemptId = yarnApplicationReport.getCurrentApplicationAttemptId();
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    /* JADX WARN: Finally extract failed */
    protected synchronized void doShutDown() {
        FinalApplicationStatus finalApplicationStatus;
        if (this.processController == null) {
            LOG.warn("No process controller for application that is not submitted.");
            return;
        }
        stopPollStatus();
        long terminationTimeoutMillis = getTerminationTimeoutMillis(60L, TimeUnit.SECONDS);
        try {
            Uninterruptibles.getUninterruptibly(getStopMessageFuture(), terminationTimeoutMillis, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            LOG.error("Failed to wait for stop message being processed.", e);
            kill();
        }
        try {
            ProcessController<YarnApplicationReport> processController = this.processController;
            Throwable th = null;
            try {
                Stopwatch start = new Stopwatch().start();
                YarnApplicationReport yarnApplicationReport = (YarnApplicationReport) processController.getReport();
                finalApplicationStatus = yarnApplicationReport.getFinalApplicationStatus();
                ApplicationId applicationId = yarnApplicationReport.getApplicationId();
                while (finalApplicationStatus == FinalApplicationStatus.UNDEFINED && start.elapsedTime(TimeUnit.MILLISECONDS) < terminationTimeoutMillis) {
                    LOG.debug("Yarn application final status for {} {}: {}", new Object[]{this.appName, applicationId, finalApplicationStatus});
                    TimeUnit.SECONDS.sleep(1L);
                    finalApplicationStatus = ((YarnApplicationReport) processController.getReport()).getFinalApplicationStatus();
                }
                if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
                    kill();
                    finalApplicationStatus = FinalApplicationStatus.KILLED;
                }
                if (processController != null) {
                    if (0 != 0) {
                        try {
                            processController.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        processController.close();
                    }
                }
            } catch (Throwable th3) {
                if (processController != null) {
                    if (0 != 0) {
                        try {
                            processController.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        processController.close();
                    }
                }
                throw th3;
            }
        } catch (Exception e2) {
            LOG.warn("Exception while waiting for application report: {}", e2.getMessage(), e2);
            kill();
            finalApplicationStatus = FinalApplicationStatus.KILLED;
        }
        super.doShutDown();
        if (finalApplicationStatus == FinalApplicationStatus.FAILED) {
            setTerminationStatus(ServiceController.TerminationStatus.FAILED);
            throw new RuntimeException(String.format("Yarn application completed with failure %s, %s.", this.appName, getRunId()));
        }
        setTerminationStatus(finalApplicationStatus == FinalApplicationStatus.SUCCEEDED ? ServiceController.TerminationStatus.SUCCEEDED : ServiceController.TerminationStatus.KILLED);
    }

    public void kill() {
        if (this.processController == null) {
            LOG.warn("No process controller for application that is not submitted.");
            return;
        }
        LOG.info("Killing application {} {}", this.appName, ((YarnApplicationReport) this.processController.getReport()).getApplicationId());
        this.processController.cancel();
    }

    protected void instanceNodeUpdated(NodeData nodeData) {
        ApplicationMasterLiveNodeData decode = ApplicationMasterLiveNodeDecoder.decode(nodeData);
        if (decode != null) {
            this.amLiveNodeData = decode;
        }
    }

    protected void instanceNodeFailed(Throwable th) {
        if (this.processController == null) {
            LOG.warn("No process controller for application that is not submitted.");
            return;
        }
        YarnApplicationReport yarnApplicationReport = (YarnApplicationReport) this.processController.getReport();
        Logger logger = LOG;
        Object[] objArr = new Object[3];
        objArr[0] = this.appName;
        objArr[1] = yarnApplicationReport.getApplicationId();
        objArr[2] = th == null ? "Unknown" : th.getMessage();
        logger.info("Failed to access application {} {} live node in ZK, resort to polling. Failure reason: {}", objArr);
        startPollStatus(yarnApplicationReport.getApplicationId());
    }

    private synchronized void startPollStatus(ApplicationId applicationId) {
        if (this.statusPollingThread == null) {
            this.statusPollingThread = new Thread(createStatusPollingRunnable(), String.format("%s-%s-yarn-poller", this.appName, applicationId));
            this.statusPollingThread.setDaemon(true);
            this.statusPollingThread.start();
        }
    }

    private synchronized void stopPollStatus() {
        if (this.statusPollingThread != null) {
            this.statusPollingThread.interrupt();
            this.statusPollingThread = null;
        }
    }

    private Runnable createStatusPollingRunnable() {
        return new Runnable() { // from class: org.apache.twill.yarn.YarnTwillController.1
            @Override // java.lang.Runnable
            public void run() {
                YarnApplicationReport yarnApplicationReport = (YarnApplicationReport) YarnTwillController.this.processController.getReport();
                ApplicationId applicationId = yarnApplicationReport.getApplicationId();
                boolean z = false;
                boolean z2 = false;
                try {
                    YarnTwillController.LOG.debug("Polling status from Yarn for {} {}.", YarnTwillController.this.appName, applicationId);
                    while (true) {
                        if (Thread.currentThread().isInterrupted()) {
                            break;
                        }
                        if (yarnApplicationReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
                            z = true;
                            break;
                        }
                        ApplicationAttemptId currentApplicationAttemptId = yarnApplicationReport.getCurrentApplicationAttemptId();
                        if (YarnTwillController.this.currentAttemptId.compareTo(currentApplicationAttemptId) != 0) {
                            YarnTwillController.LOG.info("Application attempt ID change from {} to {}", YarnTwillController.this.currentAttemptId, currentApplicationAttemptId);
                            YarnTwillController.this.currentAttemptId = currentApplicationAttemptId;
                            YarnTwillController.this.resetLogHandler();
                        }
                        try {
                        } catch (ExecutionException e) {
                            YarnTwillController.LOG.debug("Failed in exists call on ZK path {}.", YarnTwillController.this.getInstancePath(), e);
                        } catch (TimeoutException e2) {
                            YarnTwillController.LOG.debug("Timeout in exists call on ZK path {}.", YarnTwillController.this.getInstancePath(), e2);
                        }
                        if (((Stat) YarnTwillController.this.zkClient.exists(YarnTwillController.this.getInstancePath()).get(5L, TimeUnit.SECONDS)) != null) {
                            z2 = true;
                            break;
                        } else {
                            TimeUnit.SECONDS.sleep(1L);
                            yarnApplicationReport = (YarnApplicationReport) YarnTwillController.this.processController.getReport();
                        }
                    }
                } catch (InterruptedException e3) {
                    YarnTwillController.LOG.debug("Status polling thread interrupted for application {} {}", YarnTwillController.this.appName, applicationId);
                }
                YarnTwillController.LOG.debug("Stop polling status from Yarn for {} {}.", YarnTwillController.this.appName, applicationId);
                if (z) {
                    YarnTwillController.LOG.info("Yarn application {} {} completed. Shutting down controller.", YarnTwillController.this.appName, applicationId);
                    YarnTwillController.this.forceShutDown();
                } else if (z2) {
                    YarnTwillController.LOG.info("Rewatch instance node for {} {} at {}", new Object[]{YarnTwillController.this.appName, applicationId, YarnTwillController.this.getInstancePath()});
                    synchronized (YarnTwillController.this) {
                        YarnTwillController.this.statusPollingThread = null;
                        YarnTwillController.this.watchInstanceNode();
                    }
                }
            }
        };
    }

    private boolean hasRun(YarnApplicationState yarnApplicationState) {
        switch (AnonymousClass2.$SwitchMap$org$apache$hadoop$yarn$api$records$YarnApplicationState[yarnApplicationState.ordinal()]) {
            case 1:
            case 2:
            case 3:
            case 4:
                return true;
            default:
                return false;
        }
    }

    public ResourceReport getResourceReport() {
        ResourceReportClient resourcesClient;
        if (state() == Service.State.RUNNING && (resourcesClient = getResourcesClient()) != null) {
            return resourcesClient.get();
        }
        return null;
    }

    @Nullable
    private ResourceReportClient getResourcesClient() {
        YarnApplicationReport yarnApplicationReport = (YarnApplicationReport) this.processController.getReport();
        ArrayList arrayList = new ArrayList(2);
        for (String str : Arrays.asList(yarnApplicationReport.getTrackingUrl(), yarnApplicationReport.getOriginalTrackingUrl())) {
            if (str != null && !str.equals("N/A")) {
                try {
                    URL url = new URL(str);
                    String path = url.getPath();
                    if (path.endsWith("/")) {
                        path = path.substring(0, path.length() - 1);
                    }
                    arrayList.add(new URL(url.getProtocol(), url.getHost(), url.getPort(), path + TrackerService.PATH));
                } catch (MalformedURLException e) {
                    LOG.debug("Invalid tracking URL {} from YARN application report for {}:{}", new Object[]{str, this.appName, getRunId()});
                }
            }
        }
        if (arrayList.isEmpty()) {
            return null;
        }
        return new ResourceReportClient(arrayList);
    }
}
