package org.apache.pulsar.broker.loadbalance.impl;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.Map;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableDouble;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.policies.data.loadbalancer.BundleData;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.class */
public class OverloadShedder implements LoadSheddingStrategy {
    private static final Logger log = LoggerFactory.getLogger(OverloadShedder.class);
    private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();
    private static final double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05d;

    @Override // org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy
    public Multimap<String, String> findBundlesForUnloading(LoadData loadData, ServiceConfiguration serviceConfiguration) {
        this.selectedBundlesCache.clear();
        double loadBalancerBrokerOverloadedThresholdPercentage = serviceConfiguration.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0d;
        Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
        loadData.getBrokerData().forEach((str, brokerData) -> {
            LocalBrokerData localData = brokerData.getLocalData();
            double maxResourceUsageWithWeight = localData.getMaxResourceUsageWithWeight(serviceConfiguration.getLoadBalancerCPUResourceWeight(), serviceConfiguration.getLoadBalancerDirectMemoryResourceWeight(), serviceConfiguration.getLoadBalancerBandwidthInResourceWeight(), serviceConfiguration.getLoadBalancerBandwidthOutResourceWeight());
            if (maxResourceUsageWithWeight < loadBalancerBrokerOverloadedThresholdPercentage) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Broker is not overloaded, ignoring at this point ({})", str, localData.printResourceUsage());
                    return;
                }
                return;
            }
            double msgThroughputIn = (localData.getMsgThroughputIn() + localData.getMsgThroughputOut()) * ((maxResourceUsageWithWeight - loadBalancerBrokerOverloadedThresholdPercentage) + ADDITIONAL_THRESHOLD_PERCENT_MARGIN);
            log.info("Attempting to shed load on {}, which has resource usage {}% above threshold {}% -- Offloading at least {} MByte/s of traffic ({})", new Object[]{str, Double.valueOf(100.0d * maxResourceUsageWithWeight), Double.valueOf(100.0d * loadBalancerBrokerOverloadedThresholdPercentage), Double.valueOf((msgThroughputIn / 1024.0d) / 1024.0d), localData.printResourceUsage()});
            MutableDouble mutableDouble = new MutableDouble(0.0d);
            MutableBoolean mutableBoolean = new MutableBoolean(false);
            if (localData.getBundles().size() > 1) {
                loadData.getBundleDataForLoadShedding().entrySet().stream().filter(entry -> {
                    return localData.getBundles().contains(entry.getKey());
                }).map(entry2 -> {
                    String str = (String) entry2.getKey();
                    TimeAverageMessageData shortTermData = ((BundleData) entry2.getValue()).getShortTermData();
                    return Pair.of(str, Double.valueOf(shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut()));
                }).filter(pair -> {
                    return !recentlyUnloadedBundles.containsKey(pair.getLeft());
                }).sorted((pair2, pair3) -> {
                    return Double.compare(((Double) pair3.getRight()).doubleValue(), ((Double) pair2.getRight()).doubleValue());
                }).forEach(pair4 -> {
                    if (mutableDouble.doubleValue() < msgThroughputIn || mutableBoolean.isFalse()) {
                        this.selectedBundlesCache.put(str, (String) pair4.getLeft());
                        mutableDouble.add((Number) pair4.getRight());
                        mutableBoolean.setTrue();
                    }
                });
            } else if (localData.getBundles().size() == 1) {
                log.warn("HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. No Load Shedding will be done on this broker", localData.getBundles().iterator().next(), str);
            } else {
                log.warn("Broker {} is overloaded despite having no bundles", str);
            }
        });
        return this.selectedBundlesCache;
    }
}
