package org.apache.hadoop.mapreduce.lib.output.committer.manifest;

import java.io.FileNotFoundException;
import java.net.SocketTimeoutException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.UnreliableManifestStoreOperations;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitJobStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitTaskStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupJobStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupTaskStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
import org.apache.hadoop.test.LambdaTestUtils;
import org.assertj.core.api.Assertions;
import org.junit.Test;

/* loaded from: input_file:org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestCommitTaskStage.class */
public class TestCommitTaskStage extends AbstractManifestCommitterTest {
    public static final String TASK1 = String.format("task_%03d", 1);
    public static final String TASK1_ATTEMPT1 = String.format("%s_%02d", TASK1, 1);

    @Override // org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest
    public void setup() throws Exception {
        super.setup();
        StageConfig createStageConfigForJob = createStageConfigForJob(1, methodPath());
        setJobStageConfig(createStageConfigForJob);
        new SetupJobStage(createStageConfigForJob).apply(true);
    }

    private StageConfig createStageConfig() {
        return createTaskStageConfig(1, TASK1, TASK1_ATTEMPT1);
    }

    @Test
    public void testCommitMissingDirectory() throws Throwable {
        String format = String.format("task_%03d", 1);
        StageConfig createTaskStageConfig = createTaskStageConfig(1, format, String.format("%s_%02d", format, 1));
        assertPathDoesNotExist("task attempt path", createTaskStageConfig.getTaskAttemptDir());
        LambdaTestUtils.intercept(FileNotFoundException.class, () -> {
            return (CommitTaskStage.Result) new CommitTaskStage(createTaskStageConfig).apply((Object) null);
        });
    }

    @Test
    public void testCommitEmptyDirectory() throws Throwable {
        describe("Commit an empty directory as task then job");
        String format = String.format("task_%03d", 2);
        StageConfig createTaskStageConfig = createTaskStageConfig(1, format, String.format("%s_%02d", format, 1));
        new SetupTaskStage(createTaskStageConfig).apply("setup");
        CommitTaskStage.Result result = (CommitTaskStage.Result) new CommitTaskStage(createTaskStageConfig).apply((Object) null);
        TaskManifest taskManifest = result.getTaskManifest();
        Assertions.assertThat(taskManifest.getDestDirectories()).as("directories to create", new Object[0]).isEmpty();
        Assertions.assertThat(taskManifest.getFilesToCommit()).as("files to commit", new Object[0]).isEmpty();
        Path path = result.getPath();
        String readText = readText(path);
        LOG.info("manifest at {} of length {}:\n{}", new Object[]{path, Integer.valueOf(readText.length()), readText});
        CommitJobStage.Result result2 = (CommitJobStage.Result) new CommitJobStage(getJobStageConfig()).apply(new CommitJobStage.Arguments(true, true, (String) null, new CleanupJobStage.Arguments("job_stage_cleanup", true, true, false, false, 0L)));
        Path successPath = result2.getSuccessPath();
        String readText2 = readText(successPath);
        LOG.info("successBody at {} of length {}:\n{}", new Object[]{successPath, Integer.valueOf(readText2.length()), readText2});
        Assertions.assertThat(result2.getJobSuccessData().getFilenames()).as("Filenames in _SUCCESS", new Object[0]).isEmpty();
    }

    @Test
    public void testManifestSaveFailures() throws Throwable {
        describe("Test recovery of manifest save/rename failures");
        UnreliableManifestStoreOperations makeStoreOperationsUnreliable = makeStoreOperationsUnreliable();
        StageConfig createStageConfig = createStageConfig();
        new SetupTaskStage(createStageConfig).apply("setup");
        Path taskManifestDir = createStageConfig.getTaskManifestDir();
        ManifestCommitterSupport.manifestPathForTask(taskManifestDir, createStageConfig.getTaskId());
        Path manifestTempPathForTaskAttempt = ManifestCommitterSupport.manifestTempPathForTaskAttempt(taskManifestDir, createStageConfig.getTaskAttemptId());
        makeStoreOperationsUnreliable.addSaveToFail(manifestTempPathForTaskAttempt);
        makeStoreOperationsUnreliable.setFailureLimit(5);
        LambdaTestUtils.intercept(PathIOException.class, UnreliableManifestStoreOperations.generatedErrorMessage("save"), () -> {
            return (CommitTaskStage.Result) new CommitTaskStage(createStageConfig).apply((Object) null);
        });
        makeStoreOperationsUnreliable.setFailureLimit(3);
        new CommitTaskStage(createStageConfig).apply((Object) null);
        describe("Testing timeouts on rename operations.");
        makeStoreOperationsUnreliable.reset();
        makeStoreOperationsUnreliable.addTimeoutBeforeRename(manifestTempPathForTaskAttempt);
        makeStoreOperationsUnreliable.setFailureLimit(5);
        LambdaTestUtils.intercept(SocketTimeoutException.class, UnreliableManifestStoreOperations.E_TIMEOUT, () -> {
            return (CommitTaskStage.Result) new CommitTaskStage(createStageConfig).apply((Object) null);
        });
        makeStoreOperationsUnreliable.setFailureLimit(3);
        new CommitTaskStage(createStageConfig).apply((Object) null);
    }

    @Test
    public void testManifestRenameEarlyTimeouts() throws Throwable {
        describe("Testing timeouts on rename operations.");
        UnreliableManifestStoreOperations makeStoreOperationsUnreliable = makeStoreOperationsUnreliable();
        StageConfig createStageConfig = createStageConfig();
        new SetupTaskStage(createStageConfig).apply("setup");
        Path taskManifestDir = createStageConfig.getTaskManifestDir();
        ManifestCommitterSupport.manifestPathForTask(taskManifestDir, createStageConfig.getTaskId());
        makeStoreOperationsUnreliable.addTimeoutBeforeRename(ManifestCommitterSupport.manifestTempPathForTaskAttempt(taskManifestDir, createStageConfig.getTaskAttemptId()));
        makeStoreOperationsUnreliable.setFailureLimit(5);
        LambdaTestUtils.intercept(SocketTimeoutException.class, UnreliableManifestStoreOperations.E_TIMEOUT, () -> {
            return (CommitTaskStage.Result) new CommitTaskStage(createStageConfig).apply((Object) null);
        });
        IOStatisticsStore iOStatistics = createStageConfig.getIOStatistics();
        IOStatisticAssertions.assertThatStatisticCounter(iOStatistics, "task_stage_save_task_manifest.failures").isEqualTo(4L);
        iOStatistics.reset();
        makeStoreOperationsUnreliable.setFailureLimit(4);
        IOStatisticsSnapshot iOStatistics2 = TaskManifest.load(getFileSystem(), ((CommitTaskStage.Result) new CommitTaskStage(createStageConfig).apply((Object) null)).getPath()).getIOStatistics();
        LOG.info("Statistics of file successfully saved:\nD {}", IOStatisticsLogging.ioStatisticsToPrettyString(iOStatistics2));
        IOStatisticAssertions.assertThatStatisticCounter(iOStatistics2, "task_stage_save_task_manifest.failures").isEqualTo(3L);
    }

    @Test
    public void testManifestRenameLateTimeoutsFailure() throws Throwable {
        describe("Testing timeouts on rename operations.");
        UnreliableManifestStoreOperations makeStoreOperationsUnreliable = makeStoreOperationsUnreliable();
        StageConfig createStageConfig = createStageConfig();
        new SetupTaskStage(createStageConfig).apply("setup");
        makeStoreOperationsUnreliable.addTimeoutAfterRename(ManifestCommitterSupport.manifestTempPathForTaskAttempt(createStageConfig.getTaskManifestDir(), createStageConfig.getTaskAttemptId()));
        makeStoreOperationsUnreliable.setFailureLimit(5);
        LambdaTestUtils.intercept(SocketTimeoutException.class, UnreliableManifestStoreOperations.E_TIMEOUT, () -> {
            return (CommitTaskStage.Result) new CommitTaskStage(createStageConfig).apply((Object) null);
        });
    }

    @Test
    public void testManifestRenameLateTimeoutsRecovery() throws Throwable {
        describe("Testing recovery from late timeouts on rename operations.");
        UnreliableManifestStoreOperations makeStoreOperationsUnreliable = makeStoreOperationsUnreliable();
        StageConfig createStageConfig = createStageConfig();
        new SetupTaskStage(createStageConfig).apply("setup");
        makeStoreOperationsUnreliable.addTimeoutAfterRename(ManifestCommitterSupport.manifestTempPathForTaskAttempt(createStageConfig.getTaskManifestDir(), createStageConfig.getTaskAttemptId()));
        makeStoreOperationsUnreliable.setFailureLimit(4);
        createStageConfig.getIOStatistics().reset();
        new CommitTaskStage(createStageConfig).apply((Object) null);
        IOStatisticsSnapshot iOStatistics = TaskManifest.load(getFileSystem(), ((CommitTaskStage.Result) new CommitTaskStage(createStageConfig).apply((Object) null)).getPath()).getIOStatistics();
        LOG.info("Statistics of file successfully saved:\n{}", IOStatisticsLogging.ioStatisticsToPrettyString(iOStatistics));
        IOStatisticAssertions.assertThatStatisticCounter(iOStatistics, "task_stage_save_task_manifest.failures").isEqualTo(3L);
    }

    @Test
    public void testFailureToDeleteManifestPath() throws Throwable {
        describe("Testing failure in the delete call made before renaming the manifest");
        UnreliableManifestStoreOperations makeStoreOperationsUnreliable = makeStoreOperationsUnreliable();
        StageConfig createStageConfig = createStageConfig();
        new SetupTaskStage(createStageConfig).apply("setup");
        Path taskManifestDir = createStageConfig.getTaskManifestDir();
        Path manifestPathForTask = ManifestCommitterSupport.manifestPathForTask(taskManifestDir, createStageConfig.getTaskId());
        ContractTestUtils.touch(getFileSystem(), manifestPathForTask);
        makeStoreOperationsUnreliable.addDeletePathToFail(manifestPathForTask);
        ManifestCommitterSupport.manifestTempPathForTaskAttempt(taskManifestDir, createStageConfig.getTaskAttemptId());
        makeStoreOperationsUnreliable.setFailureLimit(5);
        LambdaTestUtils.intercept(PathIOException.class, () -> {
            return (CommitTaskStage.Result) new CommitTaskStage(createStageConfig).apply((Object) null);
        });
        makeStoreOperationsUnreliable.setFailureLimit(3);
        new CommitTaskStage(createStageConfig).apply((Object) null);
    }

    @Test
    public void testFailureOfDeleteBeforeSavingTemporaryFile() throws Throwable {
        describe("Testing failure in the delete call made before rename");
        UnreliableManifestStoreOperations makeStoreOperationsUnreliable = makeStoreOperationsUnreliable();
        StageConfig createStageConfig = createStageConfig();
        new SetupTaskStage(createStageConfig).apply("setup");
        makeStoreOperationsUnreliable.addDeletePathToFail(ManifestCommitterSupport.manifestTempPathForTaskAttempt(createStageConfig.getTaskManifestDir(), createStageConfig.getTaskAttemptId()));
        makeStoreOperationsUnreliable.setFailureLimit(5);
        LambdaTestUtils.intercept(PathIOException.class, () -> {
            return (CommitTaskStage.Result) new CommitTaskStage(createStageConfig).apply((Object) null);
        });
        makeStoreOperationsUnreliable.setFailureLimit(3);
        new CommitTaskStage(createStageConfig).apply((Object) null);
    }

    @Test
    public void testRenameTargetIsDir() throws Throwable {
        describe("Rename target is a directory");
        ManifestStoreOperations storeOperations = getStoreOperations();
        StageConfig createStageConfig = createStageConfig();
        SetupTaskStage setupTaskStage = new SetupTaskStage(createStageConfig);
        setupTaskStage.apply("setup");
        Path taskManifestDir = createStageConfig.getTaskManifestDir();
        Path manifestPathForTask = ManifestCommitterSupport.manifestPathForTask(taskManifestDir, createStageConfig.getTaskId());
        ManifestCommitterSupport.manifestTempPathForTaskAttempt(taskManifestDir, createStageConfig.getTaskAttemptId());
        setupTaskStage.mkdirs(manifestPathForTask, true);
        ContractTestUtils.assertIsDirectory(getFileSystem(), manifestPathForTask);
        new CommitTaskStage(createStageConfig).apply((Object) null);
        FileStatus fileStatus = storeOperations.getFileStatus(manifestPathForTask);
        Assertions.assertThat(fileStatus).describedAs("File status of %s", new Object[]{manifestPathForTask}).matches((v0) -> {
            return v0.isFile();
        }, "is a file");
        Assertions.assertThat(setupTaskStage.loadManifest(fileStatus)).matches(taskManifest -> {
            return taskManifest.getTaskID().equals(TASK1);
        }).matches(taskManifest2 -> {
            return taskManifest2.getTaskAttemptID().equals(TASK1_ATTEMPT1);
        });
    }

    @Test
    public void testManifestTempFileIsDir() throws Throwable {
        describe("Manifest temp file path is a directory");
        ManifestStoreOperations storeOperations = getStoreOperations();
        StageConfig createStageConfig = createStageConfig();
        SetupTaskStage setupTaskStage = new SetupTaskStage(createStageConfig);
        setupTaskStage.apply("setup");
        Path taskManifestDir = createStageConfig.getTaskManifestDir();
        Path manifestPathForTask = ManifestCommitterSupport.manifestPathForTask(taskManifestDir, createStageConfig.getTaskId());
        setupTaskStage.mkdirs(ManifestCommitterSupport.manifestTempPathForTaskAttempt(taskManifestDir, createStageConfig.getTaskAttemptId()), true);
        new CommitTaskStage(createStageConfig).apply((Object) null);
        Assertions.assertThat(setupTaskStage.loadManifest(storeOperations.getFileStatus(manifestPathForTask))).matches(taskManifest -> {
            return taskManifest.getTaskID().equals(TASK1);
        }).matches(taskManifest2 -> {
            return taskManifest2.getTaskAttemptID().equals(TASK1_ATTEMPT1);
        });
    }
}
