package io.mantisrx.master;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorPaths;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.actor.Terminated;
import akka.pattern.PatternsCS;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.GaugeCallback;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.master.JobListHelperActor;
import io.mantisrx.master.akka.MantisActorSupervisorStrategy;
import io.mantisrx.master.events.LifecycleEventPublisher;
import io.mantisrx.master.jobcluster.IJobClusterMetadata;
import io.mantisrx.master.jobcluster.JobClusterActor;
import io.mantisrx.master.jobcluster.job.CostsCalculator;
import io.mantisrx.master.jobcluster.job.IMantisJobMetadata;
import io.mantisrx.master.jobcluster.job.JobHelper;
import io.mantisrx.master.jobcluster.job.JobState;
import io.mantisrx.master.jobcluster.proto.BaseResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import io.mantisrx.master.jobcluster.proto.JobClusterProto;
import io.mantisrx.runtime.descriptor.SchedulingInfo;
import io.mantisrx.server.core.JobCompletedReason;
import io.mantisrx.server.master.config.ConfigurationProvider;
import io.mantisrx.server.master.domain.IJobClusterDefinition;
import io.mantisrx.server.master.domain.JobClusterDefinitionImpl;
import io.mantisrx.server.master.domain.JobDefinition;
import io.mantisrx.server.master.domain.JobId;
import io.mantisrx.server.master.persistence.MantisJobStore;
import io.mantisrx.server.master.scheduler.MantisSchedulerFactory;
import io.mantisrx.server.master.scheduler.WorkerEvent;
import io.mantisrx.shaded.com.google.common.collect.Lists;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.schedulers.Schedulers;

/* loaded from: input_file:io/mantisrx/master/JobClustersManagerActor.class */
public class JobClustersManagerActor extends AbstractActorWithTimers implements IJobClustersManager {
    private static final String CHECK_CLUSTERS_TIMER_KEY = "CHECK_CLUSTER_TIMER";
    public static final int STATE_TRANSITION_TIMEOUT_MSECS = 5000;
    private final Counter numJobClusterInitFailures;
    private final Counter numJobClusterInitSuccesses;
    private AbstractActor.Receive initializedBehavior;
    private final MantisJobStore jobStore;
    private final LifecycleEventPublisher eventPublisher;
    private final CostsCalculator costsCalculator;
    JobClusterInfoManager jobClusterInfoManager;
    private final int slaHeadroomForAcceptedJobs;
    private ActorRef jobListHelperActor;
    private final Logger logger = LoggerFactory.getLogger(JobClustersManagerActor.class);
    private final long checkAgainInSecs = 30;
    private MantisSchedulerFactory mantisSchedulerFactory = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/mantisrx/master/JobClustersManagerActor$JobClusterInfo.class */
    public static class JobClusterInfo {
        private static final Logger logger = LoggerFactory.getLogger(JobClusterInfo.class);
        private JobClusterProto.InitializeJobClusterRequest initRequest;
        final String clusterName;
        final ActorRef jobClusterActor;
        private volatile JobClusterState currentState = JobClusterState.UNINITIALIZED;
        volatile long stateUpdateTime = System.currentTimeMillis();
        final IJobClusterDefinition jobClusterDefinition;

        /* loaded from: input_file:io/mantisrx/master/JobClustersManagerActor$JobClusterInfo$JobClusterState.class */
        public enum JobClusterState {
            UNINITIALIZED,
            INITIALIZING,
            INITIALIZED,
            DELETING,
            DELETED
        }

        JobClusterInfo(String str, IJobClusterDefinition iJobClusterDefinition, ActorRef actorRef) {
            this.clusterName = str;
            this.jobClusterActor = actorRef;
            this.jobClusterDefinition = iJobClusterDefinition;
        }

        public String getClusterName() {
            return this.clusterName;
        }

        public IJobClusterDefinition getJobClusterDefinition() {
            return this.jobClusterDefinition;
        }

        void markInitializing(JobClusterProto.InitializeJobClusterRequest initializeJobClusterRequest, long j) {
            if (this.currentState != JobClusterState.UNINITIALIZED) {
                logger.warn("Invalid state transition from {} to {} for job cluster {}", new Object[]{this.currentState, JobClusterState.INITIALIZING, this.clusterName});
                return;
            }
            this.stateUpdateTime = j;
            this.currentState = JobClusterState.INITIALIZING;
            this.initRequest = initializeJobClusterRequest;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void markInitialized(long j) {
            if (this.currentState != JobClusterState.INITIALIZING) {
                logger.warn("Invalid state transition from {} to {} for job cluster {}", new Object[]{this.currentState, JobClusterState.INITIALIZED, this.clusterName});
            } else {
                this.stateUpdateTime = j;
                this.currentState = JobClusterState.INITIALIZED;
            }
        }

        void markDeleting(long j) {
            this.currentState = JobClusterState.DELETING;
            this.stateUpdateTime = j;
        }

        void markDeleted(long j) {
            this.currentState = JobClusterState.DELETED;
            this.stateUpdateTime = j;
        }

        public String toString() {
            return "JobClusterInfo{clusterName='" + this.clusterName + "', jobClusterActor=" + this.jobClusterActor + ", currentState=" + this.currentState + ", stateUpdateTime=" + this.stateUpdateTime + ", jobClusterDefinition=" + this.jobClusterDefinition + '}';
        }
    }

    /* loaded from: input_file:io/mantisrx/master/JobClustersManagerActor$JobClusterInfoManager.class */
    class JobClusterInfoManager {
        private final Map<String, JobClusterInfo> jobClusterNameToInfoMap = new HashMap();
        private final LifecycleEventPublisher eventPublisher;
        private MantisSchedulerFactory mantisSchedulerFactory;
        private final MantisJobStore jobStore;
        private final Metrics metrics;
        private final CostsCalculator costsCalculator;

        JobClusterInfoManager(MantisJobStore mantisJobStore, MantisSchedulerFactory mantisSchedulerFactory, LifecycleEventPublisher lifecycleEventPublisher, CostsCalculator costsCalculator) {
            this.eventPublisher = lifecycleEventPublisher;
            this.mantisSchedulerFactory = mantisSchedulerFactory;
            this.jobStore = mantisJobStore;
            this.costsCalculator = costsCalculator;
            MetricGroupId metricGroupId = new MetricGroupId("JobClusterInfoManager");
            this.metrics = MetricsRegistry.getInstance().registerAndGet(new Metrics.Builder().id(metricGroupId).addGauge(new GaugeCallback(metricGroupId, "jobClustersGauge", () -> {
                return Double.valueOf(1.0d * this.jobClusterNameToInfoMap.size());
            })).build());
        }

        Optional<JobClusterInfo> createClusterActorAndRegister(IJobClusterDefinition iJobClusterDefinition) {
            String name = iJobClusterDefinition.getName();
            if (isClusterExists(name)) {
                return Optional.ofNullable(this.jobClusterNameToInfoMap.get(name));
            }
            if (!ActorPaths.isValidPathElement(name)) {
                JobClustersManagerActor.this.logger.error("Cannot create actor for cluster with invalid name {}", name);
                return Optional.empty();
            }
            ActorRef actorOf = JobClustersManagerActor.this.getContext().actorOf(JobClusterActor.props(name, this.jobStore, this.mantisSchedulerFactory, this.eventPublisher, this.costsCalculator, JobClustersManagerActor.this.slaHeadroomForAcceptedJobs), "JobClusterActor-" + name);
            JobClustersManagerActor.this.getContext().watch(actorOf);
            JobClusterInfo jobClusterInfo = new JobClusterInfo(name, iJobClusterDefinition, actorOf);
            this.jobClusterNameToInfoMap.put(name, jobClusterInfo);
            return Optional.ofNullable(jobClusterInfo);
        }

        void deregisterJobCluster(String str) {
            Optional<JobClusterInfo> jobClusterInfo = getJobClusterInfo(str);
            if (!jobClusterInfo.isPresent()) {
                JobClustersManagerActor.this.logger.warn("Job Cluster does not exist {}", jobClusterInfo);
                return;
            }
            jobClusterInfo.get().markDeleted(System.currentTimeMillis());
            ActorRef actorRef = jobClusterInfo.get().jobClusterActor;
            JobClustersManagerActor.this.getContext().unwatch(actorRef);
            JobClustersManagerActor.this.getContext().stop(actorRef);
            this.jobClusterNameToInfoMap.remove(str);
        }

        Observable<JobClusterProto.InitializeJobClusterResponse> initializeCluster(JobClusterInfo jobClusterInfo, JobClusterProto.InitializeJobClusterRequest initializeJobClusterRequest, Duration duration) {
            jobClusterInfo.markInitializing(initializeJobClusterRequest, System.currentTimeMillis());
            CompletionStage ask = PatternsCS.ask(jobClusterInfo.jobClusterActor, initializeJobClusterRequest, duration);
            Class<JobClusterProto.InitializeJobClusterResponse> cls = JobClusterProto.InitializeJobClusterResponse.class;
            JobClusterProto.InitializeJobClusterResponse.class.getClass();
            return Observable.from(ask.thenApply(cls::cast).toCompletableFuture(), Schedulers.io()).map(initializeJobClusterResponse -> {
                JobClustersManagerActor.this.logger.info("JobCluster {} inited with code {}", initializeJobClusterResponse.jobClusterName, initializeJobClusterResponse.responseCode);
                Optional<JobClusterInfo> jobClusterInfo2 = JobClustersManagerActor.this.jobClusterInfoManager.getJobClusterInfo(initializeJobClusterResponse.jobClusterName);
                if (initializeJobClusterResponse.responseCode == BaseResponse.ResponseCode.SUCCESS) {
                    jobClusterInfo2.ifPresent(jobClusterInfo3 -> {
                        jobClusterInfo3.markInitialized(System.currentTimeMillis());
                    });
                }
                return initializeJobClusterResponse;
            }).onErrorResumeNext(th -> {
                JobClustersManagerActor.this.logger.warn("caught exception {}", th.getMessage(), th);
                JobClustersManagerActor.this.numJobClusterInitFailures.increment();
                deregisterJobCluster(jobClusterInfo.clusterName);
                return Observable.just(new JobClusterProto.InitializeJobClusterResponse(initializeJobClusterRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, th.getMessage(), jobClusterInfo.clusterName, ActorRef.noSender()));
            });
        }

        void initializeClusterAsync(JobClusterInfo jobClusterInfo, JobClusterProto.InitializeJobClusterRequest initializeJobClusterRequest) {
            jobClusterInfo.markInitializing(initializeJobClusterRequest, System.currentTimeMillis());
            jobClusterInfo.jobClusterActor.tell(initializeJobClusterRequest, JobClustersManagerActor.this.getSelf());
        }

        Optional<JobClusterInfo> getJobClusterInfo(String str) {
            return Optional.ofNullable(this.jobClusterNameToInfoMap.get(str));
        }

        Optional<JobDefinition> getArchivedJobDefinition(String str) {
            return this.jobStore.getArchivedJob(str).map((v0) -> {
                return v0.getJobDefinition();
            });
        }

        Map<String, JobClusterInfo> getAllJobClusterInfo() {
            return Collections.unmodifiableMap(this.jobClusterNameToInfoMap);
        }

        boolean isClusterExists(String str) {
            return this.jobClusterNameToInfoMap.containsKey(str);
        }

        void processInitializeResponse(JobClusterProto.InitializeJobClusterResponse initializeJobClusterResponse) {
            Optional<JobClusterInfo> jobClusterInfo = getJobClusterInfo(initializeJobClusterResponse.jobClusterName);
            if (!jobClusterInfo.isPresent()) {
                JobClustersManagerActor.this.logger.warn("Received JobClusterInitializeResponse {} for unknown Job Cluster {}", initializeJobClusterResponse, initializeJobClusterResponse.jobClusterName);
                return;
            }
            JobClusterInfo jobClusterInfo2 = jobClusterInfo.get();
            if (initializeJobClusterResponse.responseCode == BaseResponse.ResponseCode.SUCCESS) {
                jobClusterInfo2.markInitialized(System.currentTimeMillis());
                initializeJobClusterResponse.requestor.tell(new JobClusterManagerProto.CreateJobClusterResponse(initializeJobClusterResponse.requestId, BaseResponse.ResponseCode.SUCCESS_CREATED, initializeJobClusterResponse.jobClusterName + " created", initializeJobClusterResponse.jobClusterName), JobClustersManagerActor.this.getSelf());
            } else if (initializeJobClusterResponse.responseCode == BaseResponse.ResponseCode.SERVER_ERROR) {
                deregisterJobCluster(initializeJobClusterResponse.jobClusterName);
                initializeJobClusterResponse.requestor.tell(new JobClusterManagerProto.CreateJobClusterResponse(initializeJobClusterResponse.requestId, initializeJobClusterResponse.responseCode, initializeJobClusterResponse.message, initializeJobClusterResponse.jobClusterName), JobClustersManagerActor.this.getSelf());
            }
        }

        void processDeleteRequest(JobClusterManagerProto.DeleteJobClusterRequest deleteJobClusterRequest) {
            Optional<JobClusterInfo> jobClusterInfo = getJobClusterInfo(deleteJobClusterRequest.getName());
            ActorRef sender = JobClustersManagerActor.this.getSender();
            if (!jobClusterInfo.isPresent()) {
                sender.tell(new JobClusterManagerProto.DeleteJobClusterResponse(deleteJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "JobCluster " + deleteJobClusterRequest.getName() + " doesn't exist"), JobClustersManagerActor.this.getSelf());
                return;
            }
            JobClusterInfo jobClusterInfo2 = jobClusterInfo.get();
            jobClusterInfo2.jobClusterActor.tell(new JobClusterProto.DeleteJobClusterRequest(deleteJobClusterRequest.getUser(), deleteJobClusterRequest.getName(), sender), JobClustersManagerActor.this.getSelf());
            jobClusterInfo2.markDeleting(System.currentTimeMillis());
        }

        void processDeleteResponse(JobClusterProto.DeleteJobClusterResponse deleteJobClusterResponse) {
            if (!getJobClusterInfo(deleteJobClusterResponse.clusterName).isPresent()) {
                JobClustersManagerActor.this.logger.warn("Received delete job cluster response {} for unknown job cluster {}", deleteJobClusterResponse, deleteJobClusterResponse.clusterName);
            } else if (deleteJobClusterResponse.responseCode == BaseResponse.ResponseCode.SUCCESS) {
                deregisterJobCluster(deleteJobClusterResponse.clusterName);
            }
            deleteJobClusterResponse.requestingActor.tell(new JobClusterManagerProto.DeleteJobClusterResponse(deleteJobClusterResponse.requestId, deleteJobClusterResponse.responseCode, deleteJobClusterResponse.message), JobClustersManagerActor.this.getSelf());
        }
    }

    /* loaded from: input_file:io/mantisrx/master/JobClustersManagerActor$UpdateSchedulingInfo.class */
    public static final class UpdateSchedulingInfo {
        private final long requestId;
        private final String clusterName;
        private final SchedulingInfo schedulingInfo;
        private final String version;

        @ConstructorProperties({"requestId", "clusterName", "schedulingInfo", "version"})
        public UpdateSchedulingInfo(long j, String str, SchedulingInfo schedulingInfo, String str2) {
            this.requestId = j;
            this.clusterName = str;
            this.schedulingInfo = schedulingInfo;
            this.version = str2;
        }

        public long getRequestId() {
            return this.requestId;
        }

        public String getClusterName() {
            return this.clusterName;
        }

        public SchedulingInfo getSchedulingInfo() {
            return this.schedulingInfo;
        }

        public String getVersion() {
            return this.version;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof UpdateSchedulingInfo)) {
                return false;
            }
            UpdateSchedulingInfo updateSchedulingInfo = (UpdateSchedulingInfo) obj;
            if (getRequestId() != updateSchedulingInfo.getRequestId()) {
                return false;
            }
            String clusterName = getClusterName();
            String clusterName2 = updateSchedulingInfo.getClusterName();
            if (clusterName == null) {
                if (clusterName2 != null) {
                    return false;
                }
            } else if (!clusterName.equals(clusterName2)) {
                return false;
            }
            SchedulingInfo schedulingInfo = getSchedulingInfo();
            SchedulingInfo schedulingInfo2 = updateSchedulingInfo.getSchedulingInfo();
            if (schedulingInfo == null) {
                if (schedulingInfo2 != null) {
                    return false;
                }
            } else if (!schedulingInfo.equals(schedulingInfo2)) {
                return false;
            }
            String version = getVersion();
            String version2 = updateSchedulingInfo.getVersion();
            return version == null ? version2 == null : version.equals(version2);
        }

        public int hashCode() {
            long requestId = getRequestId();
            int i = (1 * 59) + ((int) ((requestId >>> 32) ^ requestId));
            String clusterName = getClusterName();
            int hashCode = (i * 59) + (clusterName == null ? 43 : clusterName.hashCode());
            SchedulingInfo schedulingInfo = getSchedulingInfo();
            int hashCode2 = (hashCode * 59) + (schedulingInfo == null ? 43 : schedulingInfo.hashCode());
            String version = getVersion();
            return (hashCode2 * 59) + (version == null ? 43 : version.hashCode());
        }

        public String toString() {
            return "JobClustersManagerActor.UpdateSchedulingInfo(requestId=" + getRequestId() + ", clusterName=" + getClusterName() + ", schedulingInfo=" + getSchedulingInfo() + ", version=" + getVersion() + ")";
        }
    }

    public static Props props(MantisJobStore mantisJobStore, LifecycleEventPublisher lifecycleEventPublisher, CostsCalculator costsCalculator, int i) {
        return Props.create(JobClustersManagerActor.class, new Object[]{mantisJobStore, lifecycleEventPublisher, costsCalculator, Integer.valueOf(i)}).withMailbox("akka.actor.metered-mailbox");
    }

    public JobClustersManagerActor(MantisJobStore mantisJobStore, LifecycleEventPublisher lifecycleEventPublisher, CostsCalculator costsCalculator, int i) {
        this.jobStore = mantisJobStore;
        this.eventPublisher = lifecycleEventPublisher;
        this.costsCalculator = costsCalculator;
        this.slaHeadroomForAcceptedJobs = i;
        Metrics registerAndGet = MetricsRegistry.getInstance().registerAndGet(new Metrics.Builder().id(getMetricGroupId()).addCounter("numJobClusterInitFailures").addCounter("numJobClusterInitSuccesses").build());
        this.numJobClusterInitFailures = registerAndGet.getCounter("numJobClusterInitFailures");
        this.numJobClusterInitSuccesses = registerAndGet.getCounter("numJobClusterInitSuccesses");
        this.initializedBehavior = getInitializedBehavior();
    }

    MetricGroupId getMetricGroupId() {
        return new MetricGroupId("JobClustersManagerActor");
    }

    private AbstractActor.Receive getInitializedBehavior() {
        String str = "initialized";
        return receiveBuilder().match(JobClusterManagerProto.ReconcileJobCluster.class, this::onReconcileJobClusters).match(JobClusterManagerProto.CreateJobClusterRequest.class, this::onJobClusterCreate).match(JobClusterProto.InitializeJobClusterResponse.class, this::onJobClusterInitializeResponse).match(JobClusterManagerProto.DeleteJobClusterRequest.class, this::onJobClusterDelete).match(JobClusterProto.DeleteJobClusterResponse.class, this::onJobClusterDeleteResponse).match(JobClusterManagerProto.UpdateJobClusterRequest.class, this::onJobClusterUpdate).match(JobClusterManagerProto.UpdateJobClusterSLARequest.class, this::onJobClusterUpdateSLA).match(JobClusterManagerProto.UpdateJobClusterArtifactRequest.class, this::onJobClusterUpdateArtifact).match(UpdateSchedulingInfo.class, this::onJobClusterUpdateSchedulingInfo).match(JobClusterManagerProto.UpdateJobClusterLabelsRequest.class, this::onJobClusterUpdateLabels).match(JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyRequest.class, this::onJobClusterUpdateWorkerMigrationConfig).match(JobClusterManagerProto.EnableJobClusterRequest.class, this::onJobClusterEnable).match(JobClusterManagerProto.DisableJobClusterRequest.class, this::onJobClusterDisable).match(JobClusterManagerProto.GetJobClusterRequest.class, this::onJobClusterGet).match(JobClusterManagerProto.ListCompletedJobsInClusterRequest.class, this::onJobListCompleted).match(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest.class, this::onGetLastSubmittedJobIdSubject).match(JobClusterManagerProto.ListArchivedWorkersRequest.class, this::onListArchivedWorkers).match(JobClusterManagerProto.ListJobClustersRequest.class, this::onJobClustersList).match(JobClusterManagerProto.ListJobsRequest.class, this::onJobList).match(JobClusterManagerProto.ListJobIdsRequest.class, this::onJobIdList).match(JobClusterManagerProto.ListWorkersRequest.class, this::onListActiveWorkers).match(JobClusterManagerProto.SubmitJobRequest.class, this::onJobSubmit).match(JobClusterManagerProto.KillJobRequest.class, this::onJobKillRequest).match(JobClusterManagerProto.GetJobDetailsRequest.class, this::onGetJobDetailsRequest).match(JobClusterManagerProto.GetJobSchedInfoRequest.class, this::onGetJobStatusSubject).match(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest.class, this::onGetLatestJobDiscoveryInfo).match(JobClusterManagerProto.ScaleStageRequest.class, this::onScaleStage).match(JobClusterManagerProto.ResubmitWorkerRequest.class, this::onResubmitWorker).match(WorkerEvent.class, this::onWorkerEvent).match(Terminated.class, this::onTerminated).match(JobClusterManagerProto.JobClustersManagerInitialize.class, jobClustersManagerInitialize -> {
            getSender().tell(new JobClusterManagerProto.JobClustersManagerInitializeResponse(jobClustersManagerInitialize.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(jobClustersManagerInitialize.toString(), str)), getSelf());
        }).matchAny(obj -> {
            this.logger.warn("unexpected message {} received by Job Cluster Manager actor. In initialized state ", obj);
        }).build();
    }

    private String genUnexpectedMsg(String str, String str2) {
        return String.format("Unexpected message %s received by JobClustersManager actor in %s State", str, str2);
    }

    private AbstractActor.Receive getInitializingBehavior() {
        String str = "initializing";
        return receiveBuilder().match(JobClusterManagerProto.JobClustersManagerInitialize.class, this::initialize).match(JobClusterManagerProto.ReconcileJobCluster.class, reconcileJobCluster -> {
            this.logger.warn(genUnexpectedMsg(reconcileJobCluster.toString(), str));
        }).match(JobClusterManagerProto.CreateJobClusterRequest.class, createJobClusterRequest -> {
            getSender().tell(new JobClusterManagerProto.CreateJobClusterResponse(createJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(createJobClusterRequest.toString(), str), createJobClusterRequest.getJobClusterDefinition().getName()), getSelf());
        }).match(JobClusterProto.InitializeJobClusterResponse.class, initializeJobClusterResponse -> {
            this.logger.warn(genUnexpectedMsg(initializeJobClusterResponse.toString(), str));
        }).match(JobClusterManagerProto.DeleteJobClusterRequest.class, deleteJobClusterRequest -> {
            getSender().tell(new JobClusterManagerProto.DeleteJobClusterResponse(deleteJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(deleteJobClusterRequest.toString(), str)), getSelf());
        }).match(JobClusterProto.DeleteJobClusterResponse.class, deleteJobClusterResponse -> {
            this.logger.warn(genUnexpectedMsg(deleteJobClusterResponse.toString(), str));
        }).match(JobClusterManagerProto.UpdateJobClusterRequest.class, updateJobClusterRequest -> {
            getSender().tell(new JobClusterManagerProto.UpdateJobClusterResponse(updateJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(updateJobClusterRequest.toString(), str)), getSelf());
        }).match(JobClusterManagerProto.UpdateJobClusterSLARequest.class, updateJobClusterSLARequest -> {
            getSender().tell(new JobClusterManagerProto.UpdateJobClusterSLAResponse(updateJobClusterSLARequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(updateJobClusterSLARequest.toString(), str)), getSelf());
        }).match(JobClusterManagerProto.UpdateJobClusterArtifactRequest.class, updateJobClusterArtifactRequest -> {
            getSender().tell(new JobClusterManagerProto.UpdateJobClusterArtifactResponse(updateJobClusterArtifactRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(updateJobClusterArtifactRequest.toString(), str)), getSelf());
        }).match(UpdateSchedulingInfo.class, updateSchedulingInfo -> {
            getSender().tell(new JobClusterManagerProto.UpdateSchedulingInfoResponse(updateSchedulingInfo.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(updateSchedulingInfo.toString(), str)), getSelf());
        }).match(JobClusterManagerProto.UpdateJobClusterLabelsRequest.class, updateJobClusterLabelsRequest -> {
            getSender().tell(new JobClusterManagerProto.UpdateJobClusterLabelsResponse(updateJobClusterLabelsRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(updateJobClusterLabelsRequest.toString(), str)), getSelf());
        }).match(JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyRequest.class, updateJobClusterWorkerMigrationStrategyRequest -> {
            getSender().tell(new JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyResponse(updateJobClusterWorkerMigrationStrategyRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(updateJobClusterWorkerMigrationStrategyRequest.toString(), str)), getSelf());
        }).match(JobClusterManagerProto.EnableJobClusterRequest.class, enableJobClusterRequest -> {
            getSender().tell(new JobClusterManagerProto.EnableJobClusterResponse(enableJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(enableJobClusterRequest.toString(), str)), getSelf());
        }).match(JobClusterManagerProto.DisableJobClusterRequest.class, disableJobClusterRequest -> {
            getSender().tell(new JobClusterManagerProto.DisableJobClusterResponse(disableJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(disableJobClusterRequest.toString(), str)), getSelf());
        }).match(JobClusterManagerProto.GetJobClusterRequest.class, getJobClusterRequest -> {
            getSender().tell(new JobClusterManagerProto.GetJobClusterResponse(getJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getJobClusterRequest.toString(), str), Optional.empty()), getSelf());
        }).match(JobClusterManagerProto.ListCompletedJobsInClusterRequest.class, listCompletedJobsInClusterRequest -> {
            this.logger.warn(genUnexpectedMsg(listCompletedJobsInClusterRequest.toString(), str));
        }).match(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest.class, getLastSubmittedJobIdStreamRequest -> {
            getSender().tell(new JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse(getLastSubmittedJobIdStreamRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getLastSubmittedJobIdStreamRequest.toString(), str), Optional.empty()), getSelf());
        }).match(JobClusterManagerProto.ListArchivedWorkersRequest.class, listArchivedWorkersRequest -> {
            getSender().tell(new JobClusterManagerProto.ListArchivedWorkersResponse(listArchivedWorkersRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(listArchivedWorkersRequest.toString(), str), Lists.newArrayList()), getSelf());
        }).match(JobClusterManagerProto.ListJobClustersRequest.class, listJobClustersRequest -> {
            getSender().tell(new JobClusterManagerProto.ListJobClustersResponse(listJobClustersRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(listJobClustersRequest.toString(), str), Lists.newArrayList()), getSelf());
        }).match(JobClusterManagerProto.ListJobsRequest.class, listJobsRequest -> {
            getSender().tell(new JobClusterManagerProto.ListJobsResponse(listJobsRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(listJobsRequest.toString(), str), Lists.newArrayList()), getSelf());
        }).match(JobClusterManagerProto.ListJobIdsRequest.class, listJobIdsRequest -> {
            getSender().tell(new JobClusterManagerProto.ListJobIdsResponse(listJobIdsRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(listJobIdsRequest.toString(), str), Lists.newArrayList()), getSelf());
        }).match(JobClusterManagerProto.ListWorkersRequest.class, listWorkersRequest -> {
            getSender().tell(new JobClusterManagerProto.ListWorkersResponse(listWorkersRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(listWorkersRequest.toString(), str), Lists.newArrayList()), getSelf());
        }).match(JobClusterManagerProto.SubmitJobRequest.class, submitJobRequest -> {
            getSender().tell(new JobClusterManagerProto.SubmitJobResponse(submitJobRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(submitJobRequest.toString(), str), Optional.empty()), getSelf());
        }).match(JobClusterManagerProto.KillJobRequest.class, killJobRequest -> {
            getSender().tell(new JobClusterManagerProto.KillJobResponse(killJobRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, JobState.Noop, genUnexpectedMsg(killJobRequest.toString(), str), killJobRequest.getJobId(), killJobRequest.getUser()), getSelf());
        }).match(JobClusterProto.KillJobResponse.class, killJobResponse -> {
            this.logger.warn(genUnexpectedMsg(killJobResponse.toString(), str));
        }).match(JobClusterManagerProto.GetJobDetailsRequest.class, getJobDetailsRequest -> {
            getSender().tell(new JobClusterManagerProto.GetJobDetailsResponse(getJobDetailsRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getJobDetailsRequest.toString(), str), Optional.empty()), getSelf());
        }).match(JobClusterManagerProto.GetJobSchedInfoRequest.class, getJobSchedInfoRequest -> {
            getSender().tell(new JobClusterManagerProto.GetJobSchedInfoResponse(getJobSchedInfoRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getJobSchedInfoRequest.toString(), str), Optional.empty()), getSelf());
        }).match(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest.class, getLatestJobDiscoveryInfoRequest -> {
            getSender().tell(new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(getLatestJobDiscoveryInfoRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getLatestJobDiscoveryInfoRequest.toString(), str), Optional.empty()), getSelf());
        }).match(JobClusterManagerProto.ScaleStageRequest.class, scaleStageRequest -> {
            getSender().tell(new JobClusterManagerProto.ScaleStageResponse(scaleStageRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(scaleStageRequest.toString(), str), 0), getSelf());
        }).match(JobClusterManagerProto.ResubmitWorkerRequest.class, resubmitWorkerRequest -> {
            getSender().tell(new JobClusterManagerProto.ResubmitWorkerResponse(resubmitWorkerRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(resubmitWorkerRequest.toString(), str)), getSelf());
        }).match(WorkerEvent.class, workerEvent -> {
            this.logger.warn(genUnexpectedMsg(workerEvent.toString(), str));
        }).matchAny(obj -> {
            this.logger.warn("unexpected message {} received by Job Cluster Manager actor. It needs to be initialized first ", obj);
        }).build();
    }

    private void initialize(JobClusterManagerProto.JobClustersManagerInitialize jobClustersManagerInitialize) {
        ActorRef sender = getSender();
        try {
            this.logger.info("In JobClustersManagerActor:initialize");
            this.jobListHelperActor = getContext().actorOf(JobListHelperActor.props(), "JobListHelperActor");
            getContext().watch(this.jobListHelperActor);
            this.mantisSchedulerFactory = jobClustersManagerInitialize.getScheduler();
            HashMap hashMap = new HashMap();
            this.jobClusterInfoManager = new JobClusterInfoManager(this.jobStore, this.mantisSchedulerFactory, this.eventPublisher, this.costsCalculator);
            if (jobClustersManagerInitialize.isLoadJobsFromStore()) {
                List<IJobClusterMetadata> loadAllJobClusters = this.jobStore.loadAllJobClusters();
                this.logger.info("Read {} job clusters from storage", Integer.valueOf(loadAllJobClusters.size()));
                List<IMantisJobMetadata> loadAllActiveJobs = this.jobStore.loadAllActiveJobs();
                this.logger.info("Read {} jobs from storage", Integer.valueOf(loadAllActiveJobs.size()));
                for (IJobClusterMetadata iJobClusterMetadata : loadAllJobClusters) {
                    hashMap.put(iJobClusterMetadata.getJobClusterDefinition().getName(), iJobClusterMetadata);
                }
                HashMap hashMap2 = new HashMap();
                for (IMantisJobMetadata iMantisJobMetadata : loadAllActiveJobs) {
                    ((List) hashMap2.computeIfAbsent(iMantisJobMetadata.getClusterName(), str -> {
                        return new ArrayList();
                    })).add(iMantisJobMetadata);
                }
                long masterInitTimeoutSecs = ConfigurationProvider.getConfig().getMasterInitTimeoutSecs();
                long j = masterInitTimeoutSecs - 60 > 0 ? masterInitTimeoutSecs - 60 : masterInitTimeoutSecs;
                Observable.from(hashMap.values()).filter(iJobClusterMetadata2 -> {
                    return Boolean.valueOf((iJobClusterMetadata2 == null || iJobClusterMetadata2.getJobClusterDefinition() == null) ? false : true);
                }).flatMap(iJobClusterMetadata3 -> {
                    Duration ofSeconds = Duration.ofSeconds(j);
                    Optional<JobClusterInfo> createClusterActorAndRegister = this.jobClusterInfoManager.createClusterActorAndRegister(iJobClusterMetadata3.getJobClusterDefinition());
                    if (!createClusterActorAndRegister.isPresent()) {
                        this.logger.info("skipping job cluster {} on bootstrap as actor creating failed", iJobClusterMetadata3.getJobClusterDefinition().getName());
                        return Observable.empty();
                    }
                    JobClusterInfo jobClusterInfo = createClusterActorAndRegister.get();
                    ArrayList newArrayList = Lists.newArrayList();
                    List list = (List) hashMap2.get(iJobClusterMetadata3.getJobClusterDefinition().getName());
                    if (list != null) {
                        newArrayList.addAll(list);
                    }
                    Lists.newArrayList();
                    return this.jobClusterInfoManager.initializeCluster(jobClusterInfo, new JobClusterProto.InitializeJobClusterRequest((JobClusterDefinitionImpl) iJobClusterMetadata3.getJobClusterDefinition(), iJobClusterMetadata3.isDisabled(), iJobClusterMetadata3.getLastJobCount(), newArrayList, "system", getSelf(), false), ofSeconds);
                }).filter((v0) -> {
                    return Objects.nonNull(v0);
                }).toBlocking().subscribe(initializeJobClusterResponse -> {
                    this.logger.info("JobCluster {} inited with code {}", initializeJobClusterResponse.jobClusterName, initializeJobClusterResponse.responseCode);
                    this.numJobClusterInitSuccesses.increment();
                }, th -> {
                    this.logger.warn("Exception initializing clusters {}", th.getMessage(), th);
                    this.logger.error("JobClusterManagerActor had errors during initialization NOT transitioning to initialized behavior");
                    sender.tell(new JobClusterManagerProto.JobClustersManagerInitializeResponse(jobClustersManagerInitialize.requestId, BaseResponse.ResponseCode.SERVER_ERROR, "JobClustersManager  inited with errors"), getSelf());
                }, () -> {
                    this.logger.info("JobClusterManagerActor transitioning to initialized behavior");
                    getContext().become(this.initializedBehavior);
                    sender.tell(new JobClusterManagerProto.JobClustersManagerInitializeResponse(jobClustersManagerInitialize.requestId, BaseResponse.ResponseCode.SUCCESS, "JobClustersManager successfully inited"), getSelf());
                });
                getTimers().startPeriodicTimer(CHECK_CLUSTERS_TIMER_KEY, new JobClusterManagerProto.ReconcileJobCluster(), Duration.ofSeconds(30L));
                this.logger.info("Kicking off archived job load asynchronously");
                this.jobStore.loadAllArchivedJobsAsync();
            } else {
                getContext().become(this.initializedBehavior);
                sender.tell(new JobClusterManagerProto.JobClustersManagerInitializeResponse(jobClustersManagerInitialize.requestId, BaseResponse.ResponseCode.SUCCESS, "JobClustersManager successfully inited"), getSelf());
            }
        } catch (Exception e) {
            this.logger.error("caught exception", e);
            sender.tell(new JobClusterManagerProto.JobClustersManagerInitializeResponse(jobClustersManagerInitialize.requestId, BaseResponse.ResponseCode.SERVER_ERROR, e.getMessage()), getSelf());
        }
        this.logger.info("JobClustersManagerActor:initialize ends");
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onReconcileJobClusters(JobClusterManagerProto.ReconcileJobCluster reconcileJobCluster) {
        Set set = (Set) this.jobClusterInfoManager.getAllJobClusterInfo().values().stream().filter(jobClusterInfo -> {
            return (jobClusterInfo.currentState == JobClusterInfo.JobClusterState.INITIALIZING || jobClusterInfo.currentState == JobClusterInfo.JobClusterState.DELETING) && reconcileJobCluster.timeOfEnforcement.toEpochMilli() - jobClusterInfo.stateUpdateTime > 5000;
        }).collect(Collectors.toSet());
        if (set.size() > 0) {
            this.logger.warn("{} JobClusters stuck in initializing/deleting state ", Integer.valueOf(set.size()));
            set.stream().forEach(jobClusterInfo2 -> {
                if (!jobClusterInfo2.currentState.equals(JobClusterInfo.JobClusterState.INITIALIZING)) {
                    this.logger.warn("Deregistering JobCluster {} stuck in {} state since {}", new Object[]{jobClusterInfo2.clusterName, jobClusterInfo2.currentState, Long.valueOf(jobClusterInfo2.stateUpdateTime)});
                    this.jobClusterInfoManager.deregisterJobCluster(jobClusterInfo2.clusterName);
                } else {
                    this.logger.warn("Retrying init on JobCluster {} stuck in {} state since {}", new Object[]{jobClusterInfo2.clusterName, jobClusterInfo2.currentState, Long.valueOf(jobClusterInfo2.stateUpdateTime)});
                    jobClusterInfo2.stateUpdateTime = reconcileJobCluster.timeOfEnforcement.toEpochMilli();
                    jobClusterInfo2.jobClusterActor.tell(jobClusterInfo2.initRequest, getSelf());
                }
            });
        }
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onJobClusterCreate(JobClusterManagerProto.CreateJobClusterRequest createJobClusterRequest) {
        if (this.jobClusterInfoManager.isClusterExists(createJobClusterRequest.getJobClusterDefinition().getName())) {
            getSender().tell(new JobClusterManagerProto.CreateJobClusterResponse(createJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_CONFLICT, "Job Cluster " + createJobClusterRequest.getJobClusterDefinition().getName() + " already exists", createJobClusterRequest.getJobClusterDefinition().getName()), getSelf());
            return;
        }
        try {
            Optional<JobClusterInfo> createClusterActorAndRegister = this.jobClusterInfoManager.createClusterActorAndRegister(createJobClusterRequest.getJobClusterDefinition());
            if (createClusterActorAndRegister.isPresent()) {
                this.jobClusterInfoManager.initializeClusterAsync(createClusterActorAndRegister.get(), new JobClusterProto.InitializeJobClusterRequest(createJobClusterRequest.getJobClusterDefinition(), createJobClusterRequest.getUser(), getSender()));
            } else {
                getSender().tell(new JobClusterManagerProto.CreateJobClusterResponse(createJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, "Job Cluster " + createJobClusterRequest.getJobClusterDefinition().getName() + " could not be created due to invalid name", createJobClusterRequest.getJobClusterDefinition().getName()), getSelf());
            }
        } catch (Exception e) {
            getSender().tell(new JobClusterManagerProto.CreateJobClusterResponse(createJobClusterRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, "Job Cluster " + createJobClusterRequest.getJobClusterDefinition().getName() + " could not be created due to " + e.getMessage(), createJobClusterRequest.getJobClusterDefinition().getName()), getSelf());
        }
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onJobClusterInitializeResponse(JobClusterProto.InitializeJobClusterResponse initializeJobClusterResponse) {
        this.logger.info("Got JobClusterInitializeResponse {}", initializeJobClusterResponse);
        this.jobClusterInfoManager.processInitializeResponse(initializeJobClusterResponse);
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onJobClusterDelete(JobClusterManagerProto.DeleteJobClusterRequest deleteJobClusterRequest) {
        this.jobClusterInfoManager.processDeleteRequest(deleteJobClusterRequest);
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onJobClusterDeleteResponse(JobClusterProto.DeleteJobClusterResponse deleteJobClusterResponse) {
        this.jobClusterInfoManager.processDeleteResponse(deleteJobClusterResponse);
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onJobClusterUpdate(JobClusterManagerProto.UpdateJobClusterRequest updateJobClusterRequest) {
        Optional<JobClusterInfo> jobClusterInfo = this.jobClusterInfoManager.getJobClusterInfo(updateJobClusterRequest.getJobClusterDefinition().getName());
        ActorRef sender = getSender();
        if (jobClusterInfo.isPresent()) {
            jobClusterInfo.get().jobClusterActor.forward(updateJobClusterRequest, getContext());
        } else {
            sender.tell(new JobClusterManagerProto.UpdateJobClusterResponse(updateJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "JobCluster " + updateJobClusterRequest.getJobClusterDefinition().getName() + " doesn't exist"), getSelf());
        }
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onJobClustersList(JobClusterManagerProto.ListJobClustersRequest listJobClustersRequest) {
        if (this.logger.isDebugEnabled()) {
            this.logger.info("In onJobClustersListRequest {}", listJobClustersRequest);
        }
        this.jobListHelperActor.tell(new JobListHelperActor.ListJobClusterRequestWrapper(listJobClustersRequest, getSender(), this.jobClusterInfoManager.getAllJobClusterInfo()), getSelf());
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onJobClusterGet(JobClusterManagerProto.GetJobClusterRequest getJobClusterRequest) {
        Optional<JobClusterInfo> jobClusterInfo = this.jobClusterInfoManager.getJobClusterInfo(getJobClusterRequest.getJobClusterName());
        ActorRef sender = getSender();
        if (jobClusterInfo.isPresent()) {
            jobClusterInfo.get().jobClusterActor.forward(getJobClusterRequest, getContext());
        } else {
            sender.tell(new JobClusterManagerProto.GetJobClusterResponse(getJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "No such Job cluster " + getJobClusterRequest.getJobClusterName(), Optional.empty()), getSelf());
        }
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onGetLastSubmittedJobIdSubject(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest getLastSubmittedJobIdStreamRequest) {
        Optional<JobClusterInfo> jobClusterInfo = this.jobClusterInfoManager.getJobClusterInfo(getLastSubmittedJobIdStreamRequest.getClusterName());
        ActorRef sender = getSender();
        if (jobClusterInfo.isPresent()) {
            jobClusterInfo.get().jobClusterActor.forward(getLastSubmittedJobIdStreamRequest, getContext());
        } else {
            sender.tell(new JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse(getLastSubmittedJobIdStreamRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "No such Job cluster " + getLastSubmittedJobIdStreamRequest.getClusterName(), Optional.empty()), getSelf());
        }
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onWorkerEvent(WorkerEvent workerEvent) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Entering JobClusterManagerActor:onWorkerEvent {}", workerEvent);
        }
        Optional<JobClusterInfo> jobClusterInfo = this.jobClusterInfoManager.getJobClusterInfo(workerEvent.getWorkerId().getJobCluster());
        if (jobClusterInfo.isPresent()) {
            jobClusterInfo.get().jobClusterActor.forward(workerEvent, getContext());
            return;
        }
        if (JobHelper.isTerminalWorkerEvent(workerEvent)) {
            this.logger.warn("Terminal Event from Worker {} for a cluster {} that no longer exists. Ignore worker", workerEvent, workerEvent.getWorkerId().getJobCluster());
            return;
        }
        this.logger.warn("Event from Worker {} for a cluster {} that no longer exists. Terminate worker", workerEvent, workerEvent.getWorkerId().getJobCluster());
        Optional<String> workerHostFromWorkerEvent = JobHelper.getWorkerHostFromWorkerEvent(workerEvent);
        Optional<JobDefinition> archivedJobDefinition = this.jobClusterInfoManager.getArchivedJobDefinition(workerEvent.getWorkerId().getJobId());
        if (archivedJobDefinition.isPresent()) {
            this.mantisSchedulerFactory.forJob(archivedJobDefinition.get()).unscheduleAndTerminateWorker(workerEvent.getWorkerId(), workerHostFromWorkerEvent);
        } else {
            this.logger.error("Non-Terminal Event {} from worker {} for a cluster {} that no longer exists and the job definition not yet archived", new Object[]{workerEvent, workerEvent.getWorkerId(), workerEvent.getWorkerId().getJobCluster()});
        }
    }

    private void onTerminated(Terminated terminated) {
        this.logger.warn("onTerminated {}", terminated.actor());
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onJobSubmit(JobClusterManagerProto.SubmitJobRequest submitJobRequest) {
        this.logger.info("Submitting job " + submitJobRequest);
        Optional<JobClusterInfo> jobClusterInfo = this.jobClusterInfoManager.getJobClusterInfo(submitJobRequest.getClusterName());
        ActorRef sender = getSender();
        if (jobClusterInfo.isPresent()) {
            jobClusterInfo.get().jobClusterActor.forward(submitJobRequest, getContext());
        } else {
            sender.tell(new JobClusterManagerProto.SubmitJobResponse(submitJobRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "Job Cluster " + submitJobRequest.getClusterName() + " doesn't exist", Optional.empty()), getSelf());
        }
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onJobKillRequest(JobClusterManagerProto.KillJobRequest killJobRequest) {
        this.logger.info("Killing job " + killJobRequest);
        ActorRef sender = getSender();
        JobId jobId = killJobRequest.getJobId();
        Optional<JobClusterInfo> jobClusterInfo = this.jobClusterInfoManager.getJobClusterInfo(jobId.getCluster());
        if (jobClusterInfo.isPresent()) {
            jobClusterInfo.get().jobClusterActor.tell(new JobClusterProto.KillJobRequest(killJobRequest.getJobId(), killJobRequest.getReason(), JobCompletedReason.Killed, killJobRequest.getUser(), sender), getSelf());
        } else {
            this.logger.info("Job cluster {} not found", jobId.getCluster());
            sender.tell(new JobClusterManagerProto.KillJobResponse(killJobRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, JobState.Noop, "Job cluster " + jobId.getCluster() + " doesn't exist", jobId, killJobRequest.getUser()), getSelf());
        }
    }

    public void preStart() throws Exception {
        this.logger.info("JobClusterManager Actor started");
        super.preStart();
    }

    public void postStop() throws Exception {
        this.logger.info("JobClusterManager Actor stopped");
        super.postStop();
    }

    public void preRestart(Throwable th, Optional<Object> optional) throws Exception {
        this.logger.info("preRestart {} (exc: {})", optional, th.getMessage());
    }

    public void postRestart(Throwable th) throws Exception {
        this.logger.info("postRestart (exc={})", th.getMessage());
        super.postRestart(th);
    }

    public SupervisorStrategy supervisorStrategy() {
        return MantisActorSupervisorStrategy.getInstance().create();
    }

    public AbstractActor.Receive createReceive() {
        return getInitializingBehavior();
    }

    private void logError(Throwable th) {
        this.logger.error("Exception occurred retrieving job cluster list {}", th.getMessage());
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onJobClusterUpdateSLA(JobClusterManagerProto.UpdateJobClusterSLARequest updateJobClusterSLARequest) {
        Optional<JobClusterInfo> jobClusterInfo = this.jobClusterInfoManager.getJobClusterInfo(updateJobClusterSLARequest.getClusterName());
        ActorRef sender = getSender();
        if (jobClusterInfo.isPresent()) {
            jobClusterInfo.get().jobClusterActor.forward(updateJobClusterSLARequest, getContext());
        } else {
            sender.tell(new JobClusterManagerProto.UpdateJobClusterSLAResponse(updateJobClusterSLARequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "JobCluster " + updateJobClusterSLARequest.getClusterName() + " doesn't exist"), getSelf());
        }
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onJobClusterUpdateArtifact(JobClusterManagerProto.UpdateJobClusterArtifactRequest updateJobClusterArtifactRequest) {
        Optional<JobClusterInfo> jobClusterInfo = this.jobClusterInfoManager.getJobClusterInfo(updateJobClusterArtifactRequest.getClusterName());
        ActorRef sender = getSender();
        if (jobClusterInfo.isPresent()) {
            jobClusterInfo.get().jobClusterActor.forward(updateJobClusterArtifactRequest, getContext());
        } else {
            sender.tell(new JobClusterManagerProto.UpdateJobClusterArtifactResponse(updateJobClusterArtifactRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "JobCluster " + updateJobClusterArtifactRequest.getClusterName() + " doesn't exist"), getSelf());
        }
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onJobClusterUpdateSchedulingInfo(UpdateSchedulingInfo updateSchedulingInfo) {
        ActorRef sender = getSender();
        Optional<JobClusterInfo> jobClusterInfo = this.jobClusterInfoManager.getJobClusterInfo(updateSchedulingInfo.getClusterName());
        if (jobClusterInfo.isPresent()) {
            jobClusterInfo.get().jobClusterActor.forward(updateSchedulingInfo, getContext());
        } else {
            sender.tell(new JobClusterManagerProto.UpdateJobClusterArtifactResponse(updateSchedulingInfo.getRequestId(), BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "JobCluster " + updateSchedulingInfo.getClusterName() + " doesn't exist"), getSelf());
        }
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onJobClusterUpdateLabels(JobClusterManagerProto.UpdateJobClusterLabelsRequest updateJobClusterLabelsRequest) {
        Optional<JobClusterInfo> jobClusterInfo = this.jobClusterInfoManager.getJobClusterInfo(updateJobClusterLabelsRequest.getClusterName());
        ActorRef sender = getSender();
        if (jobClusterInfo.isPresent()) {
            jobClusterInfo.get().jobClusterActor.forward(updateJobClusterLabelsRequest, getContext());
        } else {
            sender.tell(new JobClusterManagerProto.UpdateJobClusterLabelsResponse(updateJobClusterLabelsRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "JobCluster " + updateJobClusterLabelsRequest.getClusterName() + " doesn't exist"), getSelf());
        }
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onJobClusterUpdateWorkerMigrationConfig(JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyRequest updateJobClusterWorkerMigrationStrategyRequest) {
        Optional<JobClusterInfo> jobClusterInfo = this.jobClusterInfoManager.getJobClusterInfo(updateJobClusterWorkerMigrationStrategyRequest.getClusterName());
        ActorRef sender = getSender();
        if (jobClusterInfo.isPresent()) {
            jobClusterInfo.get().jobClusterActor.forward(updateJobClusterWorkerMigrationStrategyRequest, getContext());
        } else {
            sender.tell(new JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyResponse(updateJobClusterWorkerMigrationStrategyRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "JobCluster " + updateJobClusterWorkerMigrationStrategyRequest.getClusterName() + " doesn't exist"), getSelf());
        }
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onJobClusterEnable(JobClusterManagerProto.EnableJobClusterRequest enableJobClusterRequest) {
        Optional<JobClusterInfo> jobClusterInfo = this.jobClusterInfoManager.getJobClusterInfo(enableJobClusterRequest.getClusterName());
        ActorRef sender = getSender();
        if (jobClusterInfo.isPresent()) {
            jobClusterInfo.get().jobClusterActor.forward(enableJobClusterRequest, getContext());
        } else {
            sender.tell(new JobClusterManagerProto.EnableJobClusterResponse(enableJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "JobCluster " + enableJobClusterRequest.getClusterName() + " doesn't exist"), getSelf());
        }
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onJobClusterDisable(JobClusterManagerProto.DisableJobClusterRequest disableJobClusterRequest) {
        Optional<JobClusterInfo> jobClusterInfo = this.jobClusterInfoManager.getJobClusterInfo(disableJobClusterRequest.getClusterName());
        ActorRef sender = getSender();
        if (jobClusterInfo.isPresent()) {
            jobClusterInfo.get().jobClusterActor.forward(disableJobClusterRequest, getContext());
        } else {
            sender.tell(new JobClusterManagerProto.DisableJobClusterResponse(disableJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "JobCluster " + disableJobClusterRequest.getClusterName() + " doesn't exist"), getSelf());
        }
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onGetJobDetailsRequest(JobClusterManagerProto.GetJobDetailsRequest getJobDetailsRequest) {
        Optional<JobClusterInfo> jobClusterInfo = this.jobClusterInfoManager.getJobClusterInfo(getJobDetailsRequest.getJobId().getCluster());
        ActorRef sender = getSender();
        if (jobClusterInfo.isPresent()) {
            jobClusterInfo.get().jobClusterActor.forward(getJobDetailsRequest, getContext());
        } else {
            sender.tell(new JobClusterManagerProto.GetJobDetailsResponse(getJobDetailsRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "Job " + getJobDetailsRequest.getJobId().getId() + " doesn't exist", Optional.empty()), getSelf());
        }
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onGetJobStatusSubject(JobClusterManagerProto.GetJobSchedInfoRequest getJobSchedInfoRequest) {
        Optional<JobClusterInfo> jobClusterInfo = this.jobClusterInfoManager.getJobClusterInfo(getJobSchedInfoRequest.getJobId().getCluster());
        ActorRef sender = getSender();
        if (jobClusterInfo.isPresent()) {
            jobClusterInfo.get().jobClusterActor.forward(getJobSchedInfoRequest, getContext());
        } else {
            sender.tell(new JobClusterManagerProto.GetJobSchedInfoResponse(getJobSchedInfoRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "JobCluster " + getJobSchedInfoRequest.getJobId().getCluster() + " doesn't exist", Optional.empty()), getSelf());
        }
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onGetLatestJobDiscoveryInfo(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest getLatestJobDiscoveryInfoRequest) {
        Optional<JobClusterInfo> jobClusterInfo = this.jobClusterInfoManager.getJobClusterInfo(getLatestJobDiscoveryInfoRequest.getJobCluster());
        ActorRef sender = getSender();
        if (jobClusterInfo.isPresent()) {
            jobClusterInfo.get().jobClusterActor.forward(getLatestJobDiscoveryInfoRequest, getContext());
        } else {
            sender.tell(new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(getLatestJobDiscoveryInfoRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "JobCluster " + getLatestJobDiscoveryInfoRequest.getJobCluster() + " doesn't exist", Optional.empty()), getSelf());
        }
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onJobListCompleted(JobClusterManagerProto.ListCompletedJobsInClusterRequest listCompletedJobsInClusterRequest) {
        Optional<JobClusterInfo> jobClusterInfo = this.jobClusterInfoManager.getJobClusterInfo(listCompletedJobsInClusterRequest.getClusterName());
        ActorRef sender = getSender();
        if (jobClusterInfo.isPresent()) {
            jobClusterInfo.get().jobClusterActor.forward(listCompletedJobsInClusterRequest, getContext());
        } else {
            sender.tell(new JobClusterManagerProto.ListCompletedJobsInClusterResponse(listCompletedJobsInClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "JobCluster " + listCompletedJobsInClusterRequest.getClusterName() + " doesn't exist", Lists.newArrayList()), getSelf());
        }
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onJobIdList(JobClusterManagerProto.ListJobIdsRequest listJobIdsRequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onJobIdList");
        }
        this.jobListHelperActor.tell(new JobListHelperActor.ListJobIdRequestWrapper(listJobIdsRequest, getSender(), this.jobClusterInfoManager.getAllJobClusterInfo()), getSelf());
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onJobIdList");
        }
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onListArchivedWorkers(JobClusterManagerProto.ListArchivedWorkersRequest listArchivedWorkersRequest) {
        Optional<JobClusterInfo> jobClusterInfo = this.jobClusterInfoManager.getJobClusterInfo(listArchivedWorkersRequest.getJobId().getCluster());
        getSender();
        if (jobClusterInfo.isPresent()) {
            jobClusterInfo.get().jobClusterActor.forward(listArchivedWorkersRequest, getContext());
        } else {
            getSender().tell(new JobClusterManagerProto.ListArchivedWorkersResponse(listArchivedWorkersRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, "Job Cluster " + listArchivedWorkersRequest.getJobId().getCluster() + " Not found", Lists.newArrayList()), getSelf());
        }
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onListActiveWorkers(JobClusterManagerProto.ListWorkersRequest listWorkersRequest) {
        Optional<JobClusterInfo> jobClusterInfo = this.jobClusterInfoManager.getJobClusterInfo(listWorkersRequest.getJobId().getCluster());
        if (jobClusterInfo.isPresent()) {
            jobClusterInfo.get().jobClusterActor.forward(listWorkersRequest, getContext());
        } else {
            getSender().tell(new JobClusterManagerProto.ListWorkersResponse(listWorkersRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, "Job Cluster " + listWorkersRequest.getJobId().getCluster() + " Not found", Lists.newArrayList()), getSelf());
        }
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onJobList(JobClusterManagerProto.ListJobsRequest listJobsRequest) {
        this.jobListHelperActor.tell(new JobListHelperActor.ListJobRequestWrapper(listJobsRequest, getSender(), this.jobClusterInfoManager.getAllJobClusterInfo()), getSelf());
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onScaleStage(JobClusterManagerProto.ScaleStageRequest scaleStageRequest) {
        Optional<JobClusterInfo> jobClusterInfo = this.jobClusterInfoManager.getJobClusterInfo(scaleStageRequest.getJobId().getCluster());
        ActorRef sender = getSender();
        if (jobClusterInfo.isPresent()) {
            jobClusterInfo.get().jobClusterActor.forward(scaleStageRequest, getContext());
        } else {
            sender.tell(new JobClusterManagerProto.ScaleStageResponse(scaleStageRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "JobCluster " + scaleStageRequest.getJobId().getCluster() + " doesn't exist", 0), getSelf());
        }
    }

    @Override // io.mantisrx.master.IJobClustersManager
    public void onResubmitWorker(JobClusterManagerProto.ResubmitWorkerRequest resubmitWorkerRequest) {
        Optional<JobClusterInfo> jobClusterInfo = this.jobClusterInfoManager.getJobClusterInfo(resubmitWorkerRequest.getJobId().getCluster());
        ActorRef sender = getSender();
        if (jobClusterInfo.isPresent()) {
            jobClusterInfo.get().jobClusterActor.forward(resubmitWorkerRequest, getContext());
        } else {
            sender.tell(new JobClusterManagerProto.ResubmitWorkerResponse(resubmitWorkerRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "JobCluster " + resubmitWorkerRequest.getJobId().getCluster() + " doesn't exist"), getSelf());
        }
    }
}
