package org.apache.hadoop.mapred;

import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
import org.apache.hadoop.mapreduce.test.system.JTProtocol;
import org.apache.hadoop.mapreduce.test.system.JobInfo;
import org.apache.hadoop.mapreduce.test.system.MRCluster;
import org.apache.hadoop.mapreduce.test.system.TTClient;
import org.apache.hadoop.mapreduce.test.system.TaskInfo;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/hadoop/mapred/TestDistributedCacheModifiedFile.class */
public class TestDistributedCacheModifiedFile {
    private String distributedFileName = "test.txt";
    private static MRCluster cluster = null;
    private static FileSystem dfs = null;
    private static FileSystem ttFs = null;
    private static JobClient client = null;
    private static FsPermission permission = new FsPermission(511);
    private static String uriPath = "hdfs:///tmp/test.txt";
    private static final Path URIPATH = new Path(uriPath);
    static final Log LOG = LogFactory.getLog(TestDistributedCacheModifiedFile.class);

    @BeforeClass
    public static void setUp() throws Exception {
        cluster = MRCluster.createCluster(new Configuration());
        cluster.setUp();
        client = cluster.getJTClient().getClient();
        dfs = client.getFs();
        dfs.delete(URIPATH, true);
        List tTClients = cluster.getTTClients();
        Iterator it = tTClients.iterator();
        while (it.hasNext()) {
            ((TTClient) it.next()).kill();
        }
        Iterator it2 = tTClients.iterator();
        while (it2.hasNext()) {
            ((TTClient) it2.next()).start();
        }
        Thread.sleep(5000L);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        cluster.tearDown();
        dfs.delete(URIPATH, true);
        List tTClients = cluster.getTTClients();
        Iterator it = tTClients.iterator();
        while (it.hasNext()) {
            ((TTClient) it.next()).kill();
        }
        Iterator it2 = tTClients.iterator();
        while (it2.hasNext()) {
            ((TTClient) it2.next()).start();
        }
    }

    @Test
    public void testDistributedCache() throws Exception {
        Configuration configuration = new Configuration(cluster.getConf());
        JTProtocol proxy = cluster.getJTClient().getProxy();
        boolean z = true;
        int i = 0;
        ArrayList arrayList = new ArrayList();
        do {
            SleepJob sleepJob = new SleepJob();
            sleepJob.setConf(configuration);
            Job createJob = sleepJob.createJob(5, 1, 1000L, 1000, 100L, 100);
            UtilsForTests.createTmpFileDFS(dfs, URIPATH, permission, "This will be the content of\ndistributed cache\n");
            DistributedCache.createSymlink(configuration);
            DistributedCache.addCacheFile(URI.create(uriPath), configuration);
            new JobConf(configuration);
            FinishTaskControlAction.configureControlActionForJob(configuration);
            createJob.submit();
            RunningJob job = cluster.getJTClient().getClient().getJob(JobID.downgrade(createJob.getJobID()));
            i++;
            JobInfo jobInfo = proxy.getJobInfo(job.getID());
            LOG.info("jInfo is :" + jobInfo);
            Assert.assertNotNull("jobInfo is null", jobInfo);
            int i2 = 0;
            while (jobInfo.getStatus().getRunState() != JobStatus.RUNNING) {
                UtilsForTests.waitFor(10000L);
                i2++;
                jobInfo = proxy.getJobInfo(job.getID());
                if (i2 > 10) {
                    Assert.fail("job has not reached running state for more than100 seconds. Failing at this point");
                }
            }
            LOG.info("job id is :" + job.getID().toString());
            TaskInfo[] taskInfo = cluster.getJTClient().getProxy().getTaskInfo(job.getID());
            for (TaskInfo taskInfo2 : taskInfo) {
                boolean z2 = false;
                for (String str : taskInfo2.getTaskTrackers()) {
                    String fQDNofTT = UtilsForTests.getFQDNofTT(str);
                    LOG.info("taskTracker is :" + fQDNofTT);
                    boolean z3 = false;
                    if (i > 1) {
                        if (fQDNofTT != null) {
                            z = arrayList.contains(fQDNofTT);
                        }
                        if (z) {
                            z3 = true;
                        }
                    }
                    if (fQDNofTT != null) {
                        arrayList.add(fQDNofTT);
                    }
                    if (i > 1) {
                        z = false;
                    }
                    TTClient tTClient = cluster.getTTClient(fQDNofTT);
                    if (tTClient != null) {
                        int i3 = 0;
                        for (String str2 : tTClient.getMapredLocalDirs()) {
                            String str3 = str2 + "/" + TaskTracker.getPublicDistributedCacheDir();
                            LOG.info("localDir is : " + str3);
                            for (FileStatus fileStatus : tTClient.listStatus(str3, true, true)) {
                                Path path = fileStatus.getPath();
                                LOG.info("path is :" + path.toString());
                                z2 = path.toString().endsWith(this.distributedFileName);
                                if (z2) {
                                    LOG.info("PATH found is :" + path.toString());
                                    i3++;
                                    path.getName();
                                    Assert.assertTrue("File Permission is not 777", fileStatus.getPermission().equals(new FsPermission("777")));
                                }
                            }
                        }
                        LOG.debug("The distributed FileCount is :" + i3);
                        LOG.debug("The taskTrackerFound is :" + z3);
                        if (i3 != 2 && z3) {
                            Assert.fail("The distributed cache file has to be two. But found was " + i3);
                        } else if (i3 > 1 && !z3) {
                            Assert.fail("The distributed cache file cannot more than one. But found was " + i3);
                        } else if (i3 < 1) {
                            Assert.fail("The distributed cache file is less than one. But found was " + i3);
                        }
                        if (!z2) {
                            Assert.assertEquals("The distributed cache file does not exist", Boolean.valueOf(z2), false);
                        }
                    }
                }
            }
            for (TaskInfo taskInfo3 : taskInfo) {
                FinishTaskControlAction finishTaskControlAction = new FinishTaskControlAction(TaskID.downgrade(taskInfo3.getTaskID()));
                Iterator it = cluster.getTTClients().iterator();
                while (it.hasNext()) {
                    ((TTClient) it.next()).getProxy().sendAction(finishTaskControlAction);
                }
            }
            job.killJob();
            Thread.sleep(3000L);
            TaskInfo[] taskInfo4 = proxy.getTaskInfo(job.getID());
            if (taskInfo4 != null) {
                for (TaskInfo taskInfo5 : taskInfo4) {
                    if (taskInfo5.isSetupOrCleanup()) {
                        for (String str4 : taskInfo5.getTaskTrackers()) {
                            String fQDNofTT2 = UtilsForTests.getFQDNofTT(str4);
                            LOG.info("taskTracker is :" + fQDNofTT2);
                            if (fQDNofTT2 != null) {
                                arrayList.add(fQDNofTT2);
                            }
                        }
                    }
                }
            }
            while (jobInfo != null && !jobInfo.getStatus().isJobComplete()) {
                Thread.sleep(10000L);
                jobInfo = proxy.getJobInfo(job.getID());
            }
        } while (z);
    }
}
