/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TimeAverageMessageData;
import org.apache.pulsar.broker.loadbalance.LoadBalancerTestingUtils;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class ModularLoadManagerImplTest {
    private static final Logger log = LoggerFactory.getLogger(ModularLoadManagerImplTest.class);
    private LocalBookkeeperEnsemble bkEnsemble;
    private URL url1;
    private PulsarService pulsar1;
    private PulsarAdmin admin1;
    private URL url2;
    private PulsarService pulsar2;
    private PulsarAdmin admin2;
    private String primaryHost;
    private String secondaryHost;
    private NamespaceBundleFactory nsFactory;
    private ModularLoadManagerImpl primaryLoadManager;
    private ModularLoadManagerImpl secondaryLoadManager;
    private ExecutorService executor;

    private Object invokeSimpleMethod(Object instance, String methodName, Object ... args) throws Exception {
        for (Method method : instance.getClass().getDeclaredMethods()) {
            if (!method.getName().equals(methodName)) continue;
            method.setAccessible(true);
            return method.invoke(instance, args);
        }
        throw new IllegalArgumentException("Method not found: " + methodName);
    }

    private static Object getField(Object instance, String fieldName) throws Exception {
        Field field = instance.getClass().getDeclaredField(fieldName);
        field.setAccessible(true);
        return field.get(instance);
    }

    private static void setField(Object instance, String fieldName, Object value) throws Exception {
        Field field = instance.getClass().getDeclaredField(fieldName);
        field.setAccessible(true);
        field.set(instance, value);
    }

    @BeforeMethod
    void setup() throws Exception {
        this.executor = new ThreadPoolExecutor(1, 20, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
        this.bkEnsemble.start();
        ServiceConfiguration config1 = new ServiceConfiguration();
        config1.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
        config1.setClusterName("use");
        config1.setWebServicePort(Optional.of(0));
        config1.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
        config1.setAdvertisedAddress("localhost");
        config1.setBrokerShutdownTimeoutMs(0L);
        config1.setBrokerServicePort(Optional.of(0));
        config1.setBrokerServicePortTls(Optional.of(0));
        config1.setWebServicePortTls(Optional.of(0));
        this.pulsar1 = new PulsarService(config1);
        this.pulsar1.start();
        this.primaryHost = String.format("%s:%d", "localhost", this.pulsar1.getListenPortHTTP().get());
        this.url1 = new URL(this.pulsar1.getWebServiceAddress());
        this.admin1 = PulsarAdmin.builder().serviceHttpUrl(this.url1.toString()).build();
        ServiceConfiguration config2 = new ServiceConfiguration();
        config2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
        config2.setClusterName("use");
        config2.setWebServicePort(Optional.of(0));
        config2.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
        config2.setAdvertisedAddress("localhost");
        config2.setBrokerShutdownTimeoutMs(0L);
        config2.setBrokerServicePort(Optional.of(0));
        config2.setBrokerServicePortTls(Optional.of(0));
        config2.setWebServicePortTls(Optional.of(0));
        this.pulsar2 = new PulsarService(config2);
        this.pulsar2.start();
        this.secondaryHost = String.format("%s:%d", "localhost", this.pulsar2.getListenPortHTTP().get());
        this.url2 = new URL(this.pulsar2.getWebServiceAddress());
        this.admin2 = PulsarAdmin.builder().serviceHttpUrl(this.url2.toString()).build();
        this.primaryLoadManager = (ModularLoadManagerImpl)ModularLoadManagerImplTest.getField(this.pulsar1.getLoadManager().get(), "loadManager");
        this.secondaryLoadManager = (ModularLoadManagerImpl)ModularLoadManagerImplTest.getField(this.pulsar2.getLoadManager().get(), "loadManager");
        this.nsFactory = new NamespaceBundleFactory(this.pulsar1, Hashing.crc32());
        Thread.sleep(100L);
    }

    @AfterMethod(alwaysRun=true)
    void shutdown() throws Exception {
        log.info("--- Shutting down ---");
        this.executor.shutdownNow();
        this.admin1.close();
        this.admin2.close();
        this.pulsar2.close();
        this.pulsar1.close();
        this.bkEnsemble.stop();
    }

    private NamespaceBundle makeBundle(String property, String cluster, String namespace) {
        return this.nsFactory.getBundle(NamespaceName.get((String)property, (String)cluster, (String)namespace), Range.range((Comparable)NamespaceBundles.FULL_LOWER_BOUND, (BoundType)BoundType.CLOSED, (Comparable)NamespaceBundles.FULL_UPPER_BOUND, (BoundType)BoundType.CLOSED));
    }

    private NamespaceBundle makeBundle(String all) {
        return this.makeBundle(all, all, all);
    }

    private String mockBundleName(int i) {
        return String.format("%d/%d/%d/0x00000000_0xffffffff", i, i, i);
    }

    @Test(enabled=false)
    public void testCandidateConsistency() throws Exception {
        boolean foundFirst = false;
        boolean foundSecond = false;
        for (int i = 0; i < 2; ++i) {
            NamespaceBundle serviceUnit = this.makeBundle(Integer.toString(i));
            String broker = (String)this.primaryLoadManager.selectBrokerForAssignment((ServiceUnitId)serviceUnit).get();
            if (broker.equals(this.primaryHost)) {
                foundFirst = true;
                continue;
            }
            foundSecond = true;
        }
        Assert.assertTrue((boolean)foundFirst);
        Assert.assertTrue((boolean)foundSecond);
        this.secondaryLoadManager.disableBroker();
        LoadData loadData = (LoadData)ModularLoadManagerImplTest.getField(this.primaryLoadManager, "loadData");
        Thread.sleep(500L);
        Assert.assertFalse((boolean)loadData.getBrokerData().containsKey(this.secondaryHost));
        for (int i = 2; i < 7; ++i) {
            NamespaceBundle serviceUnit = this.makeBundle(Integer.toString(i));
            Assert.assertEquals((Object)this.primaryLoadManager.selectBrokerForAssignment((ServiceUnitId)serviceUnit), (Object)this.primaryHost);
        }
    }

    @Test(enabled=false)
    public void testEvenBundleDistribution() throws Exception {
        NamespaceBundle[] bundles = LoadBalancerTestingUtils.makeBundles(this.nsFactory, "test", "test", "test", 16);
        int numAssignedToPrimary = 0;
        int numAssignedToSecondary = 0;
        BundleData bundleData = new BundleData(10, 1000);
        TimeAverageMessageData longTermMessageData = new TimeAverageMessageData(1000);
        longTermMessageData.setMsgRateIn(1000.0);
        bundleData.setLongTermData(longTermMessageData);
        String firstBundleDataPath = String.format("%s/%s", "/loadbalance/bundle-data", bundles[0]);
        this.pulsar1.getLocalMetadataStore().getMetadataCache(BundleData.class).create(firstBundleDataPath, (Object)bundleData).join();
        for (NamespaceBundle bundle : bundles) {
            if (this.primaryLoadManager.selectBrokerForAssignment((ServiceUnitId)bundle).equals(this.primaryHost)) {
                ++numAssignedToPrimary;
            } else {
                ++numAssignedToSecondary;
            }
            if ((numAssignedToPrimary + numAssignedToSecondary) % 2 != 0) continue;
            Assert.assertEquals((int)numAssignedToPrimary, (int)numAssignedToSecondary);
        }
    }

    @Test
    public void testMaxTopicDistributionToBroker() throws Exception {
        int totalBundles = 50;
        NamespaceBundle[] bundles = LoadBalancerTestingUtils.makeBundles(this.nsFactory, "test", "test", "test", 50);
        BundleData bundleData = new BundleData(10, 1000);
        bundleData.setTopics(this.pulsar1.getConfiguration().getLoadBalancerBrokerMaxTopics() + 10);
        TimeAverageMessageData longTermMessageData = new TimeAverageMessageData(1000);
        longTermMessageData.setMsgRateIn(1000.0);
        bundleData.setLongTermData(longTermMessageData);
        String firstBundleDataPath = String.format("%s/%s", "/loadbalance/bundle-data", bundles[0]);
        this.pulsar1.getLocalMetadataStore().getMetadataCache(BundleData.class).create(firstBundleDataPath, (Object)bundleData).join();
        String maxTopicOwnedBroker = (String)this.primaryLoadManager.selectBrokerForAssignment((ServiceUnitId)bundles[0]).get();
        for (int i = 1; i < 50; ++i) {
            Assert.assertNotEquals((Object)this.primaryLoadManager.selectBrokerForAssignment((ServiceUnitId)bundles[i]), (Object)maxTopicOwnedBroker);
        }
    }

    @Test
    public void testLoadShedding() throws Exception {
        NamespaceBundleStats stats1 = new NamespaceBundleStats();
        NamespaceBundleStats stats2 = new NamespaceBundleStats();
        stats1.msgRateIn = 100.0;
        stats2.msgRateIn = 200.0;
        ConcurrentHashMap<String, NamespaceBundleStats> statsMap = new ConcurrentHashMap<String, NamespaceBundleStats>();
        statsMap.put(this.mockBundleName(1), stats1);
        statsMap.put(this.mockBundleName(2), stats2);
        LocalBrokerData localBrokerData = new LocalBrokerData();
        localBrokerData.update(new SystemResourceUsage(), statsMap);
        Namespaces namespacesSpy1 = (Namespaces)Mockito.spy((Object)this.pulsar1.getAdminClient().namespaces());
        AtomicReference bundleReference = new AtomicReference();
        ((Namespaces)Mockito.doAnswer(invocation -> {
            bundleReference.set(invocation.getArguments()[0].toString() + '/' + invocation.getArguments()[1]);
            return null;
        }).when((Object)namespacesSpy1)).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());
        ModularLoadManagerImplTest.setField(this.pulsar1.getAdminClient(), "namespaces", namespacesSpy1);
        this.pulsar1.getConfiguration().setLoadBalancerEnabled(true);
        LoadData loadData = (LoadData)ModularLoadManagerImplTest.getField(this.primaryLoadManager, "loadData");
        Map brokerDataMap = loadData.getBrokerData();
        BrokerData brokerDataSpy1 = (BrokerData)Mockito.spy(brokerDataMap.get(this.primaryHost));
        Mockito.when((Object)brokerDataSpy1.getLocalData()).thenReturn((Object)localBrokerData);
        brokerDataMap.put(this.primaryHost, brokerDataSpy1);
        this.primaryLoadManager.accept(new Notification(NotificationType.Created, "/loadbalance/brokers/broker:8080"));
        Thread.sleep(100L);
        localBrokerData.setCpu(new ResourceUsage(80.0, 100.0));
        this.primaryLoadManager.doLoadShedding();
        ((Namespaces)Mockito.verify((Object)namespacesSpy1, (VerificationMode)Mockito.times((int)0))).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());
        localBrokerData.getCpu().usage = 90.0;
        this.primaryLoadManager.doLoadShedding();
        ((Namespaces)Mockito.verify((Object)namespacesSpy1, (VerificationMode)Mockito.times((int)1))).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());
        Assert.assertEquals((String)((String)bundleReference.get()), (String)this.mockBundleName(2));
        this.primaryLoadManager.doLoadShedding();
        ((Namespaces)Mockito.verify((Object)namespacesSpy1, (VerificationMode)Mockito.times((int)2))).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());
        Assert.assertEquals((String)((String)bundleReference.get()), (String)this.mockBundleName(1));
        this.primaryLoadManager.doLoadShedding();
        ((Namespaces)Mockito.verify((Object)namespacesSpy1, (VerificationMode)Mockito.times((int)2))).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());
    }

    @Test
    public void testNeedBrokerDataUpdate() throws Exception {
        LocalBrokerData lastData = new LocalBrokerData();
        LocalBrokerData currentData = new LocalBrokerData();
        ServiceConfiguration conf = this.pulsar1.getConfiguration();
        conf.setLoadBalancerReportUpdateThresholdPercentage(5);
        ModularLoadManagerImpl loadManager = new ModularLoadManagerImpl();
        ModularLoadManagerImplTest.setField(loadManager, "lastData", lastData);
        ModularLoadManagerImplTest.setField(loadManager, "localData", currentData);
        ModularLoadManagerImplTest.setField(loadManager, "conf", conf);
        Supplier<Boolean> needUpdate = () -> {
            try {
                return (Boolean)this.invokeSimpleMethod(loadManager, "needBrokerDataUpdate", new Object[0]);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
        lastData.setMsgRateIn(100.0);
        currentData.setMsgRateIn(104.0);
        assert (!needUpdate.get().booleanValue());
        currentData.setMsgRateIn(105.1);
        assert (needUpdate.get().booleanValue());
        currentData.setMsgRateIn(94.0);
        assert (needUpdate.get().booleanValue());
        currentData.setMsgRateIn(95.1);
        assert (!needUpdate.get().booleanValue());
        lastData.setMsgRateIn(0.0);
        currentData.setMsgRateIn(1.0E-8);
        assert (needUpdate.get().booleanValue());
        lastData.setMsgRateIn(1.0E-8);
        currentData.setMsgRateIn(0.0);
        assert (needUpdate.get().booleanValue());
        currentData.setMsgRateIn(0.0);
        lastData.setMsgRateIn(0.0);
        assert (!needUpdate.get().booleanValue());
        lastData.getCpu().usage = 100.0;
        lastData.getCpu().limit = 1000.0;
        currentData.getCpu().usage = 106.0;
        currentData.getCpu().limit = 1000.0;
        assert (!needUpdate.get().booleanValue());
        lastData.getCpu().usage = 100.0;
        lastData.getCpu().limit = 1000.0;
        currentData.getCpu().usage = 206.0;
        currentData.getCpu().limit = 1000.0;
        assert (needUpdate.get().booleanValue());
        lastData.setCpu(new ResourceUsage());
        currentData.setCpu(new ResourceUsage());
        lastData.setMsgThroughputIn(100.0);
        currentData.setMsgThroughputIn(106.0);
        assert (needUpdate.get().booleanValue());
        currentData.setMsgThroughputIn(100.0);
        lastData.setNumBundles(100);
        currentData.setNumBundles(106);
        assert (needUpdate.get().booleanValue());
        currentData.setNumBundles(100);
        assert (!needUpdate.get().booleanValue());
    }

    @Test
    public void testBrokerStopCacheUpdate() throws Exception {
        ModularLoadManagerWrapper loadManagerWrapper = (ModularLoadManagerWrapper)this.pulsar1.getLoadManager().get();
        ModularLoadManagerImpl lm = (ModularLoadManagerImpl)Whitebox.getInternalState((Object)loadManagerWrapper, (String)"loadManager");
        Assert.assertEquals((int)lm.getAvailableBrokers().size(), (int)2);
        this.pulsar2.close();
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)lm.getAvailableBrokers().size(), (int)1));
    }

    @Test
    public void testNamespaceIsolationPoliciesForPrimaryAndSecondaryBrokers() throws Exception {
        String tenant = "my-property";
        String cluster = "use";
        String namespace = "my-ns";
        String broker1Address = this.pulsar1.getAdvertisedAddress() + "0";
        String broker2Address = this.pulsar2.getAdvertisedAddress() + "1";
        String sharedBroker = "broker3";
        this.admin1.clusters().createCluster("use", new ClusterData("http://" + this.pulsar1.getAdvertisedAddress()));
        this.admin1.tenants().createTenant("my-property", new TenantInfo((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2"}), (Set)Sets.newHashSet((Object[])new String[]{"use"})));
        this.admin1.namespaces().createNamespace("my-property/use/my-ns");
        String newPolicyJsonTemplate = "{\"namespaces\":[\"%s/%s/%s.*\"],\"primary\":[\"%s\"],\"secondary\":[\"%s\"],\"auto_failover_policy\":{\"policy_type\":\"min_available\",\"parameters\":{\"min_limit\":%s,\"usage_threshold\":80}}}";
        String newPolicyJson = String.format(newPolicyJsonTemplate, "my-property", "use", "my-ns", broker1Address, broker2Address, 1);
        String newPolicyName = "my-ns-isolation-policies";
        ObjectMapper jsonMapper = ObjectMapperFactory.create();
        NamespaceIsolationData nsPolicyData = (NamespaceIsolationData)jsonMapper.readValue(newPolicyJson.getBytes(), NamespaceIsolationData.class);
        this.admin1.clusters().createNamespaceIsolationPolicy("use", newPolicyName, nsPolicyData);
        SimpleResourceAllocationPolicies simpleResourceAllocationPolicies = new SimpleResourceAllocationPolicies(this.pulsar1);
        NamespaceBundle serviceUnit = LoadBalancerTestingUtils.makeBundles(this.nsFactory, "my-property", "use", "my-ns", 1)[0];
        LoadManagerShared.BrokerTopicLoadingPredicate brokerTopicLoadingPredicate = new LoadManagerShared.BrokerTopicLoadingPredicate(){

            public boolean isEnablePersistentTopics(String brokerUrl) {
                return true;
            }

            public boolean isEnableNonPersistentTopics(String brokerUrl) {
                return true;
            }
        };
        HashSet brokerCandidateCache = Sets.newHashSet();
        HashSet availableBrokers = Sets.newHashSet((Object[])new String[]{"broker3", broker1Address, broker2Address});
        LoadManagerShared.applyNamespacePolicies((ServiceUnitId)serviceUnit, (SimpleResourceAllocationPolicies)simpleResourceAllocationPolicies, (Set)brokerCandidateCache, (Set)availableBrokers, (LoadManagerShared.BrokerTopicLoadingPredicate)brokerTopicLoadingPredicate);
        Assert.assertEquals((int)brokerCandidateCache.size(), (int)1);
        Assert.assertTrue((boolean)brokerCandidateCache.contains(broker1Address));
        brokerCandidateCache = Sets.newHashSet();
        availableBrokers = Sets.newHashSet((Object[])new String[]{"broker3", broker2Address});
        LoadManagerShared.applyNamespacePolicies((ServiceUnitId)serviceUnit, (SimpleResourceAllocationPolicies)simpleResourceAllocationPolicies, (Set)brokerCandidateCache, (Set)availableBrokers, (LoadManagerShared.BrokerTopicLoadingPredicate)brokerTopicLoadingPredicate);
        Assert.assertEquals((int)brokerCandidateCache.size(), (int)1);
        Assert.assertTrue((boolean)brokerCandidateCache.contains(broker2Address));
        brokerCandidateCache = Sets.newHashSet();
        availableBrokers = Sets.newHashSet((Object[])new String[]{"broker3"});
        LoadManagerShared.applyNamespacePolicies((ServiceUnitId)serviceUnit, (SimpleResourceAllocationPolicies)simpleResourceAllocationPolicies, (Set)brokerCandidateCache, (Set)availableBrokers, (LoadManagerShared.BrokerTopicLoadingPredicate)brokerTopicLoadingPredicate);
        Assert.assertEquals((int)brokerCandidateCache.size(), (int)0);
        newPolicyJson = String.format(newPolicyJsonTemplate, "my-property", "use", "my-ns", broker1Address, broker2Address, 2);
        nsPolicyData = (NamespaceIsolationData)jsonMapper.readValue(newPolicyJson.getBytes(), NamespaceIsolationData.class);
        this.admin1.clusters().createNamespaceIsolationPolicy("use", newPolicyName, nsPolicyData);
        brokerCandidateCache = Sets.newHashSet();
        availableBrokers = Sets.newHashSet((Object[])new String[]{"broker3", broker1Address, broker2Address});
        LoadManagerShared.applyNamespacePolicies((ServiceUnitId)serviceUnit, (SimpleResourceAllocationPolicies)simpleResourceAllocationPolicies, (Set)brokerCandidateCache, (Set)availableBrokers, (LoadManagerShared.BrokerTopicLoadingPredicate)brokerTopicLoadingPredicate);
        Assert.assertEquals((int)brokerCandidateCache.size(), (int)2);
        Assert.assertTrue((boolean)brokerCandidateCache.contains(broker1Address));
        Assert.assertTrue((boolean)brokerCandidateCache.contains(broker2Address));
        brokerCandidateCache = Sets.newHashSet();
        availableBrokers = Sets.newHashSet((Object[])new String[]{"broker3", broker2Address});
        LoadManagerShared.applyNamespacePolicies((ServiceUnitId)serviceUnit, (SimpleResourceAllocationPolicies)simpleResourceAllocationPolicies, (Set)brokerCandidateCache, (Set)availableBrokers, (LoadManagerShared.BrokerTopicLoadingPredicate)brokerTopicLoadingPredicate);
        Assert.assertEquals((int)brokerCandidateCache.size(), (int)1);
        Assert.assertTrue((boolean)brokerCandidateCache.contains(broker2Address));
        brokerCandidateCache = Sets.newHashSet();
        availableBrokers = Sets.newHashSet((Object[])new String[]{"broker3"});
        LoadManagerShared.applyNamespacePolicies((ServiceUnitId)serviceUnit, (SimpleResourceAllocationPolicies)simpleResourceAllocationPolicies, (Set)brokerCandidateCache, (Set)availableBrokers, (LoadManagerShared.BrokerTopicLoadingPredicate)brokerTopicLoadingPredicate);
        Assert.assertEquals((int)brokerCandidateCache.size(), (int)0);
    }

    @Test
    public void testOwnBrokerZnodeByMultipleBroker() throws Exception {
        ServiceConfiguration config = new ServiceConfiguration();
        config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
        config.setClusterName("use");
        config.setWebServicePort(Optional.of(0));
        config.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
        config.setBrokerShutdownTimeoutMs(0L);
        config.setBrokerServicePort(Optional.of(0));
        PulsarService pulsar = new PulsarService(config);
        String brokerZnode = "/loadbalance/brokers/" + pulsar.getAdvertisedAddress() + ":" + config.getWebServicePort();
        this.pulsar1.getLocalMetadataStore().put(brokerZnode, new byte[0], Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join();
        try {
            pulsar.start();
        }
        catch (PulsarServerException pulsarServerException) {
            // empty catch block
        }
        pulsar.close();
    }
}

