package org.apache.hadoop.mapred;

import java.util.Iterator;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
import org.apache.hadoop.mapreduce.test.system.JTProtocol;
import org.apache.hadoop.mapreduce.test.system.JobInfo;
import org.apache.hadoop.mapreduce.test.system.MRCluster;
import org.apache.hadoop.mapreduce.test.system.TTClient;
import org.apache.hadoop.mapreduce.test.system.TaskInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/hadoop/mapred/TestControlledJob.class */
public class TestControlledJob {
    private MRCluster cluster = MRCluster.createCluster(new Configuration());
    private static final Log LOG = LogFactory.getLog(TestControlledJob.class);

    @Before
    public void before() throws Exception {
        this.cluster.setUp();
    }

    @After
    public void after() throws Exception {
        this.cluster.tearDown();
    }

    @Test
    public void testControlledJob() throws Exception {
        Configuration configuration = new Configuration(this.cluster.getConf());
        JTProtocol proxy = this.cluster.getJTClient().getProxy();
        FinishTaskControlAction.configureControlActionForJob(configuration);
        SleepJob sleepJob = new SleepJob();
        sleepJob.setConf(configuration);
        Job createJob = sleepJob.createJob(1, 0, 100L, 100, 100L, 100);
        createJob.submit();
        JobID id = this.cluster.getJTClient().getClient().getJob(JobID.downgrade(createJob.getJobID())).getID();
        for (JobInfo jobInfo = proxy.getJobInfo(id); jobInfo.getStatus().getRunState() != JobStatus.RUNNING; jobInfo = proxy.getJobInfo(id)) {
            Thread.sleep(1000L);
        }
        LOG.info("Waiting till job starts running one map");
        Assert.assertEquals(proxy.getJobInfo(id).runningMaps(), 1);
        LOG.info("waiting for another cycle to check if the maps dont finish off");
        Thread.sleep(1000L);
        Assert.assertEquals(proxy.getJobInfo(id).runningMaps(), 1);
        for (TaskInfo taskInfo : proxy.getTaskInfo(id)) {
            LOG.info("constructing control action to signal task to finish");
            FinishTaskControlAction finishTaskControlAction = new FinishTaskControlAction(TaskID.downgrade(taskInfo.getTaskID()));
            Iterator it = this.cluster.getTTClients().iterator();
            while (it.hasNext()) {
                ((TTClient) it.next()).getProxy().sendAction(finishTaskControlAction);
            }
        }
        JobInfo jobInfo2 = proxy.getJobInfo(id);
        int i = 1;
        if (jobInfo2 != null) {
            while (!jobInfo2.getStatus().isJobComplete()) {
                Thread.sleep(1000L);
                jobInfo2 = proxy.getJobInfo(id);
                if (jobInfo2 == null) {
                    break;
                }
                if (i > 40) {
                    Assert.fail("Controlled Job with ID : " + jobInfo2.getID() + " has not completed in 40 seconds after signalling.");
                }
                i++;
            }
        }
        LOG.info("Job sucessfully completed after signalling!!!!");
    }
}
