package org.apache.tez.tests;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.EnumSet;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.launcher.TezTestServiceContainerLauncherWithErrors;
import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService;
import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerServiceWithErrors;
import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl;
import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorWithErrors;
import org.apache.tez.examples.JoinValidateConfigured;
import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/tests/TestExternalTezServicesErrors.class */
public class TestExternalTezServicesErrors {
    private static ExternalTezServiceTestHelper extServiceTestHelper;
    private static ServicePluginsDescriptor servicePluginsDescriptor;
    private static final Logger LOG = LoggerFactory.getLogger(TestExternalTezServicesErrors.class);
    private static final Path SRC_DATA_DIR = new Path("/tmp/" + TestExternalTezServicesErrors.class.getSimpleName());
    private static final Path HASH_JOIN_EXPECTED_RESULT_PATH = new Path(SRC_DATA_DIR, "expectedOutputPath");
    private static final Path HASH_JOIN_OUTPUT_PATH = new Path(SRC_DATA_DIR, "outPath");
    private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush";
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_EXT_SERVICE_PUSH = Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
    private static final String EXT_FAIL_ENTITY_NAME = "ExtServiceTestFail";
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_FAIL = Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_FAIL_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_FAIL = Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_FAIL_ENTITY_NAME);
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_FAIL = Vertex.VertexExecutionContext.create(EXT_FAIL_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_DEFAULT = EXECUTION_CONTEXT_EXT_SERVICE_PUSH;
    private static String TEST_ROOT_DIR = "target/" + TestExternalTezServicesErrors.class.getName() + "-tmpDir";

    @BeforeClass
    public static void setup() throws Exception {
        extServiceTestHelper = new ExternalTezServiceTestHelper(TEST_ROOT_DIR);
        UserPayload createUserPayloadFromConf = TezUtils.createUserPayloadFromConf(extServiceTestHelper.getConfForJobs());
        servicePluginsDescriptor = ServicePluginsDescriptor.create(true, true, new TaskSchedulerDescriptor[]{(TaskSchedulerDescriptor) TaskSchedulerDescriptor.create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName()).setUserPayload(createUserPayloadFromConf), (TaskSchedulerDescriptor) TaskSchedulerDescriptor.create(EXT_FAIL_ENTITY_NAME, TezTestServiceTaskSchedulerServiceWithErrors.class.getName()).setUserPayload(createUserPayloadFromConf)}, new ContainerLauncherDescriptor[]{(ContainerLauncherDescriptor) ContainerLauncherDescriptor.create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName()).setUserPayload(createUserPayloadFromConf), (ContainerLauncherDescriptor) ContainerLauncherDescriptor.create(EXT_FAIL_ENTITY_NAME, TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(createUserPayloadFromConf)}, new TaskCommunicatorDescriptor[]{(TaskCommunicatorDescriptor) TaskCommunicatorDescriptor.create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName()).setUserPayload(createUserPayloadFromConf), (TaskCommunicatorDescriptor) TaskCommunicatorDescriptor.create(EXT_FAIL_ENTITY_NAME, TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(createUserPayloadFromConf)});
        extServiceTestHelper.setupSharedTezClient(servicePluginsDescriptor);
        extServiceTestHelper.setupHashJoinData(SRC_DATA_DIR, new Path(SRC_DATA_DIR, "inPath1"), new Path(SRC_DATA_DIR, "inPath2"), HASH_JOIN_EXPECTED_RESULT_PATH, HASH_JOIN_OUTPUT_PATH);
        extServiceTestHelper.shutdownSharedTezClient();
    }

    @AfterClass
    public static void tearDown() throws IOException, TezException {
        extServiceTestHelper.tearDownAll();
    }

    @Test(timeout = 90000)
    public void testContainerLauncherError() throws Exception {
        testServiceError("_testContainerLauncherError_", EXECUTION_CONTEXT_LAUNCHER_FAIL, DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR);
    }

    @Test(timeout = 90000)
    public void testTaskCommunicatorError() throws Exception {
        testServiceError("_testTaskCommunicatorError_", EXECUTION_CONTEXT_TASKCOMM_FAIL, DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR);
    }

    @Test(timeout = 90000)
    public void testTaskSchedulerError() throws Exception {
        testServiceError("_testTaskSchedulerError_", EXECUTION_CONTEXT_SCHEDULER_FAIL, DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR);
    }

    private void testServiceError(String str, Vertex.VertexExecutionContext vertexExecutionContext, DAGAppMasterEventType dAGAppMasterEventType) throws IOException, TezException, InterruptedException, YarnException {
        TezConfiguration tezConfiguration = new TezConfiguration(extServiceTestHelper.getConfForJobs());
        TezClient build = TezClient.newBuilder(TestExternalTezServicesErrors.class.getSimpleName() + str + "_session", tezConfiguration).setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build();
        try {
            build.start();
            LOG.info("TezSessionStarted for " + str);
            build.waitTillReady();
            LOG.info("TezSession ready for submission for " + str);
            DAGStatus waitForCompletionWithStatusUpdates = build.submitDAG(new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, vertexExecutionContext, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, "LauncherFailTest").createDag(new TezConfiguration(extServiceTestHelper.getConfForJobs()), HASH_JOIN_EXPECTED_RESULT_PATH, HASH_JOIN_OUTPUT_PATH, 3)).waitForCompletionWithStatusUpdates(Sets.newHashSet(new StatusGetOpts[]{StatusGetOpts.GET_COUNTERS}));
            Assert.assertEquals(DAGStatus.State.ERROR, waitForCompletionWithStatusUpdates.getState());
            boolean z = false;
            for (String str2 : waitForCompletionWithStatusUpdates.getDiagnostics()) {
                if (str2.contains("Service Error") && str2.contains(dAGAppMasterEventType.toString()) && str2.contains("Simulated Error")) {
                    z = true;
                }
            }
            ApplicationId appMasterApplicationId = build.getAppMasterApplicationId();
            Assert.assertTrue(z);
            build.stop();
            if (appMasterApplicationId != null) {
                YarnClient createYarnClient = YarnClient.createYarnClient();
                try {
                    createYarnClient.init(tezConfiguration);
                    createYarnClient.start();
                    ApplicationReport applicationReport = createYarnClient.getApplicationReport(appMasterApplicationId);
                    YarnApplicationState yarnApplicationState = applicationReport.getYarnApplicationState();
                    while (!EnumSet.of(YarnApplicationState.FINISHED, YarnApplicationState.FAILED, YarnApplicationState.KILLED).contains(yarnApplicationState)) {
                        Thread.sleep(200L);
                        applicationReport = createYarnClient.getApplicationReport(appMasterApplicationId);
                        yarnApplicationState = applicationReport.getYarnApplicationState();
                    }
                    String diagnostics = createYarnClient.getApplicationAttemptReport(applicationReport.getCurrentApplicationAttemptId()).getDiagnostics();
                    Assert.assertEquals(FinalApplicationStatus.FAILED, applicationReport.getFinalApplicationStatus());
                    Assert.assertEquals(YarnApplicationState.FINISHED, applicationReport.getYarnApplicationState());
                    Assert.assertTrue(diagnostics.contains("Service Error") && diagnostics.contains(dAGAppMasterEventType.toString()) && diagnostics.contains("Simulated Error"));
                    createYarnClient.stop();
                } catch (Throwable th) {
                    createYarnClient.stop();
                    throw th;
                }
            }
        } catch (Throwable th2) {
            build.stop();
            throw th2;
        }
    }
}
