/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.master.jobcluster.job;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import com.netflix.fenzo.ConstraintEvaluator;
import com.netflix.fenzo.VMTaskFitnessCalculator;
import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.Tag;
import io.mantisrx.common.WorkerPorts;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.master.akka.MantisActorSupervisorStrategy;
import io.mantisrx.master.events.LifecycleEventPublisher;
import io.mantisrx.master.events.LifecycleEventsProto;
import io.mantisrx.master.jobcluster.WorkerInfoListHolder;
import io.mantisrx.master.jobcluster.job.CostsCalculator;
import io.mantisrx.master.jobcluster.job.IMantisJobManager;
import io.mantisrx.master.jobcluster.job.IMantisJobMetadata;
import io.mantisrx.master.jobcluster.job.IMantisStageMetadata;
import io.mantisrx.master.jobcluster.job.IWorkerManager;
import io.mantisrx.master.jobcluster.job.JobHelper;
import io.mantisrx.master.jobcluster.job.JobState;
import io.mantisrx.master.jobcluster.job.MantisJobMetadataImpl;
import io.mantisrx.master.jobcluster.job.MantisStageMetadataImpl;
import io.mantisrx.master.jobcluster.job.WorkerResubmitRateLimiter;
import io.mantisrx.master.jobcluster.job.worker.IMantisWorkerMetadata;
import io.mantisrx.master.jobcluster.job.worker.JobWorker;
import io.mantisrx.master.jobcluster.job.worker.WorkerHeartbeat;
import io.mantisrx.master.jobcluster.job.worker.WorkerState;
import io.mantisrx.master.jobcluster.job.worker.WorkerStatus;
import io.mantisrx.master.jobcluster.job.worker.WorkerTerminate;
import io.mantisrx.master.jobcluster.proto.BaseResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import io.mantisrx.master.jobcluster.proto.JobClusterProto;
import io.mantisrx.master.jobcluster.proto.JobProto;
import io.mantisrx.runtime.JobConstraints;
import io.mantisrx.runtime.JobSla;
import io.mantisrx.runtime.MachineDefinition;
import io.mantisrx.runtime.MantisJobDurationType;
import io.mantisrx.runtime.MantisJobState;
import io.mantisrx.runtime.MigrationStrategy;
import io.mantisrx.runtime.WorkerMigrationConfig;
import io.mantisrx.runtime.descriptor.SchedulingInfo;
import io.mantisrx.runtime.descriptor.StageScalingPolicy;
import io.mantisrx.runtime.descriptor.StageSchedulingInfo;
import io.mantisrx.server.core.JobCompletedReason;
import io.mantisrx.server.core.JobSchedulingInfo;
import io.mantisrx.server.core.Status;
import io.mantisrx.server.core.WorkerAssignments;
import io.mantisrx.server.core.WorkerHost;
import io.mantisrx.server.core.domain.ArtifactID;
import io.mantisrx.server.core.domain.JobArtifact;
import io.mantisrx.server.core.domain.JobMetadata;
import io.mantisrx.server.core.domain.WorkerId;
import io.mantisrx.server.core.scheduler.SchedulingConstraints;
import io.mantisrx.server.master.ConstraintsEvaluators;
import io.mantisrx.server.master.agentdeploy.MigrationStrategyFactory;
import io.mantisrx.server.master.config.ConfigurationProvider;
import io.mantisrx.server.master.config.MasterConfiguration;
import io.mantisrx.server.master.domain.DataFormatAdapter;
import io.mantisrx.server.master.domain.IJobClusterDefinition;
import io.mantisrx.server.master.domain.JobDefinition;
import io.mantisrx.server.master.domain.JobId;
import io.mantisrx.server.master.persistence.MantisJobStore;
import io.mantisrx.server.master.persistence.exceptions.InvalidJobException;
import io.mantisrx.server.master.persistence.exceptions.InvalidWorkerStateChangeException;
import io.mantisrx.server.master.resourcecluster.ClusterID;
import io.mantisrx.server.master.scheduler.BatchScheduleRequest;
import io.mantisrx.server.master.scheduler.MantisScheduler;
import io.mantisrx.server.master.scheduler.ScheduleRequest;
import io.mantisrx.server.master.scheduler.WorkerEvent;
import io.mantisrx.server.master.scheduler.WorkerOnDisabledVM;
import io.mantisrx.server.master.scheduler.WorkerUnscheduleable;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import io.mantisrx.shaded.com.google.common.base.Preconditions;
import io.mantisrx.shaded.com.google.common.cache.Cache;
import io.mantisrx.shaded.com.google.common.cache.CacheBuilder;
import io.mantisrx.shaded.com.google.common.collect.Lists;
import java.io.IOException;
import java.net.URL;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.schedulers.Schedulers;
import rx.subjects.BehaviorSubject;

public class JobActor
extends AbstractActorWithTimers
implements IMantisJobManager {
    private static final String CHECK_HB_TIMER_KEY = "CHECK_HB";
    private static final String REFRESH_SEND_STAGE_ASSIGNEMNTS_KEY = "REFRESH_SEND_STAGE_ASSIGNMENTS";
    private static final Logger LOGGER = LoggerFactory.getLogger(JobActor.class);
    private static final double DEFAULT_JOB_MASTER_CORES = 1.0;
    private static final double DEFAULT_JOB_MASTER_MEM = 1024.0;
    private static final double DEFAULT_JOB_MASTER_NW = 128.0;
    private static final double DEFAULT_JOB_MASTER_DISK = 1024.0;
    private final Metrics metrics;
    private final MetricGroupId metricsGroupId;
    private final Counter numWorkerResubmissions;
    private final Counter numWorkerResubmitLimitReached;
    private final Counter numWorkerTerminated;
    private final Counter numScaleStage;
    private final Counter numWorkersCompletedNotTerminal;
    private final Counter numSchedulingChangesRefreshed;
    private final Counter numMissingWorkerPorts;
    private AbstractActor.Receive initializedBehavior;
    private AbstractActor.Receive activeBehavior;
    private AbstractActor.Receive terminatingBehavior;
    private AbstractActor.Receive terminatedBehavior;
    private final String clusterName;
    private final JobId jobId;
    private final IJobClusterDefinition jobClusterDefinition;
    private volatile MantisJobMetadataImpl mantisJobMetaData;
    private final MantisJobStore jobStore;
    private int workerWritesBatchSize = 10;
    private IWorkerManager workerManager = null;
    private final MantisScheduler mantisScheduler;
    private final LifecycleEventPublisher eventPublisher;
    private final CostsCalculator costsCalculator;
    private boolean hasJobMaster;
    private volatile boolean allWorkersCompleted = false;

    public static Props props(IJobClusterDefinition jobClusterDefinition, MantisJobMetadataImpl jobMetadata, MantisJobStore jobStore, MantisScheduler mantisScheduler, LifecycleEventPublisher eventPublisher, CostsCalculator costsCalculator) {
        return Props.create(JobActor.class, (Object[])new Object[]{jobClusterDefinition, jobMetadata, jobStore, mantisScheduler, eventPublisher, costsCalculator});
    }

    public JobActor(IJobClusterDefinition jobClusterDefinition, MantisJobMetadataImpl jobMetadata, MantisJobStore jobStore, MantisScheduler scheduler, LifecycleEventPublisher eventPublisher, CostsCalculator costsCalculator) {
        this.clusterName = jobMetadata.getClusterName();
        this.jobId = jobMetadata.getJobId();
        this.jobStore = jobStore;
        this.jobClusterDefinition = jobClusterDefinition;
        this.mantisScheduler = scheduler;
        this.eventPublisher = eventPublisher;
        this.mantisJobMetaData = jobMetadata;
        this.costsCalculator = costsCalculator;
        this.initializedBehavior = this.getInitializedBehavior();
        this.activeBehavior = this.getActiveBehavior();
        this.terminatingBehavior = this.getTerminatingBehavior();
        this.terminatedBehavior = this.getTerminatedBehavior();
        this.metricsGroupId = this.getMetricGroupId(this.jobId.getId(), this.getResourceCluster());
        Metrics m = new Metrics.Builder().id(this.metricsGroupId).addCounter("numWorkerResubmissions").addCounter("numWorkerResubmitLimitReached").addCounter("numWorkerTerminated").addCounter("numScaleStage").addCounter("numWorkersCompletedNotTerminal").addCounter("numSchedulingChangesRefreshed").addCounter("numMissingWorkerPorts").build();
        this.metrics = MetricsRegistry.getInstance().registerAndGet(m);
        this.numWorkerResubmissions = this.metrics.getCounter("numWorkerResubmissions");
        this.numWorkerResubmitLimitReached = this.metrics.getCounter("numWorkerResubmitLimitReached");
        this.numWorkerTerminated = this.metrics.getCounter("numWorkerTerminated");
        this.numScaleStage = this.metrics.getCounter("numScaleStage");
        this.numWorkersCompletedNotTerminal = this.metrics.getCounter("numWorkersCompletedNotTerminal");
        this.numSchedulingChangesRefreshed = this.metrics.getCounter("numSchedulingChangesRefreshed");
        this.numMissingWorkerPorts = this.metrics.getCounter("numMissingWorkerPorts");
    }

    MetricGroupId getMetricGroupId(String id, String resourceCluster) {
        return new MetricGroupId("JobActor", new Tag[]{new BasicTag("jobId", id), new BasicTag("resourceCluster", resourceCluster)});
    }

    void initialize(boolean isSubmit) throws Exception {
        LOGGER.info("Initializing Job {}", (Object)this.jobId);
        if (isSubmit) {
            this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, "Job request received", this.getJobId(), this.getJobState()));
            if (this.isAutoscaled(this.mantisJobMetaData.getSchedulingInfo())) {
                LOGGER.info("Job is autoscaled, setting up Job Master");
                this.setupJobMasterStage(this.mantisJobMetaData.getSchedulingInfo());
            }
            LOGGER.info("Storing job");
            this.jobStore.storeNewJob(this.mantisJobMetaData);
        }
        LOGGER.info("Stored mantis job");
        this.workerManager = new WorkerManager(this, this.jobClusterDefinition.getWorkerMigrationConfig(), this.mantisScheduler, isSubmit, ConfigurationProvider.getConfig().isBatchSchedulingEnabled());
        long checkAgainInSeconds = this.getWorkerTimeoutSecs();
        long refreshStageAssignementsDurationMs = ConfigurationProvider.getConfig().getStageAssignmentRefreshIntervalMs();
        this.getTimers().startPeriodicTimer((Object)CHECK_HB_TIMER_KEY, (Object)new JobProto.CheckHeartBeat(), Duration.ofSeconds(checkAgainInSeconds));
        if (refreshStageAssignementsDurationMs > 0L) {
            this.getTimers().startPeriodicTimer((Object)REFRESH_SEND_STAGE_ASSIGNEMNTS_KEY, (Object)new JobProto.SendWorkerAssignementsIfChanged(), Duration.ofMillis(refreshStageAssignementsDurationMs));
        }
        this.mantisJobMetaData.getJobDefinition().getJobSla().getRuntimeLimitSecs();
        LOGGER.info("Job {} initialized", (Object)this.jobId);
    }

    private long getWorkerTimeoutSecs() {
        if (this.mantisJobMetaData.getWorkerTimeoutSecs() > 0L) {
            return this.mantisJobMetaData.getWorkerTimeoutSecs();
        }
        return ConfigurationProvider.getConfig().getDefaultWorkerTimeoutSecs();
    }

    private void setupJobMasterStage(SchedulingInfo schedulingInfo) throws io.mantisrx.runtime.command.InvalidJobException {
        LOGGER.info("Job {} is autoscaled setting up Job Master", (Object)this.jobId);
        if (schedulingInfo.forStage(0) == null) {
            schedulingInfo.addJobMasterStage(StageSchedulingInfo.builder().numberOfInstances(1).machineDefinition(this.getJobMasterMachineDef()).build());
            this.mantisJobMetaData = new MantisJobMetadataImpl.Builder(this.mantisJobMetaData).withJobDefinition(new JobDefinition.Builder().from(this.mantisJobMetaData.getJobDefinition()).withSchedulingInfo(schedulingInfo).withNumberOfStages(schedulingInfo.getStages().size()).build()).build();
        }
        this.hasJobMaster = true;
    }

    private MachineDefinition getJobMasterMachineDef() {
        MasterConfiguration config = ConfigurationProvider.getConfig();
        if (config != null) {
            return new MachineDefinition(config.getJobMasterCores(), config.getJobMasterMemoryMB(), config.getJobMasterNetworkMbps(), config.getJobMasterDiskMB(), 1);
        }
        return new MachineDefinition(1.0, 1024.0, 128.0, 1024.0, 1);
    }

    public void preStart() throws Exception {
        LOGGER.info("Job Actor {}-{} started", (Object)this.clusterName, (Object)this.jobId);
    }

    public void postStop() throws Exception {
        LOGGER.info("Job Actor {} stopped invoking cleanup logic", (Object)this.jobId);
        if (this.metricsGroupId != null) {
            MetricsRegistry.getInstance().remove(this.metricsGroupId);
        }
    }

    public SupervisorStrategy supervisorStrategy() {
        return MantisActorSupervisorStrategy.getInstance().create();
    }

    public AbstractActor.Receive createReceive() {
        return this.getInitializingBehavior();
    }

    private String genUnexpectedMsg(String event, String cluster, String state) {
        return String.format("Unexpected message %s received by Job actor %s in %s State", event, cluster, state);
    }

    private AbstractActor.Receive getTerminatingBehavior() {
        String state = "terminating";
        return this.receiveBuilder().match(JobClusterManagerProto.GetJobDetailsRequest.class, this::onGetJobDetails).match(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorRequest.class, this::onGetJobDefinitionUpdatedFromJobActor).match(JobClusterManagerProto.ListWorkersRequest.class, this::onListActiveWorkers).match(WorkerEvent.class, x -> LOGGER.warn("Job {} is Terminating, ignoring worker Events {}", (Object)this.jobId.getId(), x)).match(JobProto.InitJob.class, x -> this.getSender().tell((Object)new JobProto.JobInitialized(x.requestId, BaseResponse.ResponseCode.SUCCESS, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state), this.jobId, x.requstor), this.getSelf())).match(JobClusterManagerProto.ResubmitWorkerRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.ResubmitWorkerResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state)), this.getSelf())).match(JobProto.CheckHeartBeat.class, x -> LOGGER.warn(this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state))).match(JobProto.RuntimeLimitReached.class, x -> LOGGER.warn(this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state))).match(JobClusterProto.KillJobRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.KillJobResponse(x.requestId, BaseResponse.ResponseCode.SUCCESS, JobState.Noop, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state), this.jobId, x.user), this.getSelf())).match(JobClusterManagerProto.ScaleStageRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.ScaleStageResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state), 0), this.getSelf())).match(JobClusterManagerProto.GetJobSchedInfoRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.GetJobSchedInfoResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state), Optional.empty()), this.getSelf())).match(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state), Optional.empty()), this.getSelf())).match(JobProto.SendWorkerAssignementsIfChanged.class, x -> LOGGER.warn(this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state))).match(JobClusterManagerProto.KillJobResponse.class, x -> LOGGER.info("Received Kill Job Response inTerminating State Ignoring")).matchAny(x -> LOGGER.warn(this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state))).build();
    }

    private AbstractActor.Receive getTerminatedBehavior() {
        String state = "terminated";
        return this.receiveBuilder().match(JobClusterManagerProto.GetJobDetailsRequest.class, this::onGetJobDetails).match(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorRequest.class, this::onGetJobDefinitionUpdatedFromJobActor).match(JobClusterManagerProto.ListWorkersRequest.class, this::onListActiveWorkers).match(JobProto.InitJob.class, x -> this.getSender().tell((Object)new JobProto.JobInitialized(x.requestId, BaseResponse.ResponseCode.SUCCESS, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state), this.jobId, x.requstor), this.getSelf())).match(JobClusterManagerProto.ResubmitWorkerRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.ResubmitWorkerResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state)), this.getSelf())).match(JobProto.CheckHeartBeat.class, x -> LOGGER.warn(this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state))).match(JobProto.MigrateDisabledVmWorkersRequest.class, x -> LOGGER.warn(this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state))).match(JobProto.RuntimeLimitReached.class, x -> LOGGER.warn(this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state))).match(JobClusterProto.KillJobRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.KillJobResponse(x.requestId, BaseResponse.ResponseCode.SUCCESS, JobState.Noop, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state), this.jobId, x.user), this.getSelf())).match(JobClusterManagerProto.ScaleStageRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.ScaleStageResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state), 0), this.getSelf())).match(JobClusterManagerProto.GetJobSchedInfoRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.GetJobSchedInfoResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state), Optional.empty()), this.getSelf())).match(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state), Optional.empty()), this.getSelf())).match(JobClusterManagerProto.KillJobResponse.class, x -> LOGGER.info("Received Kill Job Response inTerminating State Ignoring")).match(JobProto.SendWorkerAssignementsIfChanged.class, x -> LOGGER.warn(this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state))).match(WorkerEvent.class, x -> LOGGER.info("Received worker event  in Terminated State Ignoring")).matchAny(x -> LOGGER.warn(this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state))).build();
    }

    private AbstractActor.Receive getActiveBehavior() {
        String state = "active";
        return this.receiveBuilder().match(JobClusterManagerProto.GetJobDetailsRequest.class, this::onGetJobDetails).match(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorRequest.class, this::onGetJobDefinitionUpdatedFromJobActor).match(WorkerEvent.class, r -> this.processWorkerEvent((WorkerEvent)r)).match(JobClusterManagerProto.ResubmitWorkerRequest.class, this::onResubmitWorker).match(JobProto.CheckHeartBeat.class, this::onCheckHeartBeats).match(JobProto.MigrateDisabledVmWorkersRequest.class, this::onMigrateWorkers).match(JobProto.RuntimeLimitReached.class, this::onRuntimeLimitReached).match(JobClusterProto.KillJobRequest.class, this::onJobKill).match(JobClusterManagerProto.ScaleStageRequest.class, this::onScaleStage).match(JobClusterManagerProto.ListWorkersRequest.class, this::onListActiveWorkers).match(JobClusterManagerProto.GetJobSchedInfoRequest.class, this::onGetJobStatusSubject).match(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest.class, this::onGetLatestJobDiscoveryInfo).match(JobProto.SendWorkerAssignementsIfChanged.class, this::onSendWorkerAssignments).match(JobProto.InitJob.class, x -> this.getSender().tell((Object)new JobProto.JobInitialized(x.requestId, BaseResponse.ResponseCode.SUCCESS, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state), this.jobId, x.requstor), this.getSelf())).matchAny(x -> LOGGER.warn(this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state))).build();
    }

    private AbstractActor.Receive getInitializedBehavior() {
        String state = "initialized";
        return this.receiveBuilder().match(JobClusterManagerProto.GetJobDetailsRequest.class, this::onGetJobDetails).match(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorRequest.class, this::onGetJobDefinitionUpdatedFromJobActor).match(WorkerEvent.class, r -> this.processWorkerEvent((WorkerEvent)r)).match(JobProto.CheckHeartBeat.class, this::onCheckHeartBeats).match(JobProto.MigrateDisabledVmWorkersRequest.class, this::onMigrateWorkers).match(JobClusterProto.KillJobRequest.class, this::onJobKill).match(JobClusterManagerProto.ListWorkersRequest.class, this::onListActiveWorkers).match(JobClusterManagerProto.GetJobSchedInfoRequest.class, this::onGetJobStatusSubject).match(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest.class, this::onGetLatestJobDiscoveryInfo).match(JobProto.SendWorkerAssignementsIfChanged.class, this::onSendWorkerAssignments).match(JobClusterManagerProto.ResubmitWorkerRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.ResubmitWorkerResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state)), this.getSelf())).match(JobProto.RuntimeLimitReached.class, x -> LOGGER.warn(this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state))).match(JobClusterManagerProto.ScaleStageRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.ScaleStageResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state), 0), this.getSelf())).match(JobProto.InitJob.class, x -> this.getSender().tell((Object)new JobProto.JobInitialized(x.requestId, BaseResponse.ResponseCode.SUCCESS, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state), this.jobId, x.requstor), this.getSelf())).matchAny(x -> LOGGER.warn(this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state))).build();
    }

    private AbstractActor.Receive getInitializingBehavior() {
        String state = "initializing";
        return this.receiveBuilder().match(JobProto.InitJob.class, this::onJobInitialize).match(JobClusterManagerProto.GetJobDetailsRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.GetJobDetailsResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state), Optional.empty()), this.getSelf())).match(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorRequest.class, r -> this.getSender().tell((Object)new JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorResponse(r.requestId, BaseResponse.ResponseCode.SUCCESS, "", r.getUser(), r.getJobDefinition(), r.isAutoResubmit(), r.isQuickSubmit(), r.getOriginalSender()), this.getSelf())).match(WorkerEvent.class, x -> LOGGER.warn(this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state))).match(JobClusterManagerProto.ResubmitWorkerRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.ResubmitWorkerResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state)), this.getSelf())).match(JobProto.CheckHeartBeat.class, x -> LOGGER.warn(this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state))).match(JobProto.MigrateDisabledVmWorkersRequest.class, x -> LOGGER.warn(this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state))).match(JobProto.RuntimeLimitReached.class, x -> LOGGER.warn(this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state))).match(JobClusterProto.KillJobRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.KillJobResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, JobState.Noop, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state), this.jobId, x.user), this.getSelf())).match(JobClusterManagerProto.ScaleStageRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.ScaleStageResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state), 0), this.getSelf())).match(JobClusterManagerProto.ListWorkersRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.ListWorkersResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state), Lists.newArrayList()), this.getSelf())).match(JobClusterManagerProto.GetJobSchedInfoRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.GetJobSchedInfoResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state), Optional.empty()), this.getSelf())).match(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest.class, x -> this.getSender().tell((Object)new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(x.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state), Optional.empty()), this.getSelf())).matchAny(x -> LOGGER.warn(this.genUnexpectedMsg(x.toString(), this.jobId.getId(), state))).build();
    }

    @Override
    public void onJobInitialize(JobProto.InitJob i) {
        ActorRef sender = this.getSender();
        try {
            this.initialize(i.isSubmit);
            if (JobState.isRunningState(this.mantisJobMetaData.getState())) {
                this.getContext().become(this.activeBehavior);
                this.setRuntimeLimitTimersIfRequired(Instant.now());
            } else {
                this.getContext().become(this.initializedBehavior);
            }
            sender.tell((Object)new JobProto.JobInitialized(i.requestId, BaseResponse.ResponseCode.SUCCESS, String.format("Job %s initialized successfully", this.jobId), this.jobId, i.requstor), this.getSelf());
        }
        catch (Exception e) {
            LOGGER.error("Exception initializing job ", (Throwable)e);
            sender.tell((Object)new JobProto.JobInitialized(i.requestId, BaseResponse.ResponseCode.SERVER_ERROR, "" + e.getMessage(), this.jobId, i.requstor), this.getSelf());
        }
    }

    @Override
    public void onGetJobDetails(JobClusterManagerProto.GetJobDetailsRequest r) {
        ActorRef sender = this.getSender();
        sender.tell((Object)new JobClusterManagerProto.GetJobDetailsResponse(r.requestId, BaseResponse.ResponseCode.SUCCESS, "", Optional.of(this.getJobDetails())), this.getSelf());
    }

    public void onGetJobDefinitionUpdatedFromJobActor(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorRequest r) {
        ActorRef sender = this.getSender();
        sender.tell((Object)this.getIntermediateJobDefinition(r), this.getSelf());
    }

    @Override
    public void onGetJobStatusSubject(JobClusterManagerProto.GetJobSchedInfoRequest r) {
        ActorRef sender = this.getSender();
        if (r.getJobId().equals(this.jobId)) {
            sender.tell((Object)new JobClusterManagerProto.GetJobSchedInfoResponse(r.requestId, BaseResponse.ResponseCode.SUCCESS, "", Optional.of(this.workerManager.getJobStatusSubject())), this.getSelf());
        } else {
            String msg = "JobId in the request " + r.getJobId() + " does not match Job Actors job Id " + this.jobId;
            LOGGER.warn(msg);
            sender.tell((Object)new JobClusterManagerProto.GetJobSchedInfoResponse(r.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, msg, Optional.empty()), this.getSelf());
        }
    }

    @Override
    public void onGetLatestJobDiscoveryInfo(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest r) {
        ActorRef sender = this.getSender();
        if (r.getJobCluster().equals(this.jobId.getCluster())) {
            JobSchedulingInfo schedulingInfo = (JobSchedulingInfo)this.workerManager.getJobStatusSubject().getValue();
            if (schedulingInfo != null) {
                sender.tell((Object)new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(r.requestId, BaseResponse.ResponseCode.SUCCESS, "", Optional.ofNullable(schedulingInfo)), this.getSelf());
            } else {
                LOGGER.info("discoveryInfo from BehaviorSubject is null {}", (Object)this.jobId);
                sender.tell((Object)new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(r.requestId, BaseResponse.ResponseCode.SERVER_ERROR, "discoveryInfo from BehaviorSubject is null " + this.jobId, Optional.empty()), this.getSelf());
            }
        } else {
            String msg = "JobCluster in the request " + r.getJobCluster() + " does not match Job Actors job ID " + this.jobId;
            LOGGER.warn(msg);
            sender.tell((Object)new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(r.requestId, BaseResponse.ResponseCode.SERVER_ERROR, msg, Optional.empty()), this.getSelf());
        }
    }

    @Override
    public void processWorkerEvent(WorkerEvent e) {
        this.workerManager.processEvent(e, this.mantisJobMetaData.getState());
    }

    @Override
    public void onResubmitWorker(JobClusterManagerProto.ResubmitWorkerRequest r) {
        ActorRef sender = this.getSender();
        try {
            this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, r.getWorkerNum() + " workerNum resubmit requested by " + r.getUser() + " , reason: " + r.getReason(), this.getJobId(), this.getJobState()));
            this.workerManager.resubmitWorker(r.getWorkerNum());
            this.numWorkerResubmissions.increment();
            sender.tell((Object)new JobClusterManagerProto.ResubmitWorkerResponse(r.requestId, BaseResponse.ResponseCode.SUCCESS, String.format("Worker %d of job %s resubmitted", r.getWorkerNum(), r.getJobId())), this.getSelf());
        }
        catch (Exception e) {
            sender.tell((Object)new JobClusterManagerProto.ResubmitWorkerResponse(r.requestId, BaseResponse.ResponseCode.SERVER_ERROR, e.getMessage()), this.getSelf());
        }
    }

    @Override
    public void onMigrateWorkers(JobProto.MigrateDisabledVmWorkersRequest r) {
        this.workerManager.migrateDisabledVmWorkers(r.time);
    }

    @Override
    public void onCheckHeartBeats(JobProto.CheckHeartBeat r) {
        this.workerManager.checkHeartBeats(r.getTime());
    }

    @Override
    public void onRuntimeLimitReached(JobProto.RuntimeLimitReached r) {
        LOGGER.info("In onRuntimeLimitReached {} for Job {} ", (Object)Instant.now(), (Object)this.jobId);
        LOGGER.info("Job {} Started at {} and killed at {} due to Runtime limit reached", new Object[]{this.jobId, this.mantisJobMetaData.getStartedAtInstant().orElse(Instant.now()), Instant.now()});
        this.getContext().getParent().tell((Object)new JobClusterProto.KillJobRequest(this.jobId, "runtime limit reached", JobCompletedReason.Killed, "MantisMaster", ActorRef.noSender()), this.getSelf());
    }

    @Override
    public void onSendWorkerAssignments(JobProto.SendWorkerAssignementsIfChanged r) {
        this.workerManager.refreshAndSendWorkerAssignments();
    }

    @Override
    public void onJobKill(JobClusterProto.KillJobRequest req) {
        ActorRef sender = this.getSender();
        LOGGER.info("Shutting down job {} on request by {}", (Object)this.jobId, (Object)sender);
        try {
            this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, "Killing job, reason: " + req.reason, this.getJobId(), this.getJobState()));
            JobState newState = req.jobCompletedReason.equals((Object)JobCompletedReason.Error) || req.jobCompletedReason.equals((Object)JobCompletedReason.Lost) ? JobState.Failed : JobState.Completed;
            this.updateStateAndPersist(newState);
            sender.tell((Object)new JobClusterProto.KillJobResponse(req.requestId, BaseResponse.ResponseCode.SUCCESS, this.getJobState(), this.getJobId() + " terminated", this.getJobId(), this.mantisJobMetaData, req.user, req.requestor), this.getSelf());
            this.getTimers().cancel((Object)CHECK_HB_TIMER_KEY);
            this.getContext().become(this.terminatingBehavior);
            this.shutdown(newState, req.reason);
            this.performFinalShutdown();
        }
        catch (Exception e) {
            LOGGER.error("Failed to kill job {}", (Object)this.jobId, (Object)e);
            sender.tell((Object)new JobClusterProto.KillJobResponse(req.requestId, BaseResponse.ResponseCode.SERVER_ERROR, this.getJobState(), this.getJobId() + " Could not be terminated due to " + e.getMessage(), this.getJobId(), this.mantisJobMetaData, req.user, req.requestor), this.getSelf());
        }
    }

    @Override
    public void onScaleStage(JobClusterManagerProto.ScaleStageRequest scaleStage) {
        LOGGER.info("In Scale stage {} for Job {}", (Object)scaleStage, (Object)this.jobId);
        ActorRef sender = this.getSender();
        Optional<IMantisStageMetadata> stageMeta = this.mantisJobMetaData.getStageMetadata(scaleStage.getStageNum());
        if (!stageMeta.isPresent()) {
            LOGGER.warn("Stage {} does not exist in Job {}", (Object)scaleStage.getStageNum(), (Object)this.jobId);
            sender.tell((Object)new JobClusterManagerProto.ScaleStageResponse(scaleStage.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, "Non existent stage " + scaleStage.getStageNum(), 0), this.getSelf());
            return;
        }
        MantisStageMetadataImpl stageMetaData = (MantisStageMetadataImpl)stageMeta.get();
        if (!stageMetaData.getScalable()) {
            LOGGER.warn("Stage {} is not scalable in Job {}", (Object)scaleStage.getStageNum(), (Object)this.jobId);
            this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.WARN, "Can't change #workers to " + scaleStage.getNumWorkers() + ", stage " + scaleStage.getStageNum() + " is not scalable", this.getJobId(), this.getJobState()));
            sender.tell((Object)new JobClusterManagerProto.ScaleStageResponse(scaleStage.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, "Stage " + scaleStage.getStageNum() + " is not scalable", 0), this.getSelf());
            return;
        }
        try {
            int actualScaleup = this.workerManager.scaleStage(stageMetaData, scaleStage.getNumWorkers(), scaleStage.getReason());
            LOGGER.info("Scaled stage {} to {} workers for Job {}", new Object[]{scaleStage.getStageNum(), actualScaleup, this.jobId});
            this.numScaleStage.increment();
            sender.tell((Object)new JobClusterManagerProto.ScaleStageResponse(scaleStage.requestId, BaseResponse.ResponseCode.SUCCESS, String.format("Scaled stage %d to %d workers", scaleStage.getStageNum(), actualScaleup), actualScaleup), this.getSelf());
        }
        catch (Exception e) {
            String msg = String.format("Stage %d scale failed due to %s", scaleStage.getStageNum(), e.getMessage());
            LOGGER.error(msg, (Throwable)e);
            sender.tell((Object)new JobClusterManagerProto.ScaleStageResponse(scaleStage.requestId, BaseResponse.ResponseCode.SERVER_ERROR, msg, 0), this.getSelf());
        }
    }

    @Override
    public void onListActiveWorkers(JobClusterManagerProto.ListWorkersRequest listWorkersRequest) {
        ActorRef sender = this.getSender();
        List<IMantisWorkerMetadata> activeWorkers = this.workerManager.getActiveWorkers(listWorkersRequest.getLimit());
        sender.tell((Object)new JobClusterManagerProto.ListWorkersResponse(listWorkersRequest.requestId, BaseResponse.ResponseCode.SUCCESS, "", Collections.unmodifiableList(activeWorkers)), this.getSelf());
    }

    private void performFinalShutdown() {
        try {
            LOGGER.info("Archiving Job {}", (Object)this.jobId);
            this.jobStore.archiveJob(this.mantisJobMetaData);
        }
        catch (IOException e) {
            LOGGER.warn("Exception archiving job " + this.mantisJobMetaData.getJobId(), (Throwable)e);
        }
        this.getContext().become(this.terminatedBehavior);
        this.getSelf().tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
    }

    @Override
    public void onAllWorkersCompleted() {
        LOGGER.info("JobActor: onAllWorkersCompleted with current state {}", (Object)this.mantisJobMetaData.getState());
        if (!JobState.isTerminalState(this.mantisJobMetaData.getState()) && !this.allWorkersCompleted) {
            LOGGER.info("All workers completed but job {} in {} state. Request termination", (Object)this.jobId, (Object)this.getJobState());
            this.allWorkersCompleted = true;
            this.getContext().parent().tell((Object)new JobClusterProto.KillJobRequest(this.jobId, "Job Completed", JobCompletedReason.Normal, "MantisMaster", ActorRef.noSender()), this.getSelf());
            this.numWorkersCompletedNotTerminal.increment();
        } else {
            LOGGER.debug("Job {} Kill already requested", (Object)this.jobId);
        }
    }

    @Override
    public boolean onAllWorkersStarted() {
        LOGGER.info("In onAllWorkersStarted for Job {}", (Object)this.jobId);
        boolean isSuccess = true;
        if (this.mantisJobMetaData.getState() == JobState.Accepted) {
            try {
                this.updateStateAndPersist(JobState.Launched);
                this.getContext().become(this.activeBehavior);
                this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, "all workers started, job transitioning to Active", this.getJobId(), this.getJobState()));
                this.getContext().getParent().tell((Object)new JobClusterProto.JobStartedEvent(this.getJobId()), this.getSelf());
                Instant currentTime = Instant.now();
                this.mantisJobMetaData.setStartedAt(currentTime.toEpochMilli(), this.jobStore);
                this.setRuntimeLimitTimersIfRequired(currentTime);
            }
            catch (Exception e) {
                LOGGER.error("Error processing all worker started event ", (Throwable)e);
                isSuccess = false;
            }
        } else if (this.mantisJobMetaData.getState() == JobState.Launched) {
            LOGGER.info("Job is already in launched state");
            isSuccess = false;
        } else {
            LOGGER.warn("Unexpected all Workers Started Event while job in {} state", (Object)this.mantisJobMetaData.getState());
            isSuccess = false;
        }
        return isSuccess;
    }

    @Override
    public boolean onTooManyWorkerResubmits() {
        LOGGER.warn("Too many worker resubmits detected for Job {}. Requesting job shutdown", (Object)this.jobId);
        boolean isSuccess = true;
        this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.ERROR, "Worker Resubmit limit reached, shutting down job", this.getJobId(), this.getJobState()));
        this.numWorkerResubmitLimitReached.increment();
        this.getContext().parent().tell((Object)new JobClusterProto.KillJobRequest(this.jobId, "Too many worker resubmits", JobCompletedReason.Error, "MantisMaster", ActorRef.noSender()), this.getSelf());
        return isSuccess;
    }

    @Override
    public IMantisJobMetadata getJobDetails() {
        return this.mantisJobMetaData;
    }

    public JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorResponse getIntermediateJobDefinition(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorRequest r) {
        JobDefinition givenJobDefn = r.getJobDefinition();
        boolean forceInheritance = r.isQuickSubmit();
        MantisJobMetadataImpl lastJobMeta = this.mantisJobMetaData;
        JobDefinition.Builder jobDefnBuilder = new JobDefinition.Builder().fromWithInstanceCountInheritance(givenJobDefn, forceInheritance, stageId -> lastJobMeta.getStageMetadata((int)stageId).map(IMantisStageMetadata::getNumWorkers));
        try {
            JobDefinition mergedJobDefn = jobDefnBuilder.build();
            return new JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorResponse(r.requestId, BaseResponse.ResponseCode.SUCCESS, "", r.getUser(), mergedJobDefn, r.isAutoResubmit(), r.isQuickSubmit(), r.getOriginalSender());
        }
        catch (io.mantisrx.runtime.command.InvalidJobException ije) {
            LOGGER.error("Failed to build job definition with inheritance:", (Throwable)ije);
            return new JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorResponse(r.requestId, BaseResponse.ResponseCode.SERVER_ERROR, ije.getMessage(), r.getUser(), null, r.isAutoResubmit(), r.isQuickSubmit(), r.getOriginalSender());
        }
    }

    @Override
    public void shutdown(JobState state, String reason) {
        LOGGER.info("Entering JobActor:shutdown {}", (Object)this.jobId);
        this.workerManager.shutdown();
        this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, "job shutdown, reason: " + reason, this.getJobId(), state));
        this.eventPublisher.publishAuditEvent(new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_TERMINATE, this.jobId.getId(), "job shutdown, reason: " + reason));
    }

    @Override
    public JobId getJobId() {
        return this.jobId;
    }

    private void updateStateAndPersist(JobState newState) throws Exception {
        this.mantisJobMetaData.setJobState(newState, this.jobStore);
    }

    private void setRuntimeLimitTimersIfRequired(Instant currentTime) {
        long maxRuntimeSecs = this.mantisJobMetaData.getJobDefinition().getJobSla().getRuntimeLimitSecs();
        Instant startedAt = this.mantisJobMetaData.getStartedAtInstant().orElse(currentTime);
        if (maxRuntimeSecs > 0L) {
            long terminateJobInSecs = JobHelper.calculateRuntimeDuration(maxRuntimeSecs, startedAt);
            LOGGER.info("Will terminate Job {} at {} ", (Object)this.jobId, (Object)currentTime.plusSeconds(terminateJobInSecs));
            this.getTimers().startSingleTimer((Object)"RUNTIME_LIMIT", (Object)new JobProto.RuntimeLimitReached(), Duration.ofSeconds(terminateJobInSecs));
        } else {
            LOGGER.info("maxRuntime for Job {} is  {} ignore ", (Object)this.jobId, (Object)this.mantisJobMetaData.getJobDefinition().getJobSla().getRuntimeLimitSecs());
        }
    }

    @Override
    public JobState getJobState() {
        return this.mantisJobMetaData.getState();
    }

    private boolean isAutoscaled(SchedulingInfo schedulingInfo) {
        for (Map.Entry entry : schedulingInfo.getStages().entrySet()) {
            StageScalingPolicy scalingPolicy = ((StageSchedulingInfo)entry.getValue()).getScalingPolicy();
            if (scalingPolicy == null || !scalingPolicy.isEnabled()) continue;
            LOGGER.info("Job {} is autoscaleable", (Object)this.jobId);
            return true;
        }
        LOGGER.info("Job {} is NOT scaleable", (Object)this.jobId);
        return false;
    }

    static long getSubscriptionTimeoutSecs(IMantisJobMetadata mjmd) {
        if (mjmd.getJobDefinition().getJobSla().getDurationType() == MantisJobDurationType.Perpetual) {
            return 0L;
        }
        return mjmd.getSubscriptionTimeoutSecs() == 0L ? ConfigurationProvider.getConfig().getEphemeralJobUnsubscribedTimeoutSecs() : mjmd.getSubscriptionTimeoutSecs();
    }

    static long getHeartbeatIntervalSecs(IMantisJobMetadata mjmd) {
        if (mjmd.getHeartbeatIntervalSecs() > 0L) {
            return mjmd.getHeartbeatIntervalSecs();
        }
        return ConfigurationProvider.getConfig().getDefaultWorkerHeartbeatIntervalSecs();
    }

    private String getResourceCluster() {
        return this.mantisJobMetaData.getJobDefinition().getResourceCluster().map(ClusterID::getResourceID).orElse("mesos");
    }

    class WorkerManager
    implements IWorkerManager {
        private static final int WORKER_RESUBMIT_LIMIT = 100;
        private ObjectMapper mapper = new ObjectMapper();
        private final WorkerNumberGenerator workerNumberGenerator;
        private boolean allWorkersStarted = false;
        private final IMantisJobManager jobMgr;
        private ConcurrentSkipListSet<Integer> workersToMigrate = new ConcurrentSkipListSet();
        private int sinkStageNum;
        private final MigrationStrategy migrationStrategy;
        private final MantisScheduler scheduler;
        private long lastWorkerMigrationTimestamp = Long.MIN_VALUE;
        private Map<Integer, WorkerAssignments> stageAssignments = new HashMap<Integer, WorkerAssignments>();
        private BehaviorSubject<JobSchedulingInfo> jobSchedulingInfoBehaviorSubject;
        private String currentJobSchedulingInfoStr = null;
        private final WorkerResubmitRateLimiter resubmitRateLimiter = new WorkerResubmitRateLimiter();
        private Cache<Integer, Boolean> recentErrorWorkersCache = CacheBuilder.newBuilder().expireAfterWrite(1L, TimeUnit.HOURS).build();
        private volatile boolean stageAssignmentPotentiallyChanged;
        private final boolean batchSchedulingEnabled;

        WorkerManager(IMantisJobManager jobMgr, WorkerMigrationConfig migrationConfig, MantisScheduler scheduler, boolean isSubmit, boolean batchSchedulingEnabled) throws Exception {
            this.workerNumberGenerator = new WorkerNumberGenerator(isSubmit ? 0 : jobMgr.getJobDetails().getNextWorkerNumberToUse(), 10);
            this.scheduler = scheduler;
            this.jobMgr = jobMgr;
            this.batchSchedulingEnabled = batchSchedulingEnabled;
            this.migrationStrategy = MigrationStrategyFactory.getStrategy(JobActor.this.jobId.getId(), migrationConfig);
            int noOfStages = JobActor.this.mantisJobMetaData.getStageMetadata().size();
            this.sinkStageNum = noOfStages == 1 ? 1 : noOfStages - 1;
            JobSchedulingInfo initialJS = new JobSchedulingInfo(jobMgr.getJobId().getId(), new HashMap());
            this.currentJobSchedulingInfoStr = this.mapper.writeValueAsString((Object)initialJS);
            this.jobSchedulingInfoBehaviorSubject = BehaviorSubject.create((Object)initialJS);
            this.initialize(isSubmit);
        }

        void initialize(boolean isSubmit) throws Exception {
            if (isSubmit) {
                this.submitInitialWorkers();
            } else {
                this.initializeRunningWorkers();
            }
            JobActor.this.mantisJobMetaData.setJobCosts(JobActor.this.costsCalculator.calculateCosts(JobActor.this.mantisJobMetaData));
        }

        private void initializeRunningWorkers() {
            List<JobWorker> workersToResubmit = this.markCorruptedWorkers();
            ArrayList<IMantisWorkerMetadata> workersToSubmit = new ArrayList<IMantisWorkerMetadata>();
            this.markStageAssignmentsChanged(true);
            for (IMantisStageMetadata iMantisStageMetadata : JobActor.this.mantisJobMetaData.getStageMetadata().values()) {
                HashMap<Integer, WorkerHost> workerHosts = new HashMap<Integer, WorkerHost>();
                for (JobWorker worker : iMantisStageMetadata.getAllWorkers()) {
                    IMantisWorkerMetadata wm = worker.getMetadata();
                    if (WorkerState.isRunningState(wm.getState())) {
                        try {
                            WorkerHeartbeat fakeHB = new WorkerHeartbeat(new Status(JobActor.this.jobId.getId(), iMantisStageMetadata.getStageNum(), wm.getWorkerIndex(), wm.getWorkerNumber(), Status.TYPE.HEARTBEAT, "", MantisJobState.Started, System.currentTimeMillis()));
                            worker.processEvent(fakeHB, JobActor.this.jobStore);
                        }
                        catch (InvalidWorkerStateChangeException | IOException e) {
                            LOGGER.error("problem sending initial heartbeat for Job {} during initialization", (Object)worker.getMetadata().getJobId(), (Object)e);
                        }
                        workerHosts.put(wm.getWorkerNumber(), new WorkerHost(wm.getSlave(), wm.getWorkerIndex(), wm.getWorkerPorts().getPorts(), DataFormatAdapter.convertWorkerStateToMantisJobState(wm.getState()), wm.getWorkerNumber(), wm.getMetricsPort(), wm.getCustomPort()));
                        ScheduleRequest scheduleRequest = this.createSchedulingRequest(wm, Optional.empty());
                        this.scheduler.initializeRunningWorker(scheduleRequest, wm.getSlave(), wm.getSlaveID());
                        continue;
                    }
                    if (!wm.getState().equals((Object)WorkerState.Accepted)) continue;
                    if (this.batchSchedulingEnabled && JobState.isAcceptedState(JobActor.this.mantisJobMetaData.getState())) {
                        workersToSubmit.add(wm);
                        continue;
                    }
                    this.queueTask(wm);
                }
                if (iMantisStageMetadata.getStageNum() <= 0) continue;
                this.stageAssignments.put(iMantisStageMetadata.getStageNum(), new WorkerAssignments(Integer.valueOf(iMantisStageMetadata.getStageNum()), Integer.valueOf(iMantisStageMetadata.getNumWorkers()), workerHosts));
            }
            if (JobState.isAcceptedState(JobActor.this.mantisJobMetaData.getState()) && !workersToSubmit.isEmpty()) {
                this.queueTasks(workersToSubmit, Optional.empty());
            }
            this.markStageAssignmentsChanged(true);
            for (JobWorker jobWorker : workersToResubmit) {
                LOGGER.warn("discovered workers with missing ports during initialization: {}", (Object)jobWorker);
                try {
                    this.resubmitWorker(jobWorker);
                }
                catch (Exception e) {
                    LOGGER.warn("Exception resubmitting worker {} during initializeRunningWorkers due to {}", new Object[]{jobWorker, e.getMessage(), e});
                }
            }
        }

        private List<JobWorker> markCorruptedWorkers() {
            ArrayList<JobWorker> corruptedWorkers = new ArrayList<JobWorker>();
            for (IMantisStageMetadata iMantisStageMetadata : JobActor.this.mantisJobMetaData.getStageMetadata().values()) {
                for (JobWorker worker : iMantisStageMetadata.getAllWorkers()) {
                    IMantisWorkerMetadata wm = worker.getMetadata();
                    Optional<WorkerPorts> workerPortsOptional = wm.getPorts();
                    if (!WorkerState.isRunningState(wm.getState()) || workerPortsOptional.isPresent()) continue;
                    LOGGER.info("marking corrupted worker {} for Job ID {} as {}", new Object[]{worker.getMetadata().getWorkerId(), JobActor.this.jobId, WorkerState.Failed});
                    JobActor.this.numMissingWorkerPorts.increment();
                    corruptedWorkers.add(worker);
                    try {
                        WorkerStatus status = new WorkerStatus(new Status(JobActor.this.jobId.getId(), iMantisStageMetadata.getStageNum(), wm.getWorkerIndex(), wm.getWorkerNumber(), Status.TYPE.HEARTBEAT, "", MantisJobState.Failed, System.currentTimeMillis()));
                        worker.processEvent(status, JobActor.this.jobStore);
                    }
                    catch (InvalidWorkerStateChangeException | IOException e) {
                        LOGGER.error("problem sending initial heartbeat for Job {} during initialization", (Object)worker.getMetadata().getJobId(), (Object)e);
                    }
                }
            }
            return corruptedWorkers;
        }

        private void markStageAssignmentsChanged(boolean forceRefresh) {
            this.stageAssignmentPotentiallyChanged = true;
            long refreshInterval = ConfigurationProvider.getConfig().getStageAssignmentRefreshIntervalMs();
            if (refreshInterval == -1L || forceRefresh) {
                this.refreshStageAssignmentsAndPush();
            }
        }

        private void refreshStageAssignmentsAndPush() {
            if (!this.stageAssignmentPotentiallyChanged) {
                return;
            }
            ArrayList<IMantisWorkerMetadata> acceptedAndActiveWorkers = new ArrayList<IMantisWorkerMetadata>();
            ArrayList<IMantisWorkerMetadata> activeWorkers = new ArrayList<IMantisWorkerMetadata>();
            for (IMantisStageMetadata iMantisStageMetadata : JobActor.this.mantisJobMetaData.getStageMetadata().values()) {
                HashMap<Integer, WorkerHost> workerHosts = new HashMap<Integer, WorkerHost>();
                for (JobWorker worker : iMantisStageMetadata.getAllWorkers()) {
                    IMantisWorkerMetadata wm = worker.getMetadata();
                    if (WorkerState.isRunningState(wm.getState())) {
                        workerHosts.put(wm.getWorkerNumber(), new WorkerHost(wm.getSlave(), wm.getWorkerIndex(), wm.getWorkerPorts().getPorts(), DataFormatAdapter.convertWorkerStateToMantisJobState(wm.getState()), wm.getWorkerNumber(), wm.getMetricsPort(), wm.getCustomPort()));
                        activeWorkers.add(wm);
                        acceptedAndActiveWorkers.add(wm);
                        continue;
                    }
                    if (!wm.getState().equals((Object)WorkerState.Accepted)) continue;
                    acceptedAndActiveWorkers.add(wm);
                }
                this.stageAssignments.put(iMantisStageMetadata.getStageNum(), new WorkerAssignments(Integer.valueOf(iMantisStageMetadata.getStageNum()), Integer.valueOf(iMantisStageMetadata.getNumWorkers()), workerHosts));
            }
            JobSchedulingInfo jobSchedulingInfo = new JobSchedulingInfo(JobActor.this.jobId.getId(), this.stageAssignments);
            this.jobSchedulingInfoBehaviorSubject.onNext((Object)jobSchedulingInfo);
            JobActor.this.eventPublisher.publishWorkerListChangedEvent(new LifecycleEventsProto.WorkerListChangedEvent(new WorkerInfoListHolder(this.jobMgr.getJobId(), acceptedAndActiveWorkers)));
            JobActor.this.numSchedulingChangesRefreshed.increment();
            this.stageAssignmentPotentiallyChanged = false;
        }

        private void submitInitialWorkers() throws Exception {
            List<IMantisWorkerMetadata> workers = this.getInitialWorkers(JobActor.this.mantisJobMetaData.getJobDefinition(), System.currentTimeMillis());
            try {
                JobActor.this.jobStore.storeNewWorkers(this.jobMgr.getJobDetails(), workers);
                LOGGER.info("Stored workers {} for Job {}", workers, (Object)JobActor.this.jobId);
                this.markStageAssignmentsChanged(true);
                if (!workers.isEmpty()) {
                    if (this.batchSchedulingEnabled) {
                        this.queueTasks(workers, Optional.empty());
                    } else {
                        workers.forEach(this::queueTask);
                    }
                }
            }
            catch (Exception e) {
                LOGGER.error("Error {} storing workers of job {}", new Object[]{e.getMessage(), JobActor.this.jobId.getId(), e});
                throw new RuntimeException("Exception saving worker for Job " + JobActor.this.jobId, e);
            }
        }

        private void queueTasks(List<IMantisWorkerMetadata> workerRequests, Optional<Long> readyAt) {
            List<ScheduleRequest> scheduleRequests = workerRequests.stream().map(wR -> this.createSchedulingRequest((IMantisWorkerMetadata)wR, readyAt)).collect(Collectors.toList());
            LOGGER.info("Queueing up batch schedule request for {} workers", (Object)workerRequests.size());
            try {
                this.scheduler.scheduleWorkers(new BatchScheduleRequest(scheduleRequests));
            }
            catch (Exception e) {
                LOGGER.error("Exception queueing tasks", (Throwable)e);
            }
        }

        private void queueTask(IMantisWorkerMetadata workerRequest) {
            this.queueTasks(Collections.singletonList(workerRequest), Optional.empty());
        }

        private ScheduleRequest createSchedulingRequest(IMantisWorkerMetadata workerRequest, Optional<Long> readyAt) {
            try {
                WorkerId workerId = workerRequest.getWorkerId();
                ArrayList<ConstraintEvaluator> hardConstraints = new ArrayList<ConstraintEvaluator>();
                ArrayList<VMTaskFitnessCalculator> softConstraints = new ArrayList<VMTaskFitnessCalculator>();
                Optional<IMantisStageMetadata> stageMetadataOp = JobActor.this.mantisJobMetaData.getStageMetadata(workerRequest.getStageNum());
                if (!stageMetadataOp.isPresent()) {
                    throw new RuntimeException(String.format("No such stage %d", workerRequest.getStageNum()));
                }
                IMantisStageMetadata stageMetadata = stageMetadataOp.get();
                List<JobConstraints> stageHC = stageMetadata.getHardConstraints();
                List<JobConstraints> stageSC = stageMetadata.getSoftConstraints();
                HashSet<String> coTasks = new HashSet<String>();
                if (stageHC != null && !stageHC.isEmpty() || stageSC != null && !stageSC.isEmpty()) {
                    for (JobWorker jobWorker : stageMetadata.getAllWorkers()) {
                        if (jobWorker.getMetadata().getWorkerNumber() == workerId.getWorkerNum()) continue;
                        coTasks.add(workerId.getId());
                    }
                }
                if (stageHC != null && !stageHC.isEmpty()) {
                    for (JobConstraints c : stageHC) {
                        hardConstraints.add(ConstraintsEvaluators.hardConstraint(c, coTasks));
                    }
                }
                if (stageSC != null && !stageSC.isEmpty()) {
                    for (JobConstraints c : stageSC) {
                        softConstraints.add(ConstraintsEvaluators.softConstraint(c, coTasks));
                    }
                }
                JobMetadata jobMetadata = new JobMetadata(JobActor.this.mantisJobMetaData.getJobId().getId(), JobActor.this.mantisJobMetaData.getJobJarUrl(), JobActor.this.mantisJobMetaData.getTotalStages(), JobActor.this.mantisJobMetaData.getUser(), JobActor.this.mantisJobMetaData.getSchedulingInfo(), JobActor.this.mantisJobMetaData.getParameters(), JobActor.getSubscriptionTimeoutSecs(JobActor.this.mantisJobMetaData), JobActor.getHeartbeatIntervalSecs(JobActor.this.mantisJobMetaData), JobActor.this.mantisJobMetaData.getMinRuntimeSecs());
                ScheduleRequest sr = new ScheduleRequest(workerId, workerRequest.getStageNum(), workerRequest.getNumberOfPorts(), jobMetadata, JobActor.this.mantisJobMetaData.getSla().orElse(new JobSla.Builder().build()).getDurationType(), SchedulingConstraints.of((MachineDefinition)stageMetadata.getMachineDefinition(), stageMetadata.getSizeAttribute(), this.mergeJobDefAndArtifactAssigmentAttributes(jobMetadata.getJobJarUrl())), hardConstraints, softConstraints, readyAt.orElse(0L), workerRequest.getPreferredClusterOptional());
                return sr;
            }
            catch (Exception e) {
                LOGGER.error("Exception creating scheduleRequest ", (Throwable)e);
                throw e;
            }
        }

        private Map<String, String> mergeJobDefAndArtifactAssigmentAttributes(URL artifactUrl) {
            try {
                JobArtifact artifact;
                Optional<String> artifactName = DataFormatAdapter.extractArtifactBaseName(artifactUrl);
                if (artifactName.isPresent() && (artifact = JobActor.this.jobStore.getJobArtifact(ArtifactID.of((String)artifactName.get()))) != null && artifact.getTags() != null) {
                    HashMap<String, String> mergedMap = new HashMap<String, String>(artifact.getTags());
                    mergedMap.putAll(JobActor.this.mantisJobMetaData.getJobDefinition().getSchedulingConstraints());
                    return mergedMap;
                }
            }
            catch (Exception e) {
                LOGGER.warn("Couldn't find job artifact by id: {}", (Object)artifactUrl, (Object)e);
            }
            return JobActor.this.mantisJobMetaData.getJobDefinition().getSchedulingConstraints();
        }

        private List<IMantisWorkerMetadata> getInitialWorkers(JobDefinition jobDetails, long submittedAt) throws Exception {
            LinkedList workerRequests = Lists.newLinkedList();
            SchedulingInfo schedulingInfo = jobDetails.getSchedulingInfo();
            int totalStages = schedulingInfo.getStages().size();
            Iterator it = schedulingInfo.getStages().keySet().iterator();
            while (it.hasNext()) {
                int stageNum = (Integer)it.next();
                List<IMantisWorkerMetadata> stageWorkers = this.setupStageWorkers(schedulingInfo, totalStages, stageNum, submittedAt);
                workerRequests.addAll(stageWorkers);
            }
            return workerRequests;
        }

        private List<IMantisWorkerMetadata> setupStageWorkers(SchedulingInfo schedulingInfo, int totalStages, int stageNum, long submittedAt) throws Exception {
            LinkedList<IMantisWorkerMetadata> workerRequests = new LinkedList<IMantisWorkerMetadata>();
            StageSchedulingInfo stage = (StageSchedulingInfo)schedulingInfo.getStages().get(stageNum);
            if (stage == null) {
                LOGGER.error("StageSchedulingInfo cannot be null for Stage {}", (Object)stageNum);
                throw new Exception("StageSchedulingInfo cannot be null for Stage " + stageNum);
            }
            int numInstancesAtStage = stage.getNumberOfInstances();
            int stageIndex = 0;
            for (int i = 0; i < numInstancesAtStage; ++i) {
                int workerIndex = stageIndex++;
                if (!JobActor.this.mantisJobMetaData.getStageMetadata(stageNum).isPresent()) {
                    IMantisStageMetadata msmd = new MantisStageMetadataImpl.Builder().withJobId(JobActor.this.jobId).withStageNum(stageNum).withNumStages(totalStages).withMachineDefinition(stage.getMachineDefinition()).withNumWorkers(numInstancesAtStage).withHardConstraints(stage.getHardConstraints()).withSoftConstraints(stage.getSoftConstraints()).withScalingPolicy(stage.getScalingPolicy()).withSizeAttribute(Optional.ofNullable(stage.getContainerAttributes()).map(attrs -> (String)attrs.get("_mantis.stageContainerSizeName")).orElse(null)).isScalable(stage.getScalable()).build();
                    JobActor.this.mantisJobMetaData.addJobStageIfAbsent(msmd);
                    JobActor.this.jobStore.updateStage(msmd);
                }
                IMantisWorkerMetadata mwmd = this.addWorker(schedulingInfo, stageNum, workerIndex);
                workerRequests.add(mwmd);
            }
            return workerRequests;
        }

        private IMantisWorkerMetadata addWorker(SchedulingInfo schedulingInfo, int stageNo, int workerIndex) throws InvalidJobException {
            StageSchedulingInfo stageSchedInfo = (StageSchedulingInfo)schedulingInfo.getStages().get(stageNo);
            int workerNumber = this.workerNumberGenerator.getNextWorkerNumber(JobActor.this.mantisJobMetaData, JobActor.this.jobStore);
            JobWorker jw = new JobWorker.Builder().withJobId(JobActor.this.jobId).withWorkerIndex(workerIndex).withWorkerNumber(workerNumber).withNumberOfPorts(stageSchedInfo.getMachineDefinition().getNumPorts() + 4).withStageNum(stageNo).withLifecycleEventsPublisher(JobActor.this.eventPublisher).build();
            if (!JobActor.this.mantisJobMetaData.addWorkerMetadata(stageNo, jw)) {
                Optional<JobWorker> tmp = JobActor.this.mantisJobMetaData.getWorkerByIndex(stageNo, workerIndex);
                if (tmp.isPresent()) {
                    throw new InvalidJobException(JobActor.this.mantisJobMetaData.getJobId().getId(), stageNo, workerIndex, (Throwable)new Exception("Couldn't add worker " + workerNumber + " as index " + workerIndex + ", that index already has worker " + tmp.get().getMetadata().getWorkerNumber()));
                }
                throw new InvalidJobException(JobActor.this.mantisJobMetaData.getJobId().getId(), stageNo, workerIndex, (Throwable)new Exception("Couldn't add worker " + workerNumber + " as index " + workerIndex + "doesn't exist "));
            }
            JobActor.this.mantisJobMetaData.setJobCosts(JobActor.this.costsCalculator.calculateCosts(JobActor.this.mantisJobMetaData));
            return jw.getMetadata();
        }

        @Override
        public void shutdown() {
            this.scheduler.unscheduleJob(JobActor.this.jobId.getId());
            if (!this.allWorkerCompleted()) {
                this.terminateAllWorkersAsync();
            }
            this.jobSchedulingInfoBehaviorSubject.onNext((Object)new JobSchedulingInfo(this.jobMgr.getJobId().getId(), new HashMap()));
            this.jobSchedulingInfoBehaviorSubject.onCompleted();
        }

        private void terminateAllWorkersAsync() {
            LOGGER.info("Terminating all workers of job {}", (Object)JobActor.this.jobId);
            Observable.from(JobActor.this.mantisJobMetaData.getStageMetadata().values()).flatMap(st -> Observable.from(st.getAllWorkers())).filter(worker -> !WorkerState.isTerminalState(worker.getMetadata().getState())).map(worker -> {
                LOGGER.info("Terminating " + worker);
                this.terminateWorker(worker.getMetadata(), WorkerState.Completed, JobCompletedReason.Killed);
                return worker;
            }).doOnCompleted(() -> this.markStageAssignmentsChanged(true)).subscribeOn(Schedulers.io()).subscribe();
            LOGGER.info("Terminated all workers of job {}", (Object)JobActor.this.jobId);
        }

        private void terminateWorker(IMantisWorkerMetadata workerMeta, WorkerState finalWorkerState, JobCompletedReason reason) {
            LOGGER.info("Terminating  worker {} with number {}", (Object)workerMeta, (Object)workerMeta.getWorkerNumber());
            try {
                WorkerId workerId = workerMeta.getWorkerId();
                this.scheduler.unscheduleAndTerminateWorker(workerMeta.getWorkerId(), Optional.ofNullable(workerMeta.getSlave()));
                int stageNum = JobActor.this.mantisJobMetaData.getWorkerNumberToStageMap().get(workerMeta.getWorkerNumber());
                Optional<IMantisStageMetadata> stageMetaOp = JobActor.this.mantisJobMetaData.getStageMetadata(stageNum);
                if (stageMetaOp.isPresent()) {
                    WorkerTerminate terminateEvent = new WorkerTerminate(workerId, finalWorkerState, reason);
                    MantisStageMetadataImpl stageMetaData = (MantisStageMetadataImpl)stageMetaOp.get();
                    Optional<JobWorker> jobWorkerOp = stageMetaData.processWorkerEvent(terminateEvent, JobActor.this.jobStore);
                    if (jobWorkerOp.isPresent()) {
                        JobActor.this.jobStore.archiveWorker(jobWorkerOp.get().getMetadata());
                        JobActor.this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, "Terminated worker, reason: " + reason.name(), workerMeta.getStageNum(), workerMeta.getWorkerId(), workerMeta.getState()));
                    }
                } else {
                    LOGGER.error("Stage {} not found while terminating worker {}", (Object)stageNum, (Object)workerId);
                }
            }
            catch (Exception e) {
                LOGGER.error("Error terminating worker {}", (Object)workerMeta.getWorkerId(), (Object)e);
            }
        }

        private void terminateAndRemoveWorker(IMantisWorkerMetadata workerMeta, WorkerState finalWorkerState, JobCompletedReason reason) {
            LOGGER.info("Terminating and removing worker {}", (Object)workerMeta.getWorkerId().getId());
            try {
                WorkerId workerId = workerMeta.getWorkerId();
                int stageNum = JobActor.this.mantisJobMetaData.getWorkerNumberToStageMap().get(workerMeta.getWorkerNumber());
                Optional<IMantisStageMetadata> stageMetaOp = JobActor.this.mantisJobMetaData.getStageMetadata(stageNum);
                if (stageMetaOp.isPresent()) {
                    WorkerTerminate terminateEvent = new WorkerTerminate(workerId, finalWorkerState, reason);
                    MantisStageMetadataImpl stageMetaData = (MantisStageMetadataImpl)stageMetaOp.get();
                    Optional<JobWorker> workerOp = stageMetaData.processWorkerEvent(terminateEvent, JobActor.this.jobStore);
                    JobActor.this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, "Removing worker, reason: " + reason.name(), workerMeta.getStageNum(), workerMeta.getWorkerId(), workerMeta.getState()));
                    stageMetaData.unsafeRemoveWorker(workerId.getWorkerIndex(), workerId.getWorkerNum(), JobActor.this.jobStore);
                    this.scheduler.unscheduleAndTerminateWorker(workerMeta.getWorkerId(), Optional.ofNullable(workerMeta.getSlave()));
                    JobActor.this.mantisJobMetaData.removeWorkerMetadata(workerMeta.getWorkerNumber());
                    JobActor.this.mantisJobMetaData.setJobCosts(JobActor.this.costsCalculator.calculateCosts(JobActor.this.mantisJobMetaData));
                    LOGGER.info("Terminated worker {}", (Object)workerMeta);
                    this.markStageAssignmentsChanged(true);
                } else {
                    LOGGER.error("Stage {} not found while terminating worker {}", (Object)stageNum, (Object)workerId);
                }
            }
            catch (Exception e) {
                LOGGER.error("Error terminating worker {}", (Object)workerMeta.getWorkerId(), (Object)e);
            }
        }

        @Override
        public void refreshAndSendWorkerAssignments() {
            this.refreshStageAssignmentsAndPush();
        }

        @Override
        public void checkHeartBeats(Instant currentTime) {
            LOGGER.debug("Using worker timeout {} for job {}", (Object)JobActor.this.getWorkerTimeoutSecs(), (Object)this.jobMgr.getJobId());
            long missedHeartBeatToleranceSecs = (long)(1.5 * (double)JobActor.this.getWorkerTimeoutSecs());
            long stuckInSubmitToleranceSecs = missedHeartBeatToleranceSecs + ConfigurationProvider.getConfig().getWorkerInitTimeoutSecs();
            ArrayList workersToResubmit = Lists.newArrayList();
            this.resubmitRateLimiter.expireResubmitRecords(currentTime.toEpochMilli());
            for (IMantisStageMetadata iMantisStageMetadata : JobActor.this.mantisJobMetaData.getStageMetadata().values()) {
                for (JobWorker worker : iMantisStageMetadata.getAllWorkers()) {
                    IMantisWorkerMetadata workerMeta = worker.getMetadata();
                    if (!workerMeta.getLastHeartbeatAt().isPresent()) {
                        Instant acceptedAt = Instant.ofEpochMilli(workerMeta.getAcceptedAt());
                        if (Duration.between(acceptedAt, currentTime).getSeconds() <= stuckInSubmitToleranceSecs) continue;
                        LOGGER.info("Job {}, Worker {} stuck in accepted state for {}", new Object[]{this.jobMgr.getJobId(), workerMeta.getWorkerId(), Duration.between(acceptedAt, currentTime).getSeconds()});
                        workersToResubmit.add(worker);
                        JobActor.this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.WARN, "worker stuck in Accepted state, resubmitting worker", workerMeta.getStageNum(), workerMeta.getWorkerId(), workerMeta.getState()));
                        continue;
                    }
                    if (Duration.between(workerMeta.getLastHeartbeatAt().get(), currentTime).getSeconds() <= missedHeartBeatToleranceSecs) continue;
                    LOGGER.info("Job {}, Worker {} Duration between last heartbeat and now {} missed heart beat threshold {} exceeded", new Object[]{this.jobMgr.getJobId(), workerMeta.getWorkerId(), Duration.between(workerMeta.getLastHeartbeatAt().get(), currentTime).getSeconds(), missedHeartBeatToleranceSecs});
                    if (ConfigurationProvider.getConfig().isHeartbeatTerminationEnabled()) {
                        JobActor.this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.WARN, "heartbeat too old, resubmitting worker", workerMeta.getStageNum(), workerMeta.getWorkerId(), workerMeta.getState()));
                        workersToResubmit.add(worker);
                        continue;
                    }
                    LOGGER.warn("Heart beat based termination is disabled. Skipping termination of worker {} Please see mantis.worker.heartbeat.termination.enabled", (Object)workerMeta);
                }
            }
            for (JobWorker jobWorker : workersToResubmit) {
                try {
                    this.resubmitWorker(jobWorker);
                }
                catch (Exception e) {
                    LOGGER.warn("Exception {} occurred resubmitting Worker {}", new Object[]{e.getMessage(), jobWorker.getMetadata(), e});
                }
            }
            this.migrateDisabledVmWorkers(currentTime);
        }

        @Override
        public void migrateDisabledVmWorkers(Instant currentTime) {
            if (!this.workersToMigrate.isEmpty()) {
                Map<Integer, Integer> workerToStageMap = JobActor.this.mantisJobMetaData.getWorkerNumberToStageMap();
                List workers = this.migrationStrategy.execute(this.workersToMigrate, this.getNumberOfWorkersInStartedState(), this.getTotalWorkerCount(), this.lastWorkerMigrationTimestamp);
                if (!workers.isEmpty()) {
                    LOGGER.info("Job {} Going to migrate {} workers in this iteration", (Object)JobActor.this.jobId, (Object)workers.size());
                }
                workers.forEach(w -> {
                    if (workerToStageMap.containsKey(w)) {
                        int stageNo = (Integer)workerToStageMap.get(w);
                        Optional<IMantisStageMetadata> stageMetaOp = JobActor.this.mantisJobMetaData.getStageMetadata(stageNo);
                        if (stageMetaOp.isPresent()) {
                            JobWorker jobWorker = null;
                            try {
                                jobWorker = stageMetaOp.get().getWorkerByWorkerNumber((int)w);
                                IMantisWorkerMetadata wm = jobWorker.getMetadata();
                                LOGGER.info("Moving worker {} of job {} away from disabled VM", (Object)wm.getWorkerId(), (Object)JobActor.this.jobId);
                                JobActor.this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, " Moving out of disabled VM " + wm.getSlave(), wm.getStageNum(), wm.getWorkerId(), wm.getState()));
                                this.resubmitWorker(jobWorker);
                                this.lastWorkerMigrationTimestamp = System.currentTimeMillis();
                            }
                            catch (Exception e) {
                                LOGGER.warn("Exception resubmitting worker {} during migration due to {}", new Object[]{jobWorker, e.getMessage(), e});
                            }
                        } else {
                            LOGGER.warn("Stage {} Not Found. Skip move for worker {} in Job {}", new Object[]{stageNo, w, JobActor.this.jobId});
                        }
                    } else {
                        LOGGER.warn("worker {} not found in workerToStageMap {} for Job {}", new Object[]{w, workerToStageMap, JobActor.this.jobId});
                    }
                });
            }
        }

        private Optional<IMantisStageMetadata> getStageForWorker(WorkerEvent event) {
            Map<Integer, Integer> workerToStageMap = JobActor.this.mantisJobMetaData.getWorkerNumberToStageMap();
            if (!workerToStageMap.containsKey(event.getWorkerId().getWorkerNum())) {
                LOGGER.warn("Event {} from Unknown worker {} ", (Object)event.getWorkerId(), (Object)event);
                return Optional.empty();
            }
            Integer stageNum = workerToStageMap.get(event.getWorkerId().getWorkerNum());
            Optional<IMantisStageMetadata> stageMetaOp = JobActor.this.mantisJobMetaData.getStageMetadata(stageNum);
            if (!stageMetaOp.isPresent()) {
                LOGGER.warn("Stage {} not found in Job {} while processing event {}", new Object[]{stageNum, JobActor.this.jobId, event});
            }
            return stageMetaOp;
        }

        private void terminateUnknownWorkerIfNonTerminal(WorkerEvent event) {
            if (!JobHelper.isTerminalWorkerEvent(event)) {
                LOGGER.warn("Non terminal event from Unknown worker {} in Job {}. Request Termination", (Object)event.getWorkerId(), (Object)this.jobMgr.getJobId());
                Optional<String> host = JobHelper.getWorkerHostFromWorkerEvent(event);
                this.scheduler.unscheduleAndTerminateWorker(event.getWorkerId(), host);
            } else {
                LOGGER.warn("Job {} Terminal event from Unknown worker {}. Ignoring", (Object)JobActor.this.jobId, (Object)event.getWorkerId());
            }
        }

        @Override
        public void processEvent(WorkerEvent event, JobState jobState) {
            try {
                Optional<IMantisStageMetadata> stageMetaOp = this.getStageForWorker(event);
                if (!stageMetaOp.isPresent()) {
                    this.terminateUnknownWorkerIfNonTerminal(event);
                    return;
                }
                if (event instanceof WorkerUnscheduleable) {
                    this.scheduler.updateWorkerSchedulingReadyTime(event.getWorkerId(), this.resubmitRateLimiter.getWorkerResubmitTime(event.getWorkerId(), stageMetaOp.get().getStageNum()));
                    JobActor.this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.ERROR, "rate limiting: no resources to fit worker", ((WorkerUnscheduleable)event).getStageNum(), event.getWorkerId(), WorkerState.Accepted));
                    return;
                }
                MantisStageMetadataImpl stageMeta = (MantisStageMetadataImpl)stageMetaOp.get();
                try {
                    if (event instanceof WorkerHeartbeat) {
                        int eventWorkerIndex = event.getWorkerId().getWorkerIndex();
                        int eventWorkerNum = event.getWorkerId().getWorkerNum();
                        int currentWorkerNum = stageMeta.getWorkerByIndex(eventWorkerIndex).getMetadata().getWorkerNumber();
                        if (currentWorkerNum > eventWorkerNum) {
                            LOGGER.error("[Corrupted state] StaleWorkerEvent: {}, current worker at {}, Terminate stale worker", (Object)event.getWorkerId(), (Object)currentWorkerNum);
                        } else if (currentWorkerNum < eventWorkerNum) {
                            LOGGER.error("[Corrupted state] Newer worker num received: {}, Current stage worker: {}", (Object)event, (Object)currentWorkerNum);
                        }
                    }
                }
                catch (InvalidJobException ije) {
                    LOGGER.error("Invalid job error when checking event: {}", (Object)event, (Object)ije);
                }
                try {
                    Optional<JobWorker> workerOp = stageMeta.processWorkerEvent(event, JobActor.this.jobStore);
                    if (!workerOp.isPresent()) {
                        this.terminateUnknownWorkerIfNonTerminal(event);
                        return;
                    }
                    IMantisWorkerMetadata wm = workerOp.get().getMetadata();
                    if (event instanceof WorkerOnDisabledVM) {
                        this.workersToMigrate.add(wm.getWorkerNumber());
                        return;
                    }
                    if (WorkerState.isErrorState(wm.getState()) && !JobState.isTerminalState(jobState)) {
                        JobActor.this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.WARN, "resubmitting lost worker ", wm.getStageNum(), wm.getWorkerId(), wm.getState()));
                        this.recentErrorWorkersCache.put((Object)wm.getWorkerNumber(), (Object)true);
                        this.resubmitWorker(workerOp.get());
                        return;
                    }
                    if (WorkerState.isTerminalState(wm.getState())) {
                        JobActor.this.jobStore.archiveWorker(wm);
                        LOGGER.info("Received Worker Complete signal. Wait for all workers to complete before terminating Job {}", (Object)JobActor.this.jobId);
                    }
                    if (!(event instanceof WorkerHeartbeat)) {
                        this.markStageAssignmentsChanged(false);
                    }
                }
                catch (Exception e) {
                    LOGGER.warn("Exception saving worker update", (Throwable)e);
                }
                if (!this.allWorkersStarted && !JobState.isTerminalState(jobState)) {
                    if (this.allWorkerStarted()) {
                        this.allWorkersStarted = true;
                        this.jobMgr.onAllWorkersStarted();
                        this.scheduler.unscheduleJob(JobActor.this.jobId.getId());
                        this.markStageAssignmentsChanged(true);
                    } else if (this.allWorkerCompleted()) {
                        LOGGER.info("Job {} All workers completed1", (Object)JobActor.this.jobId);
                        this.allWorkersStarted = false;
                        this.jobMgr.onAllWorkersCompleted();
                    }
                } else if (this.allWorkerCompleted()) {
                    LOGGER.info("Job {} All workers completed", (Object)JobActor.this.jobId);
                    this.allWorkersStarted = false;
                    this.jobMgr.onAllWorkersCompleted();
                }
            }
            catch (Exception e1) {
                LOGGER.error("Job {} Exception occurred in process worker event ", (Object)JobActor.this.jobId, (Object)e1);
            }
        }

        private boolean allWorkerStarted() {
            for (MantisStageMetadataImpl mantisStageMetadataImpl : JobActor.this.mantisJobMetaData.getStageMetadata().values()) {
                if (mantisStageMetadataImpl.isAllWorkerStarted()) continue;
                return false;
            }
            return true;
        }

        private int getNumberOfWorkersInStartedState() {
            return JobActor.this.mantisJobMetaData.getStageMetadata().values().stream().map(stageMeta -> ((MantisStageMetadataImpl)stageMeta).getNumStartedWorkers()).reduce(0, (acc, num) -> acc + num);
        }

        private int getTotalWorkerCount() {
            return JobActor.this.mantisJobMetaData.getStageMetadata().values().stream().map(IMantisStageMetadata::getNumWorkers).reduce(0, (acc, num) -> acc + num);
        }

        private boolean allWorkerCompleted() {
            for (MantisStageMetadataImpl mantisStageMetadataImpl : JobActor.this.mantisJobMetaData.getStageMetadata().values()) {
                if (mantisStageMetadataImpl.getStageNum() == 0 || mantisStageMetadataImpl.isAllWorkerCompleted()) continue;
                return false;
            }
            return true;
        }

        @Override
        public void resubmitWorker(int workerNum) throws Exception {
            Optional<IMantisStageMetadata> stageMeta;
            Map<Integer, Integer> workerToStageMap = JobActor.this.mantisJobMetaData.getWorkerNumberToStageMap();
            if (workerToStageMap.containsKey(workerNum)) {
                int stageNum = workerToStageMap.get(workerNum);
                stageMeta = JobActor.this.mantisJobMetaData.getStageMetadata(stageNum);
                if (!stageMeta.isPresent()) {
                    throw new Exception(String.format("Invalid stage %d in resubmit Worker request %d", stageNum, workerNum));
                }
            } else {
                LOGGER.warn("No such Worker number {} in Job with ID {}", (Object)workerNum, (Object)JobActor.this.jobId);
                throw new Exception(String.format("No such worker number %d in resubmit Worker request", workerNum));
            }
            JobWorker worker = stageMeta.get().getWorkerByWorkerNumber(workerNum);
            this.resubmitWorker(worker);
        }

        @Override
        public List<IMantisWorkerMetadata> getActiveWorkers(int limit) {
            List<IMantisWorkerMetadata> workers = JobActor.this.mantisJobMetaData.getStageMetadata().values().stream().flatMap(st -> st.getAllWorkers().stream()).filter(worker -> !WorkerState.isTerminalState(worker.getMetadata().getState())).map(JobWorker::getMetadata).collect(Collectors.toList());
            if (workers.size() > limit) {
                return workers.subList(0, limit);
            }
            return workers;
        }

        @Override
        public BehaviorSubject<JobSchedulingInfo> getJobStatusSubject() {
            return this.jobSchedulingInfoBehaviorSubject;
        }

        private void resubmitWorker(JobWorker oldWorker) throws Exception {
            LOGGER.info("Resubmitting worker {}", (Object)oldWorker.getMetadata());
            Map<Integer, Integer> workerToStageMap = JobActor.this.mantisJobMetaData.getWorkerNumberToStageMap();
            IMantisWorkerMetadata oldWorkerMetadata = oldWorker.getMetadata();
            if (this.recentErrorWorkersCache.size() < (long)ConfigurationProvider.getConfig().getMaximumResubmissionsPerWorker()) {
                Integer stageNo = workerToStageMap.get(oldWorkerMetadata.getWorkerId().getWorkerNum());
                if (stageNo == null) {
                    String errMsg = String.format("Stage %d not found in Job %s while resubmiting worker %s", stageNo, JobActor.this.jobId, oldWorker);
                    LOGGER.warn(errMsg);
                    throw new Exception(errMsg);
                }
                Optional<IMantisStageMetadata> stageMetaOp = JobActor.this.mantisJobMetaData.getStageMetadata(stageNo);
                if (!stageMetaOp.isPresent()) {
                    String errMsg = String.format("Stage %d not found in Job %s while resubmiting worker %s", stageNo, JobActor.this.jobId, oldWorker);
                    LOGGER.warn(errMsg);
                    throw new Exception(errMsg);
                }
                MantisStageMetadataImpl stageMeta = (MantisStageMetadataImpl)stageMetaOp.get();
                JobWorker newWorker = new JobWorker.Builder().withJobId(JobActor.this.jobId).withWorkerIndex(oldWorkerMetadata.getWorkerIndex()).withWorkerNumber(this.workerNumberGenerator.getNextWorkerNumber(JobActor.this.mantisJobMetaData, JobActor.this.jobStore)).withNumberOfPorts(stageMeta.getMachineDefinition().getNumPorts() + 4).withStageNum(oldWorkerMetadata.getStageNum()).withResubmitCount(oldWorkerMetadata.getTotalResubmitCount() + 1).withResubmitOf(oldWorkerMetadata.getWorkerNumber()).withLifecycleEventsPublisher(JobActor.this.eventPublisher).build();
                JobActor.this.mantisJobMetaData.replaceWorkerMetaData(oldWorkerMetadata.getStageNum(), newWorker, oldWorker, JobActor.this.jobStore);
                JobActor.this.mantisJobMetaData.setJobCosts(JobActor.this.costsCalculator.calculateCosts(JobActor.this.mantisJobMetaData));
                this.scheduler.unscheduleAndTerminateWorker(oldWorkerMetadata.getWorkerId(), Optional.ofNullable(oldWorkerMetadata.getSlave()));
                long workerResubmitTime = this.resubmitRateLimiter.getWorkerResubmitTime(newWorker.getMetadata().getWorkerId(), stageMeta.getStageNum());
                Optional<Long> delayDuration = Optional.of(workerResubmitTime);
                this.markStageAssignmentsChanged(true);
                this.queueTasks(Collections.singletonList(newWorker.getMetadata()), delayDuration);
                LOGGER.info("Worker {} successfully queued for scheduling", (Object)newWorker);
                JobActor.this.numWorkerResubmissions.increment();
            } else {
                LOGGER.error("Resubmit count exceeded");
                this.jobMgr.onTooManyWorkerResubmits();
            }
        }

        @Override
        public int scaleStage(MantisStageMetadataImpl stageMetaData, int numWorkers, String reason) {
            int newNumWorkerCount;
            LOGGER.info("Scaling stage {} to {} workers", (Object)stageMetaData.getStageNum(), (Object)numWorkers);
            int oldNumWorkers = stageMetaData.getNumWorkers();
            int max = ConfigurationProvider.getConfig().getMaxWorkersPerStage();
            int min = 0;
            if (stageMetaData.getScalingPolicy() != null) {
                max = stageMetaData.getScalingPolicy().getMax();
                min = stageMetaData.getScalingPolicy().getMin();
            }
            if ((newNumWorkerCount = Math.max(Math.min(numWorkers, max), min)) != oldNumWorkers) {
                try {
                    stageMetaData.unsafeSetNumWorkers(newNumWorkerCount, JobActor.this.jobStore);
                    JobActor.this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, String.format("Setting #workers to %d for stage %d, reason=%s", newNumWorkerCount, stageMetaData.getStageNum(), reason), JobActor.this.getJobId(), JobActor.this.getJobState()));
                }
                catch (Exception e) {
                    String error = String.format("Exception updating stage %d worker count for Job %s due to %s", stageMetaData.getStageNum(), JobActor.this.jobId, e.getMessage());
                    LOGGER.warn(error);
                    JobActor.this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.WARN, String.format("Scaling stage failed for stage %d reason: %s", stageMetaData.getStageNum(), e.getMessage()), JobActor.this.getJobId(), JobActor.this.getJobState()));
                    throw new RuntimeException(error);
                }
                if (newNumWorkerCount > oldNumWorkers) {
                    for (int i = 0; i < newNumWorkerCount - oldNumWorkers; ++i) {
                        try {
                            int newWorkerIndex = oldNumWorkers + i;
                            SchedulingInfo schedInfo = JobActor.this.mantisJobMetaData.getJobDefinition().getSchedulingInfo();
                            IMantisWorkerMetadata workerRequest = this.addWorker(schedInfo, stageMetaData.getStageNum(), newWorkerIndex);
                            JobActor.this.jobStore.storeNewWorker(workerRequest);
                            this.markStageAssignmentsChanged(true);
                            this.queueTask(workerRequest);
                            continue;
                        }
                        catch (Exception e) {
                            LOGGER.warn("Exception adding new worker for {}", (Object)stageMetaData.getJobId().getId(), (Object)e);
                        }
                    }
                } else {
                    for (int i = 0; i < oldNumWorkers - newNumWorkerCount; ++i) {
                        try {
                            JobWorker w = stageMetaData.getWorkerByIndex(oldNumWorkers - i - 1);
                            this.terminateAndRemoveWorker(w.getMetadata(), WorkerState.Completed, JobCompletedReason.Killed);
                            continue;
                        }
                        catch (InvalidJobException e) {
                            LOGGER.warn("Exception terminating worker for {}", (Object)stageMetaData.getJobId().getId(), (Object)e);
                        }
                    }
                }
            }
            LOGGER.info("{} Scaled stage to {} workers", (Object)stageMetaData.getJobId().getId(), (Object)newNumWorkerCount);
            return newNumWorkerCount;
        }
    }

    static class WorkerNumberGenerator {
        private static final Logger LOGGER = LoggerFactory.getLogger(WorkerNumberGenerator.class);
        private static final int MAX_ATTEMPTS = 10;
        private static final long SLEEP_DURATION_MS = Duration.ofSeconds(2L).toMillis();
        private static final int DEFAULT_INCREMENT_STEP = 10;
        private final int incrementStep;
        private int lastUsed;
        private int currLimit;
        private volatile boolean hasErrored = false;

        WorkerNumberGenerator(int lastUsed, int incrementStep) {
            Preconditions.checkArgument((lastUsed >= 0 ? 1 : 0) != 0, (String)"Last Used worker Number cannot be negative {} ", (int)lastUsed);
            Preconditions.checkArgument((incrementStep >= 1 ? 1 : 0) != 0, (String)"incrementStepcannot be less than 1 {} ", (int)incrementStep);
            this.lastUsed = lastUsed;
            this.currLimit = lastUsed;
            this.incrementStep = incrementStep;
        }

        WorkerNumberGenerator() {
            this(0, 10);
        }

        private void advance(MantisJobMetadataImpl mantisJobMetaData, MantisJobStore jobStore) {
            try {
                int value = this.currLimit + this.incrementStep;
                this.setNextWorkerNumberWithRetries(mantisJobMetaData, jobStore, value);
                this.currLimit = value;
            }
            catch (Exception e) {
                this.hasErrored = true;
                LOGGER.error("Exception setting nextWorkerNumberToUse after {} consecutive attempts", (Object)10, (Object)e);
                throw new RuntimeException("Unexpected error setting next worker number to use", e);
            }
        }

        private void setNextWorkerNumberWithRetries(MantisJobMetadataImpl mantisJobMetaData, MantisJobStore jobStore, int value) throws Exception {
            Exception exception = null;
            for (int attempts = 0; attempts < 10; ++attempts) {
                try {
                    mantisJobMetaData.setNextWorkerNumberToUse(value, jobStore);
                    return;
                }
                catch (Exception e) {
                    LOGGER.warn("Failed to setNextWorkerNumberToUse to {} (attempt {}/{})", new Object[]{value, attempts, 10, e});
                    exception = e;
                    Thread.sleep(SLEEP_DURATION_MS);
                    continue;
                }
            }
            throw exception;
        }

        int getNextWorkerNumber(MantisJobMetadataImpl mantisJobMetaData, MantisJobStore jobStore) {
            if (this.hasErrored) {
                throw new IllegalStateException("Unexpected: Invalid state likely due to getting/settingnext worker number");
            }
            if (this.lastUsed == this.currLimit) {
                this.advance(mantisJobMetaData, jobStore);
            }
            return ++this.lastUsed;
        }
    }
}

