/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.master.jobcluster;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.actor.Terminated;
import akka.pattern.PatternsCS;
import com.netflix.fenzo.triggers.CronTrigger;
import com.netflix.fenzo.triggers.TriggerOperator;
import com.netflix.fenzo.triggers.exceptions.SchedulerException;
import com.netflix.fenzo.triggers.exceptions.TriggerNotFoundException;
import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.Tag;
import com.netflix.spectator.impl.Preconditions;
import io.mantisrx.common.Label;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.GaugeCallback;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.master.JobClustersManagerActor;
import io.mantisrx.master.akka.MantisActorSupervisorStrategy;
import io.mantisrx.master.api.akka.route.proto.JobClusterProtoAdapter;
import io.mantisrx.master.events.LifecycleEventPublisher;
import io.mantisrx.master.events.LifecycleEventsProto;
import io.mantisrx.master.jobcluster.CompletedJobStore;
import io.mantisrx.master.jobcluster.IJobClusterManager;
import io.mantisrx.master.jobcluster.IJobClusterMetadata;
import io.mantisrx.master.jobcluster.JobClusterMetadataImpl;
import io.mantisrx.master.jobcluster.JobDefinitionResolver;
import io.mantisrx.master.jobcluster.JobListHelper;
import io.mantisrx.master.jobcluster.LabelManager;
import io.mantisrx.master.jobcluster.MantisJobClusterMetadataView;
import io.mantisrx.master.jobcluster.PersistException;
import io.mantisrx.master.jobcluster.SLAEnforcer;
import io.mantisrx.master.jobcluster.job.CostsCalculator;
import io.mantisrx.master.jobcluster.job.IMantisJobMetadata;
import io.mantisrx.master.jobcluster.job.JobActor;
import io.mantisrx.master.jobcluster.job.JobHelper;
import io.mantisrx.master.jobcluster.job.JobState;
import io.mantisrx.master.jobcluster.job.MantisJobMetadataImpl;
import io.mantisrx.master.jobcluster.job.MantisJobMetadataView;
import io.mantisrx.master.jobcluster.job.worker.IMantisWorkerMetadata;
import io.mantisrx.master.jobcluster.proto.BaseResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import io.mantisrx.master.jobcluster.proto.JobClusterProto;
import io.mantisrx.master.jobcluster.proto.JobProto;
import io.mantisrx.runtime.JobConstraints;
import io.mantisrx.runtime.JobSla;
import io.mantisrx.runtime.command.InvalidJobException;
import io.mantisrx.runtime.descriptor.StageSchedulingInfo;
import io.mantisrx.server.core.JobCompletedReason;
import io.mantisrx.server.master.ConstraintsEvaluators;
import io.mantisrx.server.master.InvalidJobRequest;
import io.mantisrx.server.master.config.ConfigurationProvider;
import io.mantisrx.server.master.domain.IJobClusterDefinition;
import io.mantisrx.server.master.domain.JobClusterConfig;
import io.mantisrx.server.master.domain.JobClusterDefinitionImpl;
import io.mantisrx.server.master.domain.JobDefinition;
import io.mantisrx.server.master.domain.JobId;
import io.mantisrx.server.master.domain.SLA;
import io.mantisrx.server.master.persistence.MantisJobStore;
import io.mantisrx.server.master.persistence.exceptions.JobClusterAlreadyExistsException;
import io.mantisrx.server.master.scheduler.MantisScheduler;
import io.mantisrx.server.master.scheduler.MantisSchedulerFactory;
import io.mantisrx.server.master.scheduler.WorkerEvent;
import io.mantisrx.shaded.com.google.common.base.Throwables;
import io.mantisrx.shaded.com.google.common.collect.Lists;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.subjects.BehaviorSubject;

public class JobClusterActor
extends AbstractActorWithTimers
implements IJobClusterManager {
    private static final int BOOKKEEPING_INTERVAL_SECS = 5;
    private static final String BOOKKEEPING_TIMER_KEY = "JOB_CLUSTER_BOOKKEEPING";
    private static final Integer DEFAULT_LIMIT = 100;
    private static final Integer DEFAULT_ACTIVE_JOB_LIMIT = 5000;
    private final Logger logger = LoggerFactory.getLogger(JobClusterActor.class);
    private static final String CHECK_EXPIRED_TIMER_KEY = "EXPIRE_OLD_JOBS";
    private static final long EXPIRED_JOBS_CHECK_INTERVAL_SECS = 3600L;
    private final Counter numJobSubmissions;
    private final Counter numJobShutdowns;
    private final Counter numJobActorCreationCounter;
    private final Counter numJobClustersInitialized;
    private final Counter numJobClusterInitializeFailures;
    private final Counter numJobsInitialized;
    private final Counter numJobSubmissionFailures;
    private final Counter numJobClusterEnable;
    private final Counter numJobClusterEnableErrors;
    private final Counter numJobClusterDisable;
    private final Counter numJobClusterDisableErrors;
    private final Counter numJobClusterDelete;
    private final Counter numJobClusterDeleteErrors;
    private final Counter numJobClusterUpdate;
    private final Counter numJobClusterUpdateErrors;
    private final Counter numSLAEnforcementExecutions;
    private final AbstractActor.Receive initializedBehavior;
    private final AbstractActor.Receive disabledBehavior;
    private final String name;
    private final MantisJobStore jobStore;
    private IJobClusterMetadata jobClusterMetadata;
    private CronManager cronManager;
    private SLAEnforcer slaEnforcer;
    private final JobManager jobManager;
    private final MantisSchedulerFactory mantisSchedulerFactory;
    private final LifecycleEventPublisher eventPublisher;
    private final BehaviorSubject<JobId> jobIdSubmissionSubject;
    private final JobDefinitionResolver jobDefinitionResolver = new JobDefinitionResolver();
    private final Metrics metrics;

    public static Props props(String name, MantisJobStore jobStore, MantisSchedulerFactory mantisSchedulerFactory, LifecycleEventPublisher eventPublisher, CostsCalculator costsCalculator) {
        return Props.create(JobClusterActor.class, (Object[])new Object[]{name, jobStore, mantisSchedulerFactory, eventPublisher, costsCalculator});
    }

    public JobClusterActor(String name, MantisJobStore jobStore, MantisSchedulerFactory schedulerFactory, LifecycleEventPublisher eventPublisher, CostsCalculator costsCalculator) {
        this.name = name;
        this.jobStore = jobStore;
        this.mantisSchedulerFactory = schedulerFactory;
        this.eventPublisher = eventPublisher;
        this.jobManager = new JobManager(name, this.getContext(), this.mantisSchedulerFactory, eventPublisher, jobStore, costsCalculator);
        this.jobIdSubmissionSubject = BehaviorSubject.create();
        this.initializedBehavior = this.buildInitializedBehavior();
        this.disabledBehavior = this.buildDisabledBehavior();
        MetricGroupId metricGroupId = this.getMetricGroupId(name);
        Metrics m = new Metrics.Builder().id(metricGroupId).addCounter("numJobSubmissions").addCounter("numJobSubmissionFailures").addCounter("numJobShutdowns").addCounter("numJobActorCreationCounter").addCounter("numJobsInitialized").addCounter("numJobClustersInitialized").addCounter("numJobClusterInitializeFailures").addCounter("numJobClusterEnable").addCounter("numJobClusterEnableErrors").addCounter("numJobClusterDisable").addCounter("numJobClusterDisableErrors").addCounter("numJobClusterDelete").addCounter("numJobClusterDeleteErrors").addCounter("numJobClusterUpdate").addCounter("numJobClusterUpdateErrors").addCounter("numSLAEnforcementExecutions").addGauge((Gauge)new GaugeCallback(metricGroupId, "acceptedJobsGauge", () -> 1.0 * (double)this.jobManager.acceptedJobsCount())).addGauge((Gauge)new GaugeCallback(metricGroupId, "activeJobsGauge", () -> 1.0 * (double)this.jobManager.activeJobsCount())).addGauge((Gauge)new GaugeCallback(metricGroupId, "terminatingJobsGauge", () -> 1.0 * (double)this.jobManager.terminatingJobsMap.size())).addGauge((Gauge)new GaugeCallback(metricGroupId, "actorToJobIdMappingsGauge", () -> 1.0 * (double)this.jobManager.actorToJobIdMap.size())).addGauge((Gauge)new GaugeCallback(metricGroupId, "numJobsStuckInAccepted", () -> 1.0 * (double)this.jobManager.getJobsStuckInAccepted(Instant.now().toEpochMilli(), this.getExpireAcceptedDelayMs()).size())).build();
        this.metrics = m = MetricsRegistry.getInstance().registerAndGet(m);
        this.numJobSubmissions = m.getCounter("numJobSubmissions");
        this.numJobActorCreationCounter = m.getCounter("numJobActorCreationCounter");
        this.numJobSubmissionFailures = m.getCounter("numJobSubmissionFailures");
        this.numJobShutdowns = m.getCounter("numJobShutdowns");
        this.numJobsInitialized = m.getCounter("numJobsInitialized");
        this.numJobClustersInitialized = m.getCounter("numJobClustersInitialized");
        this.numJobClusterInitializeFailures = m.getCounter("numJobClusterInitializeFailures");
        this.numJobClusterEnable = m.getCounter("numJobClusterEnable");
        this.numJobClusterDisable = m.getCounter("numJobClusterDisable");
        this.numJobClusterDelete = m.getCounter("numJobClusterDelete");
        this.numJobClusterUpdate = m.getCounter("numJobClusterUpdate");
        this.numJobClusterEnableErrors = m.getCounter("numJobClusterEnableErrors");
        this.numJobClusterDisableErrors = m.getCounter("numJobClusterDisableErrors");
        this.numJobClusterDeleteErrors = m.getCounter("numJobClusterDeleteErrors");
        this.numJobClusterUpdateErrors = m.getCounter("numJobClusterUpdateErrors");
        this.numSLAEnforcementExecutions = m.getCounter("numSLAEnforcementExecutions");
    }

    public AbstractActor.Receive createReceive() {
        return this.buildInitialBehavior();
    }

    private AbstractActor.Receive buildDisabledBehavior() {
        String state = "disabled";
        return this.receiveBuilder().match(JobClusterManagerProto.UpdateJobClusterRequest.class, this::onJobClusterUpdate).match(JobClusterManagerProto.UpdateJobClusterLabelsRequest.class, this::onJobClusterUpdateLabels).match(JobClusterManagerProto.UpdateJobClusterSLARequest.class, this::onJobClusterUpdateSLA).match(JobClusterManagerProto.UpdateJobClusterArtifactRequest.class, this::onJobClusterUpdateArtifact).match(JobClustersManagerActor.UpdateSchedulingInfo.class, this::onJobClusterUpdateSchedulingInfo).match(JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyRequest.class, this::onJobClusterUpdateWorkerMigrationConfig).match(JobClusterManagerProto.GetJobClusterRequest.class, this::onJobClusterGet).match(JobClusterProto.DeleteJobClusterRequest.class, this::onJobClusterDelete).match(JobClusterManagerProto.ListArchivedWorkersRequest.class, this::onListArchivedWorkers).match(JobClusterManagerProto.ListCompletedJobsInClusterRequest.class, this::onJobListCompleted).match(JobClusterProto.KillJobResponse.class, this::onKillJobResponse).match(JobClusterManagerProto.GetJobDetailsRequest.class, this::onGetJobDetailsRequest).match(WorkerEvent.class, this::onWorkerEvent).match(JobClusterManagerProto.EnableJobClusterRequest.class, this::onJobClusterEnable).match(Terminated.class, this::onTerminated).match(JobClusterManagerProto.SubmitJobRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.SubmitJobResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), Optional.empty()), this.getSelf())).match(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorResponse.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.SubmitJobResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), Optional.empty()), this.getSelf())).match(JobClusterManagerProto.ResubmitWorkerRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.ResubmitWorkerResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state)), this.getSelf())).match(JobProto.JobInitialized.class, x -> this.logger.warn(this.genUnexpectedMsg(x.toString(), this.name, state))).match(JobClusterProto.JobStartedEvent.class, x -> this.logger.warn(this.genUnexpectedMsg(x.toString(), this.name, state))).match(JobClusterManagerProto.ScaleStageRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.ScaleStageResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), 0), this.getSelf())).match(JobClusterProto.KillJobRequest.class, x -> x.requestor.tell((Object)new JobClusterManagerProto.KillJobResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, JobState.Noop, this.genUnexpectedMsg(x.toString(), this.name, state), x.jobId, x.user), this.getSelf())).match(JobClusterManagerProto.GetJobDetailsRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.GetJobDetailsResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), Optional.empty()), this.getSelf())).match(JobClusterManagerProto.GetJobSchedInfoRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.GetJobSchedInfoResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), Optional.empty()), this.getSelf())).match(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), Optional.empty()), this.getSelf())).match(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), Optional.empty()), this.getSelf())).match(JobClusterManagerProto.ListJobIdsRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.ListJobIdsResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), new ArrayList<JobClusterProtoAdapter.JobIdInfo>()), this.getSelf())).match(JobClusterManagerProto.ListJobsRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.ListJobsResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), new ArrayList<MantisJobMetadataView>()), this.getSelf())).match(JobClusterManagerProto.ListWorkersRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.ListWorkersResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), new ArrayList<IMantisWorkerMetadata>()), this.getSelf())).match(JobClusterProto.EnforceSLARequest.class, x -> this.logger.warn(this.genUnexpectedMsg(x.toString(), this.name, state))).match(JobClusterProto.TriggerCronRequest.class, x -> this.logger.warn(this.genUnexpectedMsg(x.toString(), this.name, state))).match(JobClusterManagerProto.DisableJobClusterRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.DisableJobClusterResponse(x.requestId, BaseResponse.ResponseCode.SUCCESS, "Cluster is already disabled"), this.getSelf())).match(Terminated.class, this::onTerminated).match(JobClusterProto.InitializeJobClusterRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.JobClustersManagerInitializeResponse(x.requestId, BaseResponse.ResponseCode.SUCCESS, "Cluster is already initialized"), this.getSelf())).matchAny(x -> this.logger.warn("unexpected message '{}' received by JobCluster actor {} in Disabled State", x, (Object)this.name)).build();
    }

    private String genUnexpectedMsg(String event, String cluster, String state) {
        return String.format("Unexpected message %s received by JobCluster actor %s in %s State", event, cluster, state);
    }

    private AbstractActor.Receive buildInitialBehavior() {
        String state = "Uninited";
        return this.receiveBuilder().match(JobClusterProto.InitializeJobClusterRequest.class, this::onJobClusterInitialize).match(JobClusterManagerProto.UpdateJobClusterRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.UpdateJobClusterResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state)), this.getSelf())).match(JobClusterManagerProto.UpdateJobClusterLabelsRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.UpdateJobClusterLabelsResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state)), this.getSelf())).match(JobClusterManagerProto.UpdateJobClusterSLARequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.UpdateJobClusterSLAResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state)), this.getSelf())).match(JobClusterManagerProto.UpdateJobClusterArtifactRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.UpdateJobClusterArtifactResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state)), this.getSelf())).match(JobClustersManagerActor.UpdateSchedulingInfo.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.UpdateSchedulingInfoResponse(x.getRequestId(), BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state)), this.getSelf())).match(JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state)), this.getSelf())).match(JobClusterManagerProto.GetJobClusterRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.GetJobClusterResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), Optional.empty()), this.getSelf())).match(JobClusterProto.DeleteJobClusterRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.DeleteJobClusterResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state)), this.getSelf())).match(JobClusterManagerProto.ListArchivedWorkersRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.ListArchivedWorkersResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), Lists.newArrayList()), this.getSelf())).match(JobClusterManagerProto.ListCompletedJobsInClusterRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.ListCompletedJobsInClusterResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), Lists.newArrayList()), this.getSelf())).match(JobClusterProto.KillJobResponse.class, x -> this.logger.warn(this.genUnexpectedMsg(x.toString(), this.name, state))).match(JobClusterManagerProto.GetJobDetailsRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.GetJobDetailsResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), Optional.empty()), this.getSelf())).match(WorkerEvent.class, x -> this.logger.warn(this.genUnexpectedMsg(x.toString(), this.name, state))).match(JobClusterManagerProto.EnableJobClusterRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.EnableJobClusterResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state)), this.getSelf())).match(JobClusterManagerProto.SubmitJobRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.SubmitJobResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), Optional.empty()), this.getSelf())).match(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorResponse.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.SubmitJobResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), Optional.empty()), this.getSelf())).match(JobClusterManagerProto.ResubmitWorkerRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.ResubmitWorkerResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state)), this.getSelf())).match(JobProto.JobInitialized.class, x -> this.logger.warn(this.genUnexpectedMsg(x.toString(), this.name, state))).match(JobClusterProto.JobStartedEvent.class, x -> this.logger.warn(this.genUnexpectedMsg(x.toString(), this.name, state))).match(JobClusterManagerProto.ScaleStageRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.ScaleStageResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), 0), this.getSelf())).match(JobClusterProto.KillJobRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.KillJobResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, JobState.Noop, this.genUnexpectedMsg(x.toString(), this.name, state), x.jobId, x.user), this.getSelf())).match(JobClusterManagerProto.GetJobSchedInfoRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.GetJobSchedInfoResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), Optional.empty()), this.getSelf())).match(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), Optional.empty()), this.getSelf())).match(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), Optional.empty()), this.getSelf())).match(JobClusterManagerProto.ListJobIdsRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.ListJobIdsResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), Lists.newArrayList()), this.getSelf())).match(JobClusterManagerProto.ListJobsRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.ListJobsResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), Lists.newArrayList()), this.getSelf())).match(JobClusterManagerProto.ListWorkersRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.ListWorkersResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state), Lists.newArrayList()), this.getSelf())).match(JobClusterProto.EnforceSLARequest.class, x -> this.logger.warn(this.genUnexpectedMsg(x.toString(), this.name, state))).match(JobClusterProto.TriggerCronRequest.class, x -> this.logger.warn(this.genUnexpectedMsg(x.toString(), this.name, state))).match(JobClusterManagerProto.DisableJobClusterRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.DisableJobClusterResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.name, state)), this.getSelf())).match(Terminated.class, this::onTerminated).matchAny(x -> this.logger.warn("unexpected message '{}' received by JobCluster actor {} in Uninited State", x, (Object)this.name)).build();
    }

    private AbstractActor.Receive buildInitializedBehavior() {
        String state = "Initialized";
        return this.receiveBuilder().match(JobClusterManagerProto.UpdateJobClusterRequest.class, this::onJobClusterUpdate).match(JobClusterManagerProto.UpdateJobClusterLabelsRequest.class, this::onJobClusterUpdateLabels).match(JobClusterManagerProto.UpdateJobClusterSLARequest.class, this::onJobClusterUpdateSLA).match(JobClusterManagerProto.UpdateJobClusterArtifactRequest.class, this::onJobClusterUpdateArtifact).match(JobClustersManagerActor.UpdateSchedulingInfo.class, this::onJobClusterUpdateSchedulingInfo).match(JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyRequest.class, this::onJobClusterUpdateWorkerMigrationConfig).match(JobClusterManagerProto.EnableJobClusterRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.EnableJobClusterResponse(x.requestId, BaseResponse.ResponseCode.SUCCESS, this.genUnexpectedMsg(x.toString(), this.name, state)), this.getSelf())).match(JobClusterManagerProto.GetJobClusterRequest.class, this::onJobClusterGet).match(JobClusterProto.DeleteJobClusterRequest.class, this::onJobClusterDelete).match(JobClusterManagerProto.ListArchivedWorkersRequest.class, this::onListArchivedWorkers).match(JobClusterManagerProto.ListCompletedJobsInClusterRequest.class, this::onJobListCompleted).match(JobClusterProto.KillJobResponse.class, this::onKillJobResponse).match(WorkerEvent.class, this::onWorkerEvent).match(JobClusterManagerProto.DisableJobClusterRequest.class, this::onJobClusterDisable).match(JobClusterProto.EnforceSLARequest.class, this::onEnforceSLARequest).match(JobClusterProto.BookkeepingRequest.class, this::onBookkeepingRequest).match(JobClusterProto.TriggerCronRequest.class, this::onTriggerCron).match(JobClusterManagerProto.ListJobIdsRequest.class, this::onJobIdList).match(JobClusterManagerProto.ListJobsRequest.class, this::onJobList).match(JobClusterManagerProto.ListWorkersRequest.class, this::onListActiveWorkers).match(JobClusterManagerProto.SubmitJobRequest.class, this::onJobSubmit).match(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorResponse.class, this::onGetJobDefinitionUpdatedFromJobActorResponse).match(JobClusterManagerProto.GetJobDetailsRequest.class, this::onGetJobDetailsRequest).match(JobClusterManagerProto.GetJobSchedInfoRequest.class, this::onGetJobStatusSubject).match(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest.class, this::onGetLatestJobDiscoveryInfo).match(JobClusterProto.KillJobRequest.class, this::onJobKillRequest).match(JobClusterManagerProto.ResubmitWorkerRequest.class, this::onResubmitWorkerRequest).match(JobProto.JobInitialized.class, this::onJobInitialized).match(JobClusterProto.JobStartedEvent.class, this::onJobStarted).match(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest.class, this::onGetLastSubmittedJobIdSubject).match(JobClusterManagerProto.ScaleStageRequest.class, this::onScaleStage).match(JobClusterProto.InitializeJobClusterRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.JobClustersManagerInitializeResponse(x.requestId, BaseResponse.ResponseCode.SUCCESS, "Cluster is already initialized"), this.getSelf())).match(Terminated.class, this::onTerminated).matchAny(x -> this.logger.info("unexpected message '{}' received by JobCluster actor {} in Initialized State.from class {}", new Object[]{x, this.name, x.getClass().getCanonicalName()})).build();
    }

    MetricGroupId getMetricGroupId(String name) {
        return new MetricGroupId("JobClusterActor", new Tag[]{new BasicTag("jobCluster", name)});
    }

    public void preStart() throws Exception {
        this.logger.info("JobClusterActor {} started", (Object)this.name);
        super.preStart();
    }

    public void postStop() throws Exception {
        this.logger.info("JobClusterActor {} stopped", (Object)this.name);
        super.postStop();
        if (this.name != null) {
            MetricsRegistry.getInstance().remove(this.getMetricGroupId(this.name));
        }
    }

    public void preRestart(Throwable t, Optional<Object> m) throws Exception {
        this.logger.info("{} preRestart {} (exc: {})", new Object[]{this.name, m, t.getMessage()});
    }

    public void postRestart(Throwable reason) throws Exception {
        this.logger.info("{} postRestart (exc={})", (Object)this.name, (Object)reason.getMessage());
        super.postRestart(reason);
    }

    public SupervisorStrategy supervisorStrategy() {
        return MantisActorSupervisorStrategy.getInstance().create();
    }

    private void setBookkeepingTimer(long checkAgainInSecs) {
        this.getTimers().startPeriodicTimer((Object)BOOKKEEPING_TIMER_KEY, (Object)new JobClusterProto.BookkeepingRequest(), Duration.ofSeconds(checkAgainInSecs));
    }

    @Override
    public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest initReq) {
        ActorRef sender = this.getSender();
        this.logger.info("In onJobClusterInitialize {}", (Object)this.name);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Init Request {}", (Object)initReq);
        }
        this.jobClusterMetadata = new JobClusterMetadataImpl.Builder().withLastJobCount(initReq.lastJobNumber).withIsDisabled(initReq.isDisabled).withJobClusterDefinition(initReq.jobClusterDefinition).build();
        this.slaEnforcer = new SLAEnforcer(this.jobClusterMetadata.getJobClusterDefinition().getSLA());
        long expireFrequency = ConfigurationProvider.getConfig().getCompletedJobPurgeFrequencySeqs();
        if (this.jobClusterMetadata.isDisabled()) {
            this.logger.info("Cluster {} initialized but is Disabled", (Object)this.jobClusterMetadata.getJobClusterDefinition().getName());
            try {
                this.jobManager.initialize();
            }
            catch (Exception e) {
                sender.tell((Object)new JobClusterProto.InitializeJobClusterResponse(initReq.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, String.format("JobCluster %s initialization failed", initReq.jobClusterDefinition.getName()), initReq.jobClusterDefinition.getName(), initReq.requestor), this.getSelf());
            }
            int count = 50;
            if (!initReq.jobList.isEmpty()) {
                this.logger.info("Cluster {} is disabled however it has {} active/accepted jobs", (Object)this.jobClusterMetadata.getJobClusterDefinition().getName(), (Object)initReq.jobList.size());
                for (IMantisJobMetadata jobMeta : initReq.jobList) {
                    try {
                        if (count == 0) {
                            this.logger.info("Max cleanup limit of 50 reached abort");
                            break;
                        }
                        if (JobState.isTerminalState(jobMeta.getState())) continue;
                        this.logger.info("Job {} is in non terminal state {} for disabled cluster {}.Marking it complete", new Object[]{jobMeta.getJobId(), jobMeta.getState(), this.jobClusterMetadata.getJobClusterDefinition().getName()});
                        --count;
                        this.jobManager.markCompleted(jobMeta);
                        this.jobStore.archiveJob(jobMeta);
                    }
                    catch (Exception e) {
                        this.logger.error("Exception {} archiving job {} during init ", new Object[]{e.getMessage(), jobMeta.getJobId(), e});
                    }
                }
            }
            sender.tell((Object)new JobClusterProto.InitializeJobClusterResponse(initReq.requestId, BaseResponse.ResponseCode.SUCCESS, String.format("JobCluster %s initialized successfully. But is currently disabled", initReq.jobClusterDefinition.getName()), initReq.jobClusterDefinition.getName(), initReq.requestor), this.getSelf());
            this.logger.info("Job expiry check frequency set to {}", (Object)expireFrequency);
            this.getContext().become(this.disabledBehavior);
            return;
        }
        if (initReq.createInStore) {
            try {
                this.jobStore.createJobCluster(this.jobClusterMetadata);
                this.eventPublisher.publishAuditEvent(new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_CREATE, this.jobClusterMetadata.getJobClusterDefinition().getName(), "saved job cluster " + this.name));
                this.logger.info("successfully saved job cluster {}", (Object)this.name);
                this.numJobClustersInitialized.increment();
            }
            catch (JobClusterAlreadyExistsException exists) {
                this.numJobClusterInitializeFailures.increment();
                this.logger.error("job cluster not created");
                sender.tell((Object)new JobClusterProto.InitializeJobClusterResponse(initReq.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, String.format("JobCluster %s already exists", initReq.jobClusterDefinition.getName()), initReq.jobClusterDefinition.getName(), initReq.requestor), this.getSelf());
                return;
            }
            catch (Exception e) {
                this.numJobClusterInitializeFailures.increment();
                this.logger.error("job cluster not created due to {}", (Object)e.getMessage(), (Object)e);
                sender.tell((Object)new JobClusterProto.InitializeJobClusterResponse(initReq.requestId, BaseResponse.ResponseCode.SERVER_ERROR, String.format("JobCluster %s not created due to %s", initReq.jobClusterDefinition.getName(), Throwables.getStackTraceAsString((Throwable)e)), initReq.jobClusterDefinition.getName(), initReq.requestor), this.getSelf());
                return;
            }
        }
        try {
            this.cronManager = new CronManager(this.name, this.getSelf(), this.jobClusterMetadata.getJobClusterDefinition().getSLA());
        }
        catch (Exception e) {
            this.logger.warn("Exception initializing cron", (Throwable)e);
        }
        this.initRunningJobs(initReq, sender);
        this.logger.info("Job expiry check frequency set to {}", (Object)expireFrequency);
        try {
            this.jobManager.initialize();
        }
        catch (Exception e) {
            sender.tell((Object)new JobClusterProto.InitializeJobClusterResponse(initReq.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, String.format("JobCluster %s initialization failed", initReq.jobClusterDefinition.getName()), initReq.jobClusterDefinition.getName(), initReq.requestor), this.getSelf());
        }
    }

    private void initRunningJobs(JobClusterProto.InitializeJobClusterRequest initReq, ActorRef sender) {
        List<IMantisJobMetadata> jobList = initReq.jobList;
        this.logger.info("In _initJobs for cluster {}: {} activeJobs", (Object)this.name, (Object)jobList.size());
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("In _initJobs for cluster {} activeJobs -> {}", (Object)this.name, jobList);
        }
        Observable.from(jobList).flatMap(jobMeta -> {
            if (JobState.isTerminalState(jobMeta.getState())) {
                try {
                    this.jobStore.archiveJob((IMantisJobMetadata)jobMeta);
                    this.jobManager.markCompleted((IMantisJobMetadata)jobMeta);
                }
                catch (IOException e) {
                    this.logger.error("Exception archiving job {} during init ", (Object)jobMeta.getJobId(), (Object)e);
                }
                return Observable.empty();
            }
            if (jobMeta.getSchedulingInfo() == null) {
                this.logger.error("Scheduling info is null for active job {} in cluster {}.Skipping bootstrap ", (Object)jobMeta.getJobId(), (Object)this.name);
                return Observable.empty();
            }
            return Observable.just((Object)jobMeta);
        }).flatMap(jobMeta -> this.jobManager.bootstrapJob((MantisJobMetadataImpl)jobMeta, this.jobClusterMetadata)).subscribe(jobInited -> this.logger.info("Job Id {} initialized with code {}", (Object)jobInited.jobId, (Object)jobInited.responseCode), error -> this.logger.warn("Exception initializing jobs {}", (Object)error.getMessage()), () -> {
            if (initReq.jobList.size() > 0) {
                JobId lastJobId = new JobId(this.name, initReq.lastJobNumber);
                this.jobIdSubmissionSubject.onNext((Object)lastJobId);
            }
            this.setBookkeepingTimer(5L);
            this.getContext().become(this.initializedBehavior);
            this.logger.info("Job Cluster {} initialized", (Object)this.name);
            sender.tell((Object)new JobClusterProto.InitializeJobClusterResponse(initReq.requestId, BaseResponse.ResponseCode.SUCCESS, String.format("JobCluster %s initialized successfully", initReq.jobClusterDefinition.getName()), initReq.jobClusterDefinition.getName(), initReq.requestor), this.getSelf());
        });
    }

    @Override
    public void onJobClusterUpdate(JobClusterManagerProto.UpdateJobClusterRequest request) {
        String name = request.getJobClusterDefinition().getName();
        ActorRef sender = this.getSender();
        String givenArtifactVersion = request.getJobClusterDefinition().getJobClusterConfig().getVersion();
        if (!this.isVersionUnique(givenArtifactVersion, this.jobClusterMetadata.getJobClusterDefinition().getJobClusterConfigs())) {
            String msg = String.format("Job cluster %s not updated as the version %s is not unique", name, givenArtifactVersion);
            this.logger.error(msg);
            sender.tell((Object)new JobClusterManagerProto.UpdateJobClusterResponse(request.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, msg), this.getSelf());
            return;
        }
        IJobClusterDefinition currentJobClusterDefinition = this.jobClusterMetadata.getJobClusterDefinition();
        JobClusterDefinitionImpl mergedJobClusterDefinition = new JobClusterDefinitionImpl.Builder().mergeConfigsAndOverrideRest(currentJobClusterDefinition, request.getJobClusterDefinition()).build();
        IJobClusterMetadata jobCluster = new JobClusterMetadataImpl.Builder().withIsDisabled(this.jobClusterMetadata.isDisabled()).withLastJobCount(this.jobClusterMetadata.getLastJobCount()).withJobClusterDefinition(mergedJobClusterDefinition).build();
        try {
            this.updateAndSaveJobCluster(jobCluster);
            sender.tell((Object)new JobClusterManagerProto.UpdateJobClusterResponse(request.requestId, BaseResponse.ResponseCode.SUCCESS, name + " Job cluster updated"), this.getSelf());
            this.numJobClusterUpdate.increment();
        }
        catch (Exception e) {
            this.logger.error("job cluster not created");
            sender.tell((Object)new JobClusterManagerProto.UpdateJobClusterResponse(request.requestId, BaseResponse.ResponseCode.SERVER_ERROR, name + " Job cluster updation failed " + e.getMessage()), this.getSelf());
            this.numJobClusterUpdateErrors.increment();
        }
    }

    @Override
    public void onJobClusterDelete(JobClusterProto.DeleteJobClusterRequest request) {
        ActorRef sender = this.getSender();
        try {
            if (this.jobManager.isJobListEmpty()) {
                this.jobManager.onJobClusterDeletion();
                this.jobStore.deleteJobCluster(this.name);
                this.logger.info("successfully deleted job cluster {}", (Object)this.name);
                this.eventPublisher.publishAuditEvent(new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_DELETE, this.name, this.name + " deleted"));
                sender.tell((Object)new JobClusterProto.DeleteJobClusterResponse(request.requestId, BaseResponse.ResponseCode.SUCCESS, this.name + " deleted", request.requestingActor, this.name), this.getSelf());
                this.numJobClusterDelete.increment();
            } else {
                this.logger.warn("job cluster {} cannot be deleted as it has active jobs", (Object)this.name);
                sender.tell((Object)new JobClusterProto.DeleteJobClusterResponse(request.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.name + " Job cluster deletion failed as there are active jobs", request.requestingActor, this.name), this.getSelf());
            }
        }
        catch (Exception e) {
            this.logger.error("job cluster {} not deleted", (Object)this.name, (Object)e);
            sender.tell((Object)new JobClusterProto.DeleteJobClusterResponse(request.requestId, BaseResponse.ResponseCode.SERVER_ERROR, this.name + " Job cluster deletion failed " + e.getMessage(), request.requestingActor, this.name), this.getSelf());
            this.numJobClusterDeleteErrors.increment();
        }
    }

    @Override
    public void onJobIdList(JobClusterManagerProto.ListJobIdsRequest request) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Entering JCA:onJobIdList");
        }
        ActorRef sender = this.getSender();
        Set<JobId> jobIdsFilteredByLabelsSet = new HashSet<JobId>();
        if (!request.getCriteria().getMatchingLabels().isEmpty() && (jobIdsFilteredByLabelsSet = this.jobManager.getJobsMatchingLabels(request.getCriteria().getMatchingLabels(), request.getCriteria().getLabelsOperand())).isEmpty()) {
            sender.tell((Object)new JobClusterManagerProto.ListJobIdsResponse(request.requestId, BaseResponse.ResponseCode.SUCCESS, "No JobIds match given Label criterion", new ArrayList<JobClusterProtoAdapter.JobIdInfo>()), sender);
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit JCA:onJobIdList");
            }
            return;
        }
        List<JobClusterProtoAdapter.JobIdInfo> jobIdList = this.getFilteredNonTerminalJobIdList(request.filters, jobIdsFilteredByLabelsSet);
        if (!request.getCriteria().getActiveOnly().orElse(true).booleanValue()) {
            jobIdList.addAll(this.getFilteredTerminalJobIdList(request.filters, jobIdsFilteredByLabelsSet));
        }
        sender.tell((Object)new JobClusterManagerProto.ListJobIdsResponse(request.requestId, BaseResponse.ResponseCode.SUCCESS, "", jobIdList), sender);
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit JCA:onJobIdList");
        }
    }

    @Override
    public void onJobList(JobClusterManagerProto.ListJobsRequest request) {
        if (this.logger.isDebugEnabled()) {
            this.logger.info("Entering JCA:onJobList");
        }
        ActorRef sender = this.getSender();
        ActorRef self = this.getSelf();
        Set<JobId> jobIdsFilteredByLabelsSet = new HashSet<JobId>();
        if (!request.getCriteria().getMatchingLabels().isEmpty() && (jobIdsFilteredByLabelsSet = this.jobManager.getJobsMatchingLabels(request.getCriteria().getMatchingLabels(), request.getCriteria().getLabelsOperand())).isEmpty()) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit JCA:onJobList {}", (Object)jobIdsFilteredByLabelsSet.size());
            }
            sender.tell((Object)new JobClusterManagerProto.ListJobsResponse(request.requestId, BaseResponse.ResponseCode.SUCCESS, "", new ArrayList<MantisJobMetadataView>()), self);
            return;
        }
        this.getFilteredNonTerminalJobList(request.getCriteria(), jobIdsFilteredByLabelsSet).mergeWith(this.getFilteredTerminalJobList(request.getCriteria(), jobIdsFilteredByLabelsSet)).collect(() -> Lists.newArrayList(), List::add).doOnNext(resultList -> {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit JCA:onJobList {}", (Object)resultList.size());
            }
            sender.tell((Object)new JobClusterManagerProto.ListJobsResponse(request.requestId, BaseResponse.ResponseCode.SUCCESS, "", (List<MantisJobMetadataView>)resultList), self);
        }).subscribe();
    }

    @Override
    public void onListArchivedWorkers(JobClusterManagerProto.ListArchivedWorkersRequest request) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("In onListArchiveWorkers {}", (Object)request);
        }
        try {
            List<IMantisWorkerMetadata> workerList = this.jobStore.getArchivedWorkers(request.getJobId().getId());
            if (workerList.size() > request.getLimit()) {
                workerList = workerList.subList(0, request.getLimit());
            }
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Returning {} archived Workers", (Object)workerList.size());
            }
            this.getSender().tell((Object)new JobClusterManagerProto.ListArchivedWorkersResponse(request.requestId, BaseResponse.ResponseCode.SUCCESS, "", workerList), this.getSelf());
        }
        catch (Exception e) {
            this.logger.error("Exception listing archived workers", (Throwable)e);
            this.getSender().tell((Object)new JobClusterManagerProto.ListArchivedWorkersResponse(request.requestId, BaseResponse.ResponseCode.SERVER_ERROR, "Exception getting archived workers for job " + request.getJobId() + " -> " + e.getMessage(), Lists.newArrayList()), this.getSelf());
        }
    }

    @Override
    public void onListActiveWorkers(JobClusterManagerProto.ListWorkersRequest r) {
        Optional<JobInfo> jobInfo;
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter JobClusterActor:onListActiveWorkers {}", (Object)r);
        }
        if ((jobInfo = this.jobManager.getJobInfoForNonTerminalJob(r.getJobId())).isPresent()) {
            jobInfo.get().jobActor.forward((Object)r, (ActorContext)this.getContext());
        } else {
            this.logger.warn("No such active job {} ", (Object)r.getJobId());
            this.getSender().tell((Object)new JobClusterManagerProto.ListWorkersResponse(r.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, "No such active job " + r.getJobId(), Lists.newArrayList()), this.getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit JobClusterActor:onListActiveWorkers {}", (Object)r);
        }
    }

    private List<JobClusterProtoAdapter.JobIdInfo> getFilteredNonTerminalJobIdList(JobClusterManagerProto.ListJobCriteria request, Set<JobId> prefilteredJobIdSet) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter JobClusterActor:getFilteredNonTerminalJobIdList {}", (Object)request);
        }
        if (request.getJobState().isPresent() && request.getJobState().get().equals((Object)JobState.MetaState.Terminal)) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit JobClusterActor:getFilteredNonTerminalJobIdList with empty");
            }
            return Collections.emptyList();
        }
        List<JobInfo> jobInfoList = !prefilteredJobIdSet.isEmpty() ? prefilteredJobIdSet.stream().map(jId -> this.jobManager.getJobInfoForNonTerminalJob((JobId)jId)).filter(jInfoOp -> jInfoOp.isPresent()).map(jInfoOp -> (JobInfo)jInfoOp.get()).collect(Collectors.toList()) : this.jobManager.getAllNonTerminalJobsList();
        List<JobInfo> shortenedList = jobInfoList.subList(0, Math.min(jobInfoList.size(), request.getLimit().orElse(DEFAULT_LIMIT)));
        List<JobClusterProtoAdapter.JobIdInfo> jIdList = shortenedList.stream().map(jInfo -> new JobClusterProtoAdapter.JobIdInfo.Builder().withJobId(jInfo.jobId).withJobState(jInfo.state).withSubmittedAt(jInfo.submittedAt).withTerminatedAt(jInfo.terminatedAt).withUser(jInfo.user).withVersion(jInfo.jobDefinition.getVersion()).build()).collect(Collectors.toList());
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit JobClusterActor:getFilteredNonTerminalJobIdList {}", (Object)jIdList.size());
        }
        return jIdList;
    }

    private List<JobClusterProtoAdapter.JobIdInfo> getFilteredTerminalJobIdList(JobClusterManagerProto.ListJobCriteria request, Set<JobId> prefilteredJobIdSet) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter JobClusterActor:getFilteredTerminalJobIdList {}", (Object)request);
        }
        if (request.getJobState().isPresent() && !request.getJobState().get().equals((Object)JobState.MetaState.Terminal)) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit JobClusterActor:getFilteredTerminalJobIdList with empty");
            }
            return Collections.emptyList();
        }
        if (!request.getJobState().isPresent() && request.getActiveOnly().isPresent() && request.getActiveOnly().get().booleanValue()) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit JobClusterActor:getFilteredTerminalJobIdList with empty");
            }
            return Collections.emptyList();
        }
        List<Object> completedJobsList = !prefilteredJobIdSet.isEmpty() ? prefilteredJobIdSet.stream().map(jId -> this.jobManager.getCompletedJob((JobId)jId)).filter(cjOp -> cjOp.isPresent()).map(cjop -> (JobClusterDefinitionImpl.CompletedJob)cjop.get()).collect(Collectors.toList()) : this.jobManager.getCompletedJobsList(request.getLimit().orElse(DEFAULT_LIMIT), request.getStartJobIdExclusive().orElse(null));
        List subsetCompletedJobs = completedJobsList.subList(0, Math.min(completedJobsList.size(), request.getLimit().orElse(DEFAULT_LIMIT)));
        List<JobClusterProtoAdapter.JobIdInfo> completedJobIdList = subsetCompletedJobs.stream().map(cJob -> new JobClusterProtoAdapter.JobIdInfo.Builder().withJobIdStr(cJob.getJobId()).withVersion(cJob.getVersion()).withUser(cJob.getUser()).withSubmittedAt(cJob.getSubmittedAt()).withTerminatedAt(cJob.getTerminatedAt()).withJobState(cJob.getState()).build()).filter(Objects::nonNull).collect(Collectors.toList());
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit JobClusterActor:getFilteredTerminalJobIdList {}", (Object)completedJobIdList.size());
        }
        return completedJobIdList;
    }

    private Observable<MantisJobMetadataView> getFilteredNonTerminalJobList(JobClusterManagerProto.ListJobCriteria request, Set<JobId> prefilteredJobIdSet) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Entering JobClusterActor:getFilteredNonTerminalJobList");
        }
        Duration timeout = Duration.ofMillis(500L);
        if (request.getJobState().isPresent() && request.getJobState().get().equals((Object)JobState.MetaState.Terminal)) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit JobClusterActor:getFilteredNonTerminalJobList with empty");
            }
            return Observable.empty();
        }
        List<JobInfo> jobInfoList = !prefilteredJobIdSet.isEmpty() ? prefilteredJobIdSet.stream().map(jId -> this.jobManager.getJobInfoForNonTerminalJob((JobId)jId)).filter(jInfoOp -> jInfoOp.isPresent()).map(jInfoOp -> (JobInfo)jInfoOp.get()).collect(Collectors.toList()) : this.jobManager.getAllNonTerminalJobsList();
        List<JobInfo> shortenedList = jobInfoList.subList(0, Math.min(jobInfoList.size(), request.getLimit().orElse(DEFAULT_ACTIVE_JOB_LIMIT)));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("List of non terminal jobs {}", jobInfoList);
        }
        return Observable.from(shortenedList).flatMap(jInfo -> {
            JobClusterManagerProto.GetJobDetailsRequest req = new JobClusterManagerProto.GetJobDetailsRequest("system", jInfo.jobId);
            CompletionStage<JobClusterManagerProto.GetJobDetailsResponse> respCS = PatternsCS.ask((ActorRef)jInfo.jobActor, (Object)req, (Duration)timeout).thenApply(JobClusterManagerProto.GetJobDetailsResponse.class::cast);
            return Observable.from(respCS.toCompletableFuture(), (Scheduler)Schedulers.io()).onErrorResumeNext(ex -> {
                this.logger.warn("caught exception {}", (Object)ex.getMessage(), ex);
                return Observable.empty();
            });
        }).filter(resp -> resp != null && resp.getJobMetadata().isPresent()).map(resp -> resp.getJobMetadata().get()).map(metaData -> new MantisJobMetadataView((IMantisJobMetadata)metaData, request.getStageNumberList(), request.getWorkerIndexList(), request.getWorkerNumberList(), request.getWorkerStateList(), false));
    }

    private Observable<MantisJobMetadataView> getFilteredTerminalJobList(JobClusterManagerProto.ListJobCriteria request, Set<JobId> jobIdSet) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("JobClusterActor:getFilteredTerminalJobList");
        }
        if (request.getJobState().isPresent() && !request.getJobState().get().equals((Object)JobState.MetaState.Terminal)) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit JobClusterActor:getFilteredTerminalJobList with empty");
            }
            return Observable.empty();
        }
        if (!request.getJobState().isPresent() && request.getActiveOnly().isPresent() && request.getActiveOnly().get().booleanValue()) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit JobClusterActor:getFilteredTerminalJobList with empty");
            }
            return Observable.empty();
        }
        List<Object> jobInfoList = !jobIdSet.isEmpty() ? jobIdSet.stream().map(jId -> this.jobManager.getCompletedJob((JobId)jId)).filter(compJobOp -> compJobOp.isPresent()).map(compJobOp -> (JobClusterDefinitionImpl.CompletedJob)compJobOp.get()).collect(Collectors.toList()) : this.jobManager.getCompletedJobsList(request.getLimit().orElse(DEFAULT_LIMIT), request.getStartJobIdExclusive().orElse(null));
        List shortenedList = jobInfoList.subList(0, Math.min(jobInfoList.size(), request.getLimit().orElse(DEFAULT_LIMIT)));
        return Observable.from(shortenedList).flatMap(cJob -> {
            try {
                Optional<IMantisJobMetadata> metaOp;
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Fetching details for completed job {}", cJob);
                }
                if ((metaOp = this.jobManager.getJobDataForCompletedJob(cJob.getJobId())).isPresent()) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Fetched details for completed job {} -> {}", cJob, (Object)metaOp.get());
                    }
                    return Observable.just((Object)new MantisJobMetadataView(metaOp.get(), cJob.getTerminatedAt(), request.getStageNumberList(), request.getWorkerIndexList(), request.getWorkerNumberList(), request.getWorkerStateList(), false));
                }
            }
            catch (Exception e) {
                this.logger.error("caught exception", (Throwable)e);
                return Observable.empty();
            }
            return Observable.empty();
        });
    }

    @Override
    public void onJobListCompleted(JobClusterManagerProto.ListCompletedJobsInClusterRequest request) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onJobListCompleted {}", (Object)request);
        }
        ActorRef sender = this.getSender();
        List<JobClusterDefinitionImpl.CompletedJob> completedJobsList = this.jobManager.getCompletedJobsList(request.getLimit(), null);
        sender.tell((Object)new JobClusterManagerProto.ListCompletedJobsInClusterResponse(request.requestId, BaseResponse.ResponseCode.SUCCESS, "", completedJobsList), sender);
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onJobListCompleted {}", (Object)completedJobsList.size());
        }
    }

    @Override
    public void onJobClusterDisable(JobClusterManagerProto.DisableJobClusterRequest req) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onJobClusterDisable {}", (Object)req);
        }
        ActorRef sender = this.getSender();
        try {
            IJobClusterMetadata jobClusterMetadata = new JobClusterMetadataImpl.Builder().withIsDisabled(true).withLastJobCount(this.jobClusterMetadata.getLastJobCount()).withJobClusterDefinition((JobClusterDefinitionImpl)this.jobClusterMetadata.getJobClusterDefinition()).build();
            this.jobStore.updateJobCluster(jobClusterMetadata);
            this.jobClusterMetadata = jobClusterMetadata;
            this.cronManager.destroyCron();
            this.getContext().become(this.disabledBehavior);
            ArrayList<JobInfo> jobsToKill = new ArrayList<JobInfo>();
            jobsToKill.addAll(this.jobManager.getAcceptedJobsList());
            jobsToKill.addAll(this.jobManager.getActiveJobsList());
            for (JobInfo jobInfo : jobsToKill) {
                jobInfo.jobActor.tell((Object)new JobClusterProto.KillJobRequest(jobInfo.jobId, "Job cluster disabled", JobCompletedReason.Killed, req.getUser(), ActorRef.noSender()), this.getSelf());
            }
            this.getTimers().cancel((Object)BOOKKEEPING_TIMER_KEY);
            this.eventPublisher.publishAuditEvent(new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_DISABLED, jobClusterMetadata.getJobClusterDefinition().getName(), this.name + " disabled"));
            sender.tell((Object)new JobClusterManagerProto.DisableJobClusterResponse(req.requestId, BaseResponse.ResponseCode.SUCCESS, String.format("%s disabled", this.name)), this.getSelf());
            this.numJobClusterDisable.increment();
            this.logger.info("Job Cluster {} is disabbled", (Object)this.name);
        }
        catch (Exception e) {
            String errorMsg = "Exception disabling cluster " + this.name + " due to " + e.getMessage();
            this.logger.error(errorMsg, (Throwable)e);
            sender.tell((Object)new JobClusterManagerProto.DisableJobClusterResponse(req.requestId, BaseResponse.ResponseCode.SERVER_ERROR, errorMsg), this.getSelf());
            this.numJobClusterDisableErrors.increment();
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onJobClusterDisable");
        }
    }

    @Override
    public void onJobClusterEnable(JobClusterManagerProto.EnableJobClusterRequest req) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onJobClusterEnable");
        }
        ActorRef sender = this.getSender();
        try {
            IJobClusterMetadata jobClusterMetadata = new JobClusterMetadataImpl.Builder().withIsDisabled(false).withLastJobCount(this.jobClusterMetadata.getLastJobCount()).withJobClusterDefinition((JobClusterDefinitionImpl)this.jobClusterMetadata.getJobClusterDefinition()).build();
            this.jobStore.updateJobCluster(jobClusterMetadata);
            this.jobClusterMetadata = jobClusterMetadata;
            if (this.cronManager == null) {
                this.cronManager = new CronManager(this.name, this.getSelf(), jobClusterMetadata.getJobClusterDefinition().getSLA());
            }
            this.cronManager.initCron();
            this.getContext().become(this.initializedBehavior);
            this.setBookkeepingTimer(5L);
            this.eventPublisher.publishAuditEvent(new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_ENABLED, this.jobClusterMetadata.getJobClusterDefinition().getName(), this.name + " enabled"));
            sender.tell((Object)new JobClusterManagerProto.EnableJobClusterResponse(req.requestId, BaseResponse.ResponseCode.SUCCESS, String.format("%s enabled", this.name)), this.getSelf());
            this.numJobClusterEnable.increment();
            this.logger.info("Job Cluster {} is Enabled", (Object)this.name);
        }
        catch (Exception e) {
            String errorMsg = String.format("Exception enabling cluster %s due to %s", this.name, e.getMessage());
            this.logger.error(errorMsg, (Throwable)e);
            sender.tell((Object)new JobClusterManagerProto.EnableJobClusterResponse(req.requestId, BaseResponse.ResponseCode.SERVER_ERROR, errorMsg), this.getSelf());
            this.numJobClusterEnableErrors.increment();
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onJobClusterEnable");
        }
    }

    @Override
    public void onJobClusterGet(JobClusterManagerProto.GetJobClusterRequest request) {
        ActorRef sender = this.getSender();
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("In JobCluster Get " + this.jobClusterMetadata);
        }
        if (this.name.equals(request.getJobClusterName())) {
            MantisJobClusterMetadataView clusterView = this.generateJobClusterMetadataView(this.jobClusterMetadata, this.jobClusterMetadata.isDisabled(), Optional.ofNullable(this.cronManager).map(x -> ((CronManager)x).isCronActive).orElse(false));
            sender.tell((Object)new JobClusterManagerProto.GetJobClusterResponse(request.requestId, BaseResponse.ResponseCode.SUCCESS, "", Optional.of(clusterView)), this.getSelf());
        } else {
            sender.tell((Object)new JobClusterManagerProto.GetJobClusterResponse(request.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, "Cluster Name " + request.getJobClusterName() + " in request Does not match cluster Name " + this.name + " of Job Cluster Actor", Optional.empty()), this.getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onJobClusterGet");
        }
    }

    private MantisJobClusterMetadataView generateJobClusterMetadataView(IJobClusterMetadata jobClusterMetadata, boolean isDisabled, boolean cronActive) {
        return new MantisJobClusterMetadataView.Builder().withName(jobClusterMetadata.getJobClusterDefinition().getName()).withDisabled(isDisabled).withIsReadyForJobMaster(jobClusterMetadata.getJobClusterDefinition().getIsReadyForJobMaster()).withJars(jobClusterMetadata.getJobClusterDefinition().getJobClusterConfigs()).withJobOwner(jobClusterMetadata.getJobClusterDefinition().getOwner()).withLabels(jobClusterMetadata.getJobClusterDefinition().getLabels()).withLastJobCount(jobClusterMetadata.getLastJobCount()).withSla(jobClusterMetadata.getJobClusterDefinition().getSLA()).withMigrationConfig(jobClusterMetadata.getJobClusterDefinition().getWorkerMigrationConfig()).withParameters(jobClusterMetadata.getJobClusterDefinition().getParameters()).isCronActive(cronActive).withLatestVersion(jobClusterMetadata.getJobClusterDefinition().getJobClusterConfig().getVersion()).build();
    }

    @Override
    public void onJobSubmit(JobClusterManagerProto.SubmitJobRequest request) {
        Optional<JobInfo> existingJob;
        String uniq;
        ActorRef sender = this.getSender();
        if (request.getJobDefinition().isPresent() && (uniq = request.getJobDefinition().get().getJobSla().getUserProvidedType()) != null && !uniq.isEmpty() && (existingJob = this.jobManager.getJobInfoByUniqueId(uniq)).isPresent()) {
            this.logger.info("Job with unique {} already exists, returning its job Id {}", (Object)uniq, (Object)existingJob.get().jobId);
            sender.tell((Object)new JobClusterManagerProto.SubmitJobResponse(request.requestId, BaseResponse.ResponseCode.SUCCESS, existingJob.get().jobId.getId(), Optional.of(existingJob.get().jobId)), this.getSelf());
            return;
        }
        this.logger.info("Submitting job {}", (Object)request);
        try {
            if (this.requireJobActorProcess(request)) {
                this.logger.info("Sending job submit request to job actor for inheritance: {}", (Object)request.requestId);
                return;
            }
            JobDefinition resolvedJobDefn = request.isSubmitLatest() ? this.fromJobClusterDefinition(request.getSubmitter(), this.jobClusterMetadata.getJobClusterDefinition()) : this.getResolvedJobDefinition(request.getSubmitter(), request.getJobDefinition());
            this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobClusterStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, "Job submit request received", this.jobClusterMetadata.getJobClusterDefinition().getName()));
            resolvedJobDefn = LabelManager.insertSystemLabels(resolvedJobDefn, request.isAutoResubmit());
            this.submitJob(resolvedJobDefn, sender, request.getSubmitter());
            this.numJobSubmissions.increment();
        }
        catch (PersistException pe) {
            this.logger.error("Exception submitting job {} from {}", new Object[]{request.getClusterName(), request.getSubmitter(), pe});
            this.numJobSubmissionFailures.increment();
            sender.tell((Object)new JobClusterManagerProto.SubmitJobResponse(request.requestId, BaseResponse.ResponseCode.SERVER_ERROR, pe.getMessage(), Optional.empty()), this.getSelf());
        }
        catch (Exception e) {
            this.logger.error("Exception submitting job {} from {}", new Object[]{request.getClusterName(), request.getSubmitter(), e});
            this.numJobSubmissionFailures.increment();
            sender.tell((Object)new JobClusterManagerProto.SubmitJobResponse(request.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, e.getMessage(), Optional.empty()), this.getSelf());
        }
    }

    public void onGetJobDefinitionUpdatedFromJobActorResponse(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorResponse request) {
        this.logger.info("Resuming job submission from job actor");
        ActorRef originalSender = request.getOriginalSender();
        if (request.responseCode == BaseResponse.ResponseCode.SERVER_ERROR || request.getJobDefinition() == null) {
            this.logger.error("Failed to retrieve job definition from job actor");
            this.numJobSubmissionFailures.increment();
            originalSender.tell((Object)new JobClusterManagerProto.SubmitJobResponse(request.requestId, BaseResponse.ResponseCode.SERVER_ERROR, request.message, Optional.empty()), this.getSelf());
            return;
        }
        try {
            Optional<JobDefinition> jobDefinitionCloneO;
            JobDefinition resolvedJobDefn = request.getJobDefinition();
            if (request.isQuickSubmitMode() && (jobDefinitionCloneO = this.cloneToNewJobDefinitionWithoutArtifactNameAndVersion(request.getJobDefinition())).isPresent()) {
                resolvedJobDefn = jobDefinitionCloneO.get();
            }
            resolvedJobDefn = this.jobDefinitionResolver.getResolvedJobDefinition(request.getUser(), resolvedJobDefn, this.jobClusterMetadata);
            this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobClusterStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, "Job submit request received", this.jobClusterMetadata.getJobClusterDefinition().getName()));
            resolvedJobDefn = LabelManager.insertSystemLabels(resolvedJobDefn, request.isAutoResubmit());
            this.submitJob(resolvedJobDefn, originalSender, request.getUser());
            this.numJobSubmissions.increment();
        }
        catch (PersistException pe) {
            this.logger.error("Exception submitting job {} from {}", new Object[]{this.name, request.getUser(), pe});
            this.numJobSubmissionFailures.increment();
            originalSender.tell((Object)new JobClusterManagerProto.SubmitJobResponse(request.requestId, BaseResponse.ResponseCode.SERVER_ERROR, pe.getMessage(), Optional.empty()), this.getSelf());
        }
        catch (Exception e) {
            this.logger.error("Exception submitting job {} from {}", new Object[]{this.name, request.getUser(), e});
            this.numJobSubmissionFailures.increment();
            originalSender.tell((Object)new JobClusterManagerProto.SubmitJobResponse(request.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, e.getMessage(), Optional.empty()), this.getSelf());
        }
    }

    private boolean requireJobActorProcess(JobClusterManagerProto.SubmitJobRequest request) {
        String user = request.getSubmitter();
        Optional<JobDefinition> givenJobDefn = request.getJobDefinition();
        List<JobInfo> existingJobsList = this.jobManager.getAllNonTerminalJobsList();
        Optional<JobId> lastJobId = JobListHelper.getLastSubmittedJobId(existingJobsList, Collections.emptyList());
        if (!lastJobId.isPresent()) {
            this.logger.info("No valid last job id found for inheritance. Skip job actor process step.");
            return false;
        }
        Optional<JobInfo> jobInfoForNonTerminalJob = this.jobManager.getJobInfoForNonTerminalJob(lastJobId.get());
        if (!jobInfoForNonTerminalJob.isPresent()) {
            this.logger.info("Last job id doesn't map to job info instance, skip job actor process. {}", (Object)lastJobId.get());
            return false;
        }
        if (request.isSubmitLatest()) {
            this.logger.info("Submit latest job request, skip job actor process. {}", (Object)request);
            return false;
        }
        if (!givenJobDefn.isPresent()) {
            this.logger.info("[QuickSubmit] pass to job actor to process job definition: {}", (Object)lastJobId.get());
            jobInfoForNonTerminalJob.get().jobActor.tell((Object)new JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorRequest(user, lastJobId.get(), jobInfoForNonTerminalJob.get().jobDefinition, true, request.isAutoResubmit(), this.getSender()), this.getSelf());
            return true;
        }
        if (givenJobDefn.get().requireInheritInstanceCheck()) {
            this.logger.info("[Inherit request] pass to job actor to process job definition: {}", (Object)lastJobId.get());
            jobInfoForNonTerminalJob.get().jobActor.tell((Object)new JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorRequest(user, lastJobId.get(), givenJobDefn.get(), false, request.isAutoResubmit(), this.getSender()), this.getSelf());
            return true;
        }
        this.logger.info("request doesn't require job actor process, skip job actor and continue.");
        return false;
    }

    private JobDefinition getResolvedJobDefinition(String user, Optional<JobDefinition> givenJobDefnOp) throws Exception {
        JobDefinition resolvedJobDefn;
        if (givenJobDefnOp.isPresent()) {
            if (givenJobDefnOp.get().getSchedulingInfo() != null && givenJobDefnOp.get().requireInheritInstanceCheck()) {
                this.logger.warn("Job requires inheriting instance count but has no active non-terminal job.");
            }
            resolvedJobDefn = givenJobDefnOp.get();
        } else {
            Optional<JobDefinition> jobDefnOp = this.cloneJobDefinitionForQuickSubmitFromArchivedJobs(this.jobManager.getCompletedJobsList(1, null), Optional.empty(), this.jobStore);
            if (jobDefnOp.isPresent()) {
                this.logger.info("Inherited scheduling Info and parameters from previous job");
                resolvedJobDefn = jobDefnOp.get();
            } else if (this.jobClusterMetadata != null && this.jobClusterMetadata.getJobClusterDefinition() != null && this.jobClusterMetadata.getJobClusterDefinition().getJobClusterConfig() != null) {
                this.logger.info("No previous job definition found. Fall back to cluster definition: {}", (Object)this.name);
                IJobClusterDefinition clusterDefinition = this.jobClusterMetadata.getJobClusterDefinition();
                JobClusterConfig clusterConfig = this.jobClusterMetadata.getJobClusterDefinition().getJobClusterConfig();
                resolvedJobDefn = this.fromJobClusterDefinition(user, clusterDefinition);
                this.logger.info("Built job definition from cluster definition: {}", (Object)resolvedJobDefn);
            } else {
                throw new Exception("Job Definition could not retrieved from a previous submission (There may not be a previous submission)");
            }
        }
        this.logger.info("Resolved JobDefn {}", (Object)resolvedJobDefn);
        return this.jobDefinitionResolver.getResolvedJobDefinition(user, resolvedJobDefn, this.jobClusterMetadata);
    }

    private JobDefinition fromJobClusterDefinition(String user, IJobClusterDefinition clusterDefinition) throws InvalidJobException {
        JobClusterConfig clusterConfig = clusterDefinition.getJobClusterConfig();
        return new JobDefinition.Builder().withJobSla(new JobSla.Builder().build()).withArtifactName(clusterConfig.getArtifactName()).withVersion(clusterConfig.getVersion()).withLabels(clusterDefinition.getLabels()).withName(this.name).withParameters(clusterDefinition.getParameters()).withSchedulingInfo(clusterConfig.getSchedulingInfo()).withUser(user).build();
    }

    private void submitJob(JobDefinition jobDefinition, ActorRef sender, String user) throws PersistException {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter submitJobb");
        }
        JobId jId = null;
        try {
            this.validateJobDefinition(jobDefinition);
            long lastJobIdNumber = this.jobClusterMetadata.getLastJobCount();
            jId = new JobId(this.name, ++lastJobIdNumber);
            int heartbeatIntervalSecs = jobDefinition.getIntSystemParameter("mantis.job.worker.heartbeat.interval.secs", 0);
            int workerTimeoutSecs = jobDefinition.getIntSystemParameter("mantis.job.worker.timeout.secs", 0);
            this.logger.info("Creating new job id: {} with job defn {}, with heartbeat {} and workertimeout {}", new Object[]{jId, jobDefinition, heartbeatIntervalSecs, workerTimeoutSecs});
            MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder().withJobId(jId).withSubmittedAt(Instant.now()).withJobState(JobState.Accepted).withNextWorkerNumToUse(1).withJobDefinition(jobDefinition).withHeartbeatIntervalSecs(heartbeatIntervalSecs).withWorkerTimeoutSecs(workerTimeoutSecs).build();
            this.eventPublisher.publishAuditEvent(new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_SUBMIT, jId.getId(), jId + " submitter: " + user));
            this.jobManager.initJob(mantisJobMetaData, this.jobClusterMetadata, sender);
            this.numJobActorCreationCounter.increment();
            this.jobClusterMetadata = new JobClusterMetadataImpl.Builder().withJobClusterDefinition((JobClusterDefinitionImpl)this.jobClusterMetadata.getJobClusterDefinition()).withLastJobCount(lastJobIdNumber).withIsDisabled(this.jobClusterMetadata.isDisabled()).build();
            try {
                this.jobStore.updateJobCluster(this.jobClusterMetadata);
            }
            catch (Exception e) {
                this.logger.error("Failed to persist job cluster {} error {}", new Object[]{this.jobClusterMetadata, e.getMessage(), e});
                this.numJobSubmissionFailures.increment();
                this.cleanUpOnJobSubmitFailure(jId);
                throw new PersistException(e);
            }
            this.jobIdSubmissionSubject.onNext((Object)jId);
            this.numJobSubmissions.increment();
        }
        catch (PersistException pe) {
            throw pe;
        }
        catch (InvalidJobRequest e) {
            this.logger.error("Invalid jobcluster : {} error {}", new Object[]{this.jobClusterMetadata, e.getMessage(), e});
            this.numJobSubmissionFailures.increment();
            throw new IllegalArgumentException(e);
        }
        catch (Exception e) {
            this.logger.error("Exception persisting job in store", (Throwable)e);
            this.numJobSubmissionFailures.increment();
            throw new IllegalStateException(e);
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit submitJob");
        }
    }

    @Override
    public void onJobInitialized(JobProto.JobInitialized jobInited) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onJobInitialized");
        }
        this.jobManager.markJobInitialized(jobInited.jobId, System.currentTimeMillis());
        if (jobInited.responseCode == BaseResponse.ResponseCode.SUCCESS) {
            jobInited.requestor.tell((Object)new JobClusterManagerProto.SubmitJobResponse(jobInited.requestId, BaseResponse.ResponseCode.SUCCESS, jobInited.jobId.getId(), Optional.of(jobInited.jobId)), this.getSelf());
            this.numJobsInitialized.increment();
        } else {
            this.logger.warn("Job was not initialized {}", (Object)jobInited);
            Optional<JobInfo> jobInfo = this.jobManager.getJobInfoForNonTerminalJob(jobInited.jobId);
            if (jobInfo.isPresent()) {
                this.cleanUpOnJobSubmitFailure(jobInfo.get().jobId);
                if (jobInited.requestor != null) {
                    jobInited.requestor.tell((Object)new JobClusterManagerProto.SubmitJobResponse(jobInited.requestId, jobInited.responseCode, "Job " + jobInited.jobId + " submission failed", Optional.ofNullable(jobInited.jobId)), this.getSelf());
                }
            } else {
                this.logger.warn("No such job found {}", (Object)jobInited.jobId);
            }
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onJobInitialized");
        }
    }

    @Override
    public void onJobStarted(JobClusterProto.JobStartedEvent startedEvent) {
        this.logger.info("job {} started event", (Object)startedEvent.jobid);
        Optional<JobInfo> jobInfoOp = this.jobManager.getJobInfoForNonTerminalJob(startedEvent.jobid);
        if (jobInfoOp.isPresent()) {
            this.jobManager.markJobStarted(jobInfoOp.get());
            this.getSelf().tell((Object)new JobClusterProto.EnforceSLARequest(Instant.now(), Optional.of(jobInfoOp.get().jobDefinition)), this.getSelf());
        }
    }

    private void cleanUpOnJobSubmitFailure(JobId jId) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter cleanUpOnJobSubmitFailure {}", (Object)jId);
        }
        if (jId != null) {
            Optional<JobInfo> jobInfoOp = this.jobManager.getJobInfoForNonTerminalJob(jId);
            if (jobInfoOp.isPresent()) {
                JobInfo jobInfo = jobInfoOp.get();
                if (this.jobManager.markJobTerminating(jobInfo, JobState.Failed)) {
                    this.getContext().unwatch(jobInfo.jobActor);
                    this.getContext().stop(jobInfo.jobActor);
                    try {
                        this.jobManager.markCompleted(jId, System.currentTimeMillis(), JobState.Failed);
                    }
                    catch (IOException e) {
                        this.logger.error("Failed to store the completed job {}", (Object)jId, (Object)e);
                    }
                    this.jobManager.markJobInitialized(jId, System.currentTimeMillis());
                } else {
                    this.logger.warn("cleanup on Job Submit failure failed for job {}", (Object)jId);
                }
            }
        } else {
            this.logger.warn("cleanup on Job Submit failure failed as there was no JobId");
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit cleanUpOnJobSubmitFailure {}", (Object)jId);
        }
    }

    private void validateJobDefinition(JobDefinition definition) throws InvalidJobRequest {
        if (definition == null) {
            throw new InvalidJobRequest(null, "MantisJobDefinition cannot be null");
        }
        if (definition.getArtifactName() == null) {
            throw new InvalidJobRequest(null, "MantisJobDefinition job artifactName attribute cannot be null");
        }
        if (definition.getName() == null) {
            throw new InvalidJobRequest(null, "MantisJobDefinition name attribute cannot be null");
        }
        if (definition.getSchedulingInfo() == null) {
            throw new InvalidJobRequest(null, "MantisJobDefinition schedulingInfo cannot be null");
        }
        for (StageSchedulingInfo ssi : definition.getSchedulingInfo().getStages().values()) {
            this.validateConstraints(ssi.getSoftConstraints(), ssi.getHardConstraints());
        }
    }

    private void validateConstraints(List<JobConstraints> softConstraints, List<JobConstraints> hardConstraints) throws InvalidJobRequest {
        if (softConstraints != null) {
            for (JobConstraints jc : softConstraints) {
                if (ConstraintsEvaluators.softConstraint(jc, new HashSet<String>()) != null) continue;
                this.logger.error("Invalid Soft Job Constraint {}", (Object)jc);
                throw new InvalidJobRequest(null, "Unknown constraint " + jc);
            }
        }
        if (hardConstraints != null) {
            for (JobConstraints jc : hardConstraints) {
                if (ConstraintsEvaluators.hardConstraint(jc, new HashSet<String>()) != null) continue;
                this.logger.error("Invalid Hard Job Constraint {}", (Object)jc);
                throw new InvalidJobRequest(null, "Unknown constraint " + jc);
            }
        }
    }

    @Override
    public void onWorkerEvent(WorkerEvent r) {
        Optional<JobInfo> jobInfo;
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onWorkerEvent {}", (Object)r);
        }
        if ((jobInfo = this.jobManager.getJobInfoForNonTerminalJob(r.getWorkerId().getJobId())).isPresent()) {
            jobInfo.get().jobActor.forward((Object)r, (ActorContext)this.getContext());
        } else if (!JobHelper.isTerminalWorkerEvent(r)) {
            this.logger.warn("Event from worker {} has no valid running job. Terminating worker ", (Object)r.getWorkerId());
            Optional<String> host = JobHelper.getWorkerHostFromWorkerEvent(r);
            Optional<IMantisJobMetadata> completedJobOptional = this.jobManager.getJobDataForCompletedJob(r.getWorkerId().getJobId());
            if (completedJobOptional.isPresent()) {
                JobDefinition jobDefinition = completedJobOptional.get().getJobDefinition();
                this.mantisSchedulerFactory.forJob(jobDefinition).unscheduleAndTerminateWorker(r.getWorkerId(), host);
            } else {
                this.logger.warn("Non-terminal Event from worker {} has no completed job. Sending event to default cluster", (Object)r.getWorkerId());
                this.mantisSchedulerFactory.forClusterID(null).unscheduleAndTerminateWorker(r.getWorkerId(), host);
            }
        } else {
            this.logger.warn("Terminal Event from worker {} has no valid running job. Ignoring event ", (Object)r.getWorkerId());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onWorkerEvent {}", (Object)r);
        }
    }

    @Override
    public void onResubmitWorkerRequest(JobClusterManagerProto.ResubmitWorkerRequest req) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onResubmitWorkerRequest {}", (Object)req);
        }
        this.onResubmitWorker(req);
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onResubmitWorkerRequest {}", (Object)req);
        }
    }

    @Override
    public void onJobKillRequest(JobClusterProto.KillJobRequest req) {
        this.logger.info("JobClusterActor.onKillJobRequest {}", (Object)req);
        Optional<JobInfo> jobInfo = this.jobManager.getJobInfoForNonTerminalJob(req.jobId);
        ActorRef sender = this.getSender();
        if (jobInfo.isPresent() && this.jobManager.markJobTerminating(jobInfo.get(), JobState.Failed)) {
            jobInfo.get().jobActor.tell((Object)req, this.getSelf());
        } else {
            this.logger.info("Job {} not found", (Object)req.jobId.getId());
            if (req.requestor != null) {
                req.requestor.tell((Object)new JobClusterManagerProto.KillJobResponse(req.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, JobState.Noop, "Job " + req.jobId + " not found", req.jobId, req.user), this.getSelf());
            }
        }
    }

    @Override
    public void onKillJobResponse(JobClusterProto.KillJobResponse resp) {
        block17: {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Enter onKillJobResponse {}", (Object)resp);
            }
            if (resp.responseCode == BaseResponse.ResponseCode.SUCCESS) {
                Optional<JobInfo> jInfo = this.jobManager.getJobInfoForNonTerminalJob(resp.jobId);
                if (jInfo.isPresent()) {
                    this.getContext().unwatch(jInfo.get().jobActor);
                    this.numJobShutdowns.increment();
                    this.logger.info("Marking job {} as terminated", (Object)jInfo.get().jobId);
                    if (resp.requestor != null && !this.getSelf().equals((Object)resp.requestor)) {
                        resp.requestor.tell((Object)new JobClusterManagerProto.KillJobResponse(resp.requestId, resp.responseCode, resp.state, resp.message, resp.jobId, resp.user), this.getSelf());
                    }
                    try {
                        Optional<JobClusterDefinitionImpl.CompletedJob> completedJob = this.jobManager.markCompleted(resp.jobMetadata);
                        if (completedJob.isPresent()) {
                            this.logger.info("In cleanupAfterJobKill for Job {} in state {} and metadata {} ", new Object[]{resp.jobId, resp.state, resp.jobMetadata});
                            if (this.jobClusterMetadata.isDisabled()) break block17;
                            SLA sla = this.jobClusterMetadata.getJobClusterDefinition().getSLA();
                            if (sla.getMin() == 0 && sla.getMax() == 0) {
                                this.logger.info("{} No SLA specified nothing to enforce {}", (Object)completedJob.get().getJobId(), (Object)sla);
                                break block17;
                            }
                            try {
                                Optional<IMantisJobMetadata> cJob = Optional.of(resp.jobMetadata);
                                if (cJob == null || !cJob.isPresent()) {
                                    cJob = this.jobStore.getArchivedJob(completedJob.get().getJobId());
                                }
                                if (cJob != null && cJob.isPresent()) {
                                    this.getSelf().tell((Object)new JobClusterProto.EnforceSLARequest(Instant.now(), Optional.of(cJob.get().getJobDefinition())), ActorRef.noSender());
                                    break block17;
                                }
                                this.logger.warn("Could not load last terminated job to use for triggering enforce SLA");
                            }
                            catch (Exception e) {
                                this.logger.warn("Exception {} loading completed Job {} to enforce SLA due", new Object[]{e.getMessage(), completedJob.get().getJobId(), e});
                            }
                            break block17;
                        }
                        this.logger.warn("Unable to mark job {} completed. ", (Object)resp.jobId);
                    }
                    catch (IOException e) {
                        this.logger.error("Unable to mark job {} completed. ", (Object)resp.jobId, (Object)e);
                    }
                } else if (resp.requestor != null && !this.getSelf().equals((Object)resp.requestor)) {
                    resp.requestor.tell((Object)new JobClusterManagerProto.KillJobResponse(resp.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, JobState.Noop, "Job not found", resp.jobId, resp.user), this.getSelf());
                }
            } else if (resp.requestor != null && !this.getSelf().equals((Object)resp.requestor)) {
                resp.requestor.tell((Object)new JobClusterManagerProto.KillJobResponse(resp.requestId, resp.responseCode, resp.state, resp.message, resp.jobId, resp.user), this.getSelf());
            }
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onKillJobResponse {}", (Object)resp);
        }
    }

    @Override
    public void onGetJobDetailsRequest(JobClusterManagerProto.GetJobDetailsRequest req) {
        JobClusterManagerProto.GetJobDetailsResponse response;
        block10: {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Enter GetJobDetails {}", (Object)req);
            }
            response = new JobClusterManagerProto.GetJobDetailsResponse(req.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "Job " + req.getJobId() + "  not found", Optional.empty());
            Optional<JobInfo> jInfo = this.jobManager.getJobInfoForNonTerminalJob(req.getJobId());
            if (jInfo.isPresent()) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Forwarding getJobDetails to job actor for {}", (Object)req.getJobId());
                }
                jInfo.get().jobActor.forward((Object)req, (ActorContext)this.getContext());
                return;
            }
            Optional<JobClusterDefinitionImpl.CompletedJob> completedJob = this.jobManager.getCompletedJob(req.getJobId());
            if (completedJob.isPresent()) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Found Job {} in completed state ", (Object)req.getJobId());
                }
                try {
                    Optional<IMantisJobMetadata> jobMetaOp = this.jobStore.getArchivedJob(req.getJobId().getId());
                    if (jobMetaOp.isPresent()) {
                        response = new JobClusterManagerProto.GetJobDetailsResponse(req.requestId, BaseResponse.ResponseCode.SUCCESS, "", jobMetaOp);
                        break block10;
                    }
                    response = new JobClusterManagerProto.GetJobDetailsResponse(req.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "Job " + req.getJobId() + "  not found", Optional.empty());
                }
                catch (Exception e) {
                    this.logger.warn("Exception {} reading Job {} from Storage ", new Object[]{e.getMessage(), req.getJobId(), e});
                    response = new JobClusterManagerProto.GetJobDetailsResponse(req.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, "Exception reading Job " + req.getJobId() + "  " + e.getMessage(), Optional.empty());
                }
            } else {
                this.logger.debug("No such job {} ", (Object)req.getJobId());
            }
        }
        this.getSender().tell((Object)response, this.getSelf());
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit GetJobDetails {}", (Object)req);
        }
    }

    @Override
    public void onGetLatestJobDiscoveryInfo(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest request) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onGetLatestJobDiscoveryInfo {}", (Object)request);
        }
        ActorRef sender = this.getSender();
        if (this.name.equals(request.getJobCluster())) {
            JobId latestJobId = (JobId)this.jobIdSubmissionSubject.getValue();
            this.logger.debug("[{}] latest job Id for cluster: {}", (Object)this.name, (Object)latestJobId);
            if (latestJobId != null) {
                Optional<JobInfo> jInfo = this.jobManager.getJobInfoForNonTerminalJob(latestJobId);
                if (jInfo.isPresent()) {
                    jInfo.get().jobActor.forward((Object)request, (ActorContext)this.getContext());
                } else {
                    this.logger.info("job info not found for job ID when looking up discovery info: {}", (Object)latestJobId);
                    sender.tell((Object)new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(request.requestId, BaseResponse.ResponseCode.SERVER_ERROR, "JobInfo not found when looking up discovery info for " + latestJobId, Optional.empty()), this.getSelf());
                }
            } else {
                this.logger.debug("no latest Job ID found for job cluster {}", (Object)this.name);
                sender.tell((Object)new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(request.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "No latest jobId found for job cluster " + this.name, Optional.empty()), this.getSelf());
            }
        } else {
            String msg = "Job Cluster " + request.getJobCluster() + " In request does not match the name of this actor " + this.name;
            this.logger.warn(msg);
            sender.tell((Object)new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(request.requestId, BaseResponse.ResponseCode.SERVER_ERROR, msg, Optional.empty()), this.getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onGetLatestJobDiscoveryInfo {}", (Object)request);
        }
    }

    @Override
    public void onGetJobStatusSubject(JobClusterManagerProto.GetJobSchedInfoRequest request) {
        Optional<JobInfo> jInfo;
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onGetJobStatusSubject {}", (Object)request);
        }
        if ((jInfo = this.jobManager.getJobInfoForNonTerminalJob(request.getJobId())).isPresent()) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Forwarding getJobDetails to job actor for {}", (Object)request.getJobId());
            }
            jInfo.get().jobActor.forward((Object)request, (ActorContext)this.getContext());
        } else {
            JobClusterManagerProto.GetJobSchedInfoResponse response = new JobClusterManagerProto.GetJobSchedInfoResponse(request.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, "Job " + request.getJobId() + "  not found or not active", Optional.empty());
            this.getSender().tell((Object)response, this.getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onGetJobStatusSubject ");
        }
    }

    @Override
    public void onGetLastSubmittedJobIdSubject(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest request) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onGetLastSubmittedJobIdSubject {}", (Object)request);
        }
        ActorRef sender = this.getSender();
        if (this.name.equals(request.getClusterName())) {
            sender.tell((Object)new JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse(request.requestId, BaseResponse.ResponseCode.SUCCESS, "", Optional.of(this.jobIdSubmissionSubject)), this.getSelf());
        } else {
            String msg = "Job Cluster " + request.getClusterName() + " In request does not match the name of this actor " + this.name;
            this.logger.warn(msg);
            sender.tell((Object)new JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse(request.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, msg, Optional.empty()), this.getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onGetLastSubmittedJobIdSubject {}", (Object)request);
        }
    }

    @Override
    public void onBookkeepingRequest(JobClusterProto.BookkeepingRequest request) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onBookkeepingRequest for JobCluster {}", (Object)this.name);
        }
        this.onEnforceSLARequest(new JobClusterProto.EnforceSLARequest());
        this.jobManager.actorToJobIdMap.keySet().forEach(actorRef -> actorRef.tell((Object)new JobProto.MigrateDisabledVmWorkersRequest(request.time), ActorRef.noSender()));
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onBookkeepingRequest for JobCluster {}", (Object)this.name);
        }
    }

    @Override
    public void onEnforceSLARequest(JobClusterProto.EnforceSLARequest request) {
        int acceptedJobsCount;
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onEnforceSLA for JobCluster {} with request", (Object)this.name, (Object)request);
        }
        this.numSLAEnforcementExecutions.increment();
        long now = request.timeOfEnforcement.toEpochMilli();
        List pendingInitializationJobsPriorToCutoff = this.jobManager.getJobActorsStuckInInit(now, this.getExpirePendingInitializeDelayMs());
        List jobsStuckInAcceptedList = this.jobManager.getJobsStuckInAccepted(now, this.getExpireAcceptedDelayMs());
        List jobsStuckInTerminatingList = this.jobManager.getJobsStuckInTerminating(now, this.getExpireAcceptedDelayMs());
        if (!this.slaEnforcer.hasSLA()) {
            return;
        }
        int activeJobsCount = this.jobManager.activeJobsCount();
        int noOfJobsToLaunch = this.slaEnforcer.enforceSLAMin(activeJobsCount, acceptedJobsCount = this.jobManager.acceptedJobsCount());
        if (noOfJobsToLaunch > 0) {
            this.logger.info("Submitting {} jobs for job name {} as active count is {} and accepted count is {}", new Object[]{noOfJobsToLaunch, this.name, activeJobsCount, acceptedJobsCount});
            String user = "MantisMaster";
            if (request.jobDefinitionOp.isPresent()) {
                user = request.jobDefinitionOp.get().getUser();
            }
            for (int i = 0; i < noOfJobsToLaunch; ++i) {
                this.getSelf().tell((Object)new JobClusterManagerProto.SubmitJobRequest(this.name, user, true, request.jobDefinitionOp), this.getSelf());
            }
        } else {
            ArrayList<JobInfo> listOfJobs = new ArrayList<JobInfo>(activeJobsCount + acceptedJobsCount);
            listOfJobs.addAll(this.jobManager.getActiveJobsList());
            listOfJobs.addAll(this.jobManager.getAcceptedJobsList());
            List<JobId> jobsToKill = this.slaEnforcer.enforceSLAMax(Collections.unmodifiableList(listOfJobs));
            for (JobId jobId : jobsToKill) {
                this.logger.info("Request termination for job {}", (Object)jobId);
                this.getSelf().tell((Object)new JobClusterProto.KillJobRequest(jobId, "SLA enforcement", JobCompletedReason.Killed, "MantisMaster", ActorRef.noSender()), this.getSelf());
            }
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onEnforceSLA for JobCluster {}", (Object)this.name);
        }
    }

    private long getExpireAcceptedDelayMs() {
        return 600000L;
    }

    private Optional<JobDefinition> cloneToNewJobDefinitionWithoutArtifactNameAndVersion(JobDefinition jobDefinition) {
        try {
            JobDefinition clonedJobDefn = new JobDefinition.Builder().withJobSla(jobDefinition.getJobSla()).withLabels(jobDefinition.getLabels()).withName(jobDefinition.getName()).withParameters(jobDefinition.getParameters()).withSchedulingInfo(jobDefinition.getSchedulingInfo()).withNumberOfStages(jobDefinition.getNumberOfStages()).withSubscriptionTimeoutSecs(jobDefinition.getSubscriptionTimeoutSecs()).withUser(jobDefinition.getUser()).build();
            return Optional.of(clonedJobDefn);
        }
        catch (Exception e) {
            this.logger.warn("Could not clone JobDefinition {} due to {}", new Object[]{jobDefinition, e.getMessage(), e});
            e.printStackTrace();
            return Optional.empty();
        }
    }

    private Optional<JobDefinition> cloneJobDefinitionForQuickSubmitFromArchivedJobs(List<JobClusterDefinitionImpl.CompletedJob> completedJobs, Optional<JobDefinition> jobDefinitionOp, MantisJobStore store) {
        Optional<JobDefinition> lastSubmittedJobDefn;
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter createNewJobDefinitionFromLastSubmittedInheritSchedInfoAndParameters");
        }
        if ((lastSubmittedJobDefn = this.getLastSubmittedJobDefinition(completedJobs, jobDefinitionOp, store)).isPresent()) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit createNewJobDefinitionFromLastSubmittedInheritSchedInfoAndParameters");
            }
            return this.cloneToNewJobDefinitionWithoutArtifactNameAndVersion(lastSubmittedJobDefn.get());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit createNewJobDefinitionFromLastSubmittedInheritSchedInfoAndParameters empty");
        }
        return Optional.empty();
    }

    private long getExpirePendingInitializeDelayMs() {
        return 60000L;
    }

    @Override
    public void onTriggerCron(JobClusterProto.TriggerCronRequest request) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onTriggerCron for Job Cluster {}", (Object)this.name);
        }
        if (this.jobClusterMetadata.getJobClusterDefinition().getSLA().getCronPolicy() != null) {
            if (this.jobClusterMetadata.getJobClusterDefinition().getSLA().getCronPolicy() == IJobClusterDefinition.CronPolicy.KEEP_NEW || this.jobManager.getAllNonTerminalJobsList().size() == 0) {
                this.getSelf().tell((Object)new JobClusterManagerProto.SubmitJobRequest(this.name, "MantisMaster", Optional.empty(), false), this.getSelf());
            } else {
                this.logger.info(this.name + ": Skipping submitting new job upon cron trigger, one exists already");
            }
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onTriggerCron Triggered for Job Cluster {}", (Object)this.name);
        }
    }

    private long getTerminatedJobToDeleteDelayHours() {
        return ConfigurationProvider.getConfig().getTerminatedJobToDeleteDelayHours();
    }

    @Override
    public void onJobClusterUpdateSLA(JobClusterManagerProto.UpdateJobClusterSLARequest slaRequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onJobClusterUpdateSLA {}", (Object)slaRequest);
        }
        ActorRef sender = this.getSender();
        try {
            SLA newSla = new SLA(slaRequest.getMin(), slaRequest.getMax(), slaRequest.getCronSpec(), slaRequest.getCronPolicy());
            JobClusterDefinitionImpl updatedDefn = new JobClusterDefinitionImpl.Builder().from(this.jobClusterMetadata.getJobClusterDefinition()).withSla(newSla).build();
            boolean isDisabled = this.jobClusterMetadata.isDisabled();
            if (slaRequest.isForceEnable() && this.jobClusterMetadata.isDisabled()) {
                isDisabled = false;
            }
            IJobClusterMetadata jobCluster = new JobClusterMetadataImpl.Builder().withIsDisabled(isDisabled).withLastJobCount(this.jobClusterMetadata.getLastJobCount()).withJobClusterDefinition(updatedDefn).build();
            this.updateAndSaveJobCluster(jobCluster);
            if (this.cronManager != null) {
                this.cronManager.destroyCron();
            }
            this.cronManager = new CronManager(this.name, this.getSelf(), newSla);
            sender.tell((Object)new JobClusterManagerProto.UpdateJobClusterSLAResponse(slaRequest.requestId, BaseResponse.ResponseCode.SUCCESS, this.name + " SLA updated"), this.getSelf());
            this.eventPublisher.publishAuditEvent(new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_UPDATE, this.jobClusterMetadata.getJobClusterDefinition().getName(), this.name + " SLA update"));
        }
        catch (IllegalArgumentException e) {
            this.logger.error("Invalid arguement job cluster not updated ", (Throwable)e);
            sender.tell((Object)new JobClusterManagerProto.UpdateJobClusterSLAResponse(slaRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.name + " Job cluster SLA updation failed " + e.getMessage()), this.getSelf());
        }
        catch (Exception e) {
            this.logger.error("job cluster not updated ", (Throwable)e);
            sender.tell((Object)new JobClusterManagerProto.UpdateJobClusterSLAResponse(slaRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, this.name + " Job cluster SLA updation failed " + e.getMessage()), this.getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onJobClusterUpdateSLA {}", (Object)slaRequest);
        }
    }

    @Override
    public void onJobClusterUpdateLabels(JobClusterManagerProto.UpdateJobClusterLabelsRequest labelRequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onJobClusterUpdateLabels {}", (Object)labelRequest);
        }
        ActorRef sender = this.getSender();
        try {
            JobClusterConfig newConfig = new JobClusterConfig.Builder().from(this.jobClusterMetadata.getJobClusterDefinition().getJobClusterConfig()).build();
            JobClusterDefinitionImpl updatedDefn = new JobClusterDefinitionImpl.Builder().from(this.jobClusterMetadata.getJobClusterDefinition()).withJobClusterConfig(newConfig).withLabels(labelRequest.getLabels()).build();
            IJobClusterMetadata jobCluster = new JobClusterMetadataImpl.Builder().withIsDisabled(this.jobClusterMetadata.isDisabled()).withLastJobCount(this.jobClusterMetadata.getLastJobCount()).withJobClusterDefinition(updatedDefn).build();
            this.updateAndSaveJobCluster(jobCluster);
            sender.tell((Object)new JobClusterManagerProto.UpdateJobClusterLabelsResponse(labelRequest.requestId, BaseResponse.ResponseCode.SUCCESS, this.name + " labels updated"), this.getSelf());
            this.eventPublisher.publishAuditEvent(new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_UPDATE, this.jobClusterMetadata.getJobClusterDefinition().getName(), this.name + " update labels"));
        }
        catch (Exception e) {
            this.logger.error("job cluster labels not updated ", (Throwable)e);
            sender.tell((Object)new JobClusterManagerProto.UpdateJobClusterLabelsResponse(labelRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, this.name + " labels updation failed " + e.getMessage()), this.getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onJobClusterUpdateLabels {}", (Object)labelRequest);
        }
    }

    @Override
    public void onJobClusterUpdateArtifact(JobClusterManagerProto.UpdateJobClusterArtifactRequest artifactReq) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Entering JobClusterActor:onJobClusterUpdateArtifact");
        }
        ActorRef sender = this.getSender();
        try {
            if (!this.isVersionUnique(artifactReq.getVersion(), this.jobClusterMetadata.getJobClusterDefinition().getJobClusterConfigs())) {
                String msg = String.format("job cluster %s not updated as the version %s is not unique", this.name, artifactReq.getVersion());
                this.logger.error(msg);
                sender.tell((Object)new JobClusterManagerProto.UpdateJobClusterArtifactResponse(artifactReq.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, msg), this.getSelf());
                return;
            }
            JobClusterConfig newConfig = new JobClusterConfig.Builder().from(this.jobClusterMetadata.getJobClusterDefinition().getJobClusterConfig()).withArtifactName(artifactReq.getArtifactName()).withVersion(artifactReq.getVersion()).withUploadedAt(System.currentTimeMillis()).build();
            this.updateJobClusterConfig(newConfig);
            if (!artifactReq.isSkipSubmit()) {
                this.getSelf().tell((Object)new JobClusterManagerProto.SubmitJobRequest(this.name, artifactReq.getUser(), Optional.empty(), false), this.getSelf());
            }
            sender.tell((Object)new JobClusterManagerProto.UpdateJobClusterArtifactResponse(artifactReq.requestId, BaseResponse.ResponseCode.SUCCESS, this.name + " artifact updated"), this.getSelf());
        }
        catch (Exception e) {
            this.logger.error("job cluster not updated ", (Throwable)e);
            sender.tell((Object)new JobClusterManagerProto.UpdateJobClusterArtifactResponse(artifactReq.requestId, BaseResponse.ResponseCode.SERVER_ERROR, this.name + " Job cluster artifact updation failed " + e.getMessage()), this.getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit JobClusterActor:onJobClusterUpdateArtifact");
        }
    }

    private void updateJobClusterConfig(JobClusterConfig newConfig) throws Exception {
        JobClusterDefinitionImpl updatedDefn = new JobClusterDefinitionImpl.Builder().from(this.jobClusterMetadata.getJobClusterDefinition()).withJobClusterConfig(newConfig).build();
        IJobClusterMetadata jobCluster = new JobClusterMetadataImpl.Builder().withIsDisabled(this.jobClusterMetadata.isDisabled()).withLastJobCount(this.jobClusterMetadata.getLastJobCount()).withJobClusterDefinition(updatedDefn).build();
        this.updateAndSaveJobCluster(jobCluster);
        this.eventPublisher.publishAuditEvent(new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_UPDATE, this.jobClusterMetadata.getJobClusterDefinition().getName(), this.name + " artifact update"));
    }

    @Override
    public void onJobClusterUpdateSchedulingInfo(JobClustersManagerActor.UpdateSchedulingInfo request) {
        ActorRef sender = this.getSender();
        try {
            if (!this.isVersionUnique(request.getVersion(), this.jobClusterMetadata.getJobClusterDefinition().getJobClusterConfigs())) {
                String msg = String.format("job cluster %s not updated as the version %s is not unique", this.name, request.getVersion());
                this.logger.error(msg);
                sender.tell((Object)new JobClusterManagerProto.UpdateSchedulingInfoResponse(request.getRequestId(), BaseResponse.ResponseCode.CLIENT_ERROR, msg), this.getSelf());
                return;
            }
            JobClusterConfig newConfig = new JobClusterConfig.Builder().from(this.jobClusterMetadata.getJobClusterDefinition().getJobClusterConfig()).withVersion(request.getVersion()).withSchedulingInfo(request.getSchedulingInfo()).withUploadedAt(System.currentTimeMillis()).build();
            this.updateJobClusterConfig(newConfig);
            sender.tell((Object)new JobClusterManagerProto.UpdateSchedulingInfoResponse(request.getRequestId(), BaseResponse.ResponseCode.SUCCESS, this.name + " schedulingInfo updated"), this.getSelf());
        }
        catch (Exception e) {
            this.logger.error("job cluster not updated ", (Throwable)e);
            sender.tell((Object)new JobClusterManagerProto.UpdateSchedulingInfoResponse(request.getRequestId(), BaseResponse.ResponseCode.SERVER_ERROR, this.name + " Job cluster schedulingInfo update failed " + e.getMessage()), this.getSelf());
        }
    }

    boolean isVersionUnique(String artifactVersion, List<JobClusterConfig> existingConfigs) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter JobClusterActor {} isVersionnique {} existing versions {}", new Object[]{this.name, artifactVersion, existingConfigs});
        }
        for (JobClusterConfig config : existingConfigs) {
            if (!config.getVersion().equals(artifactVersion)) continue;
            this.logger.info("Given Version {} is not unique during UpdateJobCluster {}", (Object)artifactVersion, (Object)this.name);
            return false;
        }
        return true;
    }

    @Override
    public void onJobClusterUpdateWorkerMigrationConfig(JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyRequest req) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Entering JobClusterActor:onJobClusterUpdateWorkerMigrationConfig {}", (Object)req);
        }
        ActorRef sender = this.getSender();
        try {
            JobClusterDefinitionImpl updatedDefn = new JobClusterDefinitionImpl.Builder().from(this.jobClusterMetadata.getJobClusterDefinition()).withMigrationConfig(req.getMigrationConfig()).build();
            IJobClusterMetadata jobCluster = new JobClusterMetadataImpl.Builder().withIsDisabled(this.jobClusterMetadata.isDisabled()).withLastJobCount(this.jobClusterMetadata.getLastJobCount()).withJobClusterDefinition(updatedDefn).build();
            this.updateAndSaveJobCluster(jobCluster);
            sender.tell((Object)new JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyResponse(req.requestId, BaseResponse.ResponseCode.SUCCESS, this.name + " worker migration config updated"), this.getSelf());
            this.eventPublisher.publishAuditEvent(new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_UPDATE, this.jobClusterMetadata.getJobClusterDefinition().getName(), this.name + " worker migration config update"));
        }
        catch (Exception e) {
            this.logger.error("job cluster migration config not updated ", (Throwable)e);
            sender.tell((Object)new JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyResponse(req.requestId, BaseResponse.ResponseCode.SERVER_ERROR, this.name + " Job cluster worker migration config updation failed " + e.getMessage()), this.getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit JobClusterActor:onJobClusterUpdateWorkerMigrationConfig {}", (Object)req);
        }
    }

    private void updateAndSaveJobCluster(IJobClusterMetadata jobCluster) throws Exception {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Entering JobClusterActor:updateAndSaveJobCluster {}", (Object)jobCluster.getJobClusterDefinition().getName());
        }
        this.jobStore.updateJobCluster(jobCluster);
        this.jobClusterMetadata = jobCluster;
        if (!this.jobClusterMetadata.isDisabled()) {
            this.getContext().become(this.initializedBehavior);
        }
        this.slaEnforcer = new SLAEnforcer(this.jobClusterMetadata.getJobClusterDefinition().getSLA());
        this.logger.info("successfully saved job cluster");
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit JobClusterActor:updateAndSaveJobCluster {}", (Object)jobCluster.getJobClusterDefinition().getName());
        }
    }

    private Optional<JobDefinition> getLastSubmittedJobDefinition(List<JobClusterDefinitionImpl.CompletedJob> completedJobs, Optional<JobDefinition> jobDefinitionOp, MantisJobStore store) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Entering getLastSubmittedJobDefinition");
        }
        if (jobDefinitionOp.isPresent()) {
            return jobDefinitionOp;
        }
        Optional<JobId> lastJobId = JobListHelper.getLastSubmittedJobId(Collections.emptyList(), completedJobs);
        if (lastJobId.isPresent()) {
            Optional<JobClusterDefinitionImpl.CompletedJob> completedJob = this.jobManager.getCompletedJob(lastJobId.get());
            if (completedJob.isPresent()) {
                try {
                    Optional<IMantisJobMetadata> archivedJob = store.getArchivedJob(completedJob.get().getJobId());
                    if (archivedJob.isPresent()) {
                        if (this.logger.isTraceEnabled()) {
                            this.logger.trace("Exit getLastSubmittedJobDefinition returning job {} with defn {}", (Object)archivedJob.get().getJobId(), (Object)archivedJob.get().getJobDefinition());
                        }
                        return Optional.of(archivedJob.get().getJobDefinition());
                    }
                    this.logger.warn("Could not find load archived Job {} for cluster {}", (Object)completedJob.get().getJobId(), (Object)this.name);
                }
                catch (Exception e) {
                    this.logger.warn("Archived Job {} could not be loaded from the store due to {} ", (Object)completedJob.get().getJobId(), (Object)e.getMessage());
                }
            } else {
                this.logger.warn("Could not find any previous submitted/completed Job for cluster {}", (Object)this.name);
            }
        } else {
            this.logger.warn("Could not find any previous submitted Job for cluster {}", (Object)this.name);
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit getLastSubmittedJobDefinition empty");
        }
        return Optional.empty();
    }

    private void onTerminated(Terminated terminatedEvent) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("onTerminatedEvent {} ", (Object)terminatedEvent);
        }
    }

    @Override
    public void onScaleStage(JobClusterManagerProto.ScaleStageRequest req) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onScaleStage {}", (Object)req);
        }
        Optional<JobInfo> jobInfo = this.jobManager.getJobInfoForNonTerminalJob(req.getJobId());
        ActorRef sender = this.getSender();
        if (jobInfo.isPresent()) {
            jobInfo.get().jobActor.forward((Object)req, (ActorContext)this.getContext());
        } else {
            sender.tell((Object)new JobClusterManagerProto.ScaleStageResponse(req.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, "Job " + req.getJobId() + " not found. Could not scale stage to " + req.getNumWorkers(), 0), this.getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onScaleStage {}", (Object)req);
        }
    }

    @Override
    public void onResubmitWorker(JobClusterManagerProto.ResubmitWorkerRequest req) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit JCA:onResubmitWorker {}", (Object)req);
        }
        Optional<JobInfo> jobInfo = this.jobManager.getJobInfoForNonTerminalJob(req.getJobId());
        ActorRef sender = this.getSender();
        if (jobInfo.isPresent()) {
            jobInfo.get().jobActor.forward((Object)req, (ActorContext)this.getContext());
        } else {
            sender.tell((Object)new JobClusterManagerProto.ResubmitWorkerResponse(req.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, "Job " + req.getJobId() + " not found. Could not resubmit worker"), this.getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit JCA:onResubmitWorker {}", (Object)req);
        }
    }

    public static class CronTriggerAction
    implements Action1<ActorRef> {
        public void call(ActorRef jobClusterActor) {
            jobClusterActor.tell((Object)new JobClusterProto.TriggerCronRequest(), ActorRef.noSender());
        }
    }

    static class CronManager {
        private static final TriggerOperator triggerOperator;
        private static final Logger logger;
        private final String cronSpec;
        private final IJobClusterDefinition.CronPolicy policy;
        private final ActorRef clusterActor;
        private String triggerId;
        private final String jobClusterName;
        private String triggerGroup = null;
        private CronTrigger<ActorRef> scheduledTrigger;
        private boolean isCronActive = false;

        CronManager(String jobClusterName, ActorRef clusterActor, SLA sla) throws Exception {
            this.jobClusterName = jobClusterName;
            this.cronSpec = sla.getCronSpec();
            this.policy = sla.getCronPolicy();
            this.clusterActor = clusterActor;
            if (this.cronSpec != null) {
                this.initCron();
            }
        }

        private void initCron() throws Exception {
            if (this.cronSpec == null || this.triggerId != null) {
                return;
            }
            logger.info("Init'ing cron for " + this.jobClusterName);
            this.triggerGroup = this.jobClusterName + "-" + this;
            try {
                this.scheduledTrigger = new CronTrigger(this.cronSpec, this.jobClusterName, (Object)this.clusterActor, ActorRef.class, CronTriggerAction.class);
                this.triggerId = triggerOperator.registerTrigger(this.triggerGroup, this.scheduledTrigger);
                this.isCronActive = true;
            }
            catch (IllegalArgumentException e) {
                this.destroyCron();
                logger.error("Failed to start cron for {}: {}", (Object)this.jobClusterName, (Object)e);
                throw new SchedulerException(e.getMessage(), (Throwable)e);
            }
        }

        private void destroyCron() {
            try {
                if (this.triggerId != null) {
                    logger.info("Destroying cron " + this.triggerId);
                    this.triggerId = null;
                    this.isCronActive = false;
                    triggerOperator.deleteTrigger(this.triggerGroup, this.triggerId);
                }
            }
            catch (SchedulerException | TriggerNotFoundException e) {
                logger.warn("Couldn't delete trigger group " + this.triggerGroup + ", id " + this.triggerId);
            }
        }

        boolean isCronActive() {
            return this.isCronActive;
        }

        static {
            logger = LoggerFactory.getLogger(CronManager.class);
            triggerOperator = new TriggerOperator(1);
            try {
                triggerOperator.initialize();
            }
            catch (SchedulerException e) {
                logger.error("Unexpected: {}", (Object)e.getMessage(), (Object)e);
                throw new RuntimeException(e);
            }
        }
    }

    static final class LabelCache {
        final Map<Label, Set<JobId>> labelJobIdMap = new HashMap<Label, Set<JobId>>();
        final Map<JobId, List<Label>> jobIdToLabelMap = new HashMap<JobId, List<Label>>();
        private final Logger logger = LoggerFactory.getLogger(LabelCache.class);

        LabelCache() {
        }

        void addJobIdToLabelCache(JobId jobId, List<Label> labelList) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("addJobIdToLabelCache " + jobId + " labelList " + labelList + " current map " + this.labelJobIdMap);
            }
            if (labelList == null) {
                return;
            }
            for (Label label : labelList) {
                Set<JobId> jobIds = this.labelJobIdMap.get(label);
                if (jobIds != null) {
                    jobIds.add(jobId);
                    continue;
                }
                HashSet<JobId> jobIdList = new HashSet<JobId>();
                jobIdList.add(jobId);
                this.labelJobIdMap.put(label, jobIdList);
            }
            this.jobIdToLabelMap.put(jobId, labelList);
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit addJobIdToLabelCache " + jobId + " labelList " + labelList + " new map " + this.labelJobIdMap);
            }
        }

        void removeJobIdFromLabelCache(JobId jobId) {
            List<Label> labels;
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("removeJobIdFromLabelCache " + jobId + " current map " + this.labelJobIdMap);
            }
            if ((labels = this.jobIdToLabelMap.get(jobId)) != null) {
                for (Label label : labels) {
                    Set<JobId> jobIds = this.labelJobIdMap.get(label);
                    jobIds.remove(jobId);
                    if (!jobIds.isEmpty()) continue;
                    this.labelJobIdMap.remove(label);
                }
            }
            this.jobIdToLabelMap.remove(jobId);
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit removeJobIdFromLabelCache " + jobId + " current map " + this.labelJobIdMap);
            }
        }

        Set<JobId> getJobIdsMatchingLabels(List<Label> labelList, boolean isAnd) {
            Set<JobId> resu;
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Entering getJobidsMatchingLabels " + labelList + " is and ? " + isAnd + " with map " + this.labelJobIdMap);
            }
            HashSet<JobId> matchingJobIds = new HashSet<JobId>();
            ArrayList<Set<JobId>> matchingSubsets = new ArrayList<Set<JobId>>();
            if (labelList == null) {
                return matchingJobIds;
            }
            for (Label label : labelList) {
                if (this.labelJobIdMap.containsKey(label)) {
                    HashSet st = new HashSet();
                    st.addAll(this.labelJobIdMap.get(label));
                    matchingSubsets.add(st);
                    continue;
                }
                matchingSubsets.add(new HashSet());
            }
            Set<JobId> set = resu = isAnd ? this.getSetIntersection(matchingSubsets) : this.getSetUnion(matchingSubsets);
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exiting getJobidsMatchingLabels " + resu);
            }
            return resu;
        }

        private Set<JobId> getSetUnion(List<Set<JobId>> listOfSets) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("In getSetUnion " + listOfSets);
            }
            Set<JobId> unionSet = new HashSet<JobId>();
            if (listOfSets == null || listOfSets.isEmpty()) {
                return unionSet;
            }
            int i = 0;
            unionSet = listOfSets.get(i);
            ++i;
            while (i < listOfSets.size()) {
                Set<JobId> jobIds = listOfSets.get(i);
                unionSet.addAll(jobIds);
                ++i;
            }
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit  getSetUnion " + unionSet);
            }
            return unionSet;
        }

        private Set<JobId> getSetIntersection(List<Set<JobId>> listOfSets) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("In getSetIntersection " + listOfSets);
            }
            Set<JobId> intersectionSet = new HashSet<JobId>();
            if (listOfSets == null || listOfSets.isEmpty()) {
                return intersectionSet;
            }
            int i = 0;
            intersectionSet = listOfSets.get(i);
            ++i;
            while (i < listOfSets.size()) {
                Set<JobId> jobIds = listOfSets.get(i);
                intersectionSet.retainAll(jobIds);
                ++i;
            }
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Return getSetIntersection " + intersectionSet);
            }
            return intersectionSet;
        }
    }

    static final class JobManager {
        private final Logger logger = LoggerFactory.getLogger(JobManager.class);
        private final String name;
        private final Map<ActorRef, JobId> actorToJobIdMap = new HashMap<ActorRef, JobId>();
        private final ConcurrentMap<JobId, JobInfo> pendingInitializationJobsMap = new ConcurrentHashMap<JobId, JobInfo>();
        private final ConcurrentMap<JobId, JobInfo> activeJobsMap = new ConcurrentHashMap<JobId, JobInfo>();
        private final ConcurrentMap<JobId, JobInfo> acceptedJobsMap = new ConcurrentHashMap<JobId, JobInfo>();
        private final Set<JobInfo> nonTerminalSortedJobSet = new TreeSet<JobInfo>((o1, o2) -> {
            if (o1.submittedAt < o2.submittedAt) {
                return 1;
            }
            if (o1.submittedAt > o2.submittedAt) {
                return -1;
            }
            return 0;
        });
        private final CompletedJobStore completedJobStore;
        private final Map<JobId, JobInfo> terminatingJobsMap = new HashMap<JobId, JobInfo>();
        private final AbstractActor.ActorContext context;
        private final MantisSchedulerFactory scheduler;
        private final LifecycleEventPublisher publisher;
        private final MantisJobStore jobStore;
        private final CostsCalculator costsCalculator;
        private final LabelCache labelCache = new LabelCache();

        JobManager(String clusterName, AbstractActor.ActorContext context, MantisSchedulerFactory schedulerFactory, LifecycleEventPublisher publisher, MantisJobStore jobStore, CostsCalculator costsCalculator) {
            this.name = clusterName;
            this.jobStore = jobStore;
            this.context = context;
            this.scheduler = schedulerFactory;
            this.publisher = publisher;
            this.completedJobStore = new CompletedJobStore(this.name, this.labelCache, jobStore);
            this.costsCalculator = costsCalculator;
        }

        void initialize() throws IOException {
            this.logger.debug("Loading completed jobs for cluster {}", (Object)this.name);
            this.completedJobStore.initialize();
            this.logger.debug("Initialized completed job store for cluster {}", (Object)this.name);
        }

        public void onJobClusterDeletion() throws IOException {
            this.completedJobStore.onJobClusterDeletion();
        }

        Observable<JobProto.JobInitialized> bootstrapJob(MantisJobMetadataImpl jobMeta, IJobClusterMetadata jobClusterMetadata) {
            JobInfo jobInfo = this.createJobInfoAndActorAndWatchActor(jobMeta, jobClusterMetadata);
            this.actorToJobIdMap.put(jobInfo.jobActor, jobInfo.jobId);
            if (jobInfo.state.equals((Object)JobState.Accepted)) {
                this.acceptedJobsMap.put(jobInfo.jobId, jobInfo);
                this.nonTerminalSortedJobSet.add(jobInfo);
            } else if (jobInfo.state.equals((Object)JobState.Launched)) {
                this.activeJobsMap.put(jobInfo.jobId, jobInfo);
                this.nonTerminalSortedJobSet.add(jobInfo);
            } else if (jobInfo.state.equals((Object)JobState.Terminating_abnormal) || jobInfo.state.equals((Object)JobState.Terminating_normal)) {
                this.terminatingJobsMap.put(jobInfo.jobId, jobInfo);
                this.nonTerminalSortedJobSet.add(jobInfo);
            } else {
                this.logger.warn("Unexpected job state {}", (Object)jobInfo.state);
            }
            long masterInitTimeoutSecs = ConfigurationProvider.getConfig().getMasterInitTimeoutSecs();
            long timeout = masterInitTimeoutSecs - 60L > 0L ? masterInitTimeoutSecs - 60L : masterInitTimeoutSecs;
            Duration t = Duration.ofSeconds(timeout);
            this.markJobInitializeInitiated(jobInfo, System.currentTimeMillis());
            CompletionStage<JobProto.JobInitialized> respCS = PatternsCS.ask((ActorRef)jobInfo.jobActor, (Object)new JobProto.InitJob(ActorRef.noSender(), false), (Duration)t).thenApply(JobProto.JobInitialized.class::cast);
            return Observable.from(respCS.toCompletableFuture(), (Scheduler)Schedulers.io()).onErrorResumeNext(ex -> {
                this.logger.warn("caught exception {}", (Object)ex.getMessage(), ex);
                return Observable.just((Object)new JobProto.JobInitialized(1L, BaseResponse.ResponseCode.SERVER_ERROR, "Timeout initializing Job " + jobInfo.jobId + " exception -> " + ex.getMessage(), jobInfo.jobId, ActorRef.noSender()));
            }).map(jobInited -> {
                this.markJobInitialized(jobInited.jobId, System.currentTimeMillis());
                return jobInited;
            });
        }

        JobInfo initJob(MantisJobMetadataImpl jobMeta, IJobClusterMetadata jobClusterMetadata, ActorRef sender) {
            JobInfo jobInfo = this.createJobInfoAndActorAndWatchActor(jobMeta, jobClusterMetadata);
            this.markJobAccepted(jobInfo);
            jobInfo.jobActor.tell((Object)new JobProto.InitJob(sender, true), this.context.self());
            this.markJobInitializeInitiated(jobInfo, System.currentTimeMillis());
            return jobInfo;
        }

        JobInfo createJobInfoAndActorAndWatchActor(MantisJobMetadataImpl jobMeta, IJobClusterMetadata jobClusterMetadata) {
            MantisScheduler scheduler1 = this.scheduler.forJob(jobMeta.getJobDefinition());
            ActorRef jobActor = this.context.actorOf(JobActor.props(jobClusterMetadata.getJobClusterDefinition(), jobMeta, this.jobStore, scheduler1, this.publisher, this.costsCalculator), "JobActor-" + jobMeta.getJobId().getId());
            this.context.watch(jobActor);
            this.labelCache.addJobIdToLabelCache(jobMeta.getJobId(), jobMeta.getLabels());
            return new JobInfo.Builder().usingJobMetadata(jobMeta, jobActor).build();
        }

        void markJobInitialized(JobId jobId, long ts) {
            JobInfo removed = (JobInfo)this.pendingInitializationJobsMap.remove(jobId);
            if (removed != null) {
                removed.setInitializedAt(ts);
            }
        }

        void markJobInitializeInitiated(JobInfo jobInfo, long ts) {
            jobInfo.setInitializeInitiatedAt(ts);
            this.pendingInitializationJobsMap.put(jobInfo.jobId, jobInfo);
        }

        boolean markJobAccepted(JobInfo jobInfo) {
            boolean isSuccess = false;
            if (!jobInfo.state.isValidStateChgTo(JobState.Accepted) || this.activeJobsMap.containsKey(jobInfo.jobId) || this.terminatingJobsMap.containsKey(jobInfo.jobId)) {
                String warn = String.format("Job %s already exists", jobInfo.jobId);
                this.logger.warn(warn);
            } else {
                this.acceptedJobsMap.put(jobInfo.jobId, jobInfo);
                this.actorToJobIdMap.put(jobInfo.jobActor, jobInfo.jobId);
                this.nonTerminalSortedJobSet.add(jobInfo);
                isSuccess = true;
            }
            return isSuccess;
        }

        List<JobInfo> getPendingInitializationJobsPriorToCutoff(long ts) {
            return this.pendingInitializationJobsMap.values().stream().filter(jInfo -> jInfo.initializedAt == -1L && jInfo.initializeInitiatedAt < ts).collect(Collectors.toList());
        }

        boolean markJobTerminating(JobInfo jobInfo, JobState newState) {
            boolean isSuccess = false;
            if (JobState.isTerminalState(newState) && jobInfo.state.isValidStateChgTo(newState)) {
                this.activeJobsMap.remove(jobInfo.jobId);
                this.acceptedJobsMap.remove(jobInfo.jobId);
                this.nonTerminalSortedJobSet.add(jobInfo);
                jobInfo.setState(newState);
                this.terminatingJobsMap.put(jobInfo.jobId, jobInfo);
                jobInfo.setTerminationInitiatedAt(System.currentTimeMillis());
                isSuccess = true;
            } else {
                String warn = "Unexpected job terminating event " + jobInfo.jobId + " Invalid transition from state " + (Object)((Object)jobInfo.state) + " to state " + (Object)((Object)newState) + " ";
                this.logger.warn(warn);
            }
            return isSuccess;
        }

        boolean markJobStarted(JobInfo jobInfo) {
            boolean success = false;
            if (jobInfo.state.isValidStateChgTo(JobState.Launched)) {
                jobInfo.setState(JobState.Launched);
                this.acceptedJobsMap.remove(jobInfo.jobId);
                this.activeJobsMap.put(jobInfo.jobId, jobInfo);
                this.nonTerminalSortedJobSet.add(jobInfo);
                success = true;
            } else {
                String warn = String.format("Unexpected job started event %s Invalid transition from state %s to state %s", new Object[]{jobInfo.jobId, jobInfo.state, JobState.Launched});
                this.logger.warn(warn);
            }
            return success;
        }

        Optional<JobClusterDefinitionImpl.CompletedJob> markCompleted(IMantisJobMetadata jobMetadata) throws IOException {
            Optional<JobInfo> jobInfoOp;
            JobId jId = jobMetadata.getJobId();
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Enter markCompleted job {}", (Object)jId);
            }
            if ((jobInfoOp = this.getJobInfoForNonTerminalJob(jId)).isPresent()) {
                JobInfo jInfo = jobInfoOp.get();
                jInfo.state = jobMetadata.getState();
                jInfo.setTerminatedAt(jobMetadata.getEndedAtInstant().get().toEpochMilli());
                this.acceptedJobsMap.remove(jId);
                this.terminatingJobsMap.remove(jId);
                this.activeJobsMap.remove(jId);
                this.actorToJobIdMap.remove(jobInfoOp.get().jobActor);
                this.nonTerminalSortedJobSet.remove(jInfo);
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Exit markCompleted job {}", (Object)jId);
                }
                return Optional.of(this.completedJobStore.onJobCompletion(jobMetadata));
            }
            this.logger.warn("No such job {}", (Object)jId);
            return Optional.empty();
        }

        Optional<JobClusterDefinitionImpl.CompletedJob> markCompleted(JobId jId, long completionTime, JobState state) throws IOException {
            Optional<JobInfo> jobInfoOp;
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Enter markCompleted job {}", (Object)jId);
            }
            if ((jobInfoOp = this.getJobInfoForNonTerminalJob(jId)).isPresent()) {
                JobInfo jInfo = jobInfoOp.get();
                jInfo.state = state;
                jInfo.setTerminatedAt(completionTime);
                this.acceptedJobsMap.remove(jId);
                this.terminatingJobsMap.remove(jId);
                this.activeJobsMap.remove(jId);
                this.actorToJobIdMap.remove(jobInfoOp.get().jobActor);
                this.nonTerminalSortedJobSet.remove(jInfo);
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Exit markCompleted job {}", (Object)jId);
                }
                Object version = null;
                return Optional.ofNullable(this.completedJobStore.onJobCompletion(jId, jInfo.submittedAt, completionTime, jInfo.user, null, state, jInfo.jobDefinition.getLabels()));
            }
            this.logger.warn("No such job {}", (Object)jId);
            return Optional.empty();
        }

        List<JobInfo> getAllNonTerminalJobsList() {
            ArrayList<JobInfo> allJobsList = new ArrayList<JobInfo>(this.nonTerminalSortedJobSet);
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exiting JobClusterActor:getAllNonTerminatlJobsList {}", allJobsList);
            }
            return allJobsList;
        }

        List<JobInfo> getAcceptedJobsList() {
            ArrayList acceptedJobsList = Lists.newArrayListWithExpectedSize((int)this.acceptedJobsCount());
            acceptedJobsList.addAll(this.acceptedJobsMap.values());
            return Collections.unmodifiableList(acceptedJobsList);
        }

        List<JobInfo> getActiveJobsList() {
            ArrayList activeJobList = Lists.newArrayListWithExpectedSize((int)this.activeJobsMap.size());
            activeJobList.addAll(this.activeJobsMap.values());
            return Collections.unmodifiableList(activeJobList);
        }

        List<JobClusterDefinitionImpl.CompletedJob> getCompletedJobsList(int limit, @Nullable JobId from) {
            try {
                if (from != null) {
                    return this.completedJobStore.getCompletedJobs(limit, from);
                }
                return this.completedJobStore.getCompletedJobs(limit);
            }
            catch (IOException e) {
                return Collections.emptyList();
            }
        }

        List<JobInfo> getTerminatingJobsList() {
            ArrayList terminatingJobsList = Lists.newArrayListWithExpectedSize((int)this.terminatingJobsMap.size());
            terminatingJobsList.addAll(this.terminatingJobsMap.values());
            return Collections.unmodifiableList(terminatingJobsList);
        }

        int acceptedJobsCount() {
            return this.acceptedJobsMap.size();
        }

        int activeJobsCount() {
            return this.activeJobsMap.size();
        }

        Optional<JobClusterDefinitionImpl.CompletedJob> getCompletedJob(JobId jId) {
            try {
                return this.completedJobStore.getCompletedJob(jId);
            }
            catch (IOException e) {
                this.logger.warn("Failed to get completed job {}", (Object)jId, (Object)e);
                return Optional.empty();
            }
        }

        Optional<IMantisJobMetadata> getJobDataForCompletedJob(String jId) {
            Optional<JobId> jobId = JobId.fromId(jId);
            if (jobId.isPresent()) {
                try {
                    return this.completedJobStore.getJobMetadata(jobId.get());
                }
                catch (IOException e) {
                    this.logger.warn("Failed to get completed job {}", (Object)jId, (Object)e);
                    return Optional.empty();
                }
            }
            this.logger.warn("Invalid Job Id {} in getJobDataForCompletedJob", (Object)jId);
            return Optional.empty();
        }

        Optional<JobInfo> getJobInfoForNonTerminalJob(JobId jId) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("In getJobInfo {}", (Object)jId);
            }
            if (this.acceptedJobsMap.containsKey(jId)) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Found {} in accepted state", (Object)jId);
                }
                return Optional.of(this.acceptedJobsMap.get(jId));
            }
            if (this.activeJobsMap.containsKey(jId)) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Found {} in active state", (Object)jId);
                }
                return Optional.of(this.activeJobsMap.get(jId));
            }
            if (this.terminatingJobsMap.containsKey(jId)) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Found {} in terminating state", (Object)jId);
                }
                return Optional.of(this.terminatingJobsMap.get(jId));
            }
            return Optional.empty();
        }

        Optional<JobInfo> getJobInfoForNonTerminalJob(String jobId) {
            Optional<JobId> jId = JobId.fromId(jobId);
            if (jId.isPresent()) {
                return this.getJobInfoForNonTerminalJob(jId.get());
            }
            return Optional.empty();
        }

        Optional<JobInfo> getJobInfoByUniqueId(String uniqueId) {
            return this.getAllNonTerminalJobsList().stream().filter(jobInfo -> {
                String unq = jobInfo.jobDefinition.getJobSla().getUserProvidedType();
                return unq != null && !unq.isEmpty() && unq.equals(uniqueId);
            }).findFirst();
        }

        private List<JobInfo> getJobActorsStuckInInit(long now, long allowedDelay) {
            return this.getPendingInitializationJobsPriorToCutoff(now - allowedDelay).stream().peek(jobInfo -> this.logger.warn("Job {} waiting for initialization since {}", (Object)jobInfo.jobId, (Object)jobInfo.initializeInitiatedAt)).collect(Collectors.toList());
        }

        private List<JobInfo> getJobsStuckInAccepted(long now, long allowedDelay) {
            return this.getAcceptedJobsList().stream().filter(jobInfo -> jobInfo.submittedAt < now - allowedDelay).peek(jobInfo -> this.logger.warn("Job {} stuck in accepted since {}", (Object)jobInfo.jobId, (Object)Instant.ofEpochMilli(jobInfo.submittedAt))).collect(Collectors.toList());
        }

        private List<JobInfo> getJobsStuckInTerminating(long now, long allowedDelay) {
            return this.getTerminatingJobsList().stream().filter(jobInfo -> jobInfo.terminationInitiatedAt < now - allowedDelay).peek(jobInfo -> this.logger.warn("Job {} stuck in terminating since {}", (Object)jobInfo.jobId, (Object)Instant.ofEpochMilli(jobInfo.terminationInitiatedAt))).collect(Collectors.toList());
        }

        boolean isJobListEmpty() {
            return this.activeJobsMap.isEmpty() && this.acceptedJobsMap.isEmpty();
        }

        public Set<JobId> getJobsMatchingLabels(List<Label> labels, Optional<String> labelsOp) {
            boolean isAnd = false;
            if (labelsOp.isPresent() && labelsOp.get().equalsIgnoreCase("and")) {
                isAnd = true;
            }
            return this.labelCache.getJobIdsMatchingLabels(labels, isAnd);
        }
    }

    static final class JobInfo {
        final long submittedAt;
        public String version;
        volatile long initializeInitiatedAt = -1L;
        volatile long initializedAt = -1L;
        volatile long terminationInitiatedAt = -1L;
        volatile long terminatedAt = -1L;
        final JobId jobId;
        final ActorRef jobActor;
        volatile JobState state;
        final String user;
        final JobDefinition jobDefinition;

        JobInfo(JobId jobId, JobDefinition jobDefinition, long submittedAt, ActorRef jobActor, JobState state, String user, long initializeInitiatedAt, long initedAt) {
            this.submittedAt = submittedAt;
            this.jobActor = jobActor;
            this.jobId = jobId;
            this.state = state;
            this.user = user;
            this.jobDefinition = jobDefinition;
            this.initializeInitiatedAt = initializeInitiatedAt;
            this.initializedAt = initedAt;
        }

        public String toString() {
            return "JobInfo{submittedAt=" + this.submittedAt + ", initializeInitiatedAt=" + this.initializeInitiatedAt + ", initializedAt=" + this.initializedAt + ", terminationInitiatedAt=" + this.terminationInitiatedAt + ", terminatedAt=" + this.terminatedAt + ", jobId=" + this.jobId + ", jobActor=" + this.jobActor + ", state=" + (Object)((Object)this.state) + ", user='" + this.user + '\'' + ", jobDefinition=" + this.jobDefinition + '}';
        }

        void setInitializeInitiatedAt(long t) {
            this.initializeInitiatedAt = t;
        }

        void setInitializedAt(long t) {
            this.initializedAt = t;
        }

        void setState(JobState state) {
            this.state = state;
        }

        void setTerminationInitiatedAt(long terminationInitiatedAt) {
            this.terminationInitiatedAt = terminationInitiatedAt;
        }

        public void setTerminatedAt(long terminatedAt) {
            this.terminatedAt = terminatedAt;
        }

        JobInfo(JobId jobId, JobDefinition jobDefinition, long submittedAt, ActorRef jobActor, JobState state, String user) {
            this(jobId, jobDefinition, submittedAt, jobActor, state, user, -1L, -1L);
        }

        static class Builder {
            long submittedAt = -1L;
            long initializeInitiatedAt = -1L;
            long initializedAt = -1L;
            JobId jobId = null;
            ActorRef jobActor = null;
            JobState state = null;
            String user = "";
            JobDefinition jobDefinition = null;

            Builder() {
            }

            Builder withSubmittedAt(long submittedAt) {
                this.submittedAt = submittedAt;
                return this;
            }

            Builder withInitializeInitiatedAt(long t) {
                this.initializeInitiatedAt = t;
                return this;
            }

            Builder withInitializedAt(long t) {
                this.initializedAt = t;
                return this;
            }

            Builder withJobId(JobId jId) {
                this.jobId = jId;
                return this;
            }

            Builder withJobActor(ActorRef actor) {
                this.jobActor = actor;
                return this;
            }

            Builder withJobDefinition(JobDefinition jd) {
                this.jobDefinition = jd;
                return this;
            }

            Builder withUser(String user) {
                this.user = user;
                return this;
            }

            Builder withState(JobState state) {
                this.state = state;
                return this;
            }

            Builder usingJobMetadata(MantisJobMetadataImpl jobMeta, ActorRef actor) {
                this.jobId = jobMeta.getJobId();
                this.jobDefinition = jobMeta.getJobDefinition();
                this.submittedAt = jobMeta.getSubmittedAtInstant().toEpochMilli();
                this.state = jobMeta.getState();
                this.user = jobMeta.getUser();
                this.jobActor = actor;
                return this;
            }

            JobInfo build() {
                Preconditions.checkNotNull((Object)this.jobId, (String)"JobId cannot be null");
                Preconditions.checkNotNull((Object)this.jobDefinition, (String)"JobDefinition cannot be null");
                Preconditions.checkNotNull((Object)((Object)this.state), (String)"state cannot be null");
                Preconditions.checkNotNull((Object)this.jobActor, (String)"Job Actor cannot be null");
                return new JobInfo(this.jobId, this.jobDefinition, this.submittedAt, this.jobActor, this.state, this.user, this.initializeInitiatedAt, this.initializedAt);
            }
        }
    }
}

