package org.apache.pulsar.broker.loadbalance.extensions.scheduler;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
import org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.class */
public class TransferShedder implements NamespaceUnloadStrategy {
    private static final Logger log = LoggerFactory.getLogger(TransferShedder.class);
    private static final double KB = 1024.0d;
    private static final String CANNOT_CONTINUE_UNLOAD_MSG = "Can't continue the unload cycle.";
    private static final String CANNOT_UNLOAD_BROKER_MSG = "Can't unload broker:%s.";
    private static final String CANNOT_UNLOAD_BUNDLE_MSG = "Can't unload bundle:%s.";
    private final LoadStats stats;
    private PulsarService pulsar;
    private IsolationPoliciesHelper isolationPoliciesHelper;
    private AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper;
    private List<BrokerFilter> brokerFilterPipeline;
    private Set<UnloadDecision> decisionCache;
    private UnloadCounter counter;
    private ServiceUnitStateChannel channel;
    private int unloadConditionHitCount;

    /* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder$LoadStats.class */
    static class LoadStats {
        private double sum;
        private double sqSum;
        private int totalBrokers;
        private double avg;
        private double std;
        private LoadDataStore<BrokerLoadData> loadDataStore;
        private List<Map.Entry<String, BrokerLoadData>> brokersSortedByLoad = new ArrayList();
        int maxBrokerIndex;
        int minBrokerIndex;
        int numberOfBrokerSheddingPerCycle;
        int maxNumberOfBrokerSheddingPerCycle;

        LoadStats() {
        }

        private void update(double d, double d2, int i) {
            this.sum = d;
            this.sqSum = d2;
            this.totalBrokers = i;
            if (i == 0) {
                this.avg = 0.0d;
                this.std = 0.0d;
            } else {
                this.avg = d / i;
                this.std = Math.sqrt((d2 / i) - (this.avg * this.avg));
            }
        }

        void offload(double d, double d2, double d3) {
            this.sqSum -= (d * d) + (d2 * d2);
            double max = Math.max(0.0d, d - d3);
            double d4 = d2 + d3;
            this.sqSum += (max * max) + (d4 * d4);
            this.std = Math.sqrt(Math.abs((this.sqSum / this.totalBrokers) - (this.avg * this.avg)));
            this.numberOfBrokerSheddingPerCycle++;
            this.minBrokerIndex++;
        }

        void clear() {
            this.sum = 0.0d;
            this.sqSum = 0.0d;
            this.totalBrokers = 0;
            this.avg = 0.0d;
            this.std = 0.0d;
            this.maxBrokerIndex = 0;
            this.minBrokerIndex = 0;
            this.numberOfBrokerSheddingPerCycle = 0;
            this.maxNumberOfBrokerSheddingPerCycle = 0;
            this.brokersSortedByLoad.clear();
            this.loadDataStore = null;
        }

        Optional<UnloadDecision.Reason> update(LoadDataStore<BrokerLoadData> loadDataStore, Map<String, BrokerLookupData> map, Map<String, Long> map2, ServiceConfiguration serviceConfiguration) {
            this.maxNumberOfBrokerSheddingPerCycle = serviceConfiguration.getLoadBalancerMaxNumberOfBrokerSheddingPerCycle();
            boolean debug = ExtensibleLoadManagerImpl.debug(serviceConfiguration, TransferShedder.log);
            UnloadDecision.Reason reason = null;
            double d = 0.0d;
            double d2 = 0.0d;
            int i = 0;
            long currentTimeMillis = System.currentTimeMillis();
            HashSet hashSet = new HashSet(map.keySet());
            for (Map.Entry<String, BrokerLoadData> entry : loadDataStore.entrySet()) {
                BrokerLoadData value = entry.getValue();
                String key = entry.getKey();
                hashSet.remove(key);
                if (currentTimeMillis - value.getUpdatedAt() > serviceConfiguration.getLoadBalancerBrokerLoadDataTTLInSeconds() * 1000) {
                    TransferShedder.log.warn("Ignoring broker:{} load update because the load data timestamp:{} is too old.", key, Long.valueOf(value.getUpdatedAt()));
                    reason = UnloadDecision.Reason.OutDatedData;
                } else {
                    if (map2.containsKey(key)) {
                        long updatedAt = value.getUpdatedAt() - map2.get(key).longValue();
                        if (updatedAt < serviceConfiguration.getLoadBalanceSheddingDelayInSeconds() * 1000) {
                            if (debug) {
                                TransferShedder.log.warn("Broker:{} load data is too early since the last transfer. elapsed {} secs < threshold {} secs", new Object[]{key, Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(updatedAt)), Long.valueOf(serviceConfiguration.getLoadBalanceSheddingDelayInSeconds())});
                            }
                            update(0.0d, 0.0d, 0);
                            return Optional.of(UnloadDecision.Reason.CoolDown);
                        }
                        map2.remove(key);
                    }
                    double weightedMaxEMA = value.getWeightedMaxEMA();
                    d += weightedMaxEMA;
                    d2 += weightedMaxEMA * weightedMaxEMA;
                    i++;
                }
            }
            if (i == 0) {
                if (reason == null) {
                    reason = UnloadDecision.Reason.NoBrokers;
                }
                update(0.0d, 0.0d, 0);
                if (debug) {
                    TransferShedder.log.info("There is no broker load data.");
                }
                return Optional.of(reason);
            }
            if (hashSet.isEmpty()) {
                update(d, d2, i);
                return Optional.empty();
            }
            UnloadDecision.Reason reason2 = UnloadDecision.Reason.NoLoadData;
            update(0.0d, 0.0d, 0);
            if (debug) {
                TransferShedder.log.info("There is missing load data from brokers:{}", hashSet);
            }
            return Optional.of(reason2);
        }

        void setLoadDataStore(LoadDataStore<BrokerLoadData> loadDataStore) {
            this.loadDataStore = loadDataStore;
            this.brokersSortedByLoad.addAll(loadDataStore.entrySet());
            this.brokersSortedByLoad.sort(Comparator.comparingDouble(entry -> {
                return ((BrokerLoadData) entry.getValue()).getWeightedMaxEMA();
            }));
            this.maxBrokerIndex = this.brokersSortedByLoad.size() - 1;
            this.minBrokerIndex = 0;
        }

        String peekMinBroker() {
            return this.brokersSortedByLoad.get(this.minBrokerIndex).getKey();
        }

        String peekMaxBroker() {
            return this.brokersSortedByLoad.get(this.maxBrokerIndex).getKey();
        }

        String pollMaxBroker() {
            List<Map.Entry<String, BrokerLoadData>> list = this.brokersSortedByLoad;
            int i = this.maxBrokerIndex;
            this.maxBrokerIndex = i - 1;
            return list.get(i).getKey();
        }

        public String toString() {
            return String.format("sum:%.2f, sqSum:%.2f, avg:%.2f, std:%.2f, totalBrokers:%d, brokersSortedByLoad:%s", Double.valueOf(this.sum), Double.valueOf(this.sqSum), Double.valueOf(this.avg), Double.valueOf(this.std), Integer.valueOf(this.totalBrokers), this.brokersSortedByLoad.stream().map(entry -> {
                return (String) entry.getKey();
            }).collect(Collectors.toList()));
        }

        boolean hasTransferableBrokers() {
            return this.numberOfBrokerSheddingPerCycle < this.maxNumberOfBrokerSheddingPerCycle && this.minBrokerIndex < this.maxBrokerIndex;
        }

        public double sum() {
            return this.sum;
        }

        public double sqSum() {
            return this.sqSum;
        }

        public int totalBrokers() {
            return this.totalBrokers;
        }

        public double avg() {
            return this.avg;
        }

        public double std() {
            return this.std;
        }

        public LoadDataStore<BrokerLoadData> loadDataStore() {
            return this.loadDataStore;
        }

        public List<Map.Entry<String, BrokerLoadData>> brokersSortedByLoad() {
            return this.brokersSortedByLoad;
        }

        public int maxBrokerIndex() {
            return this.maxBrokerIndex;
        }

        public int minBrokerIndex() {
            return this.minBrokerIndex;
        }

        public int numberOfBrokerSheddingPerCycle() {
            return this.numberOfBrokerSheddingPerCycle;
        }

        public int maxNumberOfBrokerSheddingPerCycle() {
            return this.maxNumberOfBrokerSheddingPerCycle;
        }
    }

    @VisibleForTesting
    public TransferShedder(UnloadCounter unloadCounter) {
        this.stats = new LoadStats();
        this.unloadConditionHitCount = 0;
        this.pulsar = null;
        this.decisionCache = new HashSet();
        this.counter = unloadCounter;
        this.isolationPoliciesHelper = null;
        this.antiAffinityGroupPolicyHelper = null;
    }

    public TransferShedder(PulsarService pulsarService, UnloadCounter unloadCounter, List<BrokerFilter> list, IsolationPoliciesHelper isolationPoliciesHelper, AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper) {
        this.stats = new LoadStats();
        this.unloadConditionHitCount = 0;
        this.pulsar = pulsarService;
        this.decisionCache = new HashSet();
        this.counter = unloadCounter;
        this.isolationPoliciesHelper = isolationPoliciesHelper;
        this.antiAffinityGroupPolicyHelper = antiAffinityGroupPolicyHelper;
        this.channel = ServiceUnitStateChannelImpl.get(pulsarService);
        this.brokerFilterPipeline = list;
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.scheduler.NamespaceUnloadStrategy
    public void initialize(PulsarService pulsarService) {
        this.pulsar = pulsarService;
        this.decisionCache = new HashSet();
        ExtensibleLoadManagerImpl extensibleLoadManagerImpl = ExtensibleLoadManagerImpl.get(pulsarService.getLoadManager().get());
        this.counter = extensibleLoadManagerImpl.getUnloadCounter();
        this.isolationPoliciesHelper = extensibleLoadManagerImpl.getIsolationPoliciesHelper();
        this.antiAffinityGroupPolicyHelper = extensibleLoadManagerImpl.getAntiAffinityGroupPolicyHelper();
        this.channel = ServiceUnitStateChannelImpl.get(pulsarService);
        this.brokerFilterPipeline = extensibleLoadManagerImpl.getBrokerFilterPipeline();
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.scheduler.NamespaceUnloadStrategy
    public Set<UnloadDecision> findBundlesForUnloading(LoadManagerContext loadManagerContext, Map<String, Long> map, Map<String, Long> map2) {
        boolean debug;
        Optional<UnloadDecision.Reason> update;
        UnloadDecision.Reason reason;
        Unload unload;
        ServiceConfiguration brokerConfiguration = loadManagerContext.brokerConfiguration();
        this.decisionCache.clear();
        this.stats.clear();
        try {
            Map<String, BrokerLookupData> map3 = loadManagerContext.brokerRegistry().getAvailableBrokerLookupDataAsync().get(loadManagerContext.brokerConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
            try {
                this.stats.setLoadDataStore(loadManagerContext.brokerLoadDataStore());
                debug = ExtensibleLoadManagerImpl.debug(brokerConfiguration, log);
                update = this.stats.update(loadManagerContext.brokerLoadDataStore(), map3, map2, brokerConfiguration);
            } catch (Throwable th) {
                log.error("Failed to process unloading. ", th);
                this.counter.update(UnloadDecision.Label.Failure, UnloadDecision.Reason.Unknown);
            }
            if (update.isPresent()) {
                if (debug) {
                    log.warn("Can't continue the unload cycle. Skipped the load stat update. Reason:{}.", update.get());
                }
                this.counter.update(UnloadDecision.Label.Skip, update.get());
                return this.decisionCache;
            }
            this.counter.updateLoadData(this.stats.avg, this.stats.std);
            if (debug) {
                log.info("brokers' load stats:{}", this.stats);
            }
            int i = 0;
            int i2 = 0;
            double loadBalancerBrokerLoadTargetStd = brokerConfiguration.getLoadBalancerBrokerLoadTargetStd();
            boolean isLoadBalancerTransferEnabled = brokerConfiguration.isLoadBalancerTransferEnabled();
            if (this.stats.std() > loadBalancerBrokerLoadTargetStd || isUnderLoaded(loadManagerContext, this.stats.peekMinBroker(), this.stats.avg) || isOverLoaded(loadManagerContext, this.stats.peekMaxBroker(), this.stats.avg)) {
                this.unloadConditionHitCount++;
            } else {
                this.unloadConditionHitCount = 0;
            }
            if (this.unloadConditionHitCount <= brokerConfiguration.getLoadBalancerSheddingConditionHitCountThreshold()) {
                if (debug) {
                    log.info("Can't continue the unload cycle. Shedding condition hit count:{} is less than or equal to the threshold:{}.", Integer.valueOf(this.unloadConditionHitCount), Integer.valueOf(brokerConfiguration.getLoadBalancerSheddingConditionHitCountThreshold()));
                }
                this.counter.update(UnloadDecision.Label.Skip, UnloadDecision.Reason.HitCount);
                return this.decisionCache;
            }
            while (true) {
                if (this.stats.hasTransferableBrokers()) {
                    if (this.stats.std() > loadBalancerBrokerLoadTargetStd) {
                        reason = UnloadDecision.Reason.Overloaded;
                    } else if (isUnderLoaded(loadManagerContext, this.stats.peekMinBroker(), this.stats.avg)) {
                        reason = UnloadDecision.Reason.Underloaded;
                        if (debug) {
                            log.info(String.format("broker:%s is underloaded:%s although load std:%.2f <= targetStd:%.2f. Continuing unload for this underloaded broker.", this.stats.peekMinBroker(), loadManagerContext.brokerLoadDataStore().get(this.stats.peekMinBroker()).get(), Double.valueOf(this.stats.std()), Double.valueOf(loadBalancerBrokerLoadTargetStd)));
                        }
                    } else if (isOverLoaded(loadManagerContext, this.stats.peekMaxBroker(), this.stats.avg)) {
                        reason = UnloadDecision.Reason.Overloaded;
                        if (debug) {
                            log.info(String.format("broker:%s is overloaded:%s although load std:%.2f <= targetStd:%.2f. Continuing unload for this overloaded broker.", this.stats.peekMaxBroker(), loadManagerContext.brokerLoadDataStore().get(this.stats.peekMaxBroker()).get(), Double.valueOf(this.stats.std()), Double.valueOf(loadBalancerBrokerLoadTargetStd)));
                        }
                    } else if (debug) {
                        log.info("Can't continue the unload cycle.The overall cluster load meets the target, std:{} <= targetStd:{}.minBroker:{} is not underloaded. maxBroker:{} is not overloaded.", new Object[]{Double.valueOf(this.stats.std()), Double.valueOf(loadBalancerBrokerLoadTargetStd), this.stats.peekMinBroker(), this.stats.peekMaxBroker()});
                    }
                    String pollMaxBroker = this.stats.pollMaxBroker();
                    String peekMinBroker = this.stats.peekMinBroker();
                    Optional<BrokerLoadData> optional = loadManagerContext.brokerLoadDataStore().get(pollMaxBroker);
                    Optional<BrokerLoadData> optional2 = loadManagerContext.brokerLoadDataStore().get(peekMinBroker);
                    if (optional.isEmpty()) {
                        log.error(String.format("Can't unload broker:%s. MaxBrokerLoadData is empty.", pollMaxBroker));
                        i++;
                    } else if (optional2.isEmpty()) {
                        log.error("Can't transfer load to broker:{}. MinBrokerLoadData is empty.", peekMinBroker);
                        i++;
                    } else {
                        double weightedMaxEMA = optional.get().getWeightedMaxEMA();
                        double weightedMaxEMA2 = optional2.get().getWeightedMaxEMA();
                        double d = (weightedMaxEMA - weightedMaxEMA2) / 2.0d;
                        BrokerLoadData brokerLoadData = optional.get();
                        double msgThroughputIn = brokerLoadData.getMsgThroughputIn() + brokerLoadData.getMsgThroughputOut();
                        double msgThroughputIn2 = optional2.get().getMsgThroughputIn() + optional2.get().getMsgThroughputOut();
                        double d2 = (msgThroughputIn * d) / weightedMaxEMA;
                        if (debug) {
                            Logger logger = log;
                            Object[] objArr = new Object[6];
                            objArr[0] = pollMaxBroker;
                            objArr[1] = isLoadBalancerTransferEnabled ? " to broker:" + peekMinBroker : "";
                            objArr[2] = Double.valueOf(weightedMaxEMA * 100.0d);
                            objArr[3] = Double.valueOf(loadBalancerBrokerLoadTargetStd);
                            objArr[4] = Double.valueOf(d * 100.0d);
                            objArr[5] = Double.valueOf(d2 / KB);
                            logger.info(String.format("Attempting to shed load from broker:%s%s, which has the max resource usage:%.2f%%, targetStd:%.2f, -- Trying to offload %.2f%%, %.2f KByte/s of traffic.", objArr));
                        }
                        double d3 = 0.0d;
                        double d4 = 0.0d;
                        Optional<TopBundlesLoadData> optional3 = loadManagerContext.topBundleLoadDataStore().get(pollMaxBroker);
                        if (optional3.isEmpty() || optional3.get().getTopBundlesLoadData().isEmpty()) {
                            log.error(String.format("Can't unload broker:%s. TopBundlesLoadData is empty.", pollMaxBroker));
                            i++;
                        } else {
                            List<TopBundlesLoadData.BundleLoadData> topBundlesLoadData = optional3.get().getTopBundlesLoadData();
                            if (topBundlesLoadData.size() == 1) {
                                i2++;
                                log.warn(String.format("Can't unload broker:%s. Sole namespace bundle:%s is overloading the broker. ", pollMaxBroker, topBundlesLoadData.iterator().next()));
                            } else {
                                Optional<TopBundlesLoadData> optional4 = loadManagerContext.topBundleLoadDataStore().get(peekMinBroker);
                                Iterator<TopBundlesLoadData.BundleLoadData> it = optional4.isPresent() ? optional4.get().getTopBundlesLoadData().iterator() : null;
                                if (topBundlesLoadData.isEmpty()) {
                                    i2++;
                                    log.warn(String.format("Can't unload broker:%s. Broker overloaded despite having no bundles", pollMaxBroker));
                                } else {
                                    int size = topBundlesLoadData.size();
                                    Iterator<TopBundlesLoadData.BundleLoadData> it2 = topBundlesLoadData.iterator();
                                    while (true) {
                                        if (!it2.hasNext()) {
                                            break;
                                        }
                                        TopBundlesLoadData.BundleLoadData next = it2.next();
                                        String bundleName = next.bundleName();
                                        if (this.channel == null || this.channel.isOwner(bundleName, pollMaxBroker)) {
                                            if (map.containsKey(bundleName)) {
                                                if (debug) {
                                                    log.info(String.format("Can't unload bundle:%s. Bundle has been recently unloaded at ts:%d.", bundleName, map.get(bundleName)));
                                                }
                                            } else if (isTransferable(loadManagerContext, map3, bundleName, pollMaxBroker, Optional.of(peekMinBroker))) {
                                                if (size > 1) {
                                                    NamespaceBundleStats stats = next.stats();
                                                    double d5 = stats.msgThroughputIn + stats.msgThroughputOut;
                                                    boolean z = false;
                                                    ArrayList arrayList = new ArrayList();
                                                    double d6 = 0.0d;
                                                    if ((d3 - d4) + d5 > d2) {
                                                        if (isLoadBalancerTransferEnabled && it != null) {
                                                            double d7 = ((msgThroughputIn - d3) + d4) - d5;
                                                            double d8 = ((msgThroughputIn2 + d3) - d4) + d5;
                                                            while (true) {
                                                                if (!it.hasNext()) {
                                                                    break;
                                                                }
                                                                TopBundlesLoadData.BundleLoadData next2 = it.next();
                                                                if (isTransferable(loadManagerContext, map3, next2.bundleName(), peekMinBroker, Optional.of(pollMaxBroker))) {
                                                                    double d9 = next2.stats().msgThroughputIn + next2.stats().msgThroughputOut;
                                                                    double d10 = d7 + d9;
                                                                    double d11 = d8 - d9;
                                                                    if (d10 < msgThroughputIn && d11 < msgThroughputIn) {
                                                                        arrayList.add(new Unload(peekMinBroker, next2.bundleName(), Optional.of(pollMaxBroker)));
                                                                        d7 = d10;
                                                                        d8 = d11;
                                                                        d6 += d9;
                                                                        if (d8 <= d7 && d7 < msgThroughputIn * 0.75d) {
                                                                            z = true;
                                                                            break;
                                                                        }
                                                                    }
                                                                }
                                                            }
                                                        }
                                                        if (!z) {
                                                            if (debug) {
                                                                log.info(String.format("Can't unload bundle:%s. The traffic to unload:%.2f - gain:%.2f = %.2f KByte/s is greater than the target :%.2f KByte/s.", bundleName, Double.valueOf((d3 + d5) / KB), Double.valueOf(d4 / KB), Double.valueOf(((d3 - d4) + d5) / KB), Double.valueOf(d2 / KB)));
                                                            }
                                                        }
                                                    }
                                                    if (isLoadBalancerTransferEnabled) {
                                                        if (z) {
                                                            UnloadDecision.Reason reason2 = reason;
                                                            arrayList.forEach(unload2 -> {
                                                                if (debug) {
                                                                    log.info("Decided to gain bundle:{} from min broker:{}", unload2.serviceUnit(), unload2.sourceBroker());
                                                                }
                                                                UnloadDecision unloadDecision = new UnloadDecision();
                                                                unloadDecision.setUnload(unload2);
                                                                unloadDecision.succeed(reason2);
                                                                this.decisionCache.add(unloadDecision);
                                                            });
                                                            if (debug) {
                                                                log.info(String.format("Total traffic %.2f KByte/s to transfer from min broker:%s to max broker:%s.", Double.valueOf(d6 / KB), peekMinBroker, pollMaxBroker));
                                                                d4 += d6;
                                                            }
                                                        }
                                                        unload = new Unload(pollMaxBroker, bundleName, Optional.of(peekMinBroker));
                                                    } else {
                                                        unload = new Unload(pollMaxBroker, bundleName);
                                                    }
                                                    UnloadDecision unloadDecision = new UnloadDecision();
                                                    unloadDecision.setUnload(unload);
                                                    unloadDecision.succeed(reason);
                                                    this.decisionCache.add(unloadDecision);
                                                    d3 += d5;
                                                    size--;
                                                    if (debug) {
                                                        log.info(String.format("Decided to unload bundle:%s, throughput:%.2f KByte/s. The traffic marked to unload:%.2f - gain:%.2f = %.2f KByte/s. Target:%.2f KByte/s.", bundleName, Double.valueOf(d5 / KB), Double.valueOf(d3 / KB), Double.valueOf(d4 / KB), Double.valueOf((d3 - d4) / KB), Double.valueOf(d2 / KB)));
                                                    }
                                                } else if (debug) {
                                                    log.info(String.format("Can't unload bundle:%s. The remaining bundles in TopBundlesLoadData from the maxBroker:%s is less than or equal to 1.", bundleName, pollMaxBroker));
                                                }
                                            } else if (debug) {
                                                log.info(String.format("Can't unload bundle:%s. This unload can't meet affinity(isolation) or anti-affinity group policies.", bundleName));
                                            }
                                        } else if (debug) {
                                            log.warn(String.format("Can't unload bundle:%s. MaxBroker:%s is not the owner.", bundleName, pollMaxBroker));
                                        }
                                    }
                                    if (d3 > 0.0d) {
                                        double d12 = ((d3 - d4) * weightedMaxEMA) / msgThroughputIn;
                                        this.stats.offload(weightedMaxEMA, weightedMaxEMA2, d12);
                                        if (debug) {
                                            log.info(String.format("brokers' load stats:%s, after offload{max:%.2f, min:%.2f, offload:%.2f}", this.stats, Double.valueOf(weightedMaxEMA), Double.valueOf(weightedMaxEMA2), Double.valueOf(d12)));
                                        }
                                    } else {
                                        i2++;
                                        log.warn(String.format("Can't unload broker:%s. There is no bundle that can be unloaded in top bundles load data. Consider splitting bundles owned by the broker to make each bundle serve less traffic or increasing loadBalancerMaxNumberOfBundlesInBundleLoadReport to report more bundles in the top bundles load data.", pollMaxBroker));
                                    }
                                }
                            }
                        }
                    }
                } else if (debug) {
                    log.info("Can't continue the unload cycle. Exhausted target transfer brokers.");
                }
            }
            if (debug) {
                log.info("decisionCache:{}", this.decisionCache);
            }
            if (this.decisionCache.isEmpty()) {
                this.counter.update(UnloadDecision.Label.Skip, i > 0 ? UnloadDecision.Reason.NoLoadData : i2 > 0 ? UnloadDecision.Reason.NoBundles : UnloadDecision.Reason.HitCount);
            } else {
                this.unloadConditionHitCount = 0;
            }
            return this.decisionCache;
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            this.counter.update(UnloadDecision.Label.Failure, UnloadDecision.Reason.Unknown);
            log.warn("Failed to fetch available brokers. Stop unloading.", e);
            return this.decisionCache;
        }
    }

    private boolean isUnderLoaded(LoadManagerContext loadManagerContext, String str, double d) {
        Optional<BrokerLoadData> optional = loadManagerContext.brokerLoadDataStore().get(str);
        if (optional.isEmpty()) {
            return false;
        }
        BrokerLoadData brokerLoadData = optional.get();
        return brokerLoadData.getMsgThroughputEMA() < 1.0d || brokerLoadData.getWeightedMaxEMA() < d * Math.min(0.5d, Math.max(0.0d, loadManagerContext.brokerConfiguration().getLoadBalancerBrokerLoadTargetStd() / 2.0d));
    }

    private boolean isOverLoaded(LoadManagerContext loadManagerContext, String str, double d) {
        Optional<BrokerLoadData> optional = loadManagerContext.brokerLoadDataStore().get(str);
        if (optional.isEmpty()) {
            return false;
        }
        double loadBalancerBrokerOverloadedThresholdPercentage = r0.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0d;
        double loadBalancerBrokerLoadTargetStd = loadManagerContext.brokerConfiguration().getLoadBalancerBrokerLoadTargetStd();
        double weightedMaxEMA = optional.get().getWeightedMaxEMA();
        return weightedMaxEMA > loadBalancerBrokerOverloadedThresholdPercentage && weightedMaxEMA > d + loadBalancerBrokerLoadTargetStd;
    }

    private boolean isTransferable(LoadManagerContext loadManagerContext, Map<String, BrokerLookupData> map, String str, String str2, Optional<String> optional) {
        if (this.pulsar == null) {
            return true;
        }
        NamespaceBundle bundle = this.pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(LoadManagerShared.getNamespaceNameFromBundleName(str), LoadManagerShared.getBundleRangeFromBundleName(str));
        if (!isLoadBalancerSheddingBundlesWithPoliciesEnabled(loadManagerContext, bundle)) {
            return false;
        }
        HashMap hashMap = new HashMap(map);
        for (BrokerFilter brokerFilter : this.brokerFilterPipeline) {
            try {
                brokerFilter.filter(hashMap, bundle, loadManagerContext);
            } catch (BrokerFilterException e) {
                log.error("Failed to filter brokers with filter: {}", brokerFilter.getClass().getName(), e);
                return false;
            }
        }
        if (optional.isPresent() && !hashMap.containsKey(optional.get())) {
            return false;
        }
        hashMap.remove(str2);
        return (optional.isEmpty() || !loadManagerContext.brokerConfiguration().isLoadBalancerTransferEnabled()) ? !hashMap.isEmpty() : hashMap.containsKey(optional.get());
    }

    protected boolean isLoadBalancerSheddingBundlesWithPoliciesEnabled(LoadManagerContext loadManagerContext, NamespaceBundle namespaceBundle) {
        if (this.isolationPoliciesHelper != null && this.isolationPoliciesHelper.hasIsolationPolicy(namespaceBundle.getNamespaceObject())) {
            return loadManagerContext.brokerConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled();
        }
        if (this.antiAffinityGroupPolicyHelper == null || !this.antiAffinityGroupPolicyHelper.hasAntiAffinityGroupPolicy(namespaceBundle.toString())) {
            return true;
        }
        return loadManagerContext.brokerConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled();
    }

    public TransferShedder() {
        this.stats = new LoadStats();
        this.unloadConditionHitCount = 0;
    }

    public UnloadCounter getCounter() {
        return this.counter;
    }
}
