/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance;

import com.google.common.collect.Lists;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.impl.PulsarResourceDescription;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.internal.NamespacesImpl;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class LoadBalancerTest {
    LocalBookkeeperEnsemble bkEnsemble;
    ExecutorService executor = new ThreadPoolExecutor(5, 20, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    private static final Logger log = LoggerFactory.getLogger(LoadBalancerTest.class);
    private static final int MAX_RETRIES = 15;
    private static final int BROKER_COUNT = 5;
    private int[] brokerWebServicePorts = new int[5];
    private int[] brokerNativeBrokerPorts = new int[5];
    private URL[] brokerUrls = new URL[5];
    private String[] lookupAddresses = new String[5];
    private PulsarService[] pulsarServices = new PulsarService[5];
    private PulsarAdmin[] pulsarAdmins = new PulsarAdmin[5];

    @BeforeMethod
    void setup() throws Exception {
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
        this.bkEnsemble.start();
        ZkUtils.createFullPathOptimistic((ZooKeeper)this.bkEnsemble.getZkClient(), (String)"/loadbalance/settings/strategy", (byte[])"{\"loadBalancerStrategy\":\"leastLoadedServer\"}".getBytes(), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, (CreateMode)CreateMode.PERSISTENT);
        String localhost = "localhost";
        for (int i = 0; i < 5; ++i) {
            ServiceConfiguration config = new ServiceConfiguration();
            config.setBrokerServicePort(Optional.of(this.brokerNativeBrokerPorts[i]));
            config.setClusterName("use");
            config.setAdvertisedAddress("localhost");
            config.setAdvertisedAddress("localhost");
            config.setWebServicePort(Optional.of(0));
            config.setBrokerServicePortTls(Optional.of(0));
            config.setWebServicePortTls(Optional.of(0));
            config.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
            config.setBrokerShutdownTimeoutMs(0L);
            config.setBrokerServicePort(Optional.of(0));
            config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
            config.setAdvertisedAddress("localhost" + i);
            config.setLoadBalancerEnabled(false);
            this.pulsarServices[i] = new PulsarService(config);
            this.pulsarServices[i].start();
            this.brokerWebServicePorts[i] = (Integer)this.pulsarServices[i].getListenPortHTTP().get();
            this.brokerNativeBrokerPorts[i] = (Integer)this.pulsarServices[i].getBrokerListenPort().get();
            this.brokerUrls[i] = new URL("http://127.0.0.1:" + this.brokerWebServicePorts[i]);
            this.lookupAddresses[i] = this.pulsarServices[i].getAdvertisedAddress() + ":" + this.pulsarServices[i].getListenPortHTTP().get();
            this.pulsarAdmins[i] = PulsarAdmin.builder().serviceHttpUrl(this.brokerUrls[i].toString()).build();
        }
        this.createNamespacePolicies(this.pulsarServices[0]);
        Thread.sleep(100L);
    }

    @AfterMethod(alwaysRun=true)
    void shutdown() throws Exception {
        log.info("--- Shutting down ---");
        this.executor.shutdownNow();
        for (int i = 0; i < 5; ++i) {
            this.pulsarAdmins[i].close();
            if (this.pulsarServices[i] == null) continue;
            this.pulsarServices[i].close();
        }
        this.bkEnsemble.stop();
    }

    private void loopUntilLeaderChangesForAllBroker(List<PulsarService> activePulsars, LeaderBroker oldLeader) {
        int loopCount = 0;
        Awaitility.await().pollInterval(1L, TimeUnit.SECONDS).atMost(15L, TimeUnit.SECONDS).until(() -> {
            boolean settled = true;
            for (PulsarService pulsar : activePulsars) {
                Optional leader = (Optional)pulsar.getLeaderElectionService().readCurrentLeader().join();
                if (leader.isPresent() && !((LeaderBroker)leader.get()).equals((Object)oldLeader)) continue;
                settled = false;
                break;
            }
            return settled;
        });
        Assert.assertNotEquals((Object)loopCount, (Object)15, (String)"Leader is not changed even after maximum retries.");
    }

    @Test
    public void testLoadReportsWrittenOnMetadataStore() throws Exception {
        for (int i = 0; i < 5; ++i) {
            String path = String.format("%s/%s", "/loadbalance/brokers", this.lookupAddresses[i]);
            byte[] loadReportData = ((GetResult)((Optional)this.pulsarServices[i].getLocalMetadataStore().get(path).join()).get()).getValue();
            Assert.assertTrue((loadReportData.length > 0 ? 1 : 0) != 0);
            log.info("LoadReport {}, {}", (Object)this.lookupAddresses[i], (Object)new String(loadReportData));
            LoadReport loadReport = (LoadReport)ObjectMapperFactory.getThreadLocal().readValue(loadReportData, LoadReport.class);
            Assert.assertEquals((String)loadReport.getName(), (String)this.lookupAddresses[i]);
            Field ranking = ((SimpleLoadManagerImpl)this.pulsarServices[i].getLoadManager().get()).getClass().getDeclaredField("sortedRankings");
            ranking.setAccessible(true);
            AtomicReference sortedRanking = (AtomicReference)ranking.get(this.pulsarServices[i].getLoadManager().get());
            this.printSortedRanking(sortedRanking);
            int brokerCount = 0;
            for (Map.Entry entry : ((Map)sortedRanking.get()).entrySet()) {
                brokerCount += ((Set)entry.getValue()).size();
            }
            Assert.assertEquals((int)brokerCount, (int)5);
            TopicName topicName = TopicName.get((String)"persistent://pulsar/use/primary-ns/test-topic");
            ResourceUnit found = (ResourceUnit)((LoadManager)this.pulsarServices[i].getLoadManager().get()).getLeastLoaded((ServiceUnitId)this.pulsarServices[i].getNamespaceService().getBundle(topicName)).get();
            Assert.assertNotNull((Object)found);
        }
    }

    @Test
    public void testUpdateLoadReportAndCheckUpdatedRanking() throws Exception {
        int i;
        for (i = 0; i < 5; ++i) {
            LoadReport lr = new LoadReport();
            lr.setName(this.lookupAddresses[i]);
            SystemResourceUsage sru = new SystemResourceUsage();
            sru.setBandwidthIn(new ResourceUsage(256.0, 1024000.0));
            sru.setBandwidthOut(new ResourceUsage(250.0, 1024000.0));
            sru.setMemory(new ResourceUsage(1024.0, 8192.0));
            sru.setCpu(new ResourceUsage(5.0, 400.0));
            lr.setSystemResourceUsage(sru);
            Whitebox.setInternalState(this.pulsarServices[0].getLoadManager().get(), (String)"lastLoadReport", (Object)lr);
            ResourceLock lock = (ResourceLock)Whitebox.getInternalState(this.pulsarServices[i].getLoadManager().get(), (String)"brokerLock");
            lock.updateValue((Object)lr).join();
        }
        for (i = 0; i < 5; ++i) {
            Method updateRanking = Whitebox.getMethod(SimpleLoadManagerImpl.class, (String)"updateRanking", (Class[])new Class[0]);
            updateRanking.invoke(this.pulsarServices[0].getLoadManager().get(), new Object[0]);
        }
        int totalNamespaces = 200;
        HashMap<String, Integer> namespaceOwner = new HashMap<String, Integer>();
        for (int i2 = 0; i2 < totalNamespaces; ++i2) {
            TopicName topicName = TopicName.get((String)("persistent://pulsar/use/primary-ns-" + i2 + "/test-topic"));
            ResourceUnit found = (ResourceUnit)((LoadManager)this.pulsarServices[0].getLoadManager().get()).getLeastLoaded((ServiceUnitId)this.pulsarServices[0].getNamespaceService().getBundle(topicName)).get();
            if (namespaceOwner.containsKey(found.getResourceId())) {
                namespaceOwner.put(found.getResourceId(), (Integer)namespaceOwner.get(found.getResourceId()) + 1);
                continue;
            }
            namespaceOwner.put(found.getResourceId(), 1);
        }
        int averageNamespaces = totalNamespaces / 5;
        int tenPercentOfAverageNamespaces = averageNamespaces / 10;
        int lowerBound = averageNamespaces - tenPercentOfAverageNamespaces;
        int upperBound = averageNamespaces + tenPercentOfAverageNamespaces;
        for (Map.Entry broker : namespaceOwner.entrySet()) {
            log.info("Count of bundles assigned: {}, {} -- lower-bound: {} - upper-bound: {} ", new Object[]{broker.getKey(), broker.getValue(), lowerBound, upperBound});
            Assert.assertTrue(((Integer)broker.getValue() >= lowerBound && (Integer)broker.getValue() <= upperBound ? 1 : 0) != 0);
        }
    }

    private AtomicReference<Map<Long, Set<ResourceUnit>>> getSortedRanking(PulsarService pulsar) throws NoSuchFieldException, IllegalAccessException {
        Field ranking = ((SimpleLoadManagerImpl)pulsar.getLoadManager().get()).getClass().getDeclaredField("sortedRankings");
        ranking.setAccessible(true);
        AtomicReference sortedRanking = (AtomicReference)ranking.get(pulsar.getLoadManager().get());
        return sortedRanking;
    }

    private void printSortedRanking(AtomicReference<Map<Long, Set<ResourceUnit>>> sortedRanking) {
        log.info("Sorted Ranking Result:");
        sortedRanking.get().forEach((score, rus) -> {
            for (ResourceUnit ru : rus) {
                log.info("  - {}, {}", (Object)ru.getResourceId(), score);
            }
        });
    }

    @Test(timeOut=30000L)
    public void testBrokerRanking() throws Exception {
        int i;
        for (i = 0; i < 5; ++i) {
            LoadReport lr = new LoadReport();
            lr.setName(this.lookupAddresses[i]);
            SystemResourceUsage sru = new SystemResourceUsage();
            sru.setBandwidthIn(new ResourceUsage(0.0, 1024000.0));
            sru.setBandwidthOut(new ResourceUsage(0.0, 1024000.0));
            sru.setMemory(new ResourceUsage(1024.0, (double)(2048 * (i + 1))));
            sru.setCpu(new ResourceUsage(60.0, 400.0));
            lr.setSystemResourceUsage(sru);
            ResourceLock lock = (ResourceLock)Whitebox.getInternalState(this.pulsarServices[i].getLoadManager().get(), (String)"brokerLock");
            lock.updateValue((Object)lr).join();
        }
        for (i = 0; i < 5; ++i) {
            Method updateRanking = Whitebox.getMethod(SimpleLoadManagerImpl.class, (String)"updateRanking", (Class[])new Class[0]);
            updateRanking.invoke(this.pulsarServices[i].getLoadManager().get(), new Object[0]);
        }
        for (i = 0; i < 5; ++i) {
            AtomicReference<Map<Long, Set<ResourceUnit>>> sortedRanking = this.getSortedRanking(this.pulsarServices[i]);
            this.printSortedRanking(sortedRanking);
            Assert.assertEquals((int)sortedRanking.get().get(50L).size(), (int)1);
            Assert.assertEquals((int)sortedRanking.get().get(25L).size(), (int)1);
            Assert.assertEquals((int)sortedRanking.get().get(16L).size(), (int)1);
        }
    }

    @Test
    public void testTopicAssignmentWithExistingBundles() throws Exception {
        int i;
        for (i = 0; i < 5; ++i) {
            ResourceQuota defaultQuota = new ResourceQuota();
            defaultQuota.setMsgRateIn(20.0);
            defaultQuota.setMsgRateOut(60.0);
            defaultQuota.setBandwidthIn(20000.0);
            defaultQuota.setBandwidthOut(60000.0);
            defaultQuota.setMemory(87.0);
            this.pulsarServices[i].getLocalZkCacheService().getResourceQuotaCache().setDefaultQuota(defaultQuota);
            LoadReport lr = new LoadReport();
            lr.setName(this.lookupAddresses[i]);
            SystemResourceUsage sru = new SystemResourceUsage();
            sru.setBandwidthIn(new ResourceUsage(0.0, 1024000.0));
            sru.setBandwidthOut(new ResourceUsage(0.0, 1024000.0));
            sru.setMemory(new ResourceUsage(0.0, (double)(2048 * (i + 1))));
            sru.setCpu(new ResourceUsage(60.0, 400.0));
            lr.setSystemResourceUsage(sru);
            HashMap<String, NamespaceBundleStats> bundleStats = new HashMap<String, NamespaceBundleStats>();
            for (int j = 0; j < (i + 1) * 5; ++j) {
                String bundleName = String.format("pulsar/use/primary-ns-%d-%d/0x00000000_0xffffffff", i, j);
                NamespaceBundleStats stats = new NamespaceBundleStats();
                bundleStats.put(bundleName, stats);
            }
            lr.setBundleStats(bundleStats);
            Whitebox.setInternalState(this.pulsarServices[0].getLoadManager().get(), (String)"lastLoadReport", (Object)lr);
            ResourceLock lock = (ResourceLock)Whitebox.getInternalState(this.pulsarServices[i].getLoadManager().get(), (String)"brokerLock");
            lock.updateValue((Object)lr).join();
        }
        for (i = 0; i < 5; ++i) {
            Method updateRanking = Whitebox.getMethod(SimpleLoadManagerImpl.class, (String)"updateRanking", (Class[])new Class[0]);
            updateRanking.invoke(this.pulsarServices[0].getLoadManager().get(), new Object[0]);
        }
        for (i = 0; i < 5; ++i) {
            AtomicReference<Map<Long, Set<ResourceUnit>>> sortedRanking = this.getSortedRanking(this.pulsarServices[i]);
            this.printSortedRanking(sortedRanking);
        }
        int totalNamespaces = 250;
        int[] expectedAssignments = new int[]{17, 34, 51, 68, 85};
        HashMap<String, Integer> namespaceOwner = new HashMap<String, Integer>();
        for (int i2 = 0; i2 < totalNamespaces; ++i2) {
            TopicName topicName = TopicName.get((String)("persistent://pulsar/use/primary-ns-" + i2 + "/test-topic"));
            ResourceUnit found = (ResourceUnit)((LoadManager)this.pulsarServices[0].getLoadManager().get()).getLeastLoaded((ServiceUnitId)this.pulsarServices[0].getNamespaceService().getBundle(topicName)).get();
            if (namespaceOwner.containsKey(found.getResourceId())) {
                namespaceOwner.put(found.getResourceId(), (Integer)namespaceOwner.get(found.getResourceId()) + 1);
                continue;
            }
            namespaceOwner.put(found.getResourceId(), 1);
        }
        double expectedMaxVariation = 10.0;
        for (int i3 = 0; i3 < 5; ++i3) {
            long actualValue = 0L;
            String resourceId = "http://" + this.lookupAddresses[i3];
            if (namespaceOwner.containsKey(resourceId)) {
                actualValue = ((Integer)namespaceOwner.get(resourceId)).intValue();
            }
            long expectedValue = expectedAssignments[i3];
            double variation = (double)Math.abs(actualValue - expectedValue) * 100.0 / (double)expectedValue;
            log.info("Topic assignment - {}, actual: {}, expected baseline: {}, variation: {}/%", new Object[]{this.lookupAddresses[i3], actualValue, expectedValue, String.format("%.2f", variation)});
            Assert.assertTrue((variation < expectedMaxVariation ? 1 : 0) != 0);
        }
    }

    private AtomicReference<Map<String, ResourceQuota>> getRealtimeResourceQuota(PulsarService pulsar) throws NoSuchFieldException, IllegalAccessException {
        Field quotasField = ((SimpleLoadManagerImpl)pulsar.getLoadManager().get()).getClass().getDeclaredField("realtimeResourceQuotas");
        quotasField.setAccessible(true);
        AtomicReference realtimeResourceQuotas = (AtomicReference)quotasField.get(pulsar.getLoadManager().get());
        return realtimeResourceQuotas;
    }

    private void printResourceQuotas(Map<String, ResourceQuota> resourceQuotas) throws Exception {
        log.info("Realtime Resource Quota:");
        for (Map.Entry<String, ResourceQuota> entry : resourceQuotas.entrySet()) {
            String quotaStr = ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)entry.getValue());
            log.info(" {}, {}", (Object)entry.getKey(), (Object)quotaStr);
        }
    }

    private void writeLoadReportsForDynamicQuota(long timestamp) throws Exception {
        for (int i = 0; i < 5; ++i) {
            LoadReport lr = new LoadReport();
            lr.setName(this.lookupAddresses[i]);
            lr.setTimestamp(timestamp);
            SystemResourceUsage sru = new SystemResourceUsage();
            sru.setBandwidthIn(new ResourceUsage((double)(5000 * (10 + i * 5)), 1024000.0));
            sru.setBandwidthOut(new ResourceUsage((double)(15000 * (10 + i * 5)), 1024000.0));
            sru.setMemory(new ResourceUsage((double)(25 * (10 + i * 5)), (double)(2048 * (i + 1))));
            sru.setCpu(new ResourceUsage(200.0, 400.0));
            lr.setSystemResourceUsage(sru);
            HashMap<String, NamespaceBundleStats> bundleStats = new HashMap<String, NamespaceBundleStats>();
            for (int j = 0; j < 5; ++j) {
                String bundleName = String.format("pulsar/use/primary-ns-%d-%d/0x00000000_0xffffffff", i, j);
                NamespaceBundleStats stats = new NamespaceBundleStats();
                stats.msgRateIn = 5 * (i + j);
                stats.msgRateOut = 15 * (i + j);
                stats.msgThroughputIn = 5000 * (i + j);
                stats.msgThroughputOut = 15000 * (i + j);
                stats.topics = 25L * (long)(i + j);
                stats.consumerCount = 50 * (i + j);
                stats.producerCount = 50 * (i + j);
                bundleStats.put(bundleName, stats);
            }
            lr.setBundleStats(bundleStats);
            ResourceLock lock = (ResourceLock)Whitebox.getInternalState(this.pulsarServices[i].getLoadManager().get(), (String)"brokerLock");
            lock.updateValue((Object)lr).join();
        }
    }

    private void verifyBundleResourceQuota(ResourceQuota quota, double expMsgRateIn, double expMsgRateOut, double expBandwidthIn, double expBandwidthOut, double expMemory) {
        Assert.assertTrue((Math.abs(quota.getMsgRateIn() - expMsgRateIn) < 1.0 ? 1 : 0) != 0);
        Assert.assertTrue((Math.abs(quota.getMsgRateOut() - expMsgRateOut) < 1.0 ? 1 : 0) != 0);
        Assert.assertTrue((Math.abs(quota.getBandwidthIn() - expBandwidthIn) < 1.0 ? 1 : 0) != 0);
        Assert.assertTrue((Math.abs(quota.getBandwidthOut() - expBandwidthOut) < 1.0 ? 1 : 0) != 0);
        Assert.assertTrue((Math.abs(quota.getMemory() - expMemory) < 1.0 ? 1 : 0) != 0);
    }

    @Test
    public void testDynamicNamespaceBundleQuota() throws Exception {
        Map<String, ResourceQuota> quotas;
        int i;
        long startTime = System.currentTimeMillis();
        for (i = 0; i < 5; ++i) {
            ResourceQuota defaultQuota = new ResourceQuota();
            defaultQuota.setMsgRateIn(20.0);
            defaultQuota.setMsgRateOut(60.0);
            defaultQuota.setBandwidthIn(20000.0);
            defaultQuota.setBandwidthOut(60000.0);
            defaultQuota.setMemory(75.0);
            this.pulsarServices[i].getLocalZkCacheService().getResourceQuotaCache().setDefaultQuota(defaultQuota);
        }
        this.writeLoadReportsForDynamicQuota(startTime);
        Thread.sleep(5000L);
        this.writeLoadReportsForDynamicQuota(startTime + SimpleLoadManagerImpl.RESOURCE_QUOTA_GO_UP_TIMEWINDOW);
        Thread.sleep(5000L);
        for (i = 0; i < 5; ++i) {
            quotas = this.getRealtimeResourceQuota(this.pulsarServices[i]).get();
            this.printResourceQuotas(quotas);
            this.verifyBundleResourceQuota(quotas.get("pulsar/use/primary-ns-0-0/0x00000000_0xffffffff"), 19.0, 58.0, 19791.0, 58958.0, 74.0);
            this.verifyBundleResourceQuota(quotas.get("pulsar/use/primary-ns-2-2/0x00000000_0xffffffff"), 20.0, 60.0, 20000.0, 60000.0, 100.0);
            this.verifyBundleResourceQuota(quotas.get("pulsar/use/primary-ns-4-4/0x00000000_0xffffffff"), 40.0, 120.0, 40000.0, 120000.0, 150.0);
        }
        this.writeLoadReportsForDynamicQuota(startTime + SimpleLoadManagerImpl.RESOURCE_QUOTA_GO_DOWN_TIMEWINDOW);
        Thread.sleep(5000L);
        for (i = 0; i < 5; ++i) {
            quotas = this.getRealtimeResourceQuota(this.pulsarServices[i]).get();
            this.printResourceQuotas(quotas);
            this.verifyBundleResourceQuota(quotas.get("pulsar/use/primary-ns-0-0/0x00000000_0xffffffff"), 5.0, 6.0, 10203.0, 11019.0, 50.0);
            this.verifyBundleResourceQuota(quotas.get("pulsar/use/primary-ns-2-2/0x00000000_0xffffffff"), 20.0, 60.0, 20000.0, 60000.0, 100.0);
            this.verifyBundleResourceQuota(quotas.get("pulsar/use/primary-ns-4-4/0x00000000_0xffffffff"), 40.0, 120.0, 40000.0, 120000.0, 150.0);
        }
    }

    private NamespaceBundleStats newBundleStats(long topics, int producers, int consumers, double msgRateIn, double msgRateOut, double throughputIn, double throughputOut) {
        NamespaceBundleStats stats = new NamespaceBundleStats();
        stats.topics = topics;
        stats.producerCount = producers;
        stats.consumerCount = consumers;
        stats.msgRateIn = msgRateIn;
        stats.msgRateOut = msgRateOut;
        stats.msgThroughputIn = throughputIn;
        stats.msgThroughputOut = throughputOut;
        return stats;
    }

    private BundlesData getBundles(int numBundles) {
        Long maxVal = 0x100000000L;
        Long segSize = maxVal / (long)numBundles;
        ArrayList partitions = Lists.newArrayList();
        partitions.add(String.format("0x%08x", 0L));
        Long curPartition = segSize;
        for (int i = 0; i < numBundles; ++i) {
            if (i != numBundles - 1) {
                partitions.add(String.format("0x%08x", curPartition));
            } else {
                partitions.add(String.format("0x%08x", maxVal - 1L));
            }
            curPartition = curPartition + segSize;
        }
        return BundlesData.builder().boundaries((List)partitions).numBundles(partitions.size() - 1).build();
    }

    private void createNamespace(PulsarService pulsar, String namespace, int numBundles) throws Exception {
        Policies policies = new Policies();
        policies.bundles = this.getBundles(numBundles);
        String path = AdminResource.path((String[])new String[]{"policies", namespace});
        pulsar.getPulsarResources().getNamespaceResources().create(path, (Object)policies);
    }

    @Test
    public void testNamespaceBundleAutoSplit() throws Exception {
        int maxBundles = this.pulsarServices[0].getConfiguration().getLoadBalancerNamespaceMaximumBundles();
        long maxTopics = this.pulsarServices[0].getConfiguration().getLoadBalancerNamespaceBundleMaxTopics();
        int maxSessions = this.pulsarServices[0].getConfiguration().getLoadBalancerNamespaceBundleMaxSessions();
        long maxMsgRate = this.pulsarServices[0].getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate();
        long maxBandwidth = (long)this.pulsarServices[0].getConfiguration().getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * 0x100000L;
        this.pulsarServices[0].getConfiguration().setLoadBalancerAutoBundleSplitEnabled(true);
        for (int i = 1; i <= 10; ++i) {
            int numBundles = i == 10 ? maxBundles : 2;
            this.createNamespace(this.pulsarServices[0], String.format("pulsar/use/primary-ns-%02d", i), numBundles);
        }
        NamespacesImpl namespaceAdmin = (NamespacesImpl)Mockito.mock(NamespacesImpl.class);
        Whitebox.setInternalState((Object)this.pulsarServices[0].getAdminClient(), (String)"namespaces", (Object)namespaceAdmin);
        LoadReport lr = new LoadReport();
        lr.setName(this.lookupAddresses[0]);
        lr.setSystemResourceUsage(new SystemResourceUsage());
        HashMap<String, NamespaceBundleStats> bundleStats = new HashMap<String, NamespaceBundleStats>();
        bundleStats.put("pulsar/use/primary-ns-01/0x00000000_0x80000000", this.newBundleStats(maxTopics + 1L, 0, 0, 0.0, 0.0, 0.0, 0.0));
        bundleStats.put("pulsar/use/primary-ns-02/0x00000000_0x80000000", this.newBundleStats(2L, maxSessions + 1, 0, 0.0, 0.0, 0.0, 0.0));
        bundleStats.put("pulsar/use/primary-ns-03/0x00000000_0x80000000", this.newBundleStats(2L, 0, maxSessions + 1, 0.0, 0.0, 0.0, 0.0));
        bundleStats.put("pulsar/use/primary-ns-04/0x00000000_0x80000000", this.newBundleStats(2L, 0, 0, maxMsgRate + 1L, 0.0, 0.0, 0.0));
        bundleStats.put("pulsar/use/primary-ns-05/0x00000000_0x80000000", this.newBundleStats(2L, 0, 0, 0.0, maxMsgRate + 1L, 0.0, 0.0));
        bundleStats.put("pulsar/use/primary-ns-06/0x00000000_0x80000000", this.newBundleStats(2L, 0, 0, 0.0, 0.0, maxBandwidth + 1L, 0.0));
        bundleStats.put("pulsar/use/primary-ns-07/0x00000000_0x80000000", this.newBundleStats(2L, 0, 0, 0.0, 0.0, 0.0, maxBandwidth + 1L));
        bundleStats.put("pulsar/use/primary-ns-08/0x00000000_0x80000000", this.newBundleStats(maxTopics - 1L, maxSessions - 1, 1, maxMsgRate - 1L, 1.0, maxBandwidth - 1L, 1.0));
        bundleStats.put("pulsar/use/primary-ns-09/0x00000000_0x80000000", this.newBundleStats(1L, 0, 0, 0.0, 0.0, 0.0, maxBandwidth + 1L));
        bundleStats.put("pulsar/use/primary-ns-10/0x00000000_0x02000000", this.newBundleStats(maxTopics + 1L, 0, 0, 0.0, 0.0, 0.0, 0.0));
        lr.setBundleStats(bundleStats);
        Whitebox.setInternalState(this.pulsarServices[0].getLoadManager().get(), (String)"lastLoadReport", (Object)lr);
        ResourceLock lock = (ResourceLock)Whitebox.getInternalState(this.pulsarServices[0].getLoadManager().get(), (String)"brokerLock");
        lock.updateValue((Object)lr).join();
        Thread.sleep(5000L);
        ((LoadManager)this.pulsarServices[0].getLoadManager().get()).doNamespaceBundleSplit();
        boolean isAutoUnooadSplitBundleEnabled = this.pulsarServices[0].getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled();
        ((NamespacesImpl)Mockito.verify((Object)namespaceAdmin, (VerificationMode)Mockito.times((int)1))).splitNamespaceBundle("pulsar/use/primary-ns-01", "0x00000000_0x80000000", isAutoUnooadSplitBundleEnabled, null);
        ((NamespacesImpl)Mockito.verify((Object)namespaceAdmin, (VerificationMode)Mockito.times((int)1))).splitNamespaceBundle("pulsar/use/primary-ns-02", "0x00000000_0x80000000", isAutoUnooadSplitBundleEnabled, null);
        ((NamespacesImpl)Mockito.verify((Object)namespaceAdmin, (VerificationMode)Mockito.times((int)1))).splitNamespaceBundle("pulsar/use/primary-ns-03", "0x00000000_0x80000000", isAutoUnooadSplitBundleEnabled, null);
        ((NamespacesImpl)Mockito.verify((Object)namespaceAdmin, (VerificationMode)Mockito.times((int)1))).splitNamespaceBundle("pulsar/use/primary-ns-04", "0x00000000_0x80000000", isAutoUnooadSplitBundleEnabled, null);
        ((NamespacesImpl)Mockito.verify((Object)namespaceAdmin, (VerificationMode)Mockito.times((int)1))).splitNamespaceBundle("pulsar/use/primary-ns-05", "0x00000000_0x80000000", isAutoUnooadSplitBundleEnabled, null);
        ((NamespacesImpl)Mockito.verify((Object)namespaceAdmin, (VerificationMode)Mockito.times((int)1))).splitNamespaceBundle("pulsar/use/primary-ns-06", "0x00000000_0x80000000", isAutoUnooadSplitBundleEnabled, null);
        ((NamespacesImpl)Mockito.verify((Object)namespaceAdmin, (VerificationMode)Mockito.times((int)1))).splitNamespaceBundle("pulsar/use/primary-ns-07", "0x00000000_0x80000000", isAutoUnooadSplitBundleEnabled, null);
        ((NamespacesImpl)Mockito.verify((Object)namespaceAdmin, (VerificationMode)Mockito.never())).splitNamespaceBundle("pulsar/use/primary-ns-08", "0x00000000_0x80000000", isAutoUnooadSplitBundleEnabled, null);
        ((NamespacesImpl)Mockito.verify((Object)namespaceAdmin, (VerificationMode)Mockito.never())).splitNamespaceBundle("pulsar/use/primary-ns-09", "0x00000000_0x80000000", isAutoUnooadSplitBundleEnabled, null);
        ((NamespacesImpl)Mockito.verify((Object)namespaceAdmin, (VerificationMode)Mockito.never())).splitNamespaceBundle("pulsar/use/primary-ns-10", "0x00000000_0x02000000", isAutoUnooadSplitBundleEnabled, null);
    }

    @Test
    public void testLeaderElection() throws Exception {
        PulsarService[] allServices = new PulsarService[this.pulsarServices.length];
        System.arraycopy(this.pulsarServices, 0, allServices, 0, this.pulsarServices.length);
        for (int i = 0; i < 4; ++i) {
            ArrayList<PulsarService> activePulsar = new ArrayList<PulsarService>();
            ArrayList<PulsarService> followerPulsar = new ArrayList<PulsarService>();
            LeaderBroker oldLeader = null;
            PulsarService leaderPulsar = null;
            for (int j = 0; j < 5; ++j) {
                if (allServices[j].getState() == PulsarService.State.Closed) continue;
                activePulsar.add(allServices[j]);
                LeaderElectionService les = allServices[j].getLeaderElectionService();
                if (les.isLeader()) {
                    oldLeader = (LeaderBroker)les.getCurrentLeader().get();
                    leaderPulsar = allServices[j];
                    this.pulsarServices[i] = null;
                    continue;
                }
                followerPulsar.add(allServices[j]);
            }
            log.info("Old leader is : {}", (Object)oldLeader.getServiceUrl());
            for (PulsarService pulsar : activePulsar) {
                log.info("Current leader for {} is : {}", (Object)pulsar.getWebServiceAddress(), (Object)pulsar.getLeaderElectionService().getCurrentLeader());
                Assert.assertEquals(pulsar.getLeaderElectionService().readCurrentLeader().join(), Optional.of(oldLeader));
            }
            leaderPulsar.close();
            this.loopUntilLeaderChangesForAllBroker(followerPulsar, oldLeader);
            LeaderBroker newLeader = (LeaderBroker)((Optional)((PulsarService)followerPulsar.get(0)).getLeaderElectionService().readCurrentLeader().join()).get();
            log.info("New leader is : {}", (Object)newLeader.getServiceUrl());
            Assert.assertNotEquals((Object)newLeader, (Object)oldLeader);
        }
    }

    private void createNamespacePolicies(PulsarService pulsar) throws Exception {
        NamespaceIsolationPolicies policies = new NamespaceIsolationPolicies();
        HashMap<String, String> parameters = new HashMap<String, String>();
        parameters.put("min_limit", "1");
        parameters.put("usage_threshold", "100");
        ArrayList<String> allBrokers = new ArrayList<String>();
        for (int i = 0; i < 5; ++i) {
            allBrokers.add(this.pulsarServices[i].getAdvertisedAddress());
        }
        NamespaceIsolationData policyData = NamespaceIsolationData.builder().namespaces(Collections.singletonList("pulsar/use/primary-ns.*")).primary(allBrokers).secondary(Collections.emptyList()).autoFailoverPolicy(AutoFailoverPolicyData.builder().policyType(AutoFailoverPolicyType.min_available).parameters(parameters).build()).build();
        policies.setPolicy("primaryBrokerPolicy", policyData);
        ArrayList<String> allExceptFirstBroker = new ArrayList<String>();
        for (int i = 1; i < 5; ++i) {
            allExceptFirstBroker.add(this.pulsarServices[i].getAdvertisedAddress());
        }
        policyData = NamespaceIsolationData.builder().namespaces(Collections.singletonList("pulsar/use/secondary-ns.*")).primary(Collections.singletonList(this.pulsarServices[0].getWebServiceAddress())).secondary(allExceptFirstBroker).autoFailoverPolicy(AutoFailoverPolicyData.builder().policyType(AutoFailoverPolicyType.min_available).parameters(parameters).build()).build();
        policies.setPolicy("secondaryBrokerPolicy", policyData);
        policyData = NamespaceIsolationData.builder().namespaces(Collections.singletonList("pulsar/use/shared-ns.*")).primary(Collections.singletonList(this.pulsarServices[0].getWebServiceAddress())).secondary(allExceptFirstBroker).autoFailoverPolicy(AutoFailoverPolicyData.builder().policyType(AutoFailoverPolicyType.min_available).parameters(parameters).build()).build();
        policies.setPolicy("otherBrokerPolicy", policyData);
        String path = AdminResource.path((String[])new String[]{"clusters", "use", "namespaceIsolationPolicies"});
        try {
            pulsar.getPulsarResources().getNamespaceResources().getIsolationPolicies().create(path, (Object)policies.getPolicies());
        }
        catch (MetadataStoreException.BadVersionException e) {
            pulsar.getPulsarResources().getNamespaceResources().getIsolationPolicies().set(path, data -> policies.getPolicies());
        }
    }

    private PulsarResourceDescription createResourceDescription(long memoryInMB, long cpuPercentage, long bandwidthInMbps, long bandwidthOutInMbps, long threads) {
        long KB = 1024L;
        long MB = 1024L * KB;
        long GB = 1024L * MB;
        PulsarResourceDescription rd = new PulsarResourceDescription();
        rd.put("memory", new ResourceUsage((double)memoryInMB, (double)(4L * GB)));
        rd.put("cpu", new ResourceUsage((double)cpuPercentage, 100.0));
        rd.put("bandwidthIn", new ResourceUsage((double)(bandwidthInMbps * MB), (double)GB));
        rd.put("bandwidthOut", new ResourceUsage((double)(bandwidthOutInMbps * MB), (double)GB));
        return rd;
    }
}

