package io.mantisrx.master.resourcecluster;

import akka.actor.ActorRef;
import akka.pattern.Patterns;
import com.spotify.futures.CompletableFutures;
import io.mantisrx.common.Ack;
import io.mantisrx.server.master.resourcecluster.RequestThrottledException;
import io.mantisrx.server.master.resourcecluster.ResourceClusterGateway;
import io.mantisrx.server.master.resourcecluster.ResourceClusterTaskExecutorMapper;
import io.mantisrx.server.master.resourcecluster.TaskExecutorDisconnection;
import io.mantisrx.server.master.resourcecluster.TaskExecutorHeartbeat;
import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration;
import io.mantisrx.server.master.resourcecluster.TaskExecutorStatusChange;
import io.mantisrx.shaded.com.google.common.util.concurrent.RateLimiter;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

/* loaded from: input_file:io/mantisrx/master/resourcecluster/ResourceClusterGatewayAkkaImpl.class */
class ResourceClusterGatewayAkkaImpl implements ResourceClusterGateway {
    protected final ActorRef resourceClusterManagerActor;
    protected final Duration askTimeout;
    private final ResourceClusterTaskExecutorMapper mapper;
    protected final RateLimiter rateLimiter;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ResourceClusterGatewayAkkaImpl(ActorRef actorRef, Duration duration, ResourceClusterTaskExecutorMapper resourceClusterTaskExecutorMapper, int i) {
        this.resourceClusterManagerActor = actorRef;
        this.askTimeout = duration;
        this.mapper = resourceClusterTaskExecutorMapper;
        this.rateLimiter = RateLimiter.create(i);
    }

    private <In, Out> Function<In, CompletableFuture<Out>> withThrottle(Function<In, CompletableFuture<Out>> function) {
        return obj -> {
            return this.rateLimiter.tryAcquire(200L, TimeUnit.MILLISECONDS) ? (CompletableFuture) function.apply(obj) : CompletableFutures.exceptionallyCompletedFuture(new RequestThrottledException("Throttled req: " + obj.getClass().getSimpleName()));
        };
    }

    public CompletableFuture<Ack> registerTaskExecutor(TaskExecutorRegistration taskExecutorRegistration) {
        return (CompletableFuture) withThrottle(this::registerTaskExecutorImpl).apply(taskExecutorRegistration);
    }

    private CompletableFuture<Ack> registerTaskExecutorImpl(TaskExecutorRegistration taskExecutorRegistration) {
        CompletionStage ask = Patterns.ask(this.resourceClusterManagerActor, taskExecutorRegistration, this.askTimeout);
        Class<Ack> cls = Ack.class;
        Ack.class.getClass();
        return ask.thenApply(cls::cast).toCompletableFuture().whenComplete((ack, th) -> {
            this.mapper.onTaskExecutorDiscovered(taskExecutorRegistration.getClusterID(), taskExecutorRegistration.getTaskExecutorID());
        });
    }

    public CompletableFuture<Ack> heartBeatFromTaskExecutor(TaskExecutorHeartbeat taskExecutorHeartbeat) {
        return (CompletableFuture) withThrottle(this::heartBeatFromTaskExecutorImpl).apply(taskExecutorHeartbeat);
    }

    private CompletableFuture<Ack> heartBeatFromTaskExecutorImpl(TaskExecutorHeartbeat taskExecutorHeartbeat) {
        CompletionStage ask = Patterns.ask(this.resourceClusterManagerActor, taskExecutorHeartbeat, this.askTimeout);
        Class<Ack> cls = Ack.class;
        Ack.class.getClass();
        return ask.thenApply(cls::cast).toCompletableFuture();
    }

    public CompletableFuture<Ack> notifyTaskExecutorStatusChange(TaskExecutorStatusChange taskExecutorStatusChange) {
        CompletionStage ask = Patterns.ask(this.resourceClusterManagerActor, taskExecutorStatusChange, this.askTimeout);
        Class<Ack> cls = Ack.class;
        Ack.class.getClass();
        return ask.thenApply(cls::cast).toCompletableFuture();
    }

    public CompletableFuture<Ack> disconnectTaskExecutor(TaskExecutorDisconnection taskExecutorDisconnection) {
        return (CompletableFuture) withThrottle(this::disconnectTaskExecutorImpl).apply(taskExecutorDisconnection);
    }

    CompletableFuture<Ack> disconnectTaskExecutorImpl(TaskExecutorDisconnection taskExecutorDisconnection) {
        CompletionStage ask = Patterns.ask(this.resourceClusterManagerActor, taskExecutorDisconnection, this.askTimeout);
        Class<Ack> cls = Ack.class;
        Ack.class.getClass();
        return ask.thenApply(cls::cast).toCompletableFuture();
    }
}
