package alluxio.master.file.scheduler;

import alluxio.AlluxioURI;
import alluxio.client.block.stream.BlockWorkerClient;
import alluxio.client.file.FileSystemContext;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.conf.Source;
import alluxio.exception.AccessControlException;
import alluxio.exception.runtime.ResourceExhaustedRuntimeException;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.BlockStatus;
import alluxio.grpc.JobProgressReportFormat;
import alluxio.grpc.LoadRequest;
import alluxio.grpc.LoadResponse;
import alluxio.grpc.TaskStatus;
import alluxio.job.JobDescription;
import alluxio.master.file.FileSystemMaster;
import alluxio.master.file.contexts.ListStatusContext;
import alluxio.master.job.FileIterable;
import alluxio.master.job.LoadJob;
import alluxio.master.journal.JournalContext;
import alluxio.master.scheduler.DefaultWorkerProvider;
import alluxio.master.scheduler.JournaledJobMetaStore;
import alluxio.master.scheduler.Scheduler;
import alluxio.proto.journal.Job;
import alluxio.proto.journal.Journal;
import alluxio.resource.CloseableResource;
import alluxio.scheduler.job.JobState;
import alluxio.security.authentication.AuthenticatedClientUser;
import alluxio.wire.FileInfo;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerNetAddress;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Status;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:alluxio/master/file/scheduler/SchedulerTest.class */
public final class SchedulerTest {
    @BeforeClass
    public static void before() {
        AuthenticatedClientUser.set("user");
    }

    @AfterClass
    public static void after() {
        AuthenticatedClientUser.remove();
    }

    @Test
    public void testGetActiveWorkers() throws IOException {
        FileSystemMaster fileSystemMaster = (FileSystemMaster) Mockito.mock(FileSystemMaster.class);
        FileSystemContext fileSystemContext = (FileSystemContext) Mockito.mock(FileSystemContext.class);
        CloseableResource closeableResource = (CloseableResource) Mockito.mock(CloseableResource.class);
        Scheduler scheduler = new Scheduler(new DefaultWorkerProvider(fileSystemMaster, fileSystemContext), new JournaledJobMetaStore(fileSystemMaster));
        Mockito.when(fileSystemMaster.getWorkerInfoList()).thenReturn(ImmutableList.of(new WorkerInfo().setId(1L).setAddress(new WorkerNetAddress().setHost("worker1").setRpcPort(1234)), new WorkerInfo().setId(2L).setAddress(new WorkerNetAddress().setHost("worker2").setRpcPort(1234)))).thenThrow(new Throwable[]{new UnavailableException("test")}).thenReturn(ImmutableList.of(new WorkerInfo().setId(2L).setAddress(new WorkerNetAddress().setHost("worker2").setRpcPort(1234)))).thenReturn(ImmutableList.of(new WorkerInfo().setId(1L).setAddress(new WorkerNetAddress().setHost("worker1").setRpcPort(1234)), new WorkerInfo().setId(2L).setAddress(new WorkerNetAddress().setHost("worker2").setRpcPort(1234))));
        Mockito.when(fileSystemContext.acquireBlockWorkerClient((WorkerNetAddress) ArgumentMatchers.any())).thenReturn(closeableResource);
        Assert.assertEquals(0L, scheduler.getActiveWorkers().size());
        scheduler.updateWorkers();
        Assert.assertEquals(2L, scheduler.getActiveWorkers().size());
        scheduler.updateWorkers();
        Assert.assertEquals(2L, scheduler.getActiveWorkers().size());
        scheduler.updateWorkers();
        Assert.assertEquals(1L, scheduler.getActiveWorkers().size());
        scheduler.updateWorkers();
        Assert.assertEquals(2L, scheduler.getActiveWorkers().size());
    }

    @Test
    public void testSubmit() throws Exception {
        String str = "/path/to/load";
        FileSystemMaster fileSystemMaster = (FileSystemMaster) Mockito.mock(FileSystemMaster.class);
        FileSystemContext fileSystemContext = (FileSystemContext) Mockito.mock(FileSystemContext.class);
        JournalContext journalContext = (JournalContext) Mockito.mock(JournalContext.class);
        Mockito.when(fileSystemMaster.createJournalContext()).thenReturn(journalContext);
        Scheduler scheduler = new Scheduler(new DefaultWorkerProvider(fileSystemMaster, fileSystemContext), new JournaledJobMetaStore(fileSystemMaster));
        FileIterable fileIterable = new FileIterable(fileSystemMaster, "/path/to/load", Optional.of("user"), false, LoadJob.QUALIFIED_FILE_FILTER);
        LoadJob loadJob = new LoadJob("/path/to/load", Optional.of("user"), "1", OptionalLong.empty(), false, true, fileIterable);
        Assert.assertTrue(scheduler.submitJob(loadJob));
        ((JournalContext) Mockito.verify(journalContext)).append((Journal.JournalEntry) ArgumentMatchers.argThat(journalEntry -> {
            return journalEntry.hasLoadJob() && journalEntry.getLoadJob().getLoadPath().equals(str) && journalEntry.getLoadJob().getState() == Job.PJobState.CREATED && !journalEntry.getLoadJob().hasBandwidth() && journalEntry.getLoadJob().getVerify();
        }));
        Assert.assertEquals(1L, scheduler.getJobs().size());
        LoadJob loadJob2 = (LoadJob) scheduler.getJobs().get(loadJob.getDescription());
        Assert.assertEquals(OptionalLong.empty(), loadJob2.getBandwidth());
        Assert.assertTrue(loadJob2.isVerificationEnabled());
        LoadJob loadJob3 = new LoadJob("/path/to/load", Optional.of("user"), "1", OptionalLong.of(1000L), true, false, fileIterable);
        Assert.assertFalse(scheduler.submitJob(loadJob3));
        ((JournalContext) Mockito.verify(journalContext)).append((Journal.JournalEntry) ArgumentMatchers.argThat(journalEntry2 -> {
            return journalEntry2.hasLoadJob() && journalEntry2.getLoadJob().getLoadPath().equals(str) && journalEntry2.getLoadJob().getState() == Job.PJobState.CREATED && journalEntry2.getLoadJob().getBandwidth() == 1000 && !journalEntry2.getLoadJob().getPartialListing() && !journalEntry2.getLoadJob().getVerify();
        }));
        Assert.assertEquals(1L, scheduler.getJobs().size());
        LoadJob loadJob4 = (LoadJob) scheduler.getJobs().get(loadJob3.getDescription());
        Assert.assertEquals(1000L, loadJob4.getBandwidth().getAsLong());
        Assert.assertFalse(loadJob4.isVerificationEnabled());
    }

    @Test
    public void testStop() throws Exception {
        String str = "/path/to/load";
        FileSystemMaster fileSystemMaster = (FileSystemMaster) Mockito.mock(FileSystemMaster.class);
        FileSystemContext fileSystemContext = (FileSystemContext) Mockito.mock(FileSystemContext.class);
        JournalContext journalContext = (JournalContext) Mockito.mock(JournalContext.class);
        Mockito.when(fileSystemMaster.createJournalContext()).thenReturn(journalContext);
        Scheduler scheduler = new Scheduler(new DefaultWorkerProvider(fileSystemMaster, fileSystemContext), new JournaledJobMetaStore(fileSystemMaster));
        LoadJob loadJob = new LoadJob("/path/to/load", Optional.of("user"), "1", OptionalLong.of(100L), false, true, new FileIterable(fileSystemMaster, "/path/to/load", Optional.of("user"), false, LoadJob.QUALIFIED_FILE_FILTER));
        Assert.assertTrue(scheduler.submitJob(loadJob));
        ((JournalContext) Mockito.verify(journalContext, Mockito.times(1))).append((Journal.JournalEntry) ArgumentMatchers.any());
        ((JournalContext) Mockito.verify(journalContext)).append((Journal.JournalEntry) ArgumentMatchers.argThat(journalEntry -> {
            return journalEntry.hasLoadJob() && journalEntry.getLoadJob().getLoadPath().equals(str) && journalEntry.getLoadJob().getState() == Job.PJobState.CREATED && journalEntry.getLoadJob().getBandwidth() == 100 && journalEntry.getLoadJob().getVerify();
        }));
        Assert.assertTrue(scheduler.stopJob(loadJob.getDescription()));
        ((JournalContext) Mockito.verify(journalContext, Mockito.times(2))).append((Journal.JournalEntry) ArgumentMatchers.any());
        ((JournalContext) Mockito.verify(journalContext)).append((Journal.JournalEntry) ArgumentMatchers.argThat(journalEntry2 -> {
            return journalEntry2.hasLoadJob() && journalEntry2.getLoadJob().getLoadPath().equals(str) && journalEntry2.getLoadJob().getState() == Job.PJobState.STOPPED && journalEntry2.getLoadJob().getBandwidth() == 100 && journalEntry2.getLoadJob().getVerify() && journalEntry2.getLoadJob().hasEndTime();
        }));
        Assert.assertFalse(scheduler.stopJob(loadJob.getDescription()));
        ((JournalContext) Mockito.verify(journalContext, Mockito.times(2))).append((Journal.JournalEntry) ArgumentMatchers.any());
        Assert.assertFalse(scheduler.stopJob(JobDescription.newBuilder().setPath("/does/not/exist").build()));
        ((JournalContext) Mockito.verify(journalContext, Mockito.times(2))).append((Journal.JournalEntry) ArgumentMatchers.any());
        Assert.assertFalse(scheduler.submitJob(loadJob));
        ((JournalContext) Mockito.verify(journalContext, Mockito.times(3))).append((Journal.JournalEntry) ArgumentMatchers.any());
        Assert.assertTrue(scheduler.stopJob(loadJob.getDescription()));
        ((JournalContext) Mockito.verify(journalContext, Mockito.times(4))).append((Journal.JournalEntry) ArgumentMatchers.any());
    }

    @Test
    public void testSubmitExceedsCapacity() throws Exception {
        FileSystemMaster fileSystemMaster = (FileSystemMaster) Mockito.mock(FileSystemMaster.class);
        FileSystemContext fileSystemContext = (FileSystemContext) Mockito.mock(FileSystemContext.class);
        Mockito.when(fileSystemMaster.createJournalContext()).thenReturn((JournalContext) Mockito.mock(JournalContext.class));
        Scheduler scheduler = new Scheduler(new DefaultWorkerProvider(fileSystemMaster, fileSystemContext), new JournaledJobMetaStore(fileSystemMaster));
        IntStream.range(0, 100).forEach(i -> {
            String format = String.format("/path/to/load/%d", Integer.valueOf(i));
            Assert.assertTrue(scheduler.submitJob(new LoadJob(format, Optional.of("user"), "1", OptionalLong.empty(), false, true, new FileIterable(fileSystemMaster, format, Optional.of("user"), false, LoadJob.QUALIFIED_FILE_FILTER))));
        });
        FileIterable fileIterable = new FileIterable(fileSystemMaster, "/path/to/load/101", Optional.of("user"), false, LoadJob.QUALIFIED_FILE_FILTER);
        Assert.assertThrows(ResourceExhaustedRuntimeException.class, () -> {
            scheduler.submitJob(new LoadJob("/path/to/load/101", Optional.of("user"), "1", OptionalLong.empty(), false, true, fileIterable));
        });
    }

    @Test
    public void testScheduling() throws Exception {
        FileSystemMaster fileSystemMaster = (FileSystemMaster) Mockito.mock(FileSystemMaster.class);
        FileSystemContext fileSystemContext = (FileSystemContext) Mockito.mock(FileSystemContext.class);
        JournalContext journalContext = (JournalContext) Mockito.mock(JournalContext.class);
        Mockito.when(fileSystemMaster.createJournalContext()).thenReturn(journalContext);
        CloseableResource closeableResource = (CloseableResource) Mockito.mock(CloseableResource.class);
        BlockWorkerClient blockWorkerClient = (BlockWorkerClient) Mockito.mock(BlockWorkerClient.class);
        Mockito.when(fileSystemMaster.getWorkerInfoList()).thenReturn(ImmutableList.of(new WorkerInfo().setId(1L).setAddress(new WorkerNetAddress().setHost("worker1").setRpcPort(1234)), new WorkerInfo().setId(2L).setAddress(new WorkerNetAddress().setHost("worker2").setRpcPort(1234)))).thenReturn(ImmutableList.of(new WorkerInfo().setId(2L).setAddress(new WorkerNetAddress().setHost("worker2").setRpcPort(1234)))).thenReturn(ImmutableList.of(new WorkerInfo().setId(1L).setAddress(new WorkerNetAddress().setHost("worker1").setRpcPort(1234)), new WorkerInfo().setId(2L).setAddress(new WorkerNetAddress().setHost("worker2").setRpcPort(1234)), new WorkerInfo().setId(3L).setAddress(new WorkerNetAddress().setHost("worker3").setRpcPort(1234)), new WorkerInfo().setId(4L).setAddress(new WorkerNetAddress().setHost("worker4").setRpcPort(1234)), new WorkerInfo().setId(5L).setAddress(new WorkerNetAddress().setHost("worker5").setRpcPort(1234)), new WorkerInfo().setId(6L).setAddress(new WorkerNetAddress().setHost("worker6").setRpcPort(1234)), new WorkerInfo().setId(7L).setAddress(new WorkerNetAddress().setHost("worker7").setRpcPort(1234)), new WorkerInfo().setId(8L).setAddress(new WorkerNetAddress().setHost("worker8").setRpcPort(1234)), new WorkerInfo().setId(9L).setAddress(new WorkerNetAddress().setHost("worker9").setRpcPort(1234)), new WorkerInfo().setId(10L).setAddress(new WorkerNetAddress().setHost("worker10").setRpcPort(1234))));
        List<FileInfo> generateRandomFileInfo = LoadTestUtils.generateRandomFileInfo(100, 50, 67108864L);
        Mockito.when(fileSystemMaster.listStatus((AlluxioURI) ArgumentMatchers.any(), (ListStatusContext) ArgumentMatchers.any())).thenReturn(generateRandomFileInfo).thenReturn(LoadTestUtils.fileWithBlockLocations(generateRandomFileInfo, 0.95d)).thenReturn(LoadTestUtils.fileWithBlockLocations(generateRandomFileInfo, 1.1d));
        Mockito.when(fileSystemContext.acquireBlockWorkerClient((WorkerNetAddress) ArgumentMatchers.any())).thenReturn(closeableResource);
        Mockito.when(closeableResource.get()).thenReturn(blockWorkerClient);
        AtomicInteger atomicInteger = new AtomicInteger();
        Mockito.when(blockWorkerClient.load((LoadRequest) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            return buildResponseFuture((LoadRequest) invocationOnMock.getArgument(0), atomicInteger);
        });
        Scheduler scheduler = new Scheduler(new DefaultWorkerProvider(fileSystemMaster, fileSystemContext), new JournaledJobMetaStore(fileSystemMaster));
        String str = "test";
        FileIterable fileIterable = new FileIterable(fileSystemMaster, "test", Optional.of("user"), false, LoadJob.QUALIFIED_FILE_FILTER);
        LoadJob loadJob = new LoadJob("test", Optional.of("user"), "1", OptionalLong.of(1000L), false, true, fileIterable);
        scheduler.submitJob(loadJob);
        ((JournalContext) Mockito.verify(journalContext)).append((Journal.JournalEntry) ArgumentMatchers.argThat(journalEntry -> {
            return journalEntry.hasLoadJob() && journalEntry.getLoadJob().getLoadPath().equals(str) && journalEntry.getLoadJob().getState() == Job.PJobState.CREATED && journalEntry.getLoadJob().getBandwidth() == 1000 && journalEntry.getLoadJob().getVerify();
        }));
        scheduler.start();
        while (!scheduler.getJobProgress(loadJob.getDescription(), JobProgressReportFormat.TEXT, false).contains("SUCCEEDED")) {
            Assert.assertFalse(scheduler.submitJob(new LoadJob("test", Optional.of("user"), "1", OptionalLong.of(1000L), false, true, fileIterable)));
            Thread.sleep(1000L);
        }
        Thread.sleep(1000L);
        scheduler.stop();
        Assert.assertEquals(JobState.SUCCEEDED, loadJob.getJobState());
        Assert.assertEquals(0L, loadJob.getCurrentBlockCount());
        ((JournalContext) Mockito.verify(journalContext)).append((Journal.JournalEntry) ArgumentMatchers.argThat(journalEntry2 -> {
            return journalEntry2.hasLoadJob() && journalEntry2.getLoadJob().getLoadPath().equals(str) && journalEntry2.getLoadJob().getState() == Job.PJobState.SUCCEEDED && journalEntry2.getLoadJob().getBandwidth() == 1000 && journalEntry2.getLoadJob().getVerify();
        }));
        Assert.assertTrue(scheduler.submitJob(new LoadJob("test", "user", OptionalLong.of(1000L), fileIterable)));
    }

    private ListenableFuture<LoadResponse> buildResponseFuture(LoadRequest loadRequest, AtomicInteger atomicInteger) {
        atomicInteger.getAndIncrement();
        if (atomicInteger.get() == 70) {
            SettableFuture create = SettableFuture.create();
            create.setException(new TimeoutException());
            return create;
        }
        List<BlockStatus> generateRandomBlockStatus = atomicInteger.get() == 50 ? LoadTestUtils.generateRandomBlockStatus(loadRequest.getBlocksList(), 1.0d) : LoadTestUtils.generateRandomBlockStatus(loadRequest.getBlocksList(), 0.01d);
        LoadResponse.Builder newBuilder = LoadResponse.newBuilder();
        if (generateRandomBlockStatus.stream().allMatch(blockStatus -> {
            return blockStatus.getCode() == Status.OK.getCode().value();
        })) {
            newBuilder.setStatus(TaskStatus.SUCCESS);
        } else if (generateRandomBlockStatus.stream().allMatch(blockStatus2 -> {
            return blockStatus2.getCode() != Status.OK.getCode().value();
        })) {
            newBuilder.setStatus(TaskStatus.FAILURE).addAllBlockStatus(generateRandomBlockStatus);
        } else {
            newBuilder.setStatus(TaskStatus.PARTIAL_FAILURE).addAllBlockStatus((Iterable) generateRandomBlockStatus.stream().filter(blockStatus3 -> {
                return blockStatus3.getCode() != Status.OK.getCode().value();
            }).collect(ImmutableList.toImmutableList()));
        }
        SettableFuture create2 = SettableFuture.create();
        create2.set(newBuilder.build());
        return create2;
    }

    @Test
    public void testSchedulingFullCapacity() throws Exception {
        FileSystemMaster fileSystemMaster = (FileSystemMaster) Mockito.mock(FileSystemMaster.class);
        FileSystemContext fileSystemContext = (FileSystemContext) Mockito.mock(FileSystemContext.class);
        Mockito.when(fileSystemMaster.createJournalContext()).thenReturn((JournalContext) Mockito.mock(JournalContext.class));
        CloseableResource closeableResource = (CloseableResource) Mockito.mock(CloseableResource.class);
        BlockWorkerClient blockWorkerClient = (BlockWorkerClient) Mockito.mock(BlockWorkerClient.class);
        ImmutableList.Builder builder = ImmutableList.builder();
        for (int i = 0; i < 1000; i++) {
            builder.add(new WorkerInfo().setId(i).setAddress(new WorkerNetAddress().setHost("worker" + i).setRpcPort(1234)));
        }
        Mockito.when(fileSystemMaster.getWorkerInfoList()).thenReturn(builder.build());
        Mockito.when(fileSystemMaster.listStatus((AlluxioURI) ArgumentMatchers.any(), (ListStatusContext) ArgumentMatchers.any())).thenReturn(LoadTestUtils.generateRandomFileInfo(2000, 50, 67108864L));
        Mockito.when(fileSystemContext.acquireBlockWorkerClient((WorkerNetAddress) ArgumentMatchers.any())).thenReturn(closeableResource);
        Mockito.when(closeableResource.get()).thenReturn(blockWorkerClient);
        Mockito.when(blockWorkerClient.load((LoadRequest) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            LoadResponse.Builder status = LoadResponse.newBuilder().setStatus(TaskStatus.SUCCESS);
            SettableFuture create = SettableFuture.create();
            create.set(status.build());
            return create;
        });
        FileIterable fileIterable = new FileIterable(fileSystemMaster, "test", Optional.of("user"), false, LoadJob.QUALIFIED_FILE_FILTER);
        Scheduler scheduler = new Scheduler(new DefaultWorkerProvider(fileSystemMaster, fileSystemContext), new JournaledJobMetaStore(fileSystemMaster));
        for (int i2 = 0; i2 < 100; i2++) {
            scheduler.submitJob(new LoadJob("test" + i2, "user", OptionalLong.of(1000L), fileIterable));
        }
        Assert.assertThrows(ResourceExhaustedRuntimeException.class, () -> {
            scheduler.submitJob(new LoadJob("/way/too/many", "user", OptionalLong.empty(), fileIterable));
        });
        scheduler.start();
        while (scheduler.getJobs().values().stream().anyMatch(job -> {
            return job.getJobState() != JobState.SUCCEEDED;
        })) {
            Thread.sleep(1000L);
        }
        scheduler.stop();
    }

    @Test
    public void testSchedulingWithException() throws Exception {
        FileSystemMaster fileSystemMaster = (FileSystemMaster) Mockito.mock(FileSystemMaster.class);
        FileSystemContext fileSystemContext = (FileSystemContext) Mockito.mock(FileSystemContext.class);
        Mockito.when(fileSystemMaster.createJournalContext()).thenReturn((JournalContext) Mockito.mock(JournalContext.class));
        CloseableResource closeableResource = (CloseableResource) Mockito.mock(CloseableResource.class);
        BlockWorkerClient blockWorkerClient = (BlockWorkerClient) Mockito.mock(BlockWorkerClient.class);
        Mockito.when(fileSystemMaster.getWorkerInfoList()).thenReturn(ImmutableList.of(new WorkerInfo().setId(1L).setAddress(new WorkerNetAddress().setHost("worker1").setRpcPort(1234)), new WorkerInfo().setId(2L).setAddress(new WorkerNetAddress().setHost("worker2").setRpcPort(1234))));
        Mockito.when(fileSystemContext.acquireBlockWorkerClient((WorkerNetAddress) ArgumentMatchers.any())).thenReturn(closeableResource);
        Mockito.when(closeableResource.get()).thenReturn(blockWorkerClient);
        Mockito.when(fileSystemMaster.listStatus((AlluxioURI) ArgumentMatchers.any(), (ListStatusContext) ArgumentMatchers.any())).thenThrow(AccessControlException.class).thenThrow(new Throwable[]{new ResourceExhaustedRuntimeException("test", true)}).thenReturn(LoadTestUtils.generateRandomFileInfo(100, 10, 67108864L));
        Scheduler scheduler = new Scheduler(new DefaultWorkerProvider(fileSystemMaster, fileSystemContext), new JournaledJobMetaStore(fileSystemMaster));
        scheduler.start();
        FileIterable fileIterable = new FileIterable(fileSystemMaster, "test", Optional.of("user"), false, LoadJob.QUALIFIED_FILE_FILTER);
        LoadJob loadJob = new LoadJob("test", "user", OptionalLong.of(1000L), fileIterable);
        scheduler.submitJob(loadJob);
        while (!scheduler.getJobProgress(loadJob.getDescription(), JobProgressReportFormat.TEXT, false).contains("FAILED")) {
            Thread.sleep(1000L);
        }
        Mockito.when(blockWorkerClient.load((LoadRequest) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            LoadResponse.Builder status = LoadResponse.newBuilder().setStatus(TaskStatus.SUCCESS);
            SettableFuture create = SettableFuture.create();
            create.set(status.build());
            return create;
        });
        LoadJob loadJob2 = new LoadJob("test", "user", OptionalLong.of(1000L), fileIterable);
        scheduler.submitJob(loadJob2);
        while (!scheduler.getJobProgress(loadJob2.getDescription(), JobProgressReportFormat.TEXT, false).contains("SUCCEEDED")) {
            Thread.sleep(1000L);
        }
        scheduler.stop();
    }

    @Test
    public void testJobRetention() throws Exception {
        Configuration.modifiableGlobal().set(PropertyKey.JOB_RETENTION_TIME, "0ms", Source.RUNTIME);
        FileSystemMaster fileSystemMaster = (FileSystemMaster) Mockito.mock(FileSystemMaster.class);
        FileSystemContext fileSystemContext = (FileSystemContext) Mockito.mock(FileSystemContext.class);
        Mockito.when(fileSystemMaster.createJournalContext()).thenReturn((JournalContext) Mockito.mock(JournalContext.class));
        Scheduler scheduler = new Scheduler(new DefaultWorkerProvider(fileSystemMaster, fileSystemContext), new JournaledJobMetaStore(fileSystemMaster));
        scheduler.start();
        IntStream.range(0, 5).forEach(i -> {
            String format = String.format("/load/%d", Integer.valueOf(i));
            Assert.assertTrue(scheduler.submitJob(new LoadJob(format, Optional.of("user"), "1", OptionalLong.empty(), false, true, new FileIterable(fileSystemMaster, format, Optional.of("user"), false, LoadJob.QUALIFIED_FILE_FILTER))));
        });
        Assert.assertEquals(5L, scheduler.getJobs().size());
        ((alluxio.scheduler.job.Job) scheduler.getJobs().get(JobDescription.newBuilder().setPath("/load/1").setType("load").build())).setJobState(JobState.VERIFYING);
        ((alluxio.scheduler.job.Job) scheduler.getJobs().get(JobDescription.newBuilder().setPath("/load/2").setType("load").build())).setJobState(JobState.FAILED);
        ((alluxio.scheduler.job.Job) scheduler.getJobs().get(JobDescription.newBuilder().setPath("/load/3").setType("load").build())).setJobState(JobState.SUCCEEDED);
        ((alluxio.scheduler.job.Job) scheduler.getJobs().get(JobDescription.newBuilder().setPath("/load/4").setType("load").build())).setJobState(JobState.STOPPED);
        scheduler.cleanupStaleJob();
        Assert.assertEquals(2L, scheduler.getJobs().size());
        Assert.assertTrue(scheduler.getJobs().containsKey(JobDescription.newBuilder().setPath("/load/0").setType("load").build()));
        Assert.assertTrue(scheduler.getJobs().containsKey(JobDescription.newBuilder().setPath("/load/1").setType("load").build()));
        IntStream.range(2, 5).forEach(i2 -> {
            Assert.assertFalse(scheduler.getJobs().containsKey(JobDescription.newBuilder().setPath("/load/" + i2).setType("load").build()));
        });
        Configuration.modifiableGlobal().unset(PropertyKey.JOB_RETENTION_TIME);
    }
}
