/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance.impl;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.Map;
import org.apache.commons.lang3.mutable.MutableDouble;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TimeAverageMessageData;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UniformLoadShedder
implements LoadSheddingStrategy {
    private static final Logger log = LoggerFactory.getLogger(UniformLoadShedder.class);
    private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();

    @Override
    public Multimap<String, String> findBundlesForUnloading(LoadData loadData, ServiceConfiguration conf) {
        boolean isMsgThroughputThresholdExceeded;
        this.selectedBundlesCache.clear();
        Map<String, BrokerData> brokersData = loadData.getBrokerData();
        Map<String, BundleData> loadBundleData = loadData.getBundleDataForLoadShedding();
        Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
        MutableObject overloadedBroker = new MutableObject();
        MutableObject underloadedBroker = new MutableObject();
        MutableDouble maxMsgRate = new MutableDouble(-1.0);
        MutableDouble maxThroughputRate = new MutableDouble(-1.0);
        MutableDouble minMsgRate = new MutableDouble(2.147483647E9);
        MutableDouble minThroughputgRate = new MutableDouble(2.147483647E9);
        brokersData.forEach((broker, data) -> {
            double msgRate = data.getLocalData().getMsgRateIn() + data.getLocalData().getMsgRateOut();
            double throughputRate = data.getLocalData().getMsgThroughputIn() + data.getLocalData().getMsgThroughputOut();
            if (data.getLocalData().getBundles().size() > 1 && (msgRate > maxMsgRate.getValue() || throughputRate > maxThroughputRate.getValue())) {
                overloadedBroker.setValue(broker);
                maxMsgRate.setValue(msgRate);
                maxThroughputRate.setValue(throughputRate);
            }
            if (msgRate < minMsgRate.getValue() || throughputRate < minThroughputgRate.getValue()) {
                underloadedBroker.setValue(broker);
                minMsgRate.setValue(msgRate);
                minThroughputgRate.setValue(throughputRate);
            }
        });
        double msgRateDifferencePercentage = (maxMsgRate.getValue() - minMsgRate.getValue()) * 100.0 / minMsgRate.getValue();
        double msgThroughputDifferenceRate = maxThroughputRate.getValue() / minThroughputgRate.getValue();
        boolean isMsgRateThresholdExceeded = conf.getLoadBalancerMsgRateDifferenceShedderThreshold() > 0.0 && msgRateDifferencePercentage > conf.getLoadBalancerMsgRateDifferenceShedderThreshold();
        boolean bl = isMsgThroughputThresholdExceeded = conf.getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold() > 0.0 && msgThroughputDifferenceRate > conf.getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold();
        if (isMsgRateThresholdExceeded || isMsgThroughputThresholdExceeded) {
            if (log.isDebugEnabled()) {
                log.debug("Found bundles for uniform load balancing. overloaded broker {} with (msgRate,throughput)= ({},{}) and underloaded broker {} with (msgRate,throughput)= ({},{})", new Object[]{overloadedBroker.getValue(), maxMsgRate.getValue(), maxThroughputRate.getValue(), underloadedBroker.getValue(), minMsgRate.getValue(), minThroughputgRate.getValue()});
            }
            MutableInt msgRateRequiredFromUnloadedBundles = new MutableInt((int)((maxMsgRate.getValue() - minMsgRate.getValue()) / 2.0));
            MutableInt msgThroughtputRequiredFromUnloadedBundles = new MutableInt((int)((maxThroughputRate.getValue() - minThroughputgRate.getValue()) / 2.0));
            LocalBrokerData overloadedBrokerData = brokersData.get(overloadedBroker.getValue()).getLocalData();
            if (overloadedBrokerData.getBundles().size() > 1) {
                loadBundleData.entrySet().stream().filter(e -> overloadedBrokerData.getBundles().contains(e.getKey())).map(e -> {
                    String bundle = (String)e.getKey();
                    BundleData bundleData = (BundleData)e.getValue();
                    TimeAverageMessageData shortTermData = bundleData.getShortTermData();
                    double throughput = isMsgRateThresholdExceeded ? shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut() : shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut();
                    return Triple.of((Object)bundle, (Object)bundleData, (Object)throughput);
                }).filter(e -> !recentlyUnloadedBundles.containsKey(e.getLeft())).filter(e -> overloadedBrokerData.getBundles().contains(e.getLeft())).sorted((e1, e2) -> Double.compare((Double)e2.getRight(), (Double)e1.getRight())).forEach(e -> {
                    String bundle = (String)e.getLeft();
                    BundleData bundleData = (BundleData)e.getMiddle();
                    TimeAverageMessageData shortTermData = bundleData.getShortTermData();
                    double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut();
                    double bundleMsgRate = shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut();
                    if (isMsgRateThresholdExceeded) {
                        if (bundleMsgRate <= (double)(msgRateRequiredFromUnloadedBundles.getValue() + 1000)) {
                            log.info("Found bundle to unload with msgRate {}", (Object)bundleMsgRate);
                            msgRateRequiredFromUnloadedBundles.add((Number)(-bundleMsgRate));
                            this.selectedBundlesCache.put(overloadedBroker.getValue(), (Object)bundle);
                        }
                    } else if (throughput <= (double)msgThroughtputRequiredFromUnloadedBundles.getValue().intValue()) {
                        log.info("Found bundle to unload with throughput {}", (Object)throughput);
                        msgThroughtputRequiredFromUnloadedBundles.add((Number)(-throughput));
                        this.selectedBundlesCache.put(overloadedBroker.getValue(), (Object)bundle);
                    }
                });
            }
        }
        return this.selectedBundlesCache;
    }
}

