package io.rsocket.routing.broker.loadbalance;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.routing.broker.stats.ExponentialWeightedMovingAverage;
import io.rsocket.routing.broker.stats.FrugalQuantile;
import io.rsocket.routing.broker.stats.Median;
import io.rsocket.routing.broker.stats.Quantile;
import io.rsocket.util.Clock;
import io.rsocket.util.RSocketProxy;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/rsocket/routing/broker/loadbalance/WeightedRSocket.class */
public class WeightedRSocket extends RSocketProxy {
    public static final double DEFAULT_LOWER_QUANTILE = 0.5d;
    public static final double DEFAULT_HIGHER_QUANTILE = 0.8d;
    private static final double STARTUP_PENALTY = 2.251799813685247E15d;
    private static final long DEFAULT_INITIAL_INTER_ARRIVAL_TIME = Clock.unit().convert(1, TimeUnit.SECONDS);
    private static final int DEFAULT_INTER_ARRIVAL_FACTOR = 500;
    private static final double EPSILON = 1.0E-4d;
    private final Quantile lowerQuantile;
    private final Quantile higherQuantile;
    private final long inactivityFactor;
    private final long tau;
    private final ExponentialWeightedMovingAverage errorPercentage;
    private volatile int pending;
    private long stamp;
    private long stamp0;
    private long duration;
    private Median median;
    private ExponentialWeightedMovingAverage interArrivalTime;
    private AtomicLong pendingStreams;
    private volatile int availability;

    public WeightedRSocket(RSocket rSocket, Quantile quantile, Quantile quantile2, int i) {
        super(rSocket);
        this.availability = 1;
        this.lowerQuantile = quantile;
        this.higherQuantile = quantile2;
        this.inactivityFactor = i;
        long now = Clock.now();
        this.stamp = now;
        this.stamp0 = now;
        this.duration = 0L;
        this.pending = 0;
        this.median = new Median();
        this.interArrivalTime = new ExponentialWeightedMovingAverage(1L, TimeUnit.MINUTES, DEFAULT_INITIAL_INTER_ARRIVAL_TIME);
        this.pendingStreams = new AtomicLong();
        this.errorPercentage = new ExponentialWeightedMovingAverage(5L, TimeUnit.SECONDS, 1.0d);
        this.tau = Clock.unit().convert((long) (5.0d / Math.log(2.0d)), TimeUnit.SECONDS);
    }

    public WeightedRSocket(RSocket rSocket) {
        this(rSocket, new FrugalQuantile(0.5d), new FrugalQuantile(0.8d));
    }

    public WeightedRSocket(RSocket rSocket, Quantile quantile, Quantile quantile2) {
        this(rSocket, quantile, quantile2, DEFAULT_INTER_ARRIVAL_FACTOR);
    }

    public Mono<Payload> requestResponse(Payload payload) {
        long incr = incr();
        try {
            return this.source.requestResponse(payload).doOnCancel(() -> {
                decr(incr);
            }).doOnSuccess(payload2 -> {
                observe(decr(incr) - incr);
                updateErrorPercentage(1.0d);
            }).doOnError(th -> {
                long decr = decr(incr);
                if (th instanceof TimeoutException) {
                    observe(decr - incr);
                }
                updateErrorPercentage(0.0d);
            });
        } catch (Throwable th2) {
            decr(incr);
            updateErrorPercentage(0.0d);
            return Mono.error(th2);
        }
    }

    public Flux<Payload> requestStream(Payload payload) {
        try {
            incrPendingStreams();
            return this.source.requestStream(payload).doFinally(signalType -> {
                decrPendingStreams();
            }).doOnNext(payload2 -> {
                updateErrorPercentage(1.0d);
            }).doOnError(th -> {
                updateErrorPercentage(0.0d);
            });
        } catch (Throwable th2) {
            decrPendingStreams();
            updateErrorPercentage(0.0d);
            return Flux.error(th2);
        }
    }

    public Mono<Void> fireAndForget(Payload payload) {
        long incr = incr();
        try {
            return this.source.fireAndForget(payload).doOnCancel(() -> {
                decr(incr);
            }).doOnSuccess(r9 -> {
                observe(decr(incr) - incr);
                updateErrorPercentage(1.0d);
            }).doOnError(th -> {
                long decr = decr(incr);
                if (th instanceof TimeoutException) {
                    observe(decr - incr);
                }
                updateErrorPercentage(0.0d);
            });
        } catch (Throwable th2) {
            decr(incr);
            updateErrorPercentage(0.0d);
            return Mono.error(th2);
        }
    }

    public Mono<Void> metadataPush(Payload payload) {
        long incr = incr();
        try {
            return this.source.metadataPush(payload).doOnCancel(() -> {
                decr(incr);
            }).doOnSuccess(r9 -> {
                observe(decr(incr) - incr);
                updateErrorPercentage(1.0d);
            }).doOnError(th -> {
                long decr = decr(incr);
                if (th instanceof TimeoutException) {
                    observe(decr - incr);
                }
                updateErrorPercentage(0.0d);
            });
        } catch (Throwable th2) {
            decr(incr);
            updateErrorPercentage(0.0d);
            return Mono.error(th2);
        }
    }

    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        try {
            incrPendingStreams();
            return this.source.requestChannel(publisher).doFinally(signalType -> {
                decrPendingStreams();
            }).doOnNext(payload -> {
                updateErrorPercentage(1.0d);
            }).doOnError(th -> {
                updateErrorPercentage(0.0d);
            });
        } catch (Throwable th2) {
            decrPendingStreams();
            updateErrorPercentage(0.0d);
            return Flux.error(th2);
        }
    }

    public synchronized double getPredictedLatency() {
        double d;
        long now = Clock.now();
        long max = Math.max(now - this.stamp, 1L);
        double estimation = this.median.estimation();
        if (estimation == 0.0d) {
            d = this.pending == 0 ? 0.0d : STARTUP_PENALTY + this.pending;
        } else if (this.pending != 0 || max <= this.inactivityFactor * this.interArrivalTime.value()) {
            double d2 = estimation * this.pending;
            double instantaneous = instantaneous(now);
            d = d2 < instantaneous ? instantaneous / this.pending : estimation;
        } else {
            this.median.insert(0.0d);
            d = this.median.estimation();
        }
        return d;
    }

    private synchronized long instantaneous(long j) {
        return this.duration + ((j - this.stamp0) * this.pending);
    }

    void incrPendingStreams() {
        this.pendingStreams.incrementAndGet();
    }

    void decrPendingStreams() {
        this.pendingStreams.decrementAndGet();
    }

    synchronized long incr() {
        long now = Clock.now();
        this.interArrivalTime.insert(now - this.stamp);
        this.duration += Math.max(0L, now - this.stamp0) * this.pending;
        this.pending++;
        this.stamp = now;
        this.stamp0 = now;
        return now;
    }

    synchronized long decr(long j) {
        long now = Clock.now();
        this.duration += (Math.max(0L, now - this.stamp0) * this.pending) - (now - j);
        this.pending--;
        this.stamp0 = now;
        return now;
    }

    synchronized void observe(double d) {
        this.median.insert(d);
        this.lowerQuantile.insert(d);
        this.higherQuantile.insert(d);
    }

    public double availability() {
        if (Clock.now() - this.stamp > this.tau) {
            updateErrorPercentage(1.0d);
        }
        return this.source.availability() * this.errorPercentage.value();
    }

    public String toString() {
        return new StringJoiner(", ", WeightedRSocket.class.getSimpleName() + "[", "]").add("median=" + this.median.estimation()).add("lowerQuantile=" + this.lowerQuantile.estimation()).add("higherQuantile=" + this.higherQuantile.estimation()).add("interArrivalTime=" + this.interArrivalTime.value()).add("duration/pending=" + (this.pending == 0 ? 0.0d : this.duration / this.pending)).add("pending=" + this.pending).add("availability=" + availability()).add("source=" + this.source).toString();
    }

    synchronized void updateErrorPercentage(double d) {
        this.errorPercentage.insert(d);
    }

    public int pending() {
        return this.pending;
    }

    public RSocket getSource() {
        return this.source;
    }
}
