package org.apache.hadoop.mapreduce.lib.output.committer.manifest;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.LoadedManifestData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.OutputValidationException;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbortTaskStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitJobStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitTaskStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupJobStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupTaskStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.ValidateRenamedFilesStage;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.LambdaTestUtils;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
/* loaded from: input_file:org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestJobThroughManifestCommitter.class */
public class TestJobThroughManifestCommitter extends AbstractManifestCommitterTest {
    private Path destDir;
    private ManifestCommitterSupport.AttemptDirectories dirs;
    private static Path sharedTestRoot = null;
    private String jobId;
    private String taskAttempt00;
    private String taskAttempt01;
    private String taskAttempt10;
    private String taskAttempt11;
    private StageConfig ta00Config;
    private StageConfig ta01Config;
    private StageConfig ta10Config;
    private StageConfig ta11Config;
    private static LoadedManifestData loadedManifestData;

    @Override // org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest
    public void setup() throws Exception {
        super.setup();
        this.taskAttempt00 = TASK_IDS.getTaskAttempt(0, 0);
        this.taskAttempt01 = TASK_IDS.getTaskAttempt(0, 1);
        this.taskAttempt10 = TASK_IDS.getTaskAttempt(1, 0);
        this.taskAttempt11 = TASK_IDS.getTaskAttempt(1, 1);
        setSharedPath(path("TestJobThroughManifestCommitter"));
        this.destDir = new Path(sharedTestRoot, "out put");
        this.jobId = TASK_IDS.getJobId();
        this.dirs = new ManifestCommitterSupport.AttemptDirectories(this.destDir, this.jobId, 1);
        setJobStageConfig(createStageConfigForJob(1, this.destDir).build());
        this.ta00Config = createStageConfig(1, 0, 0, this.destDir).build();
        this.ta01Config = createStageConfig(1, 0, 1, this.destDir).build();
        this.ta10Config = createStageConfig(1, 1, 0, this.destDir).build();
        this.ta11Config = createStageConfig(1, 1, 1, this.destDir).build();
    }

    protected void deleteTestDirInTeardown() throws IOException {
    }

    protected boolean shouldDeleteTestRootAtEndOfTestRun() {
        return false;
    }

    private void deleteSharedTestRoot() throws IOException {
        describe("Deleting shared test root %s", sharedTestRoot);
        ContractTestUtils.rm(getFileSystem(), sharedTestRoot, true, false);
    }

    private static synchronized boolean setSharedPath(Path path) {
        if (sharedTestRoot != null) {
            return false;
        }
        LOG.info("Set shared path to {}", path);
        sharedTestRoot = path;
        return true;
    }

    @Test
    public void test_0000_setupTestDir() throws Throwable {
        describe("always ensure directory setup is empty");
        deleteSharedTestRoot();
    }

    @Test
    public void test_0100_setupJobStage() throws Throwable {
        describe("Set up a job");
        verifyPath("Job attempt dir", this.dirs.getJobAttemptDir(), (Path) new SetupJobStage(getJobStageConfig()).apply(true));
    }

    private void verifyJobSetupCompleted() throws IOException {
        assertPathExists("Job attempt dir from test_0100", this.dirs.getJobAttemptDir());
    }

    @Test
    public void test_0110_setupJobOnlyAllowedOnce() throws Throwable {
        describe("a second creation of a job attempt must fail");
        verifyJobSetupCompleted();
        LambdaTestUtils.intercept(FileAlreadyExistsException.class, "", () -> {
            return (Path) new SetupJobStage(getJobStageConfig()).apply(true);
        });
        assertPathExists("Job attempt dir", this.dirs.getJobAttemptDir());
    }

    @Test
    public void test_0120_setupJobNewAttemptNumber() throws Throwable {
        describe("Creating a new job attempt is supported");
        verifyJobSetupCompleted();
        Assertions.assertThat(pathMustExist("Job attempt 2 dir", (Path) new SetupJobStage(createStageConfig(2, -1, 0, this.destDir)).apply(false))).describedAs("Stage created path", new Object[0]).isNotEqualTo(this.dirs.getJobAttemptDir());
    }

    @Test
    public void test_0200_setupTask00() throws Throwable {
        describe("Set up a task; job must have been set up first");
        verifyJobSetupCompleted();
        verifyPath("Task attempt 00", this.dirs.getTaskAttemptPath(this.taskAttempt00), (Path) new SetupTaskStage(this.ta00Config).apply("first"));
    }

    private void verifyTaskAttempt00SetUp() throws IOException {
        pathMustExist("Dir from taskAttempt00 setup", this.dirs.getTaskAttemptPath(this.taskAttempt00));
    }

    @Test
    public void test_0210_setupTask00OnlyAllowedOnce() throws Throwable {
        describe("Second attempt to set up task00 must fail.");
        verifyTaskAttempt00SetUp();
        LambdaTestUtils.intercept(FileAlreadyExistsException.class, "second", () -> {
            return (Path) new SetupTaskStage(this.ta00Config).apply("second");
        });
    }

    @Test
    public void test_0220_setupTask01() throws Throwable {
        describe("Setup task attempt 01");
        verifyTaskAttempt00SetUp();
        verifyPath("Task attempt 01", this.dirs.getTaskAttemptPath(this.taskAttempt01), (Path) new SetupTaskStage(this.ta01Config).apply("01"));
    }

    @Test
    public void test_0230_setupTask10() throws Throwable {
        describe("Setup task attempt 10");
        verifyJobSetupCompleted();
        verifyPath("Task attempt 10", this.dirs.getTaskAttemptPath(this.taskAttempt10), (Path) new SetupTaskStage(this.ta10Config).apply("10"));
    }

    @Test
    public void test_0240_setupThenAbortTask11() throws Throwable {
        describe("Setup then abort task attempt 11");
        verifyJobSetupCompleted();
        Path path = (Path) new SetupTaskStage(this.ta11Config).apply("11");
        Assertions.assertThat(path).isEqualTo((Path) new AbortTaskStage(this.ta11Config).apply(false));
        assertPathDoesNotExist("aborted directory", path);
        LambdaTestUtils.intercept(FileNotFoundException.class, () -> {
            return (CommitTaskStage.Result) new CommitTaskStage(this.ta11Config).apply((Object) null);
        });
        assertPathDoesNotExist("task manifest", ManifestCommitterSupport.manifestPathForTask(this.dirs.getTaskManifestDir(), TASK_IDS.getTaskId(1)));
    }

    @Test
    public void test_0300_executeTask00() throws Throwable {
        describe("Create the files for Task 00, then commit the task");
        List<Path> createFilesOrDirs = createFilesOrDirs(this.dirs.getTaskAttemptPath(this.taskAttempt00), "part-00", getExecutorService(), 3, 2, 4, false);
        CommitTaskStage.Result result = (CommitTaskStage.Result) new CommitTaskStage(this.ta00Config).apply((Object) null);
        ContractTestUtils.verifyPathExists(getFileSystem(), "manifest", result.getPath());
        TaskManifest taskManifest = result.getTaskManifest();
        taskManifest.validate();
        taskManifest.setIOStatistics((IOStatisticsSnapshot) null);
        LOG.info("Task Manifest {}", taskManifest.toJson());
        validateTaskAttemptManifest(this.taskAttempt00, createFilesOrDirs, taskManifest);
    }

    protected void validateTaskAttemptManifest(String str, List<Path> list, TaskManifest taskManifest) throws IOException {
        verifyManifestTaskAttemptID(taskManifest, str);
        verifyManifestFilesMatch(taskManifest, list);
    }

    @Test
    public void test_0310_executeTask01() throws Throwable {
        describe("Create the files for Task 01, then commit the task");
        List<Path> createFilesOrDirs = createFilesOrDirs(this.dirs.getTaskAttemptPath(this.taskAttempt01), "part-00", getExecutorService(), 3, 2, 4, false);
        TaskManifest load = TaskManifest.load(getFileSystem(), ContractTestUtils.verifyPathExists(getFileSystem(), "manifest", ((CommitTaskStage.Result) new CommitTaskStage(this.ta01Config).apply((Object) null)).getPath()).getPath());
        load.validate();
        load.setIOStatistics((IOStatisticsSnapshot) null);
        LOG.info("Task Manifest {}", load.toJson());
        validateTaskAttemptManifest(this.taskAttempt01, createFilesOrDirs, load);
    }

    @Test
    public void test_0320_executeTask10() throws Throwable {
        describe("Create the files for Task 10, then commit the task");
        validateTaskAttemptManifest(this.taskAttempt10, createFilesOrDirs(this.dirs.getTaskAttemptPath(this.ta10Config.getTaskAttemptId()), "part-01", getExecutorService(), 3, 3, 3, false), ((CommitTaskStage.Result) new CommitTaskStage(this.ta10Config).apply((Object) null)).getTaskManifest());
    }

    @Test
    public void test_0340_setupThenAbortTask11() throws Throwable {
        describe("Setup then abort task attempt 11");
        Path path = (Path) new SetupTaskStage(this.ta11Config).apply("11");
        createFilesOrDirs(path, "part-01", getExecutorService(), 2, 1, 1, false);
        new AbortTaskStage(this.ta11Config).apply(false);
        assertPathDoesNotExist("aborted directory", path);
        LambdaTestUtils.intercept(FileNotFoundException.class, () -> {
            return (CommitTaskStage.Result) new CommitTaskStage(this.ta11Config).apply((Object) null);
        });
        verifyManifestTaskAttemptID(TaskManifest.load(getFileSystem(), ManifestCommitterSupport.manifestPathForTask(this.dirs.getTaskManifestDir(), TASK_IDS.getTaskId(1))), this.taskAttempt10);
    }

    @Test
    public void test_0400_loadManifests() throws Throwable {
        describe("Load all manifests; committed must be TA01 and TA10");
        LoadManifestsStage.Result result = (LoadManifestsStage.Result) new LoadManifestsStage(getJobStageConfig()).apply(new LoadManifestsStage.Arguments(File.createTempFile("entry", ".seq"), 32));
        loadedManifestData = result.getLoadedManifestData();
        Assertions.assertThat(loadedManifestData).describedAs("manifest data from %s", new Object[]{result}).isNotNull();
        LoadManifestsStage.SummaryInfo summary = result.getSummary();
        String summaryInfo = summary.toString();
        LOG.info("Manifest summary {}", summaryInfo);
        Assertions.assertThat(summary.getTaskAttemptIDs()).describedAs("Task attempts in %s", new Object[]{summaryInfo}).hasSize(2).contains(new String[]{this.taskAttempt01, this.taskAttempt10});
    }

    @Test
    public void test_0410_commitJob() throws Throwable {
        describe("Commit the job");
        new CommitJobStage(getJobStageConfig()).apply(new CommitJobStage.Arguments(true, false, (String) null, CleanupJobStage.DISABLED));
    }

    @Test
    public void test_0420_validateJob() throws Throwable {
        describe("Validate the output of the job through the validation stage");
        Assumptions.assumeThat(loadedManifestData).describedAs("Loaded Manifest Data from earlier stage", new Object[0]).isNotNull();
        Assertions.assertThat((List) ((List) new ValidateRenamedFilesStage(getJobStageConfig()).apply(loadedManifestData.getEntrySequenceData())).stream().map((v0) -> {
            return v0.getDest();
        }).collect(Collectors.toList())).containsAll(ManifestCommitterTestSupport.loadAndPrintSuccessData(getFileSystem(), getJobStageConfig().getJobSuccessMarkerPath()).getFilenames());
    }

    @Test
    public void test_0430_validateStatistics() throws Throwable {
        ManifestSuccessData load = ManifestSuccessData.load(getFileSystem(), getJobStageConfig().getJobSuccessMarkerPath());
        String json = load.toJson();
        LOG.info("Success data is {}", json);
        Assertions.assertThat(load).describedAs("Manifest " + json, new Object[0]).returns(NetUtils.getLocalHostname(), (v0) -> {
            return v0.getHostname();
        }).returns(ManifestCommitterConstants.MANIFEST_COMMITTER_CLASSNAME, (v0) -> {
            return v0.getCommitter();
        }).returns(this.jobId, (v0) -> {
            return v0.getJobId();
        }).returns(true, (v0) -> {
            return v0.getSuccess();
        }).returns("JobID", (v0) -> {
            return v0.getJobIdSource();
        });
        Assertions.assertThat(load.getDiagnostics()).containsEntry("principal", UserGroupInformation.getCurrentUser().getShortUserName()).containsEntry("stage", "committer_commit_job");
        IOStatisticsSnapshot iOStatistics = load.getIOStatistics();
        int size = load.getFilenames().size();
        IOStatisticAssertions.verifyStatisticCounterValue(iOStatistics, "committer_commit_job", 1L);
        IOStatisticAssertions.assertThatStatisticCounter(iOStatistics, "committer_files_committed").isGreaterThanOrEqualTo(size);
        IOStatisticAssertions.verifyStatisticCounterValue(iOStatistics, "committer_bytes_committed", ((Long) iOStatistics.counters().get("committer_files_committed")).longValue() * 2);
    }

    @Test
    public void test_0440_validateSuccessFiles() throws Throwable {
        FileSystem fileSystem = getFileSystem();
        ManifestCommitterTestSupport.validateGeneratedFiles(fileSystem, getJobStageConfig().getDestinationDir(), ManifestCommitterTestSupport.loadAndPrintSuccessData(fileSystem, getJobStageConfig().getJobSuccessMarkerPath()), false);
    }

    @Test
    public void test_0450_validationDetectsFailures() throws Throwable {
        Path destPath = ((FileEntry) ((List) new ValidateRenamedFilesStage(getJobStageConfig()).apply(loadedManifestData.getEntrySequenceData())).get(0)).getDestPath();
        Path path = new Path(destPath.getParent(), destPath.getName() + "-renamed");
        FileSystem fileSystem = getFileSystem();
        fileSystem.rename(destPath, path);
        try {
            LambdaTestUtils.intercept(OutputValidationException.class, () -> {
                return (List) new ValidateRenamedFilesStage(getJobStageConfig()).apply(loadedManifestData.getEntrySequenceData());
            });
            fileSystem.rename(path, destPath);
        } catch (Throwable th) {
            fileSystem.rename(path, destPath);
            throw th;
        }
    }

    @Test
    public void test_0900_cleanupJob() throws Throwable {
        describe("Cleanup job");
        CleanupJobStage.Arguments arguments = new CleanupJobStage.Arguments("job_stage_cleanup", true, true, false, false, 0L);
        CleanupJobStage.Result result = (CleanupJobStage.Result) new CleanupJobStage(getJobStageConfig()).apply(arguments);
        assertCleanupResult(result, CleanupJobStage.Outcome.PARALLEL_DELETE, 4);
        assertPathDoesNotExist("Job attempt dir", result.getDirectory());
        assertCleanupResult((CleanupJobStage.Result) new CleanupJobStage(getJobStageConfig()).apply(arguments), CleanupJobStage.Outcome.NOTHING_TO_CLEAN_UP, 0);
    }

    @Test
    public void test_9999_cleanupTestDir() throws Throwable {
        if (shouldDeleteTestRootAtEndOfTestRun()) {
            deleteSharedTestRoot();
        }
    }
}
