package io.mantisrx.server.master.scheduler;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.Props;
import akka.actor.Status;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import com.netflix.spectator.api.Tag;
import io.mantisrx.common.Ack;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.Timer;
import io.mantisrx.server.core.domain.WorkerId;
import io.mantisrx.server.master.ExecuteStageRequestFactory;
import io.mantisrx.server.master.resourcecluster.ResourceCluster;
import io.mantisrx.server.master.resourcecluster.TaskExecutorAllocationRequest;
import io.mantisrx.server.master.resourcecluster.TaskExecutorID;
import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration;
import io.mantisrx.server.worker.TaskExecutorGateway;
import io.mantisrx.shaded.com.google.common.base.Throwables;
import java.beans.ConstructorProperties;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.class */
class ResourceClusterAwareSchedulerActor extends AbstractActorWithTimers {
    private static final Logger log = LoggerFactory.getLogger(ResourceClusterAwareSchedulerActor.class);
    private final ResourceCluster resourceCluster;
    private final ExecuteStageRequestFactory executeStageRequestFactory;
    private final JobMessageRouter jobMessageRouter;
    private final int maxScheduleRetries;
    private final int maxCancelRetries;
    private final Duration intervalBetweenRetries;
    private final Timer schedulingLatency;
    private final Counter schedulingFailures;
    private final Counter connectionFailures;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor$AssignedScheduleRequestEvent.class */
    public static final class AssignedScheduleRequestEvent {
        private final ScheduleRequestEvent scheduleRequestEvent;
        private final TaskExecutorID taskExecutorID;

        @ConstructorProperties({"scheduleRequestEvent", "taskExecutorID"})
        public AssignedScheduleRequestEvent(ScheduleRequestEvent scheduleRequestEvent, TaskExecutorID taskExecutorID) {
            this.scheduleRequestEvent = scheduleRequestEvent;
            this.taskExecutorID = taskExecutorID;
        }

        public ScheduleRequestEvent getScheduleRequestEvent() {
            return this.scheduleRequestEvent;
        }

        public TaskExecutorID getTaskExecutorID() {
            return this.taskExecutorID;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof AssignedScheduleRequestEvent)) {
                return false;
            }
            AssignedScheduleRequestEvent assignedScheduleRequestEvent = (AssignedScheduleRequestEvent) obj;
            ScheduleRequestEvent scheduleRequestEvent = getScheduleRequestEvent();
            ScheduleRequestEvent scheduleRequestEvent2 = assignedScheduleRequestEvent.getScheduleRequestEvent();
            if (scheduleRequestEvent == null) {
                if (scheduleRequestEvent2 != null) {
                    return false;
                }
            } else if (!scheduleRequestEvent.equals(scheduleRequestEvent2)) {
                return false;
            }
            TaskExecutorID taskExecutorID = getTaskExecutorID();
            TaskExecutorID taskExecutorID2 = assignedScheduleRequestEvent.getTaskExecutorID();
            return taskExecutorID == null ? taskExecutorID2 == null : taskExecutorID.equals(taskExecutorID2);
        }

        public int hashCode() {
            ScheduleRequestEvent scheduleRequestEvent = getScheduleRequestEvent();
            int hashCode = (1 * 59) + (scheduleRequestEvent == null ? 43 : scheduleRequestEvent.hashCode());
            TaskExecutorID taskExecutorID = getTaskExecutorID();
            return (hashCode * 59) + (taskExecutorID == null ? 43 : taskExecutorID.hashCode());
        }

        public String toString() {
            return "ResourceClusterAwareSchedulerActor.AssignedScheduleRequestEvent(scheduleRequestEvent=" + getScheduleRequestEvent() + ", taskExecutorID=" + getTaskExecutorID() + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor$CancelRequestEvent.class */
    public static final class CancelRequestEvent {
        private final WorkerId workerId;
        private final int attempt;
        private final Throwable previousFailure;

        /* JADX INFO: Access modifiers changed from: package-private */
        public static CancelRequestEvent of(WorkerId workerId) {
            return new CancelRequestEvent(workerId, 1, null);
        }

        RetryCancelRequestEvent onFailure(Throwable th) {
            return new RetryCancelRequestEvent(this, th);
        }

        @ConstructorProperties({"workerId", "attempt", "previousFailure"})
        public CancelRequestEvent(WorkerId workerId, int i, Throwable th) {
            this.workerId = workerId;
            this.attempt = i;
            this.previousFailure = th;
        }

        public WorkerId getWorkerId() {
            return this.workerId;
        }

        public int getAttempt() {
            return this.attempt;
        }

        public Throwable getPreviousFailure() {
            return this.previousFailure;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof CancelRequestEvent)) {
                return false;
            }
            CancelRequestEvent cancelRequestEvent = (CancelRequestEvent) obj;
            if (getAttempt() != cancelRequestEvent.getAttempt()) {
                return false;
            }
            WorkerId workerId = getWorkerId();
            WorkerId workerId2 = cancelRequestEvent.getWorkerId();
            if (workerId == null) {
                if (workerId2 != null) {
                    return false;
                }
            } else if (!workerId.equals(workerId2)) {
                return false;
            }
            Throwable previousFailure = getPreviousFailure();
            Throwable previousFailure2 = cancelRequestEvent.getPreviousFailure();
            return previousFailure == null ? previousFailure2 == null : previousFailure.equals(previousFailure2);
        }

        public int hashCode() {
            int attempt = (1 * 59) + getAttempt();
            WorkerId workerId = getWorkerId();
            int hashCode = (attempt * 59) + (workerId == null ? 43 : workerId.hashCode());
            Throwable previousFailure = getPreviousFailure();
            return (hashCode * 59) + (previousFailure == null ? 43 : previousFailure.hashCode());
        }

        public String toString() {
            return "ResourceClusterAwareSchedulerActor.CancelRequestEvent(workerId=" + getWorkerId() + ", attempt=" + getAttempt() + ", previousFailure=" + getPreviousFailure() + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor$FailedToScheduleRequestEvent.class */
    public static final class FailedToScheduleRequestEvent {
        private final ScheduleRequestEvent scheduleRequestEvent;
        private final int attempt;
        private final Throwable throwable;

        /* JADX INFO: Access modifiers changed from: private */
        public ScheduleRequestEvent onRetry() {
            return new ScheduleRequestEvent(this.scheduleRequestEvent.getRequest(), this.attempt + 1, this.throwable, this.scheduleRequestEvent.getEventTime());
        }

        @ConstructorProperties({"scheduleRequestEvent", "attempt", "throwable"})
        public FailedToScheduleRequestEvent(ScheduleRequestEvent scheduleRequestEvent, int i, Throwable th) {
            this.scheduleRequestEvent = scheduleRequestEvent;
            this.attempt = i;
            this.throwable = th;
        }

        public ScheduleRequestEvent getScheduleRequestEvent() {
            return this.scheduleRequestEvent;
        }

        public int getAttempt() {
            return this.attempt;
        }

        public Throwable getThrowable() {
            return this.throwable;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof FailedToScheduleRequestEvent)) {
                return false;
            }
            FailedToScheduleRequestEvent failedToScheduleRequestEvent = (FailedToScheduleRequestEvent) obj;
            if (getAttempt() != failedToScheduleRequestEvent.getAttempt()) {
                return false;
            }
            ScheduleRequestEvent scheduleRequestEvent = getScheduleRequestEvent();
            ScheduleRequestEvent scheduleRequestEvent2 = failedToScheduleRequestEvent.getScheduleRequestEvent();
            if (scheduleRequestEvent == null) {
                if (scheduleRequestEvent2 != null) {
                    return false;
                }
            } else if (!scheduleRequestEvent.equals(scheduleRequestEvent2)) {
                return false;
            }
            Throwable throwable = getThrowable();
            Throwable throwable2 = failedToScheduleRequestEvent.getThrowable();
            return throwable == null ? throwable2 == null : throwable.equals(throwable2);
        }

        public int hashCode() {
            int attempt = (1 * 59) + getAttempt();
            ScheduleRequestEvent scheduleRequestEvent = getScheduleRequestEvent();
            int hashCode = (attempt * 59) + (scheduleRequestEvent == null ? 43 : scheduleRequestEvent.hashCode());
            Throwable throwable = getThrowable();
            return (hashCode * 59) + (throwable == null ? 43 : throwable.hashCode());
        }

        public String toString() {
            return "ResourceClusterAwareSchedulerActor.FailedToScheduleRequestEvent(scheduleRequestEvent=" + getScheduleRequestEvent() + ", attempt=" + getAttempt() + ", throwable=" + getThrowable() + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor$FailedToSubmitScheduleRequestEvent.class */
    public static final class FailedToSubmitScheduleRequestEvent {
        private final ScheduleRequestEvent scheduleRequestEvent;
        private final TaskExecutorID taskExecutorID;
        private final Throwable throwable;

        @ConstructorProperties({"scheduleRequestEvent", "taskExecutorID", "throwable"})
        public FailedToSubmitScheduleRequestEvent(ScheduleRequestEvent scheduleRequestEvent, TaskExecutorID taskExecutorID, Throwable th) {
            this.scheduleRequestEvent = scheduleRequestEvent;
            this.taskExecutorID = taskExecutorID;
            this.throwable = th;
        }

        public ScheduleRequestEvent getScheduleRequestEvent() {
            return this.scheduleRequestEvent;
        }

        public TaskExecutorID getTaskExecutorID() {
            return this.taskExecutorID;
        }

        public Throwable getThrowable() {
            return this.throwable;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof FailedToSubmitScheduleRequestEvent)) {
                return false;
            }
            FailedToSubmitScheduleRequestEvent failedToSubmitScheduleRequestEvent = (FailedToSubmitScheduleRequestEvent) obj;
            ScheduleRequestEvent scheduleRequestEvent = getScheduleRequestEvent();
            ScheduleRequestEvent scheduleRequestEvent2 = failedToSubmitScheduleRequestEvent.getScheduleRequestEvent();
            if (scheduleRequestEvent == null) {
                if (scheduleRequestEvent2 != null) {
                    return false;
                }
            } else if (!scheduleRequestEvent.equals(scheduleRequestEvent2)) {
                return false;
            }
            TaskExecutorID taskExecutorID = getTaskExecutorID();
            TaskExecutorID taskExecutorID2 = failedToSubmitScheduleRequestEvent.getTaskExecutorID();
            if (taskExecutorID == null) {
                if (taskExecutorID2 != null) {
                    return false;
                }
            } else if (!taskExecutorID.equals(taskExecutorID2)) {
                return false;
            }
            Throwable throwable = getThrowable();
            Throwable throwable2 = failedToSubmitScheduleRequestEvent.getThrowable();
            return throwable == null ? throwable2 == null : throwable.equals(throwable2);
        }

        public int hashCode() {
            ScheduleRequestEvent scheduleRequestEvent = getScheduleRequestEvent();
            int hashCode = (1 * 59) + (scheduleRequestEvent == null ? 43 : scheduleRequestEvent.hashCode());
            TaskExecutorID taskExecutorID = getTaskExecutorID();
            int hashCode2 = (hashCode * 59) + (taskExecutorID == null ? 43 : taskExecutorID.hashCode());
            Throwable throwable = getThrowable();
            return (hashCode2 * 59) + (throwable == null ? 43 : throwable.hashCode());
        }

        public String toString() {
            return "ResourceClusterAwareSchedulerActor.FailedToSubmitScheduleRequestEvent(scheduleRequestEvent=" + getScheduleRequestEvent() + ", taskExecutorID=" + getTaskExecutorID() + ", throwable=" + getThrowable() + ")";
        }
    }

    /* loaded from: input_file:io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor$InitializeRunningWorkerRequestEvent.class */
    static final class InitializeRunningWorkerRequestEvent {
        private final ScheduleRequest scheduleRequest;
        private final TaskExecutorID taskExecutorID;

        @ConstructorProperties({"scheduleRequest", "taskExecutorID"})
        public InitializeRunningWorkerRequestEvent(ScheduleRequest scheduleRequest, TaskExecutorID taskExecutorID) {
            this.scheduleRequest = scheduleRequest;
            this.taskExecutorID = taskExecutorID;
        }

        public ScheduleRequest getScheduleRequest() {
            return this.scheduleRequest;
        }

        public TaskExecutorID getTaskExecutorID() {
            return this.taskExecutorID;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof InitializeRunningWorkerRequestEvent)) {
                return false;
            }
            InitializeRunningWorkerRequestEvent initializeRunningWorkerRequestEvent = (InitializeRunningWorkerRequestEvent) obj;
            ScheduleRequest scheduleRequest = getScheduleRequest();
            ScheduleRequest scheduleRequest2 = initializeRunningWorkerRequestEvent.getScheduleRequest();
            if (scheduleRequest == null) {
                if (scheduleRequest2 != null) {
                    return false;
                }
            } else if (!scheduleRequest.equals(scheduleRequest2)) {
                return false;
            }
            TaskExecutorID taskExecutorID = getTaskExecutorID();
            TaskExecutorID taskExecutorID2 = initializeRunningWorkerRequestEvent.getTaskExecutorID();
            return taskExecutorID == null ? taskExecutorID2 == null : taskExecutorID.equals(taskExecutorID2);
        }

        public int hashCode() {
            ScheduleRequest scheduleRequest = getScheduleRequest();
            int hashCode = (1 * 59) + (scheduleRequest == null ? 43 : scheduleRequest.hashCode());
            TaskExecutorID taskExecutorID = getTaskExecutorID();
            return (hashCode * 59) + (taskExecutorID == null ? 43 : taskExecutorID.hashCode());
        }

        public String toString() {
            return "ResourceClusterAwareSchedulerActor.InitializeRunningWorkerRequestEvent(scheduleRequest=" + getScheduleRequest() + ", taskExecutorID=" + getTaskExecutorID() + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor$Noop.class */
    public static final class Noop {
        private Noop() {
        }

        public static Noop getInstance() {
            return new Noop();
        }

        public boolean equals(Object obj) {
            return obj == this || (obj instanceof Noop);
        }

        public int hashCode() {
            return 1;
        }

        public String toString() {
            return "ResourceClusterAwareSchedulerActor.Noop()";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor$RetryCancelRequestEvent.class */
    public static final class RetryCancelRequestEvent {
        private final CancelRequestEvent actualEvent;
        private final Throwable currentFailure;

        CancelRequestEvent onRetry() {
            return new CancelRequestEvent(this.actualEvent.getWorkerId(), this.actualEvent.getAttempt() + 1, this.currentFailure);
        }

        @ConstructorProperties({"actualEvent", "currentFailure"})
        public RetryCancelRequestEvent(CancelRequestEvent cancelRequestEvent, Throwable th) {
            this.actualEvent = cancelRequestEvent;
            this.currentFailure = th;
        }

        public CancelRequestEvent getActualEvent() {
            return this.actualEvent;
        }

        public Throwable getCurrentFailure() {
            return this.currentFailure;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof RetryCancelRequestEvent)) {
                return false;
            }
            RetryCancelRequestEvent retryCancelRequestEvent = (RetryCancelRequestEvent) obj;
            CancelRequestEvent actualEvent = getActualEvent();
            CancelRequestEvent actualEvent2 = retryCancelRequestEvent.getActualEvent();
            if (actualEvent == null) {
                if (actualEvent2 != null) {
                    return false;
                }
            } else if (!actualEvent.equals(actualEvent2)) {
                return false;
            }
            Throwable currentFailure = getCurrentFailure();
            Throwable currentFailure2 = retryCancelRequestEvent.getCurrentFailure();
            return currentFailure == null ? currentFailure2 == null : currentFailure.equals(currentFailure2);
        }

        public int hashCode() {
            CancelRequestEvent actualEvent = getActualEvent();
            int hashCode = (1 * 59) + (actualEvent == null ? 43 : actualEvent.hashCode());
            Throwable currentFailure = getCurrentFailure();
            return (hashCode * 59) + (currentFailure == null ? 43 : currentFailure.hashCode());
        }

        public String toString() {
            return "ResourceClusterAwareSchedulerActor.RetryCancelRequestEvent(actualEvent=" + getActualEvent() + ", currentFailure=" + getCurrentFailure() + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor$ScheduleRequestEvent.class */
    public static final class ScheduleRequestEvent {
        private final ScheduleRequest request;
        private final int attempt;

        @Nullable
        private final Throwable previousFailure;
        private final Instant eventTime;

        boolean isRetry() {
            return this.attempt > 1;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static ScheduleRequestEvent of(ScheduleRequest scheduleRequest) {
            return new ScheduleRequestEvent(scheduleRequest, 1, null, Clock.systemDefaultZone().instant());
        }

        FailedToScheduleRequestEvent onFailure(Throwable th) {
            return new FailedToScheduleRequestEvent(this, this.attempt, th);
        }

        AssignedScheduleRequestEvent onAssignment(TaskExecutorID taskExecutorID) {
            return new AssignedScheduleRequestEvent(this, taskExecutorID);
        }

        @ConstructorProperties({"request", "attempt", "previousFailure", "eventTime"})
        public ScheduleRequestEvent(ScheduleRequest scheduleRequest, int i, @Nullable Throwable th, Instant instant) {
            this.request = scheduleRequest;
            this.attempt = i;
            this.previousFailure = th;
            this.eventTime = instant;
        }

        public ScheduleRequest getRequest() {
            return this.request;
        }

        public int getAttempt() {
            return this.attempt;
        }

        @Nullable
        public Throwable getPreviousFailure() {
            return this.previousFailure;
        }

        public Instant getEventTime() {
            return this.eventTime;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof ScheduleRequestEvent)) {
                return false;
            }
            ScheduleRequestEvent scheduleRequestEvent = (ScheduleRequestEvent) obj;
            if (getAttempt() != scheduleRequestEvent.getAttempt()) {
                return false;
            }
            ScheduleRequest request = getRequest();
            ScheduleRequest request2 = scheduleRequestEvent.getRequest();
            if (request == null) {
                if (request2 != null) {
                    return false;
                }
            } else if (!request.equals(request2)) {
                return false;
            }
            Throwable previousFailure = getPreviousFailure();
            Throwable previousFailure2 = scheduleRequestEvent.getPreviousFailure();
            if (previousFailure == null) {
                if (previousFailure2 != null) {
                    return false;
                }
            } else if (!previousFailure.equals(previousFailure2)) {
                return false;
            }
            Instant eventTime = getEventTime();
            Instant eventTime2 = scheduleRequestEvent.getEventTime();
            return eventTime == null ? eventTime2 == null : eventTime.equals(eventTime2);
        }

        public int hashCode() {
            int attempt = (1 * 59) + getAttempt();
            ScheduleRequest request = getRequest();
            int hashCode = (attempt * 59) + (request == null ? 43 : request.hashCode());
            Throwable previousFailure = getPreviousFailure();
            int hashCode2 = (hashCode * 59) + (previousFailure == null ? 43 : previousFailure.hashCode());
            Instant eventTime = getEventTime();
            return (hashCode2 * 59) + (eventTime == null ? 43 : eventTime.hashCode());
        }

        public String toString() {
            return "ResourceClusterAwareSchedulerActor.ScheduleRequestEvent(request=" + getRequest() + ", attempt=" + getAttempt() + ", previousFailure=" + getPreviousFailure() + ", eventTime=" + getEventTime() + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor$SubmittedScheduleRequestEvent.class */
    public static final class SubmittedScheduleRequestEvent {
        private final ScheduleRequestEvent event;
        private final TaskExecutorID taskExecutorID;

        @ConstructorProperties({"event", "taskExecutorID"})
        public SubmittedScheduleRequestEvent(ScheduleRequestEvent scheduleRequestEvent, TaskExecutorID taskExecutorID) {
            this.event = scheduleRequestEvent;
            this.taskExecutorID = taskExecutorID;
        }

        public ScheduleRequestEvent getEvent() {
            return this.event;
        }

        public TaskExecutorID getTaskExecutorID() {
            return this.taskExecutorID;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof SubmittedScheduleRequestEvent)) {
                return false;
            }
            SubmittedScheduleRequestEvent submittedScheduleRequestEvent = (SubmittedScheduleRequestEvent) obj;
            ScheduleRequestEvent event = getEvent();
            ScheduleRequestEvent event2 = submittedScheduleRequestEvent.getEvent();
            if (event == null) {
                if (event2 != null) {
                    return false;
                }
            } else if (!event.equals(event2)) {
                return false;
            }
            TaskExecutorID taskExecutorID = getTaskExecutorID();
            TaskExecutorID taskExecutorID2 = submittedScheduleRequestEvent.getTaskExecutorID();
            return taskExecutorID == null ? taskExecutorID2 == null : taskExecutorID.equals(taskExecutorID2);
        }

        public int hashCode() {
            ScheduleRequestEvent event = getEvent();
            int hashCode = (1 * 59) + (event == null ? 43 : event.hashCode());
            TaskExecutorID taskExecutorID = getTaskExecutorID();
            return (hashCode * 59) + (taskExecutorID == null ? 43 : taskExecutorID.hashCode());
        }

        public String toString() {
            return "ResourceClusterAwareSchedulerActor.SubmittedScheduleRequestEvent(event=" + getEvent() + ", taskExecutorID=" + getTaskExecutorID() + ")";
        }
    }

    public static Props props(int i, int i2, Duration duration, ResourceCluster resourceCluster, ExecuteStageRequestFactory executeStageRequestFactory, JobMessageRouter jobMessageRouter, MetricsRegistry metricsRegistry) {
        return Props.create(ResourceClusterAwareSchedulerActor.class, new Object[]{Integer.valueOf(i), Integer.valueOf(i2), duration, resourceCluster, executeStageRequestFactory, jobMessageRouter, metricsRegistry});
    }

    public ResourceClusterAwareSchedulerActor(int i, int i2, Duration duration, ResourceCluster resourceCluster, ExecuteStageRequestFactory executeStageRequestFactory, JobMessageRouter jobMessageRouter, MetricsRegistry metricsRegistry) {
        this.resourceCluster = resourceCluster;
        this.executeStageRequestFactory = executeStageRequestFactory;
        this.jobMessageRouter = jobMessageRouter;
        this.maxScheduleRetries = i;
        this.intervalBetweenRetries = duration;
        this.maxCancelRetries = i2;
        Metrics build = new Metrics.Builder().id("ResourceClusterAwareSchedulerActor", new Tag[]{Tag.of("resourceCluster", resourceCluster.getName())}).addTimer("schedulingLatency").addCounter("schedulingFailures").addCounter("connectionFailures").build();
        metricsRegistry.registerAndGet(build);
        this.schedulingLatency = build.getTimer("schedulingLatency");
        this.schedulingFailures = build.getCounter("schedulingFailures");
        this.connectionFailures = build.getCounter("connectionFailures");
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(ScheduleRequestEvent.class, this::onScheduleRequestEvent).match(InitializeRunningWorkerRequestEvent.class, this::onInitializeRunningWorkerRequest).match(CancelRequestEvent.class, this::onCancelRequestEvent).match(AssignedScheduleRequestEvent.class, this::onAssignedScheduleRequestEvent).match(FailedToScheduleRequestEvent.class, this::onFailedScheduleRequestEvent).match(SubmittedScheduleRequestEvent.class, this::onSubmittedScheduleRequestEvent).match(FailedToSubmitScheduleRequestEvent.class, this::onFailedToSubmitScheduleRequestEvent).match(RetryCancelRequestEvent.class, this::onRetryCancelRequestEvent).match(Noop.class, this::onNoop).match(Ack.class, ack -> {
            log.debug("Received ack from {}", sender());
        }).match(Status.Failure.class, failure -> {
            log.error("Received failure from {}: {}", sender(), failure);
        }).build();
    }

    private void onScheduleRequestEvent(ScheduleRequestEvent scheduleRequestEvent) {
        if (scheduleRequestEvent.isRetry()) {
            log.info("Retrying Schedule Request {}, attempt {}", scheduleRequestEvent.getRequest(), Integer.valueOf(scheduleRequestEvent.getAttempt()));
        }
        CompletableFuture taskExecutorFor = this.resourceCluster.getTaskExecutorFor(TaskExecutorAllocationRequest.of(scheduleRequestEvent.getRequest().getWorkerId(), scheduleRequestEvent.getRequest().getMachineDefinition(), scheduleRequestEvent.getRequest().getJobMetadata(), scheduleRequestEvent.getRequest().getStageNum()));
        scheduleRequestEvent.getClass();
        CompletableFuture thenApply = taskExecutorFor.thenApply(scheduleRequestEvent::onAssignment);
        scheduleRequestEvent.getClass();
        Patterns.pipe(thenApply.exceptionally(scheduleRequestEvent::onFailure), getContext().getDispatcher()).to(self());
    }

    private void onInitializeRunningWorkerRequest(InitializeRunningWorkerRequestEvent initializeRunningWorkerRequestEvent) {
        this.resourceCluster.initializeTaskExecutor(initializeRunningWorkerRequestEvent.getTaskExecutorID(), initializeRunningWorkerRequestEvent.getScheduleRequest().getWorkerId());
    }

    private void onAssignedScheduleRequestEvent(AssignedScheduleRequestEvent assignedScheduleRequestEvent) {
        try {
            CompletableFuture taskExecutorGateway = this.resourceCluster.getTaskExecutorGateway(assignedScheduleRequestEvent.getTaskExecutorID());
            TaskExecutorRegistration taskExecutorRegistration = (TaskExecutorRegistration) this.resourceCluster.getTaskExecutorInfo(assignedScheduleRequestEvent.getTaskExecutorID()).join();
            if (taskExecutorGateway != null && taskExecutorRegistration != null) {
                Patterns.pipe(taskExecutorGateway.thenComposeAsync(taskExecutorGateway2 -> {
                    return taskExecutorGateway2.submitTask(this.executeStageRequestFactory.of(assignedScheduleRequestEvent.getScheduleRequestEvent().getRequest(), taskExecutorRegistration)).thenApply(ack -> {
                        return new SubmittedScheduleRequestEvent(assignedScheduleRequestEvent.getScheduleRequestEvent(), assignedScheduleRequestEvent.getTaskExecutorID());
                    }).exceptionally(th -> {
                        return new FailedToSubmitScheduleRequestEvent(assignedScheduleRequestEvent.getScheduleRequestEvent(), assignedScheduleRequestEvent.getTaskExecutorID(), th);
                    }).whenCompleteAsync((obj, th2) -> {
                        if (th2 == null) {
                            log.debug("[Submit Task] finish with {}", obj);
                        } else {
                            log.error("[Submit Task] fail: {}", assignedScheduleRequestEvent.getTaskExecutorID(), th2);
                        }
                    });
                }).exceptionally(th -> {
                    return new FailedToSubmitScheduleRequestEvent(assignedScheduleRequestEvent.getScheduleRequestEvent(), assignedScheduleRequestEvent.getTaskExecutorID(), th);
                }), getContext().getDispatcher()).to(self());
            }
        } catch (Exception e) {
            log.warn("Failed to submit task with the task executor {}; Resubmitting the request", assignedScheduleRequestEvent.getTaskExecutorID(), e);
            self().tell(assignedScheduleRequestEvent.getScheduleRequestEvent().onFailure(e), self());
        }
    }

    private void onFailedScheduleRequestEvent(FailedToScheduleRequestEvent failedToScheduleRequestEvent) {
        this.schedulingFailures.increment();
        if (failedToScheduleRequestEvent.getAttempt() >= this.maxScheduleRetries) {
            log.error("Failed to submit the request {} because of ", failedToScheduleRequestEvent.getScheduleRequestEvent(), failedToScheduleRequestEvent.getThrowable());
        } else {
            log.error("Failed to submit the request {}; Retrying in {} because of ", new Object[]{failedToScheduleRequestEvent.getScheduleRequestEvent(), this.intervalBetweenRetries, failedToScheduleRequestEvent.getThrowable()});
            getTimers().startSingleTimer(getSchedulingQueueKeyFor(failedToScheduleRequestEvent.getScheduleRequestEvent().getRequest().getWorkerId()), failedToScheduleRequestEvent.onRetry(), this.intervalBetweenRetries);
        }
    }

    private void onSubmittedScheduleRequestEvent(SubmittedScheduleRequestEvent submittedScheduleRequestEvent) {
        log.debug("[Submit Task]: receive SubmittedScheduleRequestEvent: {}", submittedScheduleRequestEvent);
        TaskExecutorID taskExecutorID = submittedScheduleRequestEvent.getTaskExecutorID();
        try {
            TaskExecutorRegistration taskExecutorRegistration = (TaskExecutorRegistration) this.resourceCluster.getTaskExecutorInfo(taskExecutorID).join();
            boolean routeWorkerEvent = this.jobMessageRouter.routeWorkerEvent(new WorkerLaunched(submittedScheduleRequestEvent.getEvent().getRequest().getWorkerId(), submittedScheduleRequestEvent.getEvent().getRequest().getStageNum(), taskExecutorRegistration.getHostname(), taskExecutorID.getResourceId(), Optional.ofNullable(taskExecutorRegistration.getClusterID().getResourceID()), Optional.of(taskExecutorRegistration.getClusterID()), taskExecutorRegistration.getWorkerPorts()));
            this.schedulingLatency.record(Duration.between(submittedScheduleRequestEvent.getEvent().getEventTime(), Clock.systemDefaultZone().instant()).toNanos(), TimeUnit.NANOSECONDS);
            if (!routeWorkerEvent) {
                log.error("Routing message to jobMessageRouter was never expected to fail but it has failed to event {}", submittedScheduleRequestEvent);
            }
        } catch (Exception e) {
            log.warn("Failed to route message due to error in getting TaskExecutor info: {}", taskExecutorID, e);
        }
    }

    private void onFailedToSubmitScheduleRequestEvent(FailedToSubmitScheduleRequestEvent failedToSubmitScheduleRequestEvent) {
        log.error("Failed to submit schedule request event {}", failedToSubmitScheduleRequestEvent, failedToSubmitScheduleRequestEvent.getThrowable());
        this.jobMessageRouter.routeWorkerEvent(new WorkerLaunchFailed(failedToSubmitScheduleRequestEvent.getScheduleRequestEvent().getRequest().getWorkerId(), failedToSubmitScheduleRequestEvent.getScheduleRequestEvent().getRequest().getStageNum(), Throwables.getStackTraceAsString(failedToSubmitScheduleRequestEvent.throwable)));
        try {
            this.resourceCluster.reconnectGateway(failedToSubmitScheduleRequestEvent.getTaskExecutorID()).whenComplete((ack, th) -> {
                if (th != null) {
                    log.error("Failed to request reconnect to gateway for {}", failedToSubmitScheduleRequestEvent.getTaskExecutorID(), th);
                } else {
                    log.debug("Acked from reconnection request for {}", failedToSubmitScheduleRequestEvent.getTaskExecutorID());
                }
            });
        } catch (Exception e) {
            log.warn("Failed to establish re-connection with the task executor {} on failed schedule request", failedToSubmitScheduleRequestEvent.getTaskExecutorID(), e);
            this.connectionFailures.increment();
        }
    }

    private void onCancelRequestEvent(CancelRequestEvent cancelRequestEvent) {
        try {
            log.info("onCancelRequestEvent {}", cancelRequestEvent);
            getTimers().cancel(getSchedulingQueueKeyFor(cancelRequestEvent.getWorkerId()));
            Patterns.pipe(this.resourceCluster.getTaskExecutorGateway((TaskExecutorID) this.resourceCluster.getTaskExecutorAssignedFor(cancelRequestEvent.getWorkerId()).join()).thenComposeAsync(taskExecutorGateway -> {
                return taskExecutorGateway.cancelTask(cancelRequestEvent.getWorkerId()).thenApply(ack -> {
                    return Noop.getInstance();
                }).exceptionally(th -> {
                    Throwable stripCompletionException = ExceptionUtils.stripCompletionException(ExceptionUtils.stripExecutionException(th));
                    return stripCompletionException instanceof TaskExecutorGateway.TaskNotFoundException ? Noop.getInstance() : cancelRequestEvent.onFailure(stripCompletionException);
                });
            }), context().dispatcher()).to(self());
        } catch (Exception e) {
            Throwable stripCompletionException = ExceptionUtils.stripCompletionException(ExceptionUtils.stripExecutionException(e));
            if (stripCompletionException instanceof TaskExecutorGateway.TaskNotFoundException) {
                log.info("Failed to cancel task {} as no matching executor could be found", cancelRequestEvent.getWorkerId());
            } else {
                self().tell(cancelRequestEvent.onFailure(stripCompletionException), self());
            }
        }
    }

    private void onRetryCancelRequestEvent(RetryCancelRequestEvent retryCancelRequestEvent) {
        if (retryCancelRequestEvent.getActualEvent().getAttempt() < this.maxCancelRetries) {
            context().system().scheduler().scheduleOnce(Duration.ofMinutes(1L), self(), retryCancelRequestEvent.onRetry(), getContext().getDispatcher(), self());
        } else {
            log.error("Exhausted number of retries for cancel request {}", retryCancelRequestEvent.getActualEvent(), retryCancelRequestEvent.getCurrentFailure());
        }
    }

    private void onNoop(Noop noop) {
    }

    private String getSchedulingQueueKeyFor(WorkerId workerId) {
        return "Retry-Schedule-Request-For" + workerId.toString();
    }
}
