package org.apache.tez.dag.api.client.rpc;

import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.client.FrameworkClient;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGClientImpl;
import org.apache.tez.dag.api.client.DAGClientTimelineImpl;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.DagStatusSource;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.TimelineReaderFactory;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC;
import org.apache.tez.dag.api.records.DAGProtos;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.mockito.internal.util.collections.Sets;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/tez/dag/api/client/rpc/TestDAGClient.class */
public class TestDAGClient {
    private DAGClient dagClient;
    private ApplicationId mockAppId;
    private ApplicationReport mockAppReport;
    private String dagIdStr;
    private DAGClientAMProtocolBlockingPB mockProxy;
    private DAGProtos.VertexStatusProto vertexStatusProtoWithoutCounters;
    private DAGProtos.VertexStatusProto vertexStatusProtoWithCounters;
    private DAGProtos.DAGStatusProto dagStatusProtoWithoutCounters;
    private DAGProtos.DAGStatusProto dagStatusProtoWithCounters;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/api/client/rpc/TestDAGClient$DAGClientImplForTest.class */
    public static class DAGClientImplForTest extends DAGClientImpl {
        private DAGStatus rmDagStatus;
        int numGetStatusViaRmInvocations;
        private volatile boolean faultInjected;

        public DAGClientImplForTest(ApplicationId applicationId, String str, TezConfiguration tezConfiguration, @Nullable FrameworkClient frameworkClient) throws IOException {
            super(applicationId, str, tezConfiguration, frameworkClient, UserGroupInformation.getCurrentUser());
            this.numGetStatusViaRmInvocations = 0;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setRealClient(DAGClientRPCImplForTest dAGClientRPCImplForTest) {
            this.realClient = dAGClientRPCImplForTest;
        }

        void setRmDagStatus(DAGStatus dAGStatus) {
            this.rmDagStatus = dAGStatus;
        }

        void resetCounters() {
            this.numGetStatusViaRmInvocations = 0;
        }

        protected DAGStatus getDAGStatusViaRM() throws TezException, IOException {
            this.numGetStatusViaRmInvocations++;
            if (this.faultInjected) {
                throw new IOException("Fault Injected for RM");
            }
            return this.rmDagStatus;
        }

        public boolean getIsATSEnabled() {
            return this.isATSEnabled;
        }

        void injectFault() {
            this.faultInjected = true;
        }

        DAGStatus getCachedDAGStatus() {
            return (DAGStatus) getCachedDAGStatusRef().getValue();
        }

        void enforceExpirationCachedDAGStatus() {
            getCachedDAGStatusRef().enforceExpiration();
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/api/client/rpc/TestDAGClient$DAGClientRPCImplForTest.class */
    private static class DAGClientRPCImplForTest extends DAGClientRPCImpl {
        private AtomicReference<IOException> faultAMInjectedRef;
        int numGetStatusViaAmInvocations;

        public DAGClientRPCImplForTest(ApplicationId applicationId, String str, TezConfiguration tezConfiguration, @Nullable FrameworkClient frameworkClient) throws IOException {
            super(applicationId, str, tezConfiguration, frameworkClient, UserGroupInformation.getCurrentUser());
            this.numGetStatusViaAmInvocations = 0;
            this.faultAMInjectedRef = new AtomicReference<>(null);
        }

        void setAMProxy(DAGClientAMProtocolBlockingPB dAGClientAMProtocolBlockingPB) {
            this.proxy = dAGClientAMProtocolBlockingPB;
        }

        void resetCounters() {
            this.numGetStatusViaAmInvocations = 0;
        }

        boolean createAMProxyIfNeeded() throws IOException, TezException {
            return this.proxy != null;
        }

        DAGStatus getDAGStatusViaAM(Set<StatusGetOpts> set, long j) throws IOException, TezException {
            this.numGetStatusViaAmInvocations++;
            if (this.faultAMInjectedRef.get() != null) {
                throw this.faultAMInjectedRef.get();
            }
            return super.getDAGStatusViaAM(set, j);
        }

        void injectAMFault(IOException iOException) {
            this.faultAMInjectedRef.set(iOException);
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/api/client/rpc/TestDAGClient$DAGCounterRequestMatcher.class */
    private static class DAGCounterRequestMatcher implements ArgumentMatcher<DAGClientAMProtocolRPC.GetDAGStatusRequestProto> {
        private DAGCounterRequestMatcher() {
        }

        public boolean matches(DAGClientAMProtocolRPC.GetDAGStatusRequestProto getDAGStatusRequestProto) {
            return (getDAGStatusRequestProto == null || getDAGStatusRequestProto.getStatusOptionsCount() == 0 || getDAGStatusRequestProto.getStatusOptionsList().get(0) != DAGProtos.StatusGetOptsProto.GET_COUNTERS) ? false : true;
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/api/client/rpc/TestDAGClient$VertexCounterRequestMatcher.class */
    private static class VertexCounterRequestMatcher implements ArgumentMatcher<DAGClientAMProtocolRPC.GetVertexStatusRequestProto> {
        private VertexCounterRequestMatcher() {
        }

        public boolean matches(DAGClientAMProtocolRPC.GetVertexStatusRequestProto getVertexStatusRequestProto) {
            return (getVertexStatusRequestProto == null || getVertexStatusRequestProto.getStatusOptionsCount() == 0 || getVertexStatusRequestProto.getStatusOptionsList().get(0) != DAGProtos.StatusGetOptsProto.GET_COUNTERS) ? false : true;
        }
    }

    private void setUpData() {
        DAGProtos.ProgressProto build = DAGProtos.ProgressProto.newBuilder().setFailedTaskCount(1).setKilledTaskCount(1).setRunningTaskCount(2).setSucceededTaskCount(2).setTotalTaskCount(6).build();
        DAGProtos.TezCountersProto build2 = DAGProtos.TezCountersProto.newBuilder().addCounterGroups(DAGProtos.TezCounterGroupProto.newBuilder().setName("DAGGroup").addCounters(DAGProtos.TezCounterProto.newBuilder().setDisplayName("dag_counter_1").setValue(99L))).build();
        this.dagStatusProtoWithoutCounters = DAGProtos.DAGStatusProto.newBuilder().addDiagnostics("Diagnostics_0").setState(DAGProtos.DAGStatusStateProto.DAG_RUNNING).setDAGProgress(build).addVertexProgress(DAGProtos.StringProgressPairProto.newBuilder().setKey("v1").setProgress(DAGProtos.ProgressProto.newBuilder().setFailedTaskCount(0).setSucceededTaskCount(0).setKilledTaskCount(0))).addVertexProgress(DAGProtos.StringProgressPairProto.newBuilder().setKey("v2").setProgress(DAGProtos.ProgressProto.newBuilder().setFailedTaskCount(1).setSucceededTaskCount(1).setKilledTaskCount(1))).build();
        this.dagStatusProtoWithCounters = DAGProtos.DAGStatusProto.newBuilder(this.dagStatusProtoWithoutCounters).setDagCounters(build2).build();
        DAGProtos.ProgressProto build3 = DAGProtos.ProgressProto.newBuilder().setFailedTaskCount(1).setKilledTaskCount(0).setRunningTaskCount(0).setSucceededTaskCount(1).build();
        DAGProtos.TezCountersProto build4 = DAGProtos.TezCountersProto.newBuilder().addCounterGroups(DAGProtos.TezCounterGroupProto.newBuilder().addCounters(DAGProtos.TezCounterProto.newBuilder().setDisplayName("vertex_counter_1").setValue(99L))).build();
        this.vertexStatusProtoWithoutCounters = DAGProtos.VertexStatusProto.newBuilder().setId("vertex_1").addDiagnostics("V_Diagnostics_0").setProgress(build3).setState(DAGProtos.VertexStatusStateProto.VERTEX_SUCCEEDED).build();
        this.vertexStatusProtoWithCounters = DAGProtos.VertexStatusProto.newBuilder(this.vertexStatusProtoWithoutCounters).setVertexCounters(build4).build();
    }

    @Before
    public void setUp() throws YarnException, IOException, TezException, ServiceException {
        setUpData();
        this.mockAppId = (ApplicationId) Mockito.mock(ApplicationId.class);
        this.mockAppReport = (ApplicationReport) Mockito.mock(ApplicationReport.class);
        this.dagIdStr = "dag_9999_0001_1";
        this.mockProxy = (DAGClientAMProtocolBlockingPB) Mockito.mock(DAGClientAMProtocolBlockingPB.class);
        Mockito.when(this.mockProxy.getDAGStatus((RpcController) Mockito.isNull(), (DAGClientAMProtocolRPC.GetDAGStatusRequestProto) Mockito.any())).thenReturn(DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(this.dagStatusProtoWithoutCounters).build());
        Mockito.when(this.mockProxy.getDAGStatus((RpcController) Mockito.isNull(), (DAGClientAMProtocolRPC.GetDAGStatusRequestProto) Mockito.argThat(new DAGCounterRequestMatcher()))).thenReturn(DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(this.dagStatusProtoWithCounters).build());
        Mockito.when(this.mockProxy.getVertexStatus((RpcController) Mockito.isNull(), (DAGClientAMProtocolRPC.GetVertexStatusRequestProto) Mockito.any())).thenReturn(DAGClientAMProtocolRPC.GetVertexStatusResponseProto.newBuilder().setVertexStatus(this.vertexStatusProtoWithoutCounters).build());
        Mockito.when(this.mockProxy.getVertexStatus((RpcController) Mockito.isNull(), (DAGClientAMProtocolRPC.GetVertexStatusRequestProto) Mockito.argThat(new VertexCounterRequestMatcher()))).thenReturn(DAGClientAMProtocolRPC.GetVertexStatusResponseProto.newBuilder().setVertexStatus(this.vertexStatusProtoWithCounters).build());
        this.dagClient = new DAGClientImpl(this.mockAppId, this.dagIdStr, new TezConfiguration(), (FrameworkClient) null, UserGroupInformation.getCurrentUser());
        DAGClientRPCImpl realClient = this.dagClient.getRealClient();
        realClient.appReport = this.mockAppReport;
        realClient.proxy = this.mockProxy;
    }

    @Test(timeout = 5000)
    public void testApp() throws IOException, TezException, ServiceException {
        Assert.assertTrue(this.dagClient.getExecutionContext().contains(this.mockAppId.toString()));
        Assert.assertEquals(this.mockAppId.toString(), this.dagClient.getSessionIdentifierString());
        Assert.assertEquals(this.dagIdStr, this.dagClient.getDagIdentifierString());
        Assert.assertEquals(this.mockAppReport, this.dagClient.getRealClient().getApplicationReportInternal());
    }

    @Test(timeout = 5000)
    public void testDAGStatus() throws Exception {
        DAGStatus dAGStatus = this.dagClient.getDAGStatus((Set) null);
        ((DAGClientAMProtocolBlockingPB) Mockito.verify(this.mockProxy, Mockito.times(1))).getDAGStatus((RpcController) null, DAGClientAMProtocolRPC.GetDAGStatusRequestProto.newBuilder().setDagId(this.dagIdStr).setTimeout(0L).build());
        Assert.assertEquals(new DAGStatus(this.dagStatusProtoWithoutCounters, DagStatusSource.AM), dAGStatus);
        System.out.println("DAGStatusWithoutCounter:" + dAGStatus);
        DAGStatus dAGStatus2 = this.dagClient.getDAGStatus(Sets.newSet(new StatusGetOpts[]{StatusGetOpts.GET_COUNTERS}));
        ((DAGClientAMProtocolBlockingPB) Mockito.verify(this.mockProxy, Mockito.times(1))).getDAGStatus((RpcController) null, DAGClientAMProtocolRPC.GetDAGStatusRequestProto.newBuilder().setDagId(this.dagIdStr).setTimeout(0L).addStatusOptions(DAGProtos.StatusGetOptsProto.GET_COUNTERS).build());
        Assert.assertEquals(new DAGStatus(this.dagStatusProtoWithCounters, DagStatusSource.AM), dAGStatus2);
        System.out.println("DAGStatusWithCounter:" + dAGStatus2);
    }

    @Test(timeout = 5000)
    public void testVertexStatus() throws Exception {
        VertexStatus vertexStatus = this.dagClient.getVertexStatus("v1", (Set) null);
        ((DAGClientAMProtocolBlockingPB) Mockito.verify(this.mockProxy)).getVertexStatus((RpcController) null, DAGClientAMProtocolRPC.GetVertexStatusRequestProto.newBuilder().setDagId(this.dagIdStr).setVertexName("v1").build());
        Assert.assertEquals(new VertexStatus(this.vertexStatusProtoWithoutCounters), vertexStatus);
        System.out.println("VertexWithoutCounter:" + vertexStatus);
        VertexStatus vertexStatus2 = this.dagClient.getVertexStatus("v1", Sets.newSet(new StatusGetOpts[]{StatusGetOpts.GET_COUNTERS}));
        ((DAGClientAMProtocolBlockingPB) Mockito.verify(this.mockProxy)).getVertexStatus((RpcController) null, DAGClientAMProtocolRPC.GetVertexStatusRequestProto.newBuilder().setDagId(this.dagIdStr).setVertexName("v1").addStatusOptions(DAGProtos.StatusGetOptsProto.GET_COUNTERS).build());
        Assert.assertEquals(new VertexStatus(this.vertexStatusProtoWithCounters), vertexStatus2);
        System.out.println("VertexWithCounter:" + vertexStatus2);
    }

    @Test(timeout = 5000)
    public void testTryKillDAG() throws Exception {
        this.dagClient.tryKillDAG();
        ((DAGClientAMProtocolBlockingPB) Mockito.verify(this.mockProxy, Mockito.times(1))).tryKillDAG((RpcController) null, DAGClientAMProtocolRPC.TryKillDAGRequestProto.newBuilder().setDagId(this.dagIdStr).build());
    }

    @Test(timeout = 5000)
    public void testWaitForCompletion() throws Exception {
        Mockito.when(this.mockProxy.getDAGStatus((RpcController) Mockito.isNull(), (DAGClientAMProtocolRPC.GetDAGStatusRequestProto) Mockito.any())).thenReturn(DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(this.dagStatusProtoWithoutCounters).build()).thenReturn(DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGProtos.DAGStatusProto.newBuilder(this.dagStatusProtoWithoutCounters).setState(DAGProtos.DAGStatusStateProto.DAG_SUCCEEDED).build()).build());
        this.dagClient.waitForCompletion();
        ((DAGClientAMProtocolBlockingPB) Mockito.verify(this.mockProxy, Mockito.times(2))).getDAGStatus((RpcController) ArgumentCaptor.forClass(RpcController.class).capture(), (DAGClientAMProtocolRPC.GetDAGStatusRequestProto) ArgumentCaptor.forClass(DAGClientAMProtocolRPC.GetDAGStatusRequestProto.class).capture());
    }

    @Test(timeout = 5000)
    public void testWaitForCompletionWithStatusUpdates() throws Exception {
        Mockito.when(this.mockProxy.getDAGStatus((RpcController) Mockito.isNull(), (DAGClientAMProtocolRPC.GetDAGStatusRequestProto) Mockito.any())).thenReturn(DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGProtos.DAGStatusProto.newBuilder(this.dagStatusProtoWithCounters).setState(DAGProtos.DAGStatusStateProto.DAG_RUNNING).build()).build()).thenReturn(DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGProtos.DAGStatusProto.newBuilder(this.dagStatusProtoWithCounters).setState(DAGProtos.DAGStatusStateProto.DAG_RUNNING).build()).build()).thenReturn(DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGProtos.DAGStatusProto.newBuilder(this.dagStatusProtoWithCounters).setState(DAGProtos.DAGStatusStateProto.DAG_RUNNING).build()).build()).thenReturn(DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGProtos.DAGStatusProto.newBuilder(this.dagStatusProtoWithoutCounters).setState(DAGProtos.DAGStatusStateProto.DAG_SUCCEEDED).build()).build());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(RpcController.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(DAGClientAMProtocolRPC.GetDAGStatusRequestProto.class);
        this.dagClient.waitForCompletionWithStatusUpdates((Set) null);
        ((DAGClientAMProtocolBlockingPB) Mockito.verify(this.mockProxy, Mockito.times(4))).getDAGStatus((RpcController) forClass.capture(), (DAGClientAMProtocolRPC.GetDAGStatusRequestProto) forClass2.capture());
        Mockito.when(this.mockProxy.getDAGStatus((RpcController) Mockito.isNull(), (DAGClientAMProtocolRPC.GetDAGStatusRequestProto) Mockito.any())).thenReturn(DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGProtos.DAGStatusProto.newBuilder(this.dagStatusProtoWithCounters).setState(DAGProtos.DAGStatusStateProto.DAG_RUNNING).build()).build()).thenReturn(DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGProtos.DAGStatusProto.newBuilder(this.dagStatusProtoWithCounters).setState(DAGProtos.DAGStatusStateProto.DAG_RUNNING).build()).build()).thenReturn(DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGProtos.DAGStatusProto.newBuilder(this.dagStatusProtoWithCounters).setState(DAGProtos.DAGStatusStateProto.DAG_RUNNING).build()).build()).thenReturn(DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGProtos.DAGStatusProto.newBuilder(this.dagStatusProtoWithCounters).setState(DAGProtos.DAGStatusStateProto.DAG_SUCCEEDED).build()).build());
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(RpcController.class);
        ArgumentCaptor forClass4 = ArgumentCaptor.forClass(DAGClientAMProtocolRPC.GetDAGStatusRequestProto.class);
        this.dagClient.waitForCompletionWithStatusUpdates(Sets.newSet(new StatusGetOpts[]{StatusGetOpts.GET_COUNTERS}));
        ((DAGClientAMProtocolBlockingPB) Mockito.verify(this.mockProxy, Mockito.times(8))).getDAGStatus((RpcController) forClass3.capture(), (DAGClientAMProtocolRPC.GetDAGStatusRequestProto) forClass4.capture());
    }

    @Test(timeout = 50000)
    public void testGetDagStatusWithTimeout() throws Exception {
        TezConfiguration tezConfiguration = new TezConfiguration();
        tezConfiguration.setLong("tez.dag.status.pollinterval-ms", 800L);
        DAGClientImplForTest dAGClientImplForTest = new DAGClientImplForTest(this.mockAppId, this.dagIdStr, tezConfiguration, null);
        DAGClientRPCImplForTest dAGClientRPCImplForTest = new DAGClientRPCImplForTest(this.mockAppId, this.dagIdStr, tezConfiguration, null);
        dAGClientImplForTest.setRealClient(dAGClientRPCImplForTest);
        dAGClientRPCImplForTest.setAMProxy(null);
        dAGClientImplForTest.setRmDagStatus(new DAGStatus(constructDagStatusProto(DAGProtos.DAGStatusStateProto.DAG_SUBMITTED), DagStatusSource.RM));
        long currentTimeMillis = System.currentTimeMillis();
        DAGStatus dAGStatus = dAGClientImplForTest.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000L);
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        Assert.assertTrue(currentTimeMillis2 > 1500 && currentTimeMillis2 < 2500);
        Assert.assertEquals(0L, dAGClientRPCImplForTest.numGetStatusViaAmInvocations);
        Assert.assertEquals(4L, dAGClientImplForTest.numGetStatusViaRmInvocations);
        Assert.assertEquals(DAGStatus.State.SUBMITTED, dAGStatus.getState());
        dAGClientImplForTest.resetCounters();
        dAGClientRPCImplForTest.resetCounters();
        dAGClientImplForTest.setRmDagStatus(new DAGStatus(constructDagStatusProto(DAGProtos.DAGStatusStateProto.DAG_RUNNING), DagStatusSource.RM));
        dAGClientRPCImplForTest.setAMProxy(createMockProxy(DAGProtos.DAGStatusStateProto.DAG_RUNNING, -1L));
        long currentTimeMillis3 = System.currentTimeMillis();
        DAGStatus dAGStatus2 = dAGClientImplForTest.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000L);
        long currentTimeMillis4 = System.currentTimeMillis() - currentTimeMillis3;
        Assert.assertTrue(currentTimeMillis4 > 1500 && currentTimeMillis4 < 2500);
        Assert.assertEquals(0L, dAGClientImplForTest.numGetStatusViaRmInvocations);
        Assert.assertEquals(2L, dAGClientRPCImplForTest.numGetStatusViaAmInvocations);
        Assert.assertEquals(DAGStatus.State.RUNNING, dAGStatus2.getState());
        dAGClientImplForTest.resetCounters();
        dAGClientRPCImplForTest.resetCounters();
        dAGClientImplForTest.setRmDagStatus(new DAGStatus(constructDagStatusProto(DAGProtos.DAGStatusStateProto.DAG_RUNNING), DagStatusSource.RM));
        dAGClientRPCImplForTest.setAMProxy(createMockProxy(DAGProtos.DAGStatusStateProto.DAG_SUCCEEDED, 1000L));
        long currentTimeMillis5 = System.currentTimeMillis();
        DAGStatus dAGStatus3 = dAGClientImplForTest.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000L);
        long currentTimeMillis6 = System.currentTimeMillis() - currentTimeMillis5;
        Assert.assertTrue(currentTimeMillis6 > 500 && currentTimeMillis6 < 1500);
        Assert.assertEquals(0L, dAGClientImplForTest.numGetStatusViaRmInvocations);
        Assert.assertEquals(1L, dAGClientRPCImplForTest.numGetStatusViaAmInvocations);
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, dAGStatus3.getState());
    }

    @Test(timeout = 5000)
    public void testDagClientTimelineEnabledCondition() throws IOException {
        testAtsEnabled(this.mockAppId, this.dagIdStr, false, "", true, true);
        testAtsEnabled(this.mockAppId, this.dagIdStr, false, "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService", false, true);
        testAtsEnabled(this.mockAppId, this.dagIdStr, false, "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService", true, false);
        testAtsEnabled(this.mockAppId, this.dagIdStr, DAGClientTimelineImpl.isSupported(), "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService", true, true);
    }

    private static void testAtsEnabled(ApplicationId applicationId, String str, boolean z, String str2, boolean z2, boolean z3) throws IOException {
        TezConfiguration tezConfiguration = new TezConfiguration();
        tezConfiguration.set("tez.history.logging.service.class", str2);
        tezConfiguration.setBoolean("tez.am.history.logging.enabled", z2);
        tezConfiguration.setBoolean("tez.dag.history.logging.enabled", z3);
        Assert.assertEquals(Boolean.valueOf(z), Boolean.valueOf(new DAGClientImplForTest(applicationId, str, tezConfiguration, null).getIsATSEnabled()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DAGProtos.DAGStatusProto.Builder constructDagStatusProto(DAGProtos.DAGStatusStateProto dAGStatusStateProto) {
        DAGProtos.DAGStatusProto.Builder newBuilder = DAGProtos.DAGStatusProto.newBuilder();
        newBuilder.setState(dAGStatusStateProto);
        return newBuilder;
    }

    private DAGClientAMProtocolBlockingPB createMockProxy(final DAGProtos.DAGStatusStateProto dAGStatusStateProto, final long j) throws ServiceException {
        DAGClientAMProtocolBlockingPB dAGClientAMProtocolBlockingPB = (DAGClientAMProtocolBlockingPB) Mockito.mock(DAGClientAMProtocolBlockingPB.class);
        ((DAGClientAMProtocolBlockingPB) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.tez.dag.api.client.rpc.TestDAGClient.1
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                long timeout = ((DAGClientAMProtocolRPC.GetDAGStatusRequestProto) invocationOnMock.getArguments()[1]).getTimeout();
                if (j != -1) {
                    timeout = j;
                }
                Thread.sleep(timeout);
                return DAGClientAMProtocolRPC.GetDAGStatusResponseProto.newBuilder().setDagStatus(TestDAGClient.this.constructDagStatusProto(dAGStatusStateProto)).build();
            }
        }).when(dAGClientAMProtocolBlockingPB)).getDAGStatus((RpcController) Mockito.isNull(), (DAGClientAMProtocolRPC.GetDAGStatusRequestProto) Mockito.any());
        return dAGClientAMProtocolBlockingPB;
    }

    @Test
    public void testTimelineClientCleanup() throws Exception {
        ThreadGroup threadGroup;
        TezConfiguration tezConfiguration = new TezConfiguration();
        tezConfiguration.set("yarn.http.policy", "HTTPS_ONLY");
        KeyStoreTestUtil.setupSSLConfig(new File(System.getProperty("java.io.tmpdir")).getAbsolutePath(), KeyStoreTestUtil.getClasspathDir(TestDAGClient.class), tezConfiguration, false);
        DAGClientTimelineImpl dAGClientTimelineImpl = new DAGClientTimelineImpl(this.mockAppId, this.dagIdStr, tezConfiguration, (FrameworkClient) Mockito.mock(FrameworkClient.class), 10000);
        Field declaredField = DAGClientTimelineImpl.class.getDeclaredField("timelineReaderStrategy");
        declaredField.setAccessible(true);
        ((TimelineReaderFactory.TimelineReaderStrategy) declaredField.get(dAGClientTimelineImpl)).getHttpClient();
        ThreadGroup threadGroup2 = Thread.currentThread().getThreadGroup();
        while (true) {
            threadGroup = threadGroup2;
            if (threadGroup.getParent() == null) {
                break;
            } else {
                threadGroup2 = threadGroup.getParent();
            }
        }
        Thread[] threadArr = new Thread[threadGroup.activeCount()];
        threadGroup.enumerate(threadArr);
        Thread thread = null;
        for (Thread thread2 : threadArr) {
            if ((thread2.getName() != null && thread2.getName().contains("Truststore reloader thread")) || thread2.getName().contains("SSL Certificates Store Monitor")) {
                thread = thread2;
            }
        }
        Assert.assertTrue("Reloader is not alive", thread.isAlive());
        dAGClientTimelineImpl.close();
        boolean z = true;
        for (int i = 0; i < 10; i++) {
            z = thread.isAlive();
            if (!z) {
                break;
            }
            Thread.sleep(1000L);
        }
        Assert.assertFalse("Reloader is still alive", z);
    }

    @Test(timeout = 50000)
    public void testGetDagStatusWithCachedStatusExpiration() throws Exception {
        TezConfiguration tezConfiguration = new TezConfiguration();
        tezConfiguration.setLong("tez.dag.status.pollinterval-ms", 800L);
        tezConfiguration.setLong("tez.client.dag.status.cache.timeout-secs", 100000L);
        DAGClientImplForTest dAGClientImplForTest = new DAGClientImplForTest(this.mockAppId, this.dagIdStr, tezConfiguration, null);
        Throwable th = null;
        try {
            try {
                DAGClientRPCImplForTest dAGClientRPCImplForTest = new DAGClientRPCImplForTest(this.mockAppId, this.dagIdStr, tezConfiguration, null);
                dAGClientImplForTest.setRealClient(dAGClientRPCImplForTest);
                dAGClientImplForTest.setRmDagStatus(new DAGStatus(constructDagStatusProto(DAGProtos.DAGStatusStateProto.DAG_RUNNING), DagStatusSource.RM));
                dAGClientRPCImplForTest.setAMProxy(createMockProxy(DAGProtos.DAGStatusStateProto.DAG_RUNNING, -1L));
                long currentTimeMillis = System.currentTimeMillis();
                DAGStatus dAGStatus = dAGClientImplForTest.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000L);
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                Assert.assertTrue(currentTimeMillis2 > 1500 && currentTimeMillis2 < 2500);
                Assert.assertEquals(0L, dAGClientImplForTest.numGetStatusViaRmInvocations);
                Assert.assertEquals(2L, dAGClientRPCImplForTest.numGetStatusViaAmInvocations);
                Assert.assertEquals(DAGStatus.State.RUNNING, dAGStatus.getState());
                dAGClientImplForTest.resetCounters();
                dAGClientRPCImplForTest.resetCounters();
                dAGClientImplForTest.setRmDagStatus(new DAGStatus(constructDagStatusProto(DAGProtos.DAGStatusStateProto.DAG_RUNNING), DagStatusSource.RM));
                dAGClientRPCImplForTest.setAMProxy(createMockProxy(DAGProtos.DAGStatusStateProto.DAG_SUCCEEDED, 1000L));
                long currentTimeMillis3 = System.currentTimeMillis();
                DAGStatus dAGStatus2 = dAGClientImplForTest.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000L);
                long currentTimeMillis4 = System.currentTimeMillis() - currentTimeMillis3;
                Assert.assertTrue("diff is " + currentTimeMillis4, currentTimeMillis4 > 500 && currentTimeMillis4 < 1500);
                Assert.assertEquals(0L, dAGClientImplForTest.numGetStatusViaRmInvocations);
                Assert.assertEquals(1L, dAGClientRPCImplForTest.numGetStatusViaAmInvocations);
                Assert.assertEquals(DAGStatus.State.SUCCEEDED, dAGStatus2.getState());
                DAGStatus cachedDAGStatus = dAGClientImplForTest.getCachedDAGStatus();
                Assert.assertNotNull(cachedDAGStatus);
                Assert.assertSame(dAGStatus2, cachedDAGStatus);
                dAGClientImplForTest.resetCounters();
                dAGClientRPCImplForTest.resetCounters();
                dAGClientRPCImplForTest.injectAMFault(new IOException("injected Fault"));
                DAGStatus dAGStatus3 = dAGClientImplForTest.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class));
                Assert.assertEquals(0L, dAGClientImplForTest.numGetStatusViaRmInvocations);
                Assert.assertEquals(1L, dAGClientRPCImplForTest.numGetStatusViaAmInvocations);
                Assert.assertEquals(DAGStatus.State.SUCCEEDED, dAGStatus3.getState());
                Assert.assertSame(dAGStatus3, cachedDAGStatus);
                dAGClientRPCImplForTest.setAMProxy(createMockProxy(DAGProtos.DAGStatusStateProto.DAG_SUCCEEDED, 1000L));
                dAGClientRPCImplForTest.injectAMFault(new IOException("injected AM Fault"));
                dAGClientImplForTest.resetCounters();
                dAGClientRPCImplForTest.resetCounters();
                dAGClientImplForTest.enforceExpirationCachedDAGStatus();
                DAGStatus dAGStatus4 = dAGClientImplForTest.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class));
                Assert.assertEquals(1L, dAGClientImplForTest.numGetStatusViaRmInvocations);
                Assert.assertEquals(1L, dAGClientRPCImplForTest.numGetStatusViaAmInvocations);
                Assert.assertEquals(DAGStatus.State.RUNNING, dAGStatus4.getState());
                Assert.assertNotSame(dAGStatus4, cachedDAGStatus);
                Assert.assertNull(dAGClientImplForTest.getCachedDAGStatus());
                Assert.assertNotNull(dAGStatus4);
                dAGClientImplForTest.resetCounters();
                dAGClientRPCImplForTest.resetCounters();
                dAGClientRPCImplForTest.setAMProxy(createMockProxy(DAGProtos.DAGStatusStateProto.DAG_SUCCEEDED, 1000L));
                dAGClientImplForTest.injectFault();
                try {
                    dAGClientImplForTest.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class));
                    Assert.fail("The RM should throw IOException");
                } catch (IOException e) {
                    Assert.assertEquals(e.getMessage(), "Fault Injected for RM");
                    Assert.assertEquals(1L, dAGClientImplForTest.numGetStatusViaRmInvocations);
                    Assert.assertEquals(1L, dAGClientRPCImplForTest.numGetStatusViaAmInvocations);
                }
                if (dAGClientImplForTest != null) {
                    if (0 == 0) {
                        dAGClientImplForTest.close();
                        return;
                    }
                    try {
                        dAGClientImplForTest.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (dAGClientImplForTest != null) {
                if (th != null) {
                    try {
                        dAGClientImplForTest.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    dAGClientImplForTest.close();
                }
            }
            throw th4;
        }
    }
}
