package org.apache.hadoop.mapred;

import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.WordCount;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
import org.apache.hadoop.mapreduce.test.system.JTProtocol;
import org.apache.hadoop.mapreduce.test.system.JobInfo;
import org.apache.hadoop.mapreduce.test.system.MRCluster;
import org.apache.hadoop.mapreduce.test.system.TTClient;
import org.apache.hadoop.mapreduce.test.system.TaskInfo;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/hadoop/mapred/TestTaskKilling.class */
public class TestTaskKilling {
    private static MRCluster cluster;
    private static final Log LOG = LogFactory.getLog(TestTaskKilling.class);
    private static JobClient jobClient = null;
    private static JTProtocol remoteJTClient = null;

    /* loaded from: input_file:org/apache/hadoop/mapred/TestTaskKilling$FailedMapperClass.class */
    public static class FailedMapperClass implements Mapper<NullWritable, NullWritable, NullWritable, NullWritable> {
        public void configure(JobConf jobConf) {
        }

        public void map(NullWritable nullWritable, NullWritable nullWritable2, OutputCollector<NullWritable, NullWritable> outputCollector, Reporter reporter) throws IOException {
            int i = 0;
            while (i < 240) {
                UtilsForTests.waitFor(1000L);
                i++;
            }
            if (i == 240) {
                throw new IOException();
            }
        }

        public void close() {
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, OutputCollector outputCollector, Reporter reporter) throws IOException {
            map((NullWritable) obj, (NullWritable) obj2, (OutputCollector<NullWritable, NullWritable>) outputCollector, reporter);
        }
    }

    @BeforeClass
    public static void before() throws Exception {
        cluster = MRCluster.createCluster(new Configuration());
        cluster.setUp();
        jobClient = cluster.getJTClient().getClient();
        remoteJTClient = cluster.getJTClient().getProxy();
    }

    @AfterClass
    public static void after() throws Exception {
        cluster.tearDown();
    }

    @Test
    public void testFailedTaskJobStatus() throws IOException, InterruptedException, ClassNotFoundException {
        Configuration configuration = new Configuration(cluster.getConf());
        TaskInfo taskInfo = null;
        SleepJob sleepJob = new SleepJob();
        sleepJob.setConf(configuration);
        Job createJob = sleepJob.createJob(3, 1, 4000L, 4000, 100L, 100);
        JobConf jobConf = new JobConf(configuration);
        jobConf.setMaxMapAttempts(20);
        jobConf.setMaxReduceAttempts(20);
        createJob.submit();
        JobID id = jobClient.getJob(JobID.downgrade(createJob.getJobID())).getID();
        JobInfo jobInfo = remoteJTClient.getJobInfo(id);
        int i = 0;
        while (i < 60 && jobInfo.getStatus().getRunState() != JobStatus.RUNNING) {
            UtilsForTests.waitFor(1000L);
            jobInfo = remoteJTClient.getJobInfo(id);
            i++;
        }
        Assert.assertTrue("Job has not been started for 1 min.", i != 60);
        for (TaskInfo taskInfo2 : remoteJTClient.getTaskInfo(id)) {
            if (!taskInfo2.isSetupOrCleanup()) {
                taskInfo = taskInfo2;
            }
        }
        int i2 = 0;
        TaskInfo taskInfo3 = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
        while (i2 < 60 && (taskInfo3.getTaskStatus().length <= 0 || taskInfo3.getTaskStatus()[0].getRunState() != TaskStatus.State.RUNNING)) {
            UtilsForTests.waitFor(1000L);
            taskInfo3 = remoteJTClient.getTaskInfo(taskInfo3.getTaskID());
            i2++;
        }
        Assert.assertTrue("Task has not been started for 1 min.", i2 != 60);
        JobClient jobClient2 = jobClient;
        jobClient2.getClass();
        new JobClient.NetworkedJob(jobClient2, jobInfo.getStatus()).killTask(new TaskAttemptID(TaskID.downgrade(taskInfo3.getTaskID()), 0), false);
        LOG.info("Waiting till the job is completed...");
        while (!jobInfo.getStatus().isJobComplete()) {
            UtilsForTests.waitFor(100L);
            jobInfo = remoteJTClient.getJobInfo(id);
        }
        Assert.assertEquals("JobStatus", jobInfo.getStatus().getRunState(), JobStatus.SUCCEEDED);
    }

    @Test
    public void testDirCleanupAfterTaskKilled() throws IOException, InterruptedException {
        TaskInfo taskInfo = null;
        boolean z = false;
        String str = null;
        FileStatus[] fileStatusArr = null;
        Path path = new Path("input");
        Path path2 = new Path("output");
        Configuration configuration = new Configuration(cluster.getConf());
        JobConf jobConf = new JobConf(configuration);
        jobConf.setJobName("Word Count");
        jobConf.setJarByClass(WordCount.class);
        jobConf.setMapperClass(WordCount.MapClass.class);
        jobConf.setCombinerClass(WordCount.Reduce.class);
        jobConf.setReducerClass(WordCount.Reduce.class);
        jobConf.setNumMapTasks(1);
        jobConf.setNumReduceTasks(1);
        jobConf.setMaxMapAttempts(20);
        jobConf.setMaxReduceAttempts(20);
        jobConf.setOutputKeyClass(Text.class);
        jobConf.setOutputValueClass(IntWritable.class);
        cleanup(path, configuration);
        cleanup(path2, configuration);
        createInput(path, configuration);
        FileInputFormat.setInputPaths(jobConf, new Path[]{path});
        FileOutputFormat.setOutputPath(jobConf, path2);
        JobID id = jobClient.submitJob(jobConf).getID();
        JobInfo jobInfo = remoteJTClient.getJobInfo(id);
        int i = 0;
        while (i < 60 && jobInfo.getStatus().getRunState() != JobStatus.RUNNING) {
            UtilsForTests.waitFor(1000L);
            jobInfo = remoteJTClient.getJobInfo(id);
            i++;
        }
        Assert.assertTrue("Job has not been started for 1 min.", i != 60);
        String username = jobClient.getAllJobs()[0].getUsername();
        TaskInfo[] taskInfo2 = remoteJTClient.getTaskInfo(id);
        int length = taskInfo2.length;
        int i2 = 0;
        while (true) {
            if (i2 >= length) {
                break;
            }
            TaskInfo taskInfo3 = taskInfo2[i2];
            if (!taskInfo3.isSetupOrCleanup()) {
                taskInfo = taskInfo3;
                break;
            }
            i2++;
        }
        int i3 = 0;
        while (i3 < 30 && (taskInfo.getTaskStatus().length <= 0 || taskInfo.getTaskStatus()[0].getRunState() != TaskStatus.State.RUNNING)) {
            UtilsForTests.waitFor(1000L);
            taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
            i3++;
        }
        Assert.assertTrue("Task has not been started for 30 sec.", i3 != 30);
        TaskID downgrade = TaskID.downgrade(taskInfo.getTaskID());
        FinishTaskControlAction finishTaskControlAction = new FinishTaskControlAction(downgrade);
        String[] taskTrackers = taskInfo.getTaskTrackers();
        for (int i4 = 0; i4 < 30 && taskTrackers.length == 0; i4++) {
            UtilsForTests.waitFor(100L);
            taskTrackers = taskInfo.getTaskTrackers();
        }
        TTClient tTClient = cluster.getTTClient(taskTrackers[0].split("_")[1].split(":")[0]);
        tTClient.getProxy().sendAction(finishTaskControlAction);
        String[] mapredLocalDirs = tTClient.getMapredLocalDirs();
        TaskAttemptID taskAttemptID = new TaskAttemptID(downgrade, 0);
        int length2 = mapredLocalDirs.length;
        int i5 = 0;
        while (true) {
            if (i5 >= length2) {
                break;
            }
            str = mapredLocalDirs[i5] + "/" + TaskTracker.getLocalTaskDir(username, id.toString(), taskAttemptID.toString());
            fileStatusArr = tTClient.listStatus(str, true);
            if (fileStatusArr.length > 0) {
                z = true;
                JobClient jobClient2 = jobClient;
                jobClient2.getClass();
                new JobClient.NetworkedJob(jobClient2, jobInfo.getStatus()).killTask(taskAttemptID, false);
                break;
            }
            i5++;
        }
        Assert.assertTrue("Task Attempt directory " + taskAttemptID + " has not been found while task was running.", z);
        remoteJTClient.getTaskInfo(downgrade);
        for (int i6 = 0; i6 < 60; i6++) {
            UtilsForTests.waitFor(1000L);
            remoteJTClient.getTaskInfo(downgrade);
            fileStatusArr = tTClient.listStatus(str, true);
            if (fileStatusArr.length == 0) {
                break;
            }
        }
        Assert.assertTrue("Task attempt temporary folder has not been cleaned.", z && fileStatusArr.length == 0);
        for (int i7 = 0; i7 < 30; i7++) {
            UtilsForTests.waitFor(1000L);
            remoteJTClient.getTaskInfo(downgrade);
        }
        Assert.assertEquals("Task status has not been changed to KILLED.", TaskStatus.State.KILLED, remoteJTClient.getTaskInfo(downgrade).getTaskStatus()[0].getRunState());
    }

    private void cleanup(Path path, Configuration configuration) throws IOException {
        path.getFileSystem(configuration).delete(path, true);
    }

    private void createInput(Path path, Configuration configuration) throws IOException {
        FileSystem fileSystem = path.getFileSystem(configuration);
        if (!fileSystem.mkdirs(path)) {
            throw new IOException("Failed to create the input directory:" + path.toString());
        }
        fileSystem.setPermission(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
        FSDataOutputStream create = fileSystem.create(new Path(path, "data.txt"));
        for (int i = 0; i < 3000000; i++) {
            create.writeBytes("Hadoop is framework for data intensive distributed applications.\nHadoop enables applications to work with thousands of nodes.");
        }
        create.close();
    }

    @Test
    public void testDirCleanupAfterTaskFailed() throws IOException, InterruptedException {
        String str = null;
        TaskInfo taskInfo = null;
        boolean z = false;
        Path path = new Path("input");
        Path path2 = new Path("output");
        Configuration configuration = new Configuration(cluster.getConf());
        JobConf jobConf = new JobConf(configuration);
        jobConf.setJobName("Task Failed job");
        jobConf.setJarByClass(UtilsForTests.class);
        jobConf.setMapperClass(FailedMapperClass.class);
        jobConf.setNumMapTasks(1);
        jobConf.setNumReduceTasks(0);
        jobConf.setMaxMapAttempts(1);
        cleanup(path, configuration);
        cleanup(path2, configuration);
        createInput(path, configuration);
        FileInputFormat.setInputPaths(jobConf, new Path[]{path});
        FileOutputFormat.setOutputPath(jobConf, path2);
        JobID id = jobClient.submitJob(jobConf).getID();
        JobInfo jobInfo = remoteJTClient.getJobInfo(id);
        int i = 0;
        while (i < 60 && jobInfo.getStatus().getRunState() != JobStatus.RUNNING) {
            UtilsForTests.waitFor(1000L);
            jobInfo = remoteJTClient.getJobInfo(id);
            i++;
        }
        Assert.assertTrue("Job has not been started for 1 min.", i != 60);
        String username = jobClient.getAllJobs()[0].getUsername();
        TaskInfo[] taskInfo2 = remoteJTClient.getTaskInfo(id);
        int length = taskInfo2.length;
        int i2 = 0;
        while (true) {
            if (i2 >= length) {
                break;
            }
            TaskInfo taskInfo3 = taskInfo2[i2];
            if (!taskInfo3.isSetupOrCleanup()) {
                taskInfo = taskInfo3;
                break;
            }
            i2++;
        }
        TaskID downgrade = TaskID.downgrade(taskInfo.getTaskID());
        FinishTaskControlAction finishTaskControlAction = new FinishTaskControlAction(downgrade);
        String[] taskTrackers = taskInfo.getTaskTrackers();
        for (int i3 = 0; i3 < 30 && taskTrackers.length == 0; i3++) {
            UtilsForTests.waitFor(1000L);
            taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
            taskTrackers = taskInfo.getTaskTrackers();
        }
        Assert.assertTrue("Task tracker not found.", taskTrackers.length != 0);
        TTClient tTClient = cluster.getTTClient(taskTrackers[0].split("_")[1].split(":")[0]);
        tTClient.getProxy().sendAction(finishTaskControlAction);
        int i4 = 0;
        while (i4 < 60 && (taskInfo.getTaskStatus().length <= 0 || taskInfo.getTaskStatus()[0].getRunState() != TaskStatus.State.RUNNING)) {
            UtilsForTests.waitFor(1000L);
            taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
            i4++;
        }
        Assert.assertTrue("Task has not been started for 1 min.", i4 != 60);
        String[] mapredLocalDirs = tTClient.getMapredLocalDirs();
        TaskAttemptID taskAttemptID = new TaskAttemptID(downgrade, 0);
        int length2 = mapredLocalDirs.length;
        int i5 = 0;
        while (true) {
            if (i5 >= length2) {
                break;
            }
            str = mapredLocalDirs[i5] + "/" + TaskTracker.getLocalTaskDir(username, id.toString(), taskAttemptID.toString());
            if (tTClient.listStatus(str, true).length > 0) {
                z = true;
                break;
            }
            i5++;
        }
        TaskInfo taskInfo4 = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
        Assert.assertTrue("Task Attempt directory " + taskAttemptID + " has not been found while task was running.", z);
        for (int i6 = 0; i6 < 30; i6++) {
            UtilsForTests.waitFor(1000L);
            taskInfo4 = remoteJTClient.getTaskInfo(downgrade);
        }
        Assert.assertEquals("Task status has not been changed to FAILED.", taskInfo4.getTaskStatus()[0].getRunState(), TaskStatus.State.FAILED);
        Assert.assertTrue("Temporary folder has not been cleanup.", tTClient.listStatus(str, true).length == 0);
    }

    @Test
    public void testAllTaskAttemptKill() throws Exception {
        boolean z;
        Configuration configuration = new Configuration(cluster.getConf());
        SleepJob sleepJob = new SleepJob();
        sleepJob.setConf(configuration);
        Job createJob = sleepJob.createJob(3, 1, 40000L, 1000, 100L, 100);
        JobConf jobConf = new JobConf(configuration);
        createJob.submit();
        RunningJob job = cluster.getJTClient().getClient().getJob(JobID.downgrade(createJob.getJobID()));
        int parseInt = Integer.parseInt(jobConf.get("mapreduce.map.maxattempts"));
        LOG.info("MAX_MAP_TASK_ATTEMPTS is : " + parseInt);
        Assert.assertTrue(parseInt > 0);
        JobInfo jobInfo = remoteJTClient.getJobInfo(job.getID());
        Assert.assertNotNull(Integer.valueOf(jobInfo.getStatus().getRunState()));
        while (jobInfo.getStatus().getRunState() != JobStatus.RUNNING) {
            try {
                Thread.sleep(10000L);
            } catch (InterruptedException e) {
            }
            jobInfo = remoteJTClient.getJobInfo(job.getID());
        }
        JobID downgrade = JobID.downgrade(job.getID());
        LOG.info("job id is :" + downgrade.toString());
        int i = 0;
        do {
            TaskInfo[] taskInfo = cluster.getJTClient().getProxy().getTaskInfo(job.getID());
            z = false;
            int length = taskInfo.length;
            int i2 = 0;
            while (true) {
                if (i2 >= length) {
                    break;
                }
                TaskStatus[] taskStatus = taskInfo[i2].getTaskStatus();
                if (taskStatus.length > 0) {
                    LOG.info("taskStatuses[0].getRunState() is :" + taskStatus[0].getRunState());
                    if (taskStatus[0].getRunState() == TaskStatus.State.RUNNING) {
                        z = true;
                        break;
                    } else {
                        LOG.info("Sleeping 5 seconds");
                        Thread.sleep(5000L);
                    }
                }
                i2++;
            }
            i++;
            if (i > 10) {
                Assert.fail("Since the sleep count has reached beyond a pointfailing at this point");
            }
        } while (!z);
        String str = null;
        for (int i3 = 0; i3 < parseInt; i3++) {
            TaskInfo[] taskInfo2 = cluster.getJTClient().getProxy().getTaskInfo(job.getID());
            int length2 = taskInfo2.length;
            int i4 = 0;
            while (true) {
                if (i4 < length2) {
                    TaskInfo taskInfo3 = taskInfo2[i4];
                    if (!taskInfo3.isSetupOrCleanup()) {
                        TaskID downgrade2 = TaskID.downgrade(taskInfo3.getTaskID());
                        LOG.info("taskid is :" + downgrade2);
                        if (i3 == 0) {
                            str = downgrade2.toString();
                            TaskAttemptID taskAttemptID = new TaskAttemptID(downgrade2, i3);
                            LOG.info("taskAttemptid going to be killed is : " + taskAttemptID);
                            JobClient jobClient2 = jobClient;
                            jobClient2.getClass();
                            new JobClient.NetworkedJob(jobClient2, jobInfo.getStatus()).killTask(taskAttemptID, true);
                            checkTaskCompletionEvent(taskAttemptID, jobInfo);
                            break;
                        }
                        if (str.equals(downgrade2.toString())) {
                            TaskAttemptID taskAttemptID2 = new TaskAttemptID(downgrade2, i3);
                            LOG.info("taskAttemptid going to be killed is : " + taskAttemptID2);
                            JobClient jobClient3 = jobClient;
                            jobClient3.getClass();
                            new JobClient.NetworkedJob(jobClient3, jobInfo.getStatus()).killTask(taskAttemptID2, true);
                            checkTaskCompletionEvent(taskAttemptID2, jobInfo);
                            break;
                        }
                    }
                    i4++;
                }
            }
        }
        while (jobInfo != null && !jobInfo.getStatus().isJobComplete()) {
            Thread.sleep(10000L);
            jobInfo = remoteJTClient.getJobInfo(job.getID());
        }
        JobStatus jobStatus = null;
        for (JobStatus jobStatus2 : jobClient.getAllJobs()) {
            if (JobID.downgrade(jobStatus2.getJobID()).equals(downgrade)) {
                jobStatus = jobStatus2;
                LOG.info("jobStatus found is :" + jobStatus.getJobId().toString());
            }
        }
        Assert.assertEquals("The job should have failed at this stage", JobStatus.FAILED, jobStatus.getRunState());
    }

    public void checkTaskCompletionEvent(TaskAttemptID taskAttemptID, JobInfo jobInfo) throws Exception {
        boolean z = false;
        int i = 0;
        while (!z) {
            JobClient jobClient2 = jobClient;
            jobClient2.getClass();
            TaskCompletionEvent[] taskCompletionEvents = new JobClient.NetworkedJob(jobClient2, jobInfo.getStatus()).getTaskCompletionEvents(0);
            int length = taskCompletionEvents.length;
            int i2 = 0;
            while (true) {
                if (i2 >= length) {
                    break;
                }
                if (taskCompletionEvents[i2].getTaskAttemptId().toString().equals(taskAttemptID.toString())) {
                    z = true;
                    Thread.sleep(10000L);
                    break;
                }
                i2++;
            }
            if (!z) {
                LOG.info("Thread is sleeping for 10 seconds");
                Thread.sleep(10000L);
                i++;
            }
            if (i > 10) {
                Assert.fail("Since the task attemptid is not appearing in theTaskCompletionEvent, it seems this task attempt was not killed");
            }
        }
    }
}
