package alluxio.master.scheduler;

import alluxio.client.block.stream.BlockWorkerClient;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.runtime.AlluxioRuntimeException;
import alluxio.exception.runtime.InternalRuntimeException;
import alluxio.exception.runtime.NotFoundRuntimeException;
import alluxio.exception.runtime.ResourceExhaustedRuntimeException;
import alluxio.exception.runtime.UnavailableRuntimeException;
import alluxio.grpc.JobProgressReportFormat;
import alluxio.job.JobDescription;
import alluxio.resource.CloseableResource;
import alluxio.scheduler.job.Job;
import alluxio.scheduler.job.JobMetaStore;
import alluxio.scheduler.job.JobState;
import alluxio.scheduler.job.Task;
import alluxio.scheduler.job.WorkerProvider;
import alluxio.util.ThreadFactoryUtils;
import alluxio.util.ThreadUtils;
import alluxio.wire.WorkerInfo;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:alluxio/master/scheduler/Scheduler.class */
public final class Scheduler {
    private static final int CAPACITY = 100;
    private static final int EXECUTOR_SHUTDOWN_MS = 10000;
    private final JobMetaStore mJobMetaStore;
    private ScheduledExecutorService mSchedulerExecutor;
    private final WorkerProvider mWorkerProvider;
    private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
    private static final long WORKER_UPDATE_INTERVAL = Configuration.getMs(PropertyKey.MASTER_WORKER_INFO_CACHE_REFRESH_TIME);
    private final Map<JobDescription, Job<?>> mExistingJobs = new ConcurrentHashMap();
    private final Map<Job<?>, Set<WorkerInfo>> mRunningTasks = new ConcurrentHashMap();
    private volatile boolean mRunning = false;
    private Map<WorkerInfo, CloseableResource<BlockWorkerClient>> mActiveWorkers = ImmutableMap.of();

    public Scheduler(WorkerProvider workerProvider, JobMetaStore jobMetaStore) {
        this.mWorkerProvider = workerProvider;
        this.mJobMetaStore = jobMetaStore;
    }

    public void start() {
        if (this.mRunning) {
            return;
        }
        retrieveJobs();
        this.mSchedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThreadFactoryUtils.build("scheduler", false));
        this.mSchedulerExecutor.scheduleAtFixedRate(this::updateWorkers, 0L, WORKER_UPDATE_INTERVAL, TimeUnit.MILLISECONDS);
        this.mSchedulerExecutor.scheduleWithFixedDelay(this::processJobs, 0L, 100L, TimeUnit.MILLISECONDS);
        this.mSchedulerExecutor.scheduleWithFixedDelay(this::cleanupStaleJob, 1L, 1L, TimeUnit.HOURS);
        this.mRunning = true;
    }

    private void retrieveJobs() {
        for (Job<?> job : this.mJobMetaStore.getJobs()) {
            this.mExistingJobs.put(job.getDescription(), job);
            if (job.isDone()) {
                this.mRunningTasks.remove(job);
            } else {
                this.mRunningTasks.put(job, new HashSet());
            }
        }
    }

    public void stop() {
        if (this.mRunning) {
            this.mActiveWorkers.values().forEach((v0) -> {
                v0.close();
            });
            this.mActiveWorkers = ImmutableMap.of();
            ThreadUtils.shutdownAndAwaitTermination(this.mSchedulerExecutor, 10000L);
            this.mRunning = false;
        }
    }

    public boolean submitJob(Job<?> job) {
        Job<?> job2 = this.mExistingJobs.get(job.getDescription());
        if (job2 != null && !job2.isDone()) {
            updateExistingJob(job, job2);
            return false;
        }
        if (this.mRunningTasks.size() >= CAPACITY) {
            throw new ResourceExhaustedRuntimeException("Too many jobs running, please submit later.", true);
        }
        this.mJobMetaStore.updateJob(job);
        this.mExistingJobs.put(job.getDescription(), job);
        this.mRunningTasks.put(job, new HashSet());
        LOG.debug(String.format("start job: %s", job));
        return true;
    }

    private void updateExistingJob(Job<?> job, Job<?> job2) {
        job2.updateJob(job);
        this.mJobMetaStore.updateJob(job2);
        LOG.debug(String.format("updated existing job: %s from %s", job2, job));
        if (job2.getJobState() == JobState.STOPPED) {
            job2.setJobState(JobState.RUNNING);
            this.mRunningTasks.put(job2, new HashSet());
            LOG.debug(String.format("restart existing job: %s", job2));
        }
    }

    public boolean stopJob(JobDescription jobDescription) {
        Job<?> job = this.mExistingJobs.get(jobDescription);
        if (job == null || !job.isRunning()) {
            return false;
        }
        job.setJobState(JobState.STOPPED);
        this.mJobMetaStore.updateJob(job);
        return true;
    }

    public String getJobProgress(JobDescription jobDescription, JobProgressReportFormat jobProgressReportFormat, boolean z) {
        Job<?> job = this.mExistingJobs.get(jobDescription);
        if (job == null) {
            throw new NotFoundRuntimeException(String.format("%s cannot be found.", jobDescription));
        }
        return job.getProgress(jobProgressReportFormat, z);
    }

    @VisibleForTesting
    public Map<WorkerInfo, CloseableResource<BlockWorkerClient>> getActiveWorkers() {
        return this.mActiveWorkers;
    }

    @VisibleForTesting
    public void cleanupStaleJob() {
        long currentTimeMillis = System.currentTimeMillis();
        this.mExistingJobs.entrySet().removeIf(entry -> {
            return !((Job) entry.getValue()).isRunning() && ((Job) entry.getValue()).getEndTime().isPresent() && ((Job) entry.getValue()).getEndTime().getAsLong() <= currentTimeMillis - Configuration.getMs(PropertyKey.JOB_RETENTION_TIME);
        });
    }

    @VisibleForTesting
    public void updateWorkers() {
        if (Thread.currentThread().isInterrupted()) {
            return;
        }
        try {
            try {
                ImmutableSet<WorkerInfo> copyOf = ImmutableSet.copyOf(this.mWorkerProvider.getWorkerInfos());
                if (copyOf.size() == this.mActiveWorkers.size() && copyOf.containsAll(this.mActiveWorkers.keySet())) {
                    return;
                }
                ImmutableMap.Builder builder = ImmutableMap.builder();
                for (WorkerInfo workerInfo : copyOf) {
                    if (this.mActiveWorkers.containsKey(workerInfo)) {
                        builder.put(workerInfo, this.mActiveWorkers.get(workerInfo));
                    } else {
                        try {
                            builder.put(workerInfo, this.mWorkerProvider.getWorkerClient(workerInfo.getAddress()));
                        } catch (AlluxioRuntimeException e) {
                        }
                    }
                }
                for (Map.Entry<WorkerInfo, CloseableResource<BlockWorkerClient>> entry : this.mActiveWorkers.entrySet()) {
                    WorkerInfo key = entry.getKey();
                    if (!copyOf.contains(key)) {
                        entry.getValue().close();
                        LOG.debug("Closed BlockWorkerClient to lost worker {}", key);
                    }
                }
                this.mActiveWorkers = builder.build();
            } catch (AlluxioRuntimeException e2) {
                LOG.warn("Failed to get worker info, using existing worker infos of {} workers", Integer.valueOf(this.mActiveWorkers.size()));
            }
        } catch (Exception e3) {
            LOG.error("Unexpected exception thrown in updateWorkers.", e3);
        }
    }

    @VisibleForTesting
    public Map<JobDescription, Job<?>> getJobs() {
        return this.mExistingJobs;
    }

    private void processJobs() {
        if (Thread.currentThread().isInterrupted()) {
            return;
        }
        this.mRunningTasks.forEach(this::processJob);
    }

    private void processJob(Job<?> job, Set<WorkerInfo> set) {
        try {
            if (!job.isRunning()) {
                try {
                    this.mJobMetaStore.updateJob(job);
                } catch (UnavailableRuntimeException e) {
                    LOG.error("error writing to journal when processing job", e);
                }
                this.mRunningTasks.remove(job);
                return;
            }
            if (!job.isHealthy()) {
                job.failJob(new InternalRuntimeException("Job failed because it's not healthy."));
                return;
            }
            this.mActiveWorkers.forEach((workerInfo, closeableResource) -> {
                if (set.contains(workerInfo) || !scheduleTask(job, workerInfo, set, closeableResource)) {
                    return;
                }
                set.add(workerInfo);
            });
            if (set.isEmpty() && job.isCurrentPassDone()) {
                if (job.needVerification()) {
                    job.initiateVerification();
                } else if (job.isHealthy()) {
                    job.setJobSuccess();
                } else {
                    job.failJob(new InternalRuntimeException("Job failed because it's not healthy."));
                }
            }
        } catch (Exception e2) {
            LOG.error("Unexpected exception thrown in processJob.", e2);
            job.failJob(new InternalRuntimeException(e2));
        }
    }

    private boolean scheduleTask(Job job, WorkerInfo workerInfo, Set<WorkerInfo> set, CloseableResource<BlockWorkerClient> closeableResource) {
        if (!job.isRunning()) {
            return false;
        }
        try {
            Optional nextTask = job.getNextTask(workerInfo);
            if (!nextTask.isPresent()) {
                return false;
            }
            Task task = (Task) nextTask.get();
            task.execute((BlockWorkerClient) closeableResource.get());
            task.getResponseFuture().addListener(() -> {
                try {
                    if (!job.processResponse(task)) {
                        set.remove(workerInfo);
                    }
                    if (job.isHealthy()) {
                        if (!this.mActiveWorkers.containsKey(workerInfo)) {
                            set.remove(workerInfo);
                        } else if (!scheduleTask(job, workerInfo, set, this.mActiveWorkers.get(workerInfo))) {
                            set.remove(workerInfo);
                        }
                    }
                } catch (Exception e) {
                    LOG.error("Unexpected exception thrown in response future listener.", e);
                    job.failJob(new InternalRuntimeException(e));
                }
            }, this.mSchedulerExecutor);
            return true;
        } catch (AlluxioRuntimeException e) {
            LOG.warn(String.format("error getting next task for job %s", job), e);
            if (e.isRetryable()) {
                return false;
            }
            job.failJob(e);
            return false;
        }
    }
}
