package io.mantisrx.server.worker.jobmaster.clutch;

import com.yahoo.labs.samoa.instances.Attribute;
import io.mantisrx.runtime.descriptor.StageScalingPolicy;
import io.mantisrx.runtime.descriptor.StageSchedulingInfo;
import io.mantisrx.server.worker.jobmaster.JobAutoScaler;
import io.mantisrx.server.worker.jobmaster.Util;
import io.mantisrx.server.worker.jobmaster.control.actuators.ClutchMantisStageActuator;
import io.mantisrx.server.worker.jobmaster.control.controllers.PIDController;
import io.mantisrx.server.worker.jobmaster.control.utils.ErrorComputer;
import io.mantisrx.server.worker.jobmaster.control.utils.Integrator;
import io.mantisrx.shaded.com.google.common.cache.Cache;
import io.mantisrx.shaded.com.google.common.cache.CacheBuilder;
import io.mantisrx.shaded.com.google.common.util.concurrent.AtomicDouble;
import io.vavr.Tuple;
import io.vavr.Tuple3;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import moa.core.FastVector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

/* loaded from: input_file:io/mantisrx/server/worker/jobmaster/clutch/ClutchAutoScaler.class */
public class ClutchAutoScaler implements Observable.Transformer<JobAutoScaler.Event, Object> {
    private static final String autoscaleLogMessageFormat = "Autoscaling stage %d to %d instances on controller output: cpu/mem/network %f/%f/%f (dampening: %f) and predicted error: %f with dominant resource: %s";
    private final JobAutoScaler.StageScaler scaler;
    private final StageSchedulingInfo stageSchedulingInfo;
    private final long initialSize;
    private final ClutchConfiguration config;
    private final AtomicLong cooldownTimestamp;
    private static final Logger log = LoggerFactory.getLogger(ClutchAutoScaler.class);
    private static final FastVector attributes = new FastVector();
    private final AtomicLong targetScale = new AtomicLong(0);
    private final AtomicDouble gainDampeningFactor = new AtomicDouble(1.0d);
    private final AtomicLong rps = new AtomicLong(0);
    private final ClutchPIDConfig defaultConfig = new ClutchPIDConfig(60.0d, Tuple.of(Double.valueOf(0.0d), Double.valueOf(25.0d)), 0.01d, 0.01d);
    Cache<Long, Long> actionCache = CacheBuilder.newBuilder().maximumSize(12).expireAfterWrite(60, TimeUnit.MINUTES).build();
    AtomicDouble correction = new AtomicDouble(0.0d);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/mantisrx/server/worker/jobmaster/clutch/ClutchAutoScaler$ClutchController.class */
    public class ClutchController implements Observable.Transformer<JobAutoScaler.Event, ClutchControllerOutput> {
        private final ClutchPIDConfig config;
        private final StageScalingPolicy.ScalingReason metric;
        private final StageSchedulingInfo stageSchedulingInfo;
        private final AtomicDouble gainFactor;
        private final long initialSize;
        private final long min;
        private final long max;
        private final Integrator integrator;

        public ClutchController(StageScalingPolicy.ScalingReason scalingReason, StageSchedulingInfo stageSchedulingInfo, ClutchPIDConfig clutchPIDConfig, AtomicDouble atomicDouble, long j, long j2, long j3) {
            this.metric = scalingReason;
            this.config = clutchPIDConfig;
            this.gainFactor = atomicDouble;
            this.initialSize = j;
            this.stageSchedulingInfo = stageSchedulingInfo;
            this.min = j2;
            this.max = j3;
            this.integrator = new Integrator(this.initialSize, this.min - 1, this.max + 1);
        }

        public Observable<ClutchControllerOutput> call(Observable<JobAutoScaler.Event> observable) {
            return observable.map(event -> {
                return Double.valueOf(Util.getEffectiveValue(this.stageSchedulingInfo, event.getType(), event.getValue()));
            }).lift(new ErrorComputer(this.config.setPoint, true, ((Double) this.config.rope._1).doubleValue(), ((Double) this.config.rope._2).doubleValue())).lift(PIDController.of(Double.valueOf(this.config.kp), Double.valueOf(0.0d), Double.valueOf(this.config.kd), Double.valueOf(1.0d), this.gainFactor)).lift(this.integrator).map(d -> {
                return new ClutchControllerOutput(this.metric, d);
            });
        }
    }

    public ClutchAutoScaler(StageSchedulingInfo stageSchedulingInfo, JobAutoScaler.StageScaler stageScaler, ClutchConfiguration clutchConfiguration, int i) {
        this.stageSchedulingInfo = stageSchedulingInfo;
        this.scaler = stageScaler;
        this.initialSize = i;
        this.targetScale.set(i);
        this.config = clutchConfiguration;
        this.rps.set(Math.round(clutchConfiguration.rps));
        this.cooldownTimestamp = new AtomicLong(System.currentTimeMillis() + (((Long) clutchConfiguration.cooldownSeconds.getOrElse(0L)).longValue() * 1000));
        Observable.interval(60L, TimeUnit.SECONDS).forEach(l -> {
            double computeGainFactor = computeGainFactor(this.actionCache);
            log.debug("Setting gain dampening factor to: {}.", Double.valueOf(computeGainFactor));
            this.gainDampeningFactor.set(computeGainFactor);
        });
    }

    private static double enforceMinMax(double d, double d2, double d3) {
        if (Double.isNaN(d)) {
            d = d2;
        }
        return d < d2 ? d2 : d > d3 ? d3 : d;
    }

    private double computeGainFactor(Cache<Long, Long> cache) {
        long count = cache.asMap().values().stream().filter(l -> {
            return ((double) l.longValue()) > 0.0d;
        }).count();
        long count2 = cache.asMap().values().stream().filter(l2 -> {
            return ((double) l2.longValue()) < 0.0d;
        }).count();
        long j = count + count2;
        return Math.pow(j == 0 ? 1.0d : count > count2 ? (1.0d * count) / j : (1.0d * count2) / j, 3.0d);
    }

    private ClutchControllerOutput findDominatingResource(Tuple3<ClutchControllerOutput, ClutchControllerOutput, ClutchControllerOutput> tuple3) {
        return (((ClutchControllerOutput) tuple3._1).scale.doubleValue() < ((ClutchControllerOutput) tuple3._2).scale.doubleValue() || ((ClutchControllerOutput) tuple3._1).scale.doubleValue() < ((ClutchControllerOutput) tuple3._3).scale.doubleValue()) ? (((ClutchControllerOutput) tuple3._2).scale.doubleValue() < ((ClutchControllerOutput) tuple3._1).scale.doubleValue() || ((ClutchControllerOutput) tuple3._2).scale.doubleValue() < ((ClutchControllerOutput) tuple3._3).scale.doubleValue()) ? (ClutchControllerOutput) tuple3._3 : (ClutchControllerOutput) tuple3._2 : (ClutchControllerOutput) tuple3._1;
    }

    public Observable<Object> call(Observable<JobAutoScaler.Event> observable) {
        Observable share = observable.share();
        ClutchController clutchController = new ClutchController(StageScalingPolicy.ScalingReason.CPU, this.stageSchedulingInfo, (ClutchPIDConfig) this.config.cpu.getOrElse(this.defaultConfig), this.gainDampeningFactor, this.initialSize, this.config.minSize, this.config.maxSize);
        ClutchController clutchController2 = new ClutchController(StageScalingPolicy.ScalingReason.JVMMemory, this.stageSchedulingInfo, (ClutchPIDConfig) this.config.memory.getOrElse(this.defaultConfig), this.gainDampeningFactor, this.initialSize, this.config.minSize, this.config.maxSize);
        ClutchController clutchController3 = new ClutchController(StageScalingPolicy.ScalingReason.Network, this.stageSchedulingInfo, (ClutchPIDConfig) this.config.network.getOrElse(this.defaultConfig), this.gainDampeningFactor, this.initialSize, this.config.minSize, this.config.maxSize);
        Observable compose = share.filter(event -> {
            return Boolean.valueOf(event.getType().equals(StageScalingPolicy.ScalingReason.CPU));
        }).compose(clutchController);
        Observable compose2 = share.filter(event2 -> {
            return Boolean.valueOf(event2.getType().equals(StageScalingPolicy.ScalingReason.JVMMemory));
        }).compose(clutchController2);
        Observable compose3 = share.filter(event3 -> {
            return Boolean.valueOf(event3.getType().equals(StageScalingPolicy.ScalingReason.Network));
        }).compose(clutchController3);
        Observable doOnNext = Observable.zip(Observable.zip(share.filter(event4 -> {
            return Boolean.valueOf(event4.getType().equals(StageScalingPolicy.ScalingReason.CPU));
        }).map((v0) -> {
            return v0.getValue();
        }), share.filter(event5 -> {
            return Boolean.valueOf(event5.getType().equals(StageScalingPolicy.ScalingReason.JVMMemory));
        }).map((v0) -> {
            return v0.getValue();
        }), share.filter(event6 -> {
            return Boolean.valueOf(event6.getType().equals(StageScalingPolicy.ScalingReason.Network));
        }).map((v0) -> {
            return v0.getValue();
        }), (v0, v1, v2) -> {
            return Tuple.of(v0, v1, v2);
        }), Observable.zip(compose, compose2, compose3, (v0, v1, v2) -> {
            return Tuple.of(v0, v1, v2);
        }), (v0, v1) -> {
            return Tuple.of(v0, v1);
        }).withLatestFrom(share.map((v0) -> {
            return v0.getNumWorkers();
        }), (tuple2, num) -> {
            return Tuple.of(tuple2._1, tuple2._2, num);
        }).withLatestFrom(Observable.merge(Observable.just(Double.valueOf(0.0d)), share.filter(event7 -> {
            return Boolean.valueOf(event7.getType().equals(StageScalingPolicy.ScalingReason.KafkaLag));
        }).map((v0) -> {
            return v0.getValue();
        }).map(d -> {
            return Double.valueOf(d.doubleValue() / this.config.rps);
        }), share.filter(event8 -> {
            return Boolean.valueOf(event8.getType().equals(StageScalingPolicy.ScalingReason.DataDrop));
        }).map(event9 -> {
            return Double.valueOf((event9.getValue() / 100.0d) * event9.getNumWorkers());
        })), (tuple3, d2) -> {
            return Tuple.of(tuple3._1, tuple3._2, tuple3._3, d2);
        }).map(tuple4 -> {
            int intValue = ((Integer) tuple4._3).intValue();
            ClutchControllerOutput findDominatingResource = findDominatingResource((Tuple3) tuple4._2);
            String name = findDominatingResource.reason.name();
            double min = Math.min(((Double) tuple4._4).doubleValue(), ((Double) this.config.maxAdjustment.getOrElse(Double.valueOf(this.config.maxSize * 1.0d))).doubleValue());
            double d3 = min < 1.0d ? 0.0d : min;
            if (System.currentTimeMillis() > this.cooldownTimestamp.get()) {
                this.correction.set(Math.min(this.correction.addAndGet(d3), ((Double) this.config.maxAdjustment.getOrElse(Double.valueOf(this.config.maxSize * 1.0d))).doubleValue()));
            }
            this.correction.set(this.correction.get() * 0.99d);
            this.correction.set(Double.isNaN(this.correction.get()) ? 0.0d : this.correction.get());
            Double valueOf = Double.valueOf(enforceMinMax(Math.ceil(findDominatingResource.scale.doubleValue()) + Math.ceil(this.correction.get()), this.config.minSize, this.config.maxSize));
            return Tuple.of(String.format(autoscaleLogMessageFormat, Integer.valueOf(this.scaler.getStage()), Integer.valueOf(valueOf.intValue()), ((ClutchControllerOutput) ((Tuple3) tuple4._2)._1).scale, ((ClutchControllerOutput) ((Tuple3) tuple4._2)._2).scale, ((ClutchControllerOutput) ((Tuple3) tuple4._2)._3).scale, Double.valueOf(this.gainDampeningFactor.get()), Double.valueOf(this.correction.get()), name), valueOf, Integer.valueOf(intValue));
        }).filter(tuple32 -> {
            return Boolean.valueOf(System.currentTimeMillis() > this.cooldownTimestamp.get());
        }).filter(tuple33 -> {
            return Boolean.valueOf(((double) Math.abs(Math.round(((Double) tuple33._2).doubleValue()) - ((long) ((Integer) tuple33._3).intValue()))) > 0.99d);
        }).doOnNext(tuple34 -> {
            log.info((String) tuple34._1);
        }).compose(new ClutchMantisStageActuator(this.scaler)).map((v0) -> {
            return Math.round(v0);
        }).doOnNext(l -> {
            this.actionCache.put(Long.valueOf(System.currentTimeMillis()), Long.valueOf(l.longValue() - this.targetScale.get()));
        });
        AtomicLong atomicLong = this.targetScale;
        atomicLong.getClass();
        return doOnNext.doOnNext((v1) -> {
            r1.set(v1);
        }).doOnNext(l2 -> {
            this.cooldownTimestamp.set(System.currentTimeMillis() + (((Long) this.config.cooldownSeconds.getOrElse(0L)).longValue() * 1000));
        }).map(l3 -> {
            return l3;
        });
    }

    static {
        attributes.add(new Attribute("cpu"));
        attributes.add(new Attribute("memory"));
        attributes.add(new Attribute("network"));
        attributes.add(new Attribute("minuteofday"));
        attributes.add(new Attribute("scale"));
        attributes.add(new Attribute("error"));
    }
}
