/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.master.resourcecluster;

import akka.actor.ActorRef;
import akka.pattern.Patterns;
import com.netflix.spectator.api.Tag;
import com.spotify.futures.CompletableFutures;
import io.mantisrx.common.Ack;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.config.dynamic.LongDynamicProperty;
import io.mantisrx.server.master.resourcecluster.RequestThrottledException;
import io.mantisrx.server.master.resourcecluster.ResourceClusterGateway;
import io.mantisrx.server.master.resourcecluster.TaskExecutorDisconnection;
import io.mantisrx.server.master.resourcecluster.TaskExecutorHeartbeat;
import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration;
import io.mantisrx.server.master.resourcecluster.TaskExecutorStatusChange;
import io.mantisrx.shaded.com.google.common.util.concurrent.RateLimiter;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ResourceClusterGatewayAkkaImpl
implements ResourceClusterGateway {
    private static final Logger log = LoggerFactory.getLogger(ResourceClusterGatewayAkkaImpl.class);
    protected final ActorRef resourceClusterManagerActor;
    protected final Duration askTimeout;
    private final Counter registrationCounter;
    private final Counter heartbeatCounter;
    private final Counter disconnectionCounter;
    private final Counter throttledCounter;
    private final RateLimiter rateLimiter;
    private final ScheduledExecutorService semaphoreResetScheduler = Executors.newScheduledThreadPool(1);

    ResourceClusterGatewayAkkaImpl(ActorRef resourceClusterManagerActor, Duration askTimeout, LongDynamicProperty maxConcurrentRequestCountDp) {
        this.resourceClusterManagerActor = resourceClusterManagerActor;
        this.askTimeout = askTimeout;
        log.info("Setting maxConcurrentRequestCountDp for resourceCluster gateway {}", (Object)maxConcurrentRequestCountDp);
        this.rateLimiter = RateLimiter.create((double)((Long)maxConcurrentRequestCountDp.getValue()).longValue());
        this.semaphoreResetScheduler.scheduleAtFixedRate(() -> {
            long newRate = (Long)maxConcurrentRequestCountDp.getValue();
            log.info("Setting the rate limiter rate to {}", (Object)newRate);
            this.rateLimiter.setRate((double)newRate);
        }, 1L, 1L, TimeUnit.MINUTES);
        Metrics m = new Metrics.Builder().id("ResourceClusterGatewayAkkaImpl", new Tag[0]).addCounter("registrationCounter").addCounter("heartbeatCounter").addCounter("disconnectionCounter").addCounter("throttledCounter").build();
        Metrics metrics = MetricsRegistry.getInstance().registerAndGet(m);
        this.registrationCounter = metrics.getCounter("registrationCounter");
        this.heartbeatCounter = metrics.getCounter("heartbeatCounter");
        this.disconnectionCounter = metrics.getCounter("disconnectionCounter");
        this.throttledCounter = metrics.getCounter("throttledCounter");
    }

    private <In, Out> Function<In, CompletableFuture<Out>> withThrottle(Function<In, CompletableFuture<Out>> func) {
        return in -> {
            if (this.rateLimiter.tryAcquire()) {
                return (CompletableFuture)func.apply(in);
            }
            this.throttledCounter.increment();
            return CompletableFutures.exceptionallyCompletedFuture((Throwable)new RequestThrottledException("Throttled req: " + in.getClass().getSimpleName()));
        };
    }

    public CompletableFuture<Ack> registerTaskExecutor(TaskExecutorRegistration registration) {
        return this.withThrottle(this::registerTaskExecutorImpl).apply(registration);
    }

    private CompletableFuture<Ack> registerTaskExecutorImpl(TaskExecutorRegistration registration) {
        this.registrationCounter.increment();
        return Patterns.ask((ActorRef)this.resourceClusterManagerActor, (Object)registration, (Duration)this.askTimeout).thenApply(Ack.class::cast).toCompletableFuture();
    }

    public CompletableFuture<Ack> heartBeatFromTaskExecutor(TaskExecutorHeartbeat heartbeat) {
        return this.withThrottle(this::heartBeatFromTaskExecutorImpl).apply(heartbeat);
    }

    private CompletableFuture<Ack> heartBeatFromTaskExecutorImpl(TaskExecutorHeartbeat heartbeat) {
        this.heartbeatCounter.increment();
        return Patterns.ask((ActorRef)this.resourceClusterManagerActor, (Object)heartbeat, (Duration)this.askTimeout).thenApply(Ack.class::cast).toCompletableFuture();
    }

    public CompletableFuture<Ack> notifyTaskExecutorStatusChange(TaskExecutorStatusChange statusChange) {
        return Patterns.ask((ActorRef)this.resourceClusterManagerActor, (Object)statusChange, (Duration)this.askTimeout).thenApply(Ack.class::cast).toCompletableFuture();
    }

    public CompletableFuture<Ack> disconnectTaskExecutor(TaskExecutorDisconnection taskExecutorDisconnection) {
        this.disconnectionCounter.increment();
        return this.withThrottle(this::disconnectTaskExecutorImpl).apply(taskExecutorDisconnection);
    }

    CompletableFuture<Ack> disconnectTaskExecutorImpl(TaskExecutorDisconnection taskExecutorDisconnection) {
        return Patterns.ask((ActorRef)this.resourceClusterManagerActor, (Object)taskExecutorDisconnection, (Duration)this.askTimeout).thenApply(Ack.class::cast).toCompletableFuture();
    }
}

