package org.apache.pulsar.broker.loadbalance.impl;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableDouble;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TimeAverageMessageData;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.class */
public class ThresholdShedder implements LoadSheddingStrategy {
    private static final Logger log = LoggerFactory.getLogger(ThresholdShedder.class);
    private static final double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05d;
    private static final double MB = 1048576.0d;
    private static final long LOAD_LOG_SAMPLE_DELAY_IN_SEC = 300;
    private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();
    private final Map<String, Double> brokerAvgResourceUsage = new HashMap();
    private long lastSampledLoadLogTS = 0;

    private static int toPercentage(double d) {
        return (int) (d * 100.0d);
    }

    private boolean canSampleLog() {
        long currentTimeMillis = System.currentTimeMillis() / 1000;
        boolean z = currentTimeMillis - this.lastSampledLoadLogTS >= LOAD_LOG_SAMPLE_DELAY_IN_SEC;
        if (z) {
            this.lastSampledLoadLogTS = currentTimeMillis;
        }
        return z;
    }

    @Override // org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy
    public Multimap<String, String> findBundlesForUnloading(LoadData loadData, ServiceConfiguration serviceConfiguration) {
        this.selectedBundlesCache.clear();
        boolean canSampleLog = canSampleLog();
        double loadBalancerBrokerThresholdShedderPercentage = serviceConfiguration.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0d;
        Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
        double loadBalancerBundleUnloadMinThroughputThreshold = serviceConfiguration.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
        double brokerAvgUsage = getBrokerAvgUsage(loadData, serviceConfiguration.getLoadBalancerHistoryResourcePercentage(), serviceConfiguration, canSampleLog);
        if (canSampleLog) {
            log.info("brokers' resource avgUsage:{}%", Integer.valueOf(toPercentage(brokerAvgUsage)));
        }
        if (brokerAvgUsage == 0.0d) {
            log.warn("average max resource usage is 0");
            return this.selectedBundlesCache;
        }
        loadData.getBrokerData().forEach((str, brokerData) -> {
            LocalBrokerData localData = brokerData.getLocalData();
            double doubleValue = this.brokerAvgResourceUsage.getOrDefault(str, Double.valueOf(0.0d)).doubleValue();
            if (doubleValue < brokerAvgUsage + loadBalancerBrokerThresholdShedderPercentage) {
                if (canSampleLog) {
                    log.info("[{}] broker is not overloaded, ignoring at this point, currentUsage:{}%", str, Integer.valueOf(toPercentage(doubleValue)));
                    return;
                }
                return;
            }
            double d = ((doubleValue - brokerAvgUsage) - loadBalancerBrokerThresholdShedderPercentage) + ADDITIONAL_THRESHOLD_PERCENT_MARGIN;
            double msgThroughputIn = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
            double d2 = msgThroughputIn * d;
            if (d2 < loadBalancerBundleUnloadMinThroughputThreshold) {
                if (canSampleLog) {
                    log.info("[{}] broker is planning to shed throughput {} MByte/s less than minimumThroughputThreshold {} MByte/s, skipping bundle unload.", new Object[]{str, Double.valueOf(d2 / MB), Double.valueOf(loadBalancerBundleUnloadMinThroughputThreshold / MB)});
                    return;
                }
                return;
            }
            log.info("Attempting to shed load on {}, which has max resource usage above avgUsage  and threshold {}% > {}% + {}% -- Offloading at least {} MByte/s of traffic, left throughput {} MByte/s", new Object[]{str, Double.valueOf(doubleValue), Double.valueOf(brokerAvgUsage), Double.valueOf(loadBalancerBrokerThresholdShedderPercentage), Double.valueOf(d2 / MB), Double.valueOf((msgThroughputIn - d2) / MB)});
            MutableDouble mutableDouble = new MutableDouble(0.0d);
            MutableBoolean mutableBoolean = new MutableBoolean(false);
            if (localData.getBundles().size() > 1) {
                loadData.getBundleDataForLoadShedding().entrySet().stream().map(entry -> {
                    String str = (String) entry.getKey();
                    TimeAverageMessageData shortTermData = ((BundleData) entry.getValue()).getShortTermData();
                    return Pair.of(str, Double.valueOf(shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut()));
                }).filter(pair -> {
                    return !recentlyUnloadedBundles.containsKey(pair.getLeft());
                }).filter(pair2 -> {
                    return localData.getBundles().contains(pair2.getLeft());
                }).sorted((pair3, pair4) -> {
                    return Double.compare(((Double) pair4.getRight()).doubleValue(), ((Double) pair3.getRight()).doubleValue());
                }).forEach(pair5 -> {
                    if (mutableDouble.doubleValue() < d2 || mutableBoolean.isFalse()) {
                        this.selectedBundlesCache.put(str, (String) pair5.getLeft());
                        mutableDouble.add((Number) pair5.getRight());
                        mutableBoolean.setTrue();
                    }
                });
            } else if (localData.getBundles().size() == 1) {
                log.warn("HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. No Load Shedding will be done on this broker", localData.getBundles().iterator().next(), str);
            } else {
                log.warn("Broker {} is overloaded despite having no bundles", str);
            }
        });
        return this.selectedBundlesCache;
    }

    private double getBrokerAvgUsage(LoadData loadData, double d, ServiceConfiguration serviceConfiguration, boolean z) {
        double d2 = 0.0d;
        int i = 0;
        for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
            d2 += updateAvgResourceUsage(entry.getKey(), entry.getValue().getLocalData(), d, serviceConfiguration, z);
            i++;
        }
        if (i > 0) {
            return d2 / i;
        }
        return 0.0d;
    }

    private double updateAvgResourceUsage(String str, LocalBrokerData localBrokerData, double d, ServiceConfiguration serviceConfiguration, boolean z) {
        Double d2 = this.brokerAvgResourceUsage.get(str);
        double maxResourceUsageWithWeight = localBrokerData.getMaxResourceUsageWithWeight(serviceConfiguration.getLoadBalancerCPUResourceWeight(), serviceConfiguration.getLoadBalancerMemoryResourceWeight(), serviceConfiguration.getLoadBalancerDirectMemoryResourceWeight(), serviceConfiguration.getLoadBalancerBandwithInResourceWeight(), serviceConfiguration.getLoadBalancerBandwithOutResourceWeight());
        if (z) {
            Logger logger = log;
            Object[] objArr = new Object[3];
            objArr[0] = str;
            objArr[1] = Integer.valueOf(d2 == null ? 0 : toPercentage(d2.doubleValue()));
            objArr[2] = Integer.valueOf(toPercentage(maxResourceUsageWithWeight));
            logger.info("{} broker load: historyUsage={}%, resourceUsage={}%", objArr);
        }
        if (maxResourceUsageWithWeight > 1.0d) {
            log.error("{} broker resourceUsage is bigger than 100%. Some of the resource limits are mis-configured. Try to disable the error resource signals by setting their weights to zero or fix the resource limit configurations. Ref:https://pulsar.apache.org/docs/administration-load-balance/#thresholdshedder ResourceUsage:[{}], CPUResourceWeight:{}, MemoryResourceWeight:{}, DirectMemoryResourceWeight:{}, BandwithInResourceWeight:{}, BandwithOutResourceWeight:{}", new Object[]{str, localBrokerData.printResourceUsage(), Double.valueOf(serviceConfiguration.getLoadBalancerCPUResourceWeight()), Double.valueOf(serviceConfiguration.getLoadBalancerMemoryResourceWeight()), Double.valueOf(serviceConfiguration.getLoadBalancerDirectMemoryResourceWeight()), Double.valueOf(serviceConfiguration.getLoadBalancerBandwithInResourceWeight()), Double.valueOf(serviceConfiguration.getLoadBalancerBandwithOutResourceWeight())});
            maxResourceUsageWithWeight = localBrokerData.getMaxResourceUsageWithWeightWithinLimit(serviceConfiguration.getLoadBalancerCPUResourceWeight(), serviceConfiguration.getLoadBalancerMemoryResourceWeight(), serviceConfiguration.getLoadBalancerDirectMemoryResourceWeight(), serviceConfiguration.getLoadBalancerBandwithInResourceWeight(), serviceConfiguration.getLoadBalancerBandwithOutResourceWeight());
            log.warn("{} broker recomputed max resourceUsage={}%. Skipped usage signals bigger than 100%", str, Integer.valueOf(toPercentage(maxResourceUsageWithWeight)));
        }
        Double valueOf = Double.valueOf(d2 == null ? maxResourceUsageWithWeight : (d2.doubleValue() * d) + ((1.0d - d) * maxResourceUsageWithWeight));
        this.brokerAvgResourceUsage.put(str, valueOf);
        return valueOf.doubleValue();
    }
}
