package org.apache.pulsar.broker.loadbalance.extensions;

import com.google.common.collect.Sets;
import java.net.URL;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.namespace.NamespaceBundleSplitListener;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.TableViewImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.class */
public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
    private static final Logger log = LoggerFactory.getLogger(ExtensibleLoadManagerImplTest.class);
    private PulsarService pulsar1;
    private PulsarService pulsar2;
    private PulsarTestContext additionalPulsarTestContext;
    private ExtensibleLoadManagerImpl primaryLoadManager;
    private ExtensibleLoadManagerImpl secondaryLoadManager;
    private ServiceUnitStateChannelImpl channel1;
    private ServiceUnitStateChannelImpl channel2;
    private final String defaultTestNamespace = "public/test";

    /* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest$MockBrokerFilter.class */
    private static abstract class MockBrokerFilter implements BrokerFilter {
        private MockBrokerFilter() {
        }

        public String name() {
            return "Mock-broker-filter";
        }
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    public void setup() throws Exception {
        this.conf.setForceDeleteNamespaceAllowed(true);
        this.conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
        this.conf.setAllowAutoTopicCreation(true);
        this.conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
        this.conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
        this.conf.setLoadBalancerSheddingEnabled(false);
        this.conf.setLoadBalancerDebugModeEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(false);
        super.internalSetup(this.conf);
        this.pulsar1 = this.pulsar;
        ServiceConfiguration defaultConf = getDefaultConf();
        defaultConf.setAllowAutoTopicCreation(true);
        defaultConf.setForceDeleteNamespaceAllowed(true);
        defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
        defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
        defaultConf.setLoadBalancerSheddingEnabled(false);
        defaultConf.setTopicLevelPoliciesEnabled(false);
        this.additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf);
        this.pulsar2 = this.additionalPulsarTestContext.getPulsarService();
        setPrimaryLoadManager();
        setSecondaryLoadManager();
        this.admin.clusters().createCluster(this.conf.getClusterName(), ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("public", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2"}), Sets.newHashSet(new String[]{this.conf.getClusterName()})));
        this.admin.namespaces().createNamespace("public/default");
        this.admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet(new String[]{this.conf.getClusterName()}));
        this.admin.namespaces().createNamespace("public/test");
        this.admin.namespaces().setNamespaceReplicationClusters("public/test", Sets.newHashSet(new String[]{this.conf.getClusterName()}));
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass
    protected void cleanup() throws Exception {
        this.pulsar1 = null;
        this.pulsar2.close();
        super.internalCleanup();
        this.additionalPulsarTestContext.close();
    }

    @BeforeMethod(alwaysRun = true)
    protected void initializeState() throws PulsarAdminException {
        this.admin.namespaces().unload("public/test");
        Mockito.reset(new ExtensibleLoadManagerImpl[]{this.primaryLoadManager, this.secondaryLoadManager});
    }

    @Test
    public void testAssignInternalTopic() throws Exception {
        Optional optional = (Optional) this.primaryLoadManager.assign(Optional.of(TopicName.get(ServiceUnitStateChannelImpl.TOPIC)), getBundleAsync(this.pulsar1, TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get()).get();
        Assert.assertEquals(optional, (Optional) this.secondaryLoadManager.assign(Optional.of(TopicName.get(ServiceUnitStateChannelImpl.TOPIC)), getBundleAsync(this.pulsar1, TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get()).get());
        Assert.assertTrue(optional.isPresent());
        Optional currentLeader = ((LeaderElectionService) FieldUtils.readField(this.channel1, "leaderElectionService", true)).getCurrentLeader();
        Assert.assertTrue(currentLeader.isPresent());
        Assert.assertEquals(((BrokerLookupData) optional.get()).getWebServiceUrl(), ((LeaderBroker) currentLeader.get()).getServiceUrl());
    }

    @Test
    public void testAssign() throws Exception {
        TopicName topicName = TopicName.get("public/test/test-assign");
        NamespaceBundle namespaceBundle = getBundleAsync(this.pulsar1, topicName).get();
        Optional optional = (Optional) this.primaryLoadManager.assign(Optional.empty(), namespaceBundle).get();
        Assert.assertTrue(optional.isPresent());
        log.info("Assign the bundle {} to {}", namespaceBundle, optional);
        Assert.assertEquals(optional, (Optional) this.secondaryLoadManager.assign(Optional.empty(), namespaceBundle).get());
        ((ExtensibleLoadManagerImpl) Mockito.verify(this.primaryLoadManager, Mockito.times(1))).getBrokerSelectionStrategy();
        ((ExtensibleLoadManagerImpl) Mockito.verify(this.secondaryLoadManager, Mockito.times(0))).getBrokerSelectionStrategy();
        Optional optional2 = (Optional) this.pulsar2.getNamespaceService().getBrokerServiceUrlAsync(topicName, (LookupOptions) null).get();
        Assert.assertTrue(optional2.isPresent());
        Assert.assertEquals(((LookupResult) optional2.get()).getLookupData().getHttpUrl(), ((BrokerLookupData) optional.get()).getWebServiceUrl());
        Optional webServiceUrl = this.pulsar2.getNamespaceService().getWebServiceUrl(namespaceBundle, LookupOptions.builder().requestHttps(false).build());
        Assert.assertTrue(webServiceUrl.isPresent());
        Assert.assertEquals(((URL) webServiceUrl.get()).toString(), ((BrokerLookupData) optional.get()).getWebServiceUrl());
    }

    @Test
    public void testCheckOwnershipAsync() throws Exception {
        NamespaceBundle namespaceBundle = getBundleAsync(this.pulsar1, TopicName.get("public/test/test-check-ownership")).get();
        Assert.assertFalse(((Boolean) this.primaryLoadManager.checkOwnershipAsync(Optional.empty(), namespaceBundle).get()).booleanValue());
        Assert.assertFalse(((Boolean) this.secondaryLoadManager.checkOwnershipAsync(Optional.empty(), namespaceBundle).get()).booleanValue());
        Optional optional = (Optional) this.primaryLoadManager.assign(Optional.empty(), namespaceBundle).get();
        Assert.assertTrue(optional.isPresent());
        if (((BrokerLookupData) optional.get()).getPulsarServiceUrl().equals(this.pulsar1.getBrokerServiceUrl())) {
            Assert.assertTrue(((Boolean) this.primaryLoadManager.checkOwnershipAsync(Optional.empty(), namespaceBundle).get()).booleanValue());
            Assert.assertFalse(((Boolean) this.secondaryLoadManager.checkOwnershipAsync(Optional.empty(), namespaceBundle).get()).booleanValue());
        } else {
            Assert.assertFalse(((Boolean) this.primaryLoadManager.checkOwnershipAsync(Optional.empty(), namespaceBundle).get()).booleanValue());
            Assert.assertTrue(((Boolean) this.secondaryLoadManager.checkOwnershipAsync(Optional.empty(), namespaceBundle).get()).booleanValue());
        }
    }

    @Test
    public void testFilter() throws Exception {
        NamespaceBundle namespaceBundle = getBundleAsync(this.pulsar1, TopicName.get("public/test/test-filter")).get();
        ((ExtensibleLoadManagerImpl) Mockito.doReturn(List.of(new BrokerFilter() { // from class: org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImplTest.1
            public String name() {
                return "Mock broker filter";
            }

            public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String, BrokerLookupData> map, ServiceUnitId serviceUnitId, LoadManagerContext loadManagerContext) {
                map.remove(ExtensibleLoadManagerImplTest.this.pulsar1.getLookupServiceAddress());
                return CompletableFuture.completedFuture(map);
            }
        })).when(this.primaryLoadManager)).getBrokerFilterPipeline();
        Optional optional = (Optional) this.primaryLoadManager.assign(Optional.empty(), namespaceBundle).get();
        Assert.assertTrue(optional.isPresent());
        Assert.assertEquals(((BrokerLookupData) optional.get()).getWebServiceUrl(), this.pulsar2.getWebServiceAddress());
    }

    @Test
    public void testFilterHasException() throws Exception {
        NamespaceBundle namespaceBundle = getBundleAsync(this.pulsar1, TopicName.get("public/test/test-filter-has-exception")).get();
        ((ExtensibleLoadManagerImpl) Mockito.doReturn(List.of(new MockBrokerFilter() { // from class: org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImplTest.2
            public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String, BrokerLookupData> map, ServiceUnitId serviceUnitId, LoadManagerContext loadManagerContext) {
                map.remove(map.keySet().iterator().next());
                return FutureUtil.failedFuture(new BrokerFilterException("Test"));
            }
        })).when(this.primaryLoadManager)).getBrokerFilterPipeline();
        Assert.assertTrue(((Optional) this.primaryLoadManager.assign(Optional.empty(), namespaceBundle).get()).isPresent());
    }

    @Test(timeOut = 30000)
    public void testUnloadAdminAPI() throws Exception {
        String brokerServiceUrl;
        TopicName topicName = TopicName.get("public/test/test-unload");
        final NamespaceBundle namespaceBundle = getBundleAsync(this.pulsar1, topicName).get();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        NamespaceBundleOwnershipListener namespaceBundleOwnershipListener = new NamespaceBundleOwnershipListener() { // from class: org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImplTest.3
            public void onLoad(NamespaceBundle namespaceBundle2) {
                atomicInteger.incrementAndGet();
            }

            public void unLoad(NamespaceBundle namespaceBundle2) {
                atomicInteger2.incrementAndGet();
            }

            public boolean test(NamespaceBundle namespaceBundle2) {
                return namespaceBundle2.equals(namespaceBundle);
            }
        };
        this.pulsar1.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener[]{namespaceBundleOwnershipListener});
        this.pulsar2.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener[]{namespaceBundleOwnershipListener});
        String lookupTopic = this.admin.lookups().lookupTopic(topicName.toString());
        log.info("Assign the bundle {} to {}", namespaceBundle, lookupTopic);
        checkOwnershipState(lookupTopic, namespaceBundle);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(atomicInteger.get(), 1);
            Assert.assertEquals(atomicInteger2.get(), 0);
        });
        this.admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), namespaceBundle.getBundleRange());
        Assert.assertFalse(((Boolean) this.primaryLoadManager.checkOwnershipAsync(Optional.empty(), namespaceBundle).get()).booleanValue());
        Assert.assertFalse(((Boolean) this.secondaryLoadManager.checkOwnershipAsync(Optional.empty(), namespaceBundle).get()).booleanValue());
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(atomicInteger.get(), 1);
            Assert.assertEquals(atomicInteger2.get(), 1);
        });
        String lookupTopic2 = this.admin.lookups().lookupTopic(topicName.toString());
        log.info("Assign the bundle {} to {}", namespaceBundle, lookupTopic2);
        Awaitility.await().untilAsserted(() -> {
            checkOwnershipState(lookupTopic2, namespaceBundle);
            Assert.assertEquals(atomicInteger.get(), 2);
            Assert.assertEquals(atomicInteger2.get(), 1);
        });
        String lookupServiceAddress = this.pulsar1.getLookupServiceAddress();
        if (lookupTopic2.equals(this.pulsar1.getBrokerServiceUrl())) {
            lookupServiceAddress = this.pulsar2.getLookupServiceAddress();
            brokerServiceUrl = this.pulsar2.getBrokerServiceUrl();
        } else {
            brokerServiceUrl = this.pulsar1.getBrokerServiceUrl();
        }
        checkOwnershipState(lookupTopic2, namespaceBundle);
        this.admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), namespaceBundle.getBundleRange(), lookupServiceAddress);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(atomicInteger.get(), 3);
            Assert.assertEquals(atomicInteger2.get(), 2);
        });
        Assert.assertEquals(this.admin.lookups().lookupTopic(topicName.toString()), brokerServiceUrl);
        try {
            this.admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), namespaceBundle.getBundleRange(), lookupServiceAddress);
            Assert.fail();
        } catch (PulsarAdminException e) {
            Assert.assertTrue(e.getMessage().contains("cannot be transfer to same broker"));
        }
    }

    private void checkOwnershipState(String str, NamespaceBundle namespaceBundle) throws ExecutionException, InterruptedException {
        ExtensibleLoadManagerImpl extensibleLoadManagerImpl = this.secondaryLoadManager;
        ExtensibleLoadManagerImpl extensibleLoadManagerImpl2 = this.primaryLoadManager;
        if (str.equals(this.pulsar1.getBrokerServiceUrl())) {
            extensibleLoadManagerImpl = this.primaryLoadManager;
            extensibleLoadManagerImpl2 = this.secondaryLoadManager;
        }
        Assert.assertTrue(((Boolean) extensibleLoadManagerImpl.checkOwnershipAsync(Optional.empty(), namespaceBundle).get()).booleanValue());
        Assert.assertFalse(((Boolean) extensibleLoadManagerImpl2.checkOwnershipAsync(Optional.empty(), namespaceBundle).get()).booleanValue());
    }

    @Test(timeOut = 30000)
    public void testSplitBundleAdminAPI() throws Exception {
        final String str = "public/test";
        this.admin.topics().createPartitionedTopic("persistent://" + "public/test" + "/test-split", 10);
        BundlesData bundles = this.admin.namespaces().getBundles("public/test");
        int numBundles = bundles.getNumBundles();
        final List list = bundles.getBoundaries().stream().map(Long::decode).sorted().toList();
        String str2 = list.get(0) + "_" + list.get(1);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        NamespaceBundleSplitListener namespaceBundleSplitListener = new NamespaceBundleSplitListener() { // from class: org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImplTest.4
            public void onSplit(NamespaceBundle namespaceBundle) {
                atomicInteger.incrementAndGet();
            }

            public boolean test(NamespaceBundle namespaceBundle) {
                return namespaceBundle.toString().equals(String.format(str + "/0x%08x_0x%08x", list.get(0), list.get(1)));
            }
        };
        this.pulsar1.getNamespaceService().addNamespaceBundleSplitListener(new NamespaceBundleSplitListener[]{namespaceBundleSplitListener});
        this.pulsar2.getNamespaceService().addNamespaceBundleSplitListener(new NamespaceBundleSplitListener[]{namespaceBundleSplitListener});
        long longValue = ((Long) list.get(0)).longValue() + ((((Long) list.get(1)).longValue() - ((Long) list.get(0)).longValue()) / 2);
        this.admin.namespaces().splitNamespaceBundle("public/test", str2, true, (String) null);
        BundlesData bundles2 = this.admin.namespaces().getBundles("public/test");
        Assert.assertEquals(bundles2.getNumBundles(), numBundles + 1);
        String format = String.format("0x%08x", list.get(0));
        String format2 = String.format("0x%08x", Long.valueOf(longValue));
        String format3 = String.format("0x%08x", list.get(1));
        Assert.assertTrue(bundles2.getBoundaries().contains(format));
        Assert.assertTrue(bundles2.getBoundaries().contains(format2));
        Assert.assertTrue(bundles2.getBoundaries().contains(format3));
        Assert.assertEquals(atomicInteger.get(), 1);
        try {
            this.admin.namespaces().splitNamespaceBundle("public/test", "invalid", true, (String) null);
            Assert.fail();
        } catch (PulsarAdminException e) {
            Assert.assertTrue(e.getMessage().contains("Invalid bundle range"));
        }
    }

    @Test(timeOut = 30000)
    public void testSplitBundleWithSpecificPositionAdminAPI() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://" + "public/test" + "/test-split-with-specific-position", 10);
        BundlesData bundles = this.admin.namespaces().getBundles("public/test");
        int numBundles = bundles.getNumBundles();
        List list = bundles.getBoundaries().stream().map(Long::decode).sorted().toList();
        String str = list.get(0) + "_" + list.get(1);
        long longValue = ((Long) list.get(0)).longValue() + ((((Long) list.get(1)).longValue() - ((Long) list.get(0)).longValue()) / 2) + 100;
        this.admin.namespaces().splitNamespaceBundle("public/test", str, true, "specified_positions_divide", List.of((Long) list.get(0), (Long) list.get(1), Long.valueOf(longValue)));
        BundlesData bundles2 = this.admin.namespaces().getBundles("public/test");
        Assert.assertEquals(bundles2.getNumBundles(), numBundles + 1);
        String format = String.format("0x%08x", list.get(0));
        String format2 = String.format("0x%08x", Long.valueOf(longValue));
        String format3 = String.format("0x%08x", list.get(1));
        Assert.assertTrue(bundles2.getBoundaries().contains(format));
        Assert.assertTrue(bundles2.getBoundaries().contains(format2));
        Assert.assertTrue(bundles2.getBoundaries().contains(format3));
    }

    @Test(timeOut = 30000)
    public void testDeleteNamespaceBundle() throws Exception {
        this.admin.namespaces().createNamespace("public/testDeleteNamespaceBundle", 3);
        TopicName topicName = TopicName.get("public/testDeleteNamespaceBundle/test-delete-namespace-bundle");
        NamespaceBundle namespaceBundle = getBundleAsync(this.pulsar1, topicName).get();
        String lookupTopic = this.admin.lookups().lookupTopic(topicName.toString());
        log.info("Assign the bundle {} to {}", namespaceBundle, lookupTopic);
        checkOwnershipState(lookupTopic, namespaceBundle);
        this.admin.namespaces().deleteNamespaceBundle(topicName.getNamespace(), namespaceBundle.getBundleRange());
        Assert.assertFalse(((Boolean) this.primaryLoadManager.checkOwnershipAsync(Optional.empty(), namespaceBundle).get()).booleanValue());
    }

    @Test(timeOut = 30000)
    public void testDeleteNamespace() throws Exception {
        TopicName topicName = TopicName.get("public/test-delete-namespace" + "/test-delete-namespace-topic");
        this.admin.namespaces().createNamespace("public/test-delete-namespace");
        this.admin.namespaces().setNamespaceReplicationClusters("public/test-delete-namespace", Sets.newHashSet(new String[]{this.conf.getClusterName()}));
        Assert.assertTrue(this.admin.namespaces().getNamespaces("public").contains("public/test-delete-namespace"));
        this.admin.topics().createPartitionedTopic(topicName.toString(), 2);
        this.admin.lookups().lookupTopic(topicName.toString());
        NamespaceBundle namespaceBundle = getBundleAsync(this.pulsar1, topicName).get();
        try {
            this.admin.namespaces().deleteNamespaceBundle("public/test-delete-namespace", namespaceBundle.getBundleRange());
            Assert.fail();
        } catch (Exception e) {
            Assert.assertTrue(e.getMessage().contains("Cannot delete non empty bundle"));
        }
        this.admin.namespaces().deleteNamespaceBundle("public/test-delete-namespace", namespaceBundle.getBundleRange(), true);
        this.admin.lookups().lookupTopic(topicName.toString());
        this.admin.namespaces().deleteNamespace("public/test-delete-namespace", true);
        Assert.assertFalse(this.admin.namespaces().getNamespaces("public").contains("public/test-delete-namespace"));
    }

    @Test(timeOut = 30000)
    public void testCheckOwnershipPresentWithSystemNamespace() throws Exception {
        try {
            this.pulsar1.getNamespaceService().checkOwnershipPresent(getBundleAsync(this.pulsar1, TopicName.get(NamespaceName.SYSTEM_NAMESPACE + "/test")).get());
        } catch (Exception e) {
            log.info("Got exception", e);
            Assert.assertTrue(e.getCause() instanceof UnsupportedOperationException);
        }
    }

    @Test
    public void testMoreThenOneFilter() throws Exception {
        NamespaceBundle namespaceBundle = getBundleAsync(this.pulsar1, TopicName.get("public/test/test-filter-has-exception")).get();
        final String lookupServiceAddress = this.pulsar1.getLookupServiceAddress();
        ((ExtensibleLoadManagerImpl) Mockito.doReturn(List.of(new MockBrokerFilter() { // from class: org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImplTest.5
            public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String, BrokerLookupData> map, ServiceUnitId serviceUnitId, LoadManagerContext loadManagerContext) {
                map.remove(lookupServiceAddress);
                return CompletableFuture.completedFuture(map);
            }
        }, new MockBrokerFilter() { // from class: org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImplTest.6
            public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String, BrokerLookupData> map, ServiceUnitId serviceUnitId, LoadManagerContext loadManagerContext) {
                return FutureUtil.failedFuture(new BrokerFilterException("Test"));
            }
        })).when(this.primaryLoadManager)).getBrokerFilterPipeline();
        Optional optional = (Optional) this.primaryLoadManager.assign(Optional.empty(), namespaceBundle).get();
        Assert.assertTrue(optional.isPresent());
        Assert.assertEquals(((BrokerLookupData) optional.get()).getWebServiceUrl(), this.pulsar2.getWebServiceAddress());
    }

    @Test
    public void testDeployAndRollbackLoadManager() throws Exception {
        ServiceConfiguration defaultConf = getDefaultConf();
        defaultConf.setAllowAutoTopicCreation(true);
        defaultConf.setForceDeleteNamespaceAllowed(true);
        defaultConf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
        defaultConf.setLoadBalancerSheddingEnabled(false);
        PulsarTestContext createAdditionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf);
        try {
            PulsarService pulsarService = createAdditionalPulsarTestContext.getPulsarService();
            String lookupTopic = pulsarService.getAdminClient().lookups().lookupTopic("persistent://public/test/test");
            Assert.assertEquals(lookupTopic, pulsarService.getBrokerServiceUrl());
            String lookupTopic2 = this.pulsar1.getAdminClient().lookups().lookupTopic("persistent://public/test/test");
            String lookupTopic3 = this.pulsar2.getAdminClient().lookups().lookupTopic("persistent://public/test/test");
            Assert.assertEquals(lookupTopic, lookupTopic2);
            Assert.assertEquals(lookupTopic, lookupTopic3);
            NamespaceBundle namespaceBundle = getBundleAsync(this.pulsar1, TopicName.get("persistent://public/test/test")).get();
            LookupOptions build = LookupOptions.builder().authoritative(false).requestHttps(false).readOnly(false).loadTopicsInBundle(false).build();
            Optional webServiceUrl = this.pulsar1.getNamespaceService().getWebServiceUrl(namespaceBundle, build);
            Assert.assertTrue(webServiceUrl.isPresent());
            Assert.assertEquals(((URL) webServiceUrl.get()).toString(), pulsarService.getWebServiceAddress());
            Optional webServiceUrl2 = this.pulsar2.getNamespaceService().getWebServiceUrl(namespaceBundle, build);
            Assert.assertTrue(webServiceUrl2.isPresent());
            Assert.assertEquals(((URL) webServiceUrl2.get()).toString(), ((URL) webServiceUrl.get()).toString());
            Optional webServiceUrl3 = pulsarService.getNamespaceService().getWebServiceUrl(namespaceBundle, build);
            Assert.assertTrue(webServiceUrl3.isPresent());
            Assert.assertEquals(((URL) webServiceUrl3.get()).toString(), ((URL) webServiceUrl.get()).toString());
            ServiceConfiguration defaultConf2 = getDefaultConf();
            defaultConf2.setAllowAutoTopicCreation(true);
            defaultConf2.setForceDeleteNamespaceAllowed(true);
            defaultConf2.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
            defaultConf2.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
            PulsarTestContext createAdditionalPulsarTestContext2 = createAdditionalPulsarTestContext(defaultConf2);
            try {
                PulsarService pulsarService2 = createAdditionalPulsarTestContext2.getPulsarService();
                HashSet newHashSet = Sets.newHashSet(new String[]{this.pulsar1.getBrokerServiceUrl(), this.pulsar2.getBrokerServiceUrl(), pulsarService2.getBrokerServiceUrl()});
                String lookupTopic4 = pulsarService2.getAdminClient().lookups().lookupTopic("persistent://public/test/test");
                Assert.assertTrue(newHashSet.contains(lookupTopic4));
                String lookupTopic5 = this.pulsar1.getAdminClient().lookups().lookupTopic("persistent://public/test/test");
                String lookupTopic6 = this.pulsar2.getAdminClient().lookups().lookupTopic("persistent://public/test/test");
                String lookupTopic7 = pulsarService.getAdminClient().lookups().lookupTopic("persistent://public/test/test");
                Assert.assertEquals(lookupTopic4, lookupTopic5);
                Assert.assertEquals(lookupTopic4, lookupTopic6);
                Assert.assertEquals(lookupTopic4, lookupTopic7);
                HashSet newHashSet2 = Sets.newHashSet(new String[]{this.pulsar1.getWebServiceAddress(), this.pulsar2.getWebServiceAddress(), pulsarService2.getWebServiceAddress()});
                Optional webServiceUrl4 = this.pulsar1.getNamespaceService().getWebServiceUrl(namespaceBundle, build);
                Assert.assertTrue(webServiceUrl4.isPresent());
                Assert.assertTrue(newHashSet2.contains(((URL) webServiceUrl4.get()).toString()));
                Optional webServiceUrl5 = this.pulsar2.getNamespaceService().getWebServiceUrl(namespaceBundle, build);
                Assert.assertTrue(webServiceUrl5.isPresent());
                Assert.assertEquals(((URL) webServiceUrl5.get()).toString(), ((URL) webServiceUrl4.get()).toString());
                Optional webServiceUrl6 = pulsarService.getNamespaceService().getWebServiceUrl(namespaceBundle, build);
                Assert.assertTrue(webServiceUrl6.isPresent());
                Assert.assertTrue(newHashSet2.contains(((URL) webServiceUrl6.get()).toString()));
                Optional webServiceUrl7 = pulsarService2.getNamespaceService().getWebServiceUrl(namespaceBundle, build);
                Assert.assertTrue(webServiceUrl7.isPresent());
                Assert.assertEquals(((URL) webServiceUrl7.get()).toString(), ((URL) webServiceUrl4.get()).toString());
                if (createAdditionalPulsarTestContext2 != null) {
                    createAdditionalPulsarTestContext2.close();
                }
                if (createAdditionalPulsarTestContext != null) {
                    createAdditionalPulsarTestContext.close();
                }
            } catch (Throwable th) {
                if (createAdditionalPulsarTestContext2 != null) {
                    try {
                        createAdditionalPulsarTestContext2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createAdditionalPulsarTestContext != null) {
                try {
                    createAdditionalPulsarTestContext.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void testTopBundlesLoadDataStoreTableViewFromChannelOwner() throws Exception {
        LoadDataStore loadDataStore = (LoadDataStore) FieldUtils.readDeclaredField(this.primaryLoadManager, "topBundlesLoadDataStore", true);
        ServiceUnitStateChannelImpl serviceUnitStateChannelImpl = (ServiceUnitStateChannelImpl) FieldUtils.readDeclaredField(this.primaryLoadManager, "serviceUnitStateChannel", true);
        TableViewImpl tableViewImpl = (TableViewImpl) FieldUtils.readDeclaredField(loadDataStore, "tableView", true);
        LoadDataStore loadDataStore2 = (LoadDataStore) FieldUtils.readDeclaredField(this.secondaryLoadManager, "topBundlesLoadDataStore", true);
        TableViewImpl tableViewImpl2 = (TableViewImpl) FieldUtils.readDeclaredField(loadDataStore2, "tableView", true);
        if (((Boolean) serviceUnitStateChannelImpl.isChannelOwnerAsync().get(5L, TimeUnit.SECONDS)).booleanValue()) {
            Assert.assertNotNull(tableViewImpl);
            Assert.assertNull(tableViewImpl2);
        } else {
            Assert.assertNull(tableViewImpl);
            Assert.assertNotNull(tableViewImpl2);
        }
        restartBroker();
        this.pulsar1 = this.pulsar;
        setPrimaryLoadManager();
        this.admin.namespaces().setNamespaceReplicationClusters("public/test", Sets.newHashSet(new String[]{this.conf.getClusterName()}));
        ServiceUnitStateChannelImpl serviceUnitStateChannelImpl2 = (ServiceUnitStateChannelImpl) FieldUtils.readDeclaredField(this.primaryLoadManager, "serviceUnitStateChannel", true);
        LoadDataStore loadDataStore3 = (LoadDataStore) FieldUtils.readDeclaredField(this.primaryLoadManager, "topBundlesLoadDataStore", true);
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assert.assertFalse(((Boolean) serviceUnitStateChannelImpl2.isChannelOwnerAsync().get(5L, TimeUnit.SECONDS)).booleanValue());
            Assert.assertNotNull(FieldUtils.readDeclaredField(loadDataStore2, "tableView", true));
            Assert.assertNull(FieldUtils.readDeclaredField(loadDataStore3, "tableView", true));
        });
    }

    @Test
    public void testRoleChange() throws Exception {
        LoadDataStore loadDataStore = (LoadDataStore) FieldUtils.readDeclaredField(this.primaryLoadManager, "topBundlesLoadDataStore", true);
        LoadDataStore loadDataStore2 = (LoadDataStore) Mockito.spy(loadDataStore);
        AtomicInteger atomicInteger = new AtomicInteger(3);
        AtomicInteger atomicInteger2 = new AtomicInteger(3);
        ((LoadDataStore) Mockito.doAnswer(invocationOnMock -> {
            if (atomicInteger.decrementAndGet() > 0) {
                throw new RuntimeException();
            }
            Mockito.reset(new Object[0]);
            return null;
        }).when(loadDataStore2)).startTableView();
        ((LoadDataStore) Mockito.doAnswer(invocationOnMock2 -> {
            if (atomicInteger2.decrementAndGet() > 0) {
                throw new RuntimeException();
            }
            Mockito.reset(new Object[0]);
            return null;
        }).when(loadDataStore2)).closeTableView();
        FieldUtils.writeDeclaredField(this.primaryLoadManager, "topBundlesLoadDataStore", loadDataStore2, true);
        LoadDataStore loadDataStore3 = (LoadDataStore) FieldUtils.readDeclaredField(this.secondaryLoadManager, "topBundlesLoadDataStore", true);
        LoadDataStore loadDataStore4 = (LoadDataStore) Mockito.spy(loadDataStore3);
        AtomicInteger atomicInteger3 = new AtomicInteger(3);
        AtomicInteger atomicInteger4 = new AtomicInteger(3);
        ((LoadDataStore) Mockito.doAnswer(invocationOnMock3 -> {
            if (atomicInteger3.decrementAndGet() > 0) {
                throw new RuntimeException();
            }
            Mockito.reset(new Object[0]);
            return null;
        }).when(loadDataStore4)).startTableView();
        ((LoadDataStore) Mockito.doAnswer(invocationOnMock4 -> {
            if (atomicInteger4.decrementAndGet() > 0) {
                throw new RuntimeException();
            }
            Mockito.reset(new Object[0]);
            return null;
        }).when(loadDataStore4)).closeTableView();
        FieldUtils.writeDeclaredField(this.secondaryLoadManager, "topBundlesLoadDataStore", loadDataStore4, true);
        if (((Boolean) this.channel1.isChannelOwnerAsync().get(5L, TimeUnit.SECONDS)).booleanValue()) {
            this.primaryLoadManager.playFollower();
            this.primaryLoadManager.playFollower();
            this.secondaryLoadManager.playLeader();
            this.secondaryLoadManager.playLeader();
            this.primaryLoadManager.playLeader();
            this.primaryLoadManager.playLeader();
            this.secondaryLoadManager.playFollower();
            this.secondaryLoadManager.playFollower();
        } else {
            this.primaryLoadManager.playLeader();
            this.primaryLoadManager.playLeader();
            this.secondaryLoadManager.playFollower();
            this.secondaryLoadManager.playFollower();
            this.primaryLoadManager.playFollower();
            this.primaryLoadManager.playFollower();
            this.secondaryLoadManager.playLeader();
            this.secondaryLoadManager.playLeader();
        }
        ((LoadDataStore) Mockito.verify(loadDataStore2, Mockito.times(3))).startTableView();
        ((LoadDataStore) Mockito.verify(loadDataStore2, Mockito.times(3))).closeTableView();
        ((LoadDataStore) Mockito.verify(loadDataStore4, Mockito.times(3))).startTableView();
        ((LoadDataStore) Mockito.verify(loadDataStore4, Mockito.times(3))).closeTableView();
        FieldUtils.writeDeclaredField(this.primaryLoadManager, "topBundlesLoadDataStore", loadDataStore, true);
        FieldUtils.writeDeclaredField(this.secondaryLoadManager, "topBundlesLoadDataStore", loadDataStore3, true);
    }

    @Test
    public void testGetMetrics() throws Exception {
        BrokerLoadDataReporter brokerLoadDataReporter = (BrokerLoadDataReporter) Mockito.mock(BrokerLoadDataReporter.class);
        FieldUtils.writeDeclaredField(this.primaryLoadManager, "brokerLoadDataReporter", brokerLoadDataReporter, true);
        BrokerLoadData brokerLoadData = new BrokerLoadData();
        SystemResourceUsage systemResourceUsage = new SystemResourceUsage();
        ResourceUsage resourceUsage = new ResourceUsage(1.0d, 100.0d);
        ResourceUsage resourceUsage2 = new ResourceUsage(800.0d, 200.0d);
        ResourceUsage resourceUsage3 = new ResourceUsage(2.0d, 100.0d);
        ResourceUsage resourceUsage4 = new ResourceUsage(3.0d, 100.0d);
        ResourceUsage resourceUsage5 = new ResourceUsage(4.0d, 100.0d);
        systemResourceUsage.setCpu(resourceUsage);
        systemResourceUsage.setMemory(resourceUsage2);
        systemResourceUsage.setDirectMemory(resourceUsage3);
        systemResourceUsage.setBandwidthIn(resourceUsage4);
        systemResourceUsage.setBandwidthOut(resourceUsage5);
        brokerLoadData.update(systemResourceUsage, 1.0d, 2.0d, 3.0d, 4.0d, 5, 6, this.conf);
        ((BrokerLoadDataReporter) Mockito.doReturn(brokerLoadData).when(brokerLoadDataReporter)).generateLoadData();
        AtomicReference atomicReference = (AtomicReference) FieldUtils.readDeclaredField(this.primaryLoadManager, "unloadMetrics", true);
        UnloadCounter unloadCounter = new UnloadCounter();
        FieldUtils.writeDeclaredField(unloadCounter, "unloadBrokerCount", 2L, true);
        FieldUtils.writeDeclaredField(unloadCounter, "unloadBundleCount", 3L, true);
        FieldUtils.writeDeclaredField(unloadCounter, "loadAvg", Double.valueOf(1.5d), true);
        FieldUtils.writeDeclaredField(unloadCounter, "loadStd", Double.valueOf(0.3d), true);
        FieldUtils.writeDeclaredField(unloadCounter, "breakdownCounters", Map.of(UnloadDecision.Label.Success, new LinkedHashMap<Object, Object>() { // from class: org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImplTest.7
            {
                put(UnloadDecision.Reason.Overloaded, new AtomicLong(1L));
                put(UnloadDecision.Reason.Underloaded, new AtomicLong(2L));
            }
        }, UnloadDecision.Label.Skip, new LinkedHashMap<Object, Object>() { // from class: org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImplTest.8
            {
                put(UnloadDecision.Reason.HitCount, new AtomicLong(3L));
                put(UnloadDecision.Reason.NoBundles, new AtomicLong(4L));
                put(UnloadDecision.Reason.CoolDown, new AtomicLong(5L));
                put(UnloadDecision.Reason.OutDatedData, new AtomicLong(6L));
                put(UnloadDecision.Reason.NoLoadData, new AtomicLong(7L));
                put(UnloadDecision.Reason.NoBrokers, new AtomicLong(8L));
                put(UnloadDecision.Reason.Unknown, new AtomicLong(9L));
            }
        }, UnloadDecision.Label.Failure, Map.of(UnloadDecision.Reason.Unknown, new AtomicLong(10L))), true);
        atomicReference.set(unloadCounter.toMetrics(this.pulsar.getAdvertisedAddress()));
        AtomicReference atomicReference2 = (AtomicReference) FieldUtils.readDeclaredField(this.primaryLoadManager, "splitMetrics", true);
        SplitCounter splitCounter = new SplitCounter();
        FieldUtils.writeDeclaredField(splitCounter, "splitCount", 35L, true);
        FieldUtils.writeDeclaredField(splitCounter, "breakdownCounters", Map.of(SplitDecision.Label.Success, Map.of(SplitDecision.Reason.Topics, new AtomicLong(1L), SplitDecision.Reason.Sessions, new AtomicLong(2L), SplitDecision.Reason.MsgRate, new AtomicLong(3L), SplitDecision.Reason.Bandwidth, new AtomicLong(4L), SplitDecision.Reason.Admin, new AtomicLong(5L)), SplitDecision.Label.Failure, Map.of(SplitDecision.Reason.Unknown, new AtomicLong(6L))), true);
        atomicReference2.set(splitCounter.toMetrics(this.pulsar.getAdvertisedAddress()));
        AssignCounter assignCounter = new AssignCounter();
        assignCounter.incrementSuccess();
        assignCounter.incrementFailure();
        assignCounter.incrementFailure();
        assignCounter.incrementSkip();
        assignCounter.incrementSkip();
        assignCounter.incrementSkip();
        FieldUtils.writeDeclaredField(this.primaryLoadManager, "assignCounter", assignCounter, true);
        FieldUtils.writeDeclaredField(this.channel1, "lastOwnedServiceUnitCountAt", Long.valueOf(System.currentTimeMillis()), true);
        FieldUtils.writeDeclaredField(this.channel1, "totalOwnedServiceUnitCnt", 10, true);
        FieldUtils.writeDeclaredField(this.channel1, "totalInactiveBrokerCleanupCnt", 1, true);
        FieldUtils.writeDeclaredField(this.channel1, "totalServiceUnitTombstoneCleanupCnt", 2, true);
        FieldUtils.writeDeclaredField(this.channel1, "totalOrphanServiceUnitCleanupCnt", 3, true);
        FieldUtils.writeDeclaredField(this.channel1, "totalCleanupErrorCnt", new AtomicLong(4L), true);
        FieldUtils.writeDeclaredField(this.channel1, "totalInactiveBrokerCleanupScheduledCnt", 5, true);
        FieldUtils.writeDeclaredField(this.channel1, "totalInactiveBrokerCleanupIgnoredCnt", 6, true);
        FieldUtils.writeDeclaredField(this.channel1, "totalInactiveBrokerCleanupCancelledCnt", 7, true);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        int i = 0;
        for (ServiceUnitState serviceUnitState : ServiceUnitState.values()) {
            linkedHashMap.put(serviceUnitState, new ServiceUnitStateChannelImpl.Counters(new AtomicLong(i + 1), new AtomicLong(i + 2)));
            linkedHashMap2.put(serviceUnitState, new ServiceUnitStateChannelImpl.Counters(new AtomicLong(i + 1), new AtomicLong(i + 2)));
            i += 2;
        }
        int i2 = 0;
        for (ServiceUnitStateChannelImpl.EventType eventType : ServiceUnitStateChannelImpl.EventType.values()) {
            linkedHashMap3.put(eventType, new ServiceUnitStateChannelImpl.Counters(new AtomicLong(i2 + 1), new AtomicLong(i2 + 2)));
            i2 += 2;
        }
        FieldUtils.writeDeclaredField(this.channel1, "ownerLookUpCounters", linkedHashMap, true);
        FieldUtils.writeDeclaredField(this.channel1, "eventCounters", linkedHashMap3, true);
        FieldUtils.writeDeclaredField(this.channel1, "handlerCounters", linkedHashMap2, true);
        Assert.assertEquals((Set) this.primaryLoadManager.getMetrics().stream().map(metrics -> {
            return metrics.toString();
        }).collect(Collectors.toSet()), Set.of((Object[]) "dimensions=[{broker=localhost, metric=loadBalancing}], metrics=[{brk_lb_bandwidth_in_usage=3.0, brk_lb_bandwidth_out_usage=4.0, brk_lb_cpu_usage=1.0, brk_lb_directMemory_usage=2.0, brk_lb_memory_usage=400.0}]\ndimensions=[{broker=localhost, feature=max_ema, metric=loadBalancing}], metrics=[{brk_lb_resource_usage=4.0}]\ndimensions=[{broker=localhost, feature=max, metric=loadBalancing}], metrics=[{brk_lb_resource_usage=0.04}]\ndimensions=[{broker=localhost, metric=bundleUnloading}], metrics=[{brk_lb_unload_broker_total=2, brk_lb_unload_bundle_total=3}]\ndimensions=[{broker=localhost, metric=bundleUnloading, reason=Unknown, result=Failure}], metrics=[{brk_lb_unload_broker_breakdown_total=10}]\ndimensions=[{broker=localhost, metric=bundleUnloading, reason=HitCount, result=Skip}], metrics=[{brk_lb_unload_broker_breakdown_total=3}]\ndimensions=[{broker=localhost, metric=bundleUnloading, reason=NoBundles, result=Skip}], metrics=[{brk_lb_unload_broker_breakdown_total=4}]\ndimensions=[{broker=localhost, metric=bundleUnloading, reason=CoolDown, result=Skip}], metrics=[{brk_lb_unload_broker_breakdown_total=5}]\ndimensions=[{broker=localhost, metric=bundleUnloading, reason=OutDatedData, result=Skip}], metrics=[{brk_lb_unload_broker_breakdown_total=6}]\ndimensions=[{broker=localhost, metric=bundleUnloading, reason=NoLoadData, result=Skip}], metrics=[{brk_lb_unload_broker_breakdown_total=7}]\ndimensions=[{broker=localhost, metric=bundleUnloading, reason=NoBrokers, result=Skip}], metrics=[{brk_lb_unload_broker_breakdown_total=8}]\ndimensions=[{broker=localhost, metric=bundleUnloading, reason=Unknown, result=Skip}], metrics=[{brk_lb_unload_broker_breakdown_total=9}]\ndimensions=[{broker=localhost, metric=bundleUnloading, reason=Overloaded, result=Success}], metrics=[{brk_lb_unload_broker_breakdown_total=1}]\ndimensions=[{broker=localhost, metric=bundleUnloading, reason=Underloaded, result=Success}], metrics=[{brk_lb_unload_broker_breakdown_total=2}]\ndimensions=[{broker=localhost, feature=max_ema, metric=bundleUnloading, stat=avg}], metrics=[{brk_lb_resource_usage_stats=1.5}]\ndimensions=[{broker=localhost, feature=max_ema, metric=bundleUnloading, stat=std}], metrics=[{brk_lb_resource_usage_stats=0.3}]\ndimensions=[{broker=localhost, metric=bundlesSplit}], metrics=[{brk_lb_bundles_split_total=35}]\ndimensions=[{broker=localhost, metric=bundlesSplit, reason=Topics, result=Success}], metrics=[{brk_lb_bundles_split_breakdown_total=1}]\ndimensions=[{broker=localhost, metric=bundlesSplit, reason=Sessions, result=Success}], metrics=[{brk_lb_bundles_split_breakdown_total=2}]\ndimensions=[{broker=localhost, metric=bundlesSplit, reason=MsgRate, result=Success}], metrics=[{brk_lb_bundles_split_breakdown_total=3}]\ndimensions=[{broker=localhost, metric=bundlesSplit, reason=Bandwidth, result=Success}], metrics=[{brk_lb_bundles_split_breakdown_total=4}]\ndimensions=[{broker=localhost, metric=bundlesSplit, reason=Admin, result=Success}], metrics=[{brk_lb_bundles_split_breakdown_total=5}]\ndimensions=[{broker=localhost, metric=bundlesSplit, reason=Unknown, result=Failure}], metrics=[{brk_lb_bundles_split_breakdown_total=6}]\ndimensions=[{broker=localhost, metric=assign, result=Failure}], metrics=[{brk_lb_assign_broker_breakdown_total=2}]\ndimensions=[{broker=localhost, metric=assign, result=Skip}], metrics=[{brk_lb_assign_broker_breakdown_total=3}]\ndimensions=[{broker=localhost, metric=assign, result=Success}], metrics=[{brk_lb_assign_broker_breakdown_total=1}]\ndimensions=[{broker=localhost, metric=sunitStateChn, result=Total, state=Init}], metrics=[{brk_sunit_state_chn_owner_lookup_total=1}]\ndimensions=[{broker=localhost, metric=sunitStateChn, result=Failure, state=Init}], metrics=[{brk_sunit_state_chn_owner_lookup_total=2}]\ndimensions=[{broker=localhost, metric=sunitStateChn, result=Total, state=Free}], metrics=[{brk_sunit_state_chn_owner_lookup_total=3}]\ndimensions=[{broker=localhost, metric=sunitStateChn, result=Failure, state=Free}], metrics=[{brk_sunit_state_chn_owner_lookup_total=4}]\ndimensions=[{broker=localhost, metric=sunitStateChn, result=Total, state=Owned}], metrics=[{brk_sunit_state_chn_owner_lookup_total=5}]\ndimensions=[{broker=localhost, metric=sunitStateChn, result=Failure, state=Owned}], metrics=[{brk_sunit_state_chn_owner_lookup_total=6}]\ndimensions=[{broker=localhost, metric=sunitStateChn, result=Total, state=Assigning}], metrics=[{brk_sunit_state_chn_owner_lookup_total=7}]\ndimensions=[{broker=localhost, metric=sunitStateChn, result=Failure, state=Assigning}], metrics=[{brk_sunit_state_chn_owner_lookup_total=8}]\ndimensions=[{broker=localhost, metric=sunitStateChn, result=Total, state=Releasing}], metrics=[{brk_sunit_state_chn_owner_lookup_total=9}]\ndimensions=[{broker=localhost, metric=sunitStateChn, result=Failure, state=Releasing}], metrics=[{brk_sunit_state_chn_owner_lookup_total=10}]\ndimensions=[{broker=localhost, metric=sunitStateChn, result=Total, state=Splitting}], metrics=[{brk_sunit_state_chn_owner_lookup_total=11}]\ndimensions=[{broker=localhost, metric=sunitStateChn, result=Failure, state=Splitting}], metrics=[{brk_sunit_state_chn_owner_lookup_total=12}]\ndimensions=[{broker=localhost, metric=sunitStateChn, result=Total, state=Deleted}], metrics=[{brk_sunit_state_chn_owner_lookup_total=13}]\ndimensions=[{broker=localhost, metric=sunitStateChn, result=Failure, state=Deleted}], metrics=[{brk_sunit_state_chn_owner_lookup_total=14}]\ndimensions=[{broker=localhost, event=Assign, metric=sunitStateChn, result=Total}], metrics=[{brk_sunit_state_chn_event_publish_ops_total=1}]\ndimensions=[{broker=localhost, event=Assign, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_event_publish_ops_total=2}]\ndimensions=[{broker=localhost, event=Split, metric=sunitStateChn, result=Total}], metrics=[{brk_sunit_state_chn_event_publish_ops_total=3}]\ndimensions=[{broker=localhost, event=Split, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_event_publish_ops_total=4}]\ndimensions=[{broker=localhost, event=Unload, metric=sunitStateChn, result=Total}], metrics=[{brk_sunit_state_chn_event_publish_ops_total=5}]\ndimensions=[{broker=localhost, event=Unload, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_event_publish_ops_total=6}]\ndimensions=[{broker=localhost, event=Override, metric=sunitStateChn, result=Total}], metrics=[{brk_sunit_state_chn_event_publish_ops_total=7}]\ndimensions=[{broker=localhost, event=Override, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_event_publish_ops_total=8}]\ndimensions=[{broker=localhost, event=Init, metric=sunitStateChn, result=Total}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=1}]\ndimensions=[{broker=localhost, event=Init, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=2}]\ndimensions=[{broker=localhost, event=Free, metric=sunitStateChn, result=Total}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=3}]\ndimensions=[{broker=localhost, event=Free, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=4}]\ndimensions=[{broker=localhost, event=Owned, metric=sunitStateChn, result=Total}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=5}]\ndimensions=[{broker=localhost, event=Owned, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=6}]\ndimensions=[{broker=localhost, event=Assigning, metric=sunitStateChn, result=Total}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=7}]\ndimensions=[{broker=localhost, event=Assigning, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=8}]\ndimensions=[{broker=localhost, event=Releasing, metric=sunitStateChn, result=Total}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=9}]\ndimensions=[{broker=localhost, event=Releasing, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=10}]\ndimensions=[{broker=localhost, event=Splitting, metric=sunitStateChn, result=Total}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=11}]\ndimensions=[{broker=localhost, event=Splitting, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=12}]\ndimensions=[{broker=localhost, event=Deleted, metric=sunitStateChn, result=Total}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=13}]\ndimensions=[{broker=localhost, event=Deleted, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=14}]\ndimensions=[{broker=localhost, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_cleanup_ops_total=4}]\ndimensions=[{broker=localhost, metric=sunitStateChn, result=Skip}], metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=6}]\ndimensions=[{broker=localhost, metric=sunitStateChn, result=Cancel}], metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=7}]\ndimensions=[{broker=localhost, metric=sunitStateChn, result=Schedule}], metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=5}]\ndimensions=[{broker=localhost, metric=sunitStateChn, result=Success}], metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=1}]\ndimensions=[{broker=localhost, metric=sunitStateChn}], metrics=[{brk_sunit_state_chn_orphan_su_cleanup_ops_total=3, brk_sunit_state_chn_owned_su_total=10, brk_sunit_state_chn_su_tombstone_cleanup_ops_total=2}]\n".split("\n")));
    }

    @Test
    public void testDisableBroker() throws Exception {
        ServiceConfiguration defaultConf = getDefaultConf();
        defaultConf.setAllowAutoTopicCreation(true);
        defaultConf.setForceDeleteNamespaceAllowed(true);
        defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
        defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
        defaultConf.setLoadBalancerSheddingEnabled(false);
        defaultConf.setLoadBalancerDebugModeEnabled(true);
        defaultConf.setTopicLevelPoliciesEnabled(false);
        PulsarTestContext createAdditionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf);
        try {
            PulsarService pulsarService = createAdditionalPulsarTestContext.getPulsarService();
            ExtensibleLoadManagerImpl extensibleLoadManagerImpl = (ExtensibleLoadManagerImpl) Mockito.spy((ExtensibleLoadManagerImpl) FieldUtils.readField(pulsarService.getLoadManager().get(), "loadManager", true));
            String lookupTopic = pulsarService.getAdminClient().lookups().lookupTopic("persistent://public/test/test");
            TopicName topicName = TopicName.get("persistent://public/test/test");
            NamespaceBundle namespaceBundle = getBundleAsync(this.pulsar1, topicName).get();
            if (!pulsarService.getBrokerServiceUrl().equals(lookupTopic)) {
                this.admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), namespaceBundle.getBundleRange(), pulsarService.getLookupServiceAddress());
                lookupTopic = this.pulsar2.getAdminClient().lookups().lookupTopic("persistent://public/test/test");
            }
            String lookupTopic2 = this.pulsar1.getAdminClient().lookups().lookupTopic("persistent://public/test/test");
            String lookupTopic3 = this.pulsar2.getAdminClient().lookups().lookupTopic("persistent://public/test/test");
            Assert.assertEquals(lookupTopic, pulsarService.getBrokerServiceUrl());
            Assert.assertEquals(lookupTopic, lookupTopic2);
            Assert.assertEquals(lookupTopic, lookupTopic3);
            Assert.assertFalse(((Boolean) this.primaryLoadManager.checkOwnershipAsync(Optional.empty(), namespaceBundle).get()).booleanValue());
            Assert.assertFalse(((Boolean) this.secondaryLoadManager.checkOwnershipAsync(Optional.empty(), namespaceBundle).get()).booleanValue());
            Assert.assertTrue(((Boolean) extensibleLoadManagerImpl.checkOwnershipAsync(Optional.empty(), namespaceBundle).get()).booleanValue());
            extensibleLoadManagerImpl.disableBroker();
            Assert.assertFalse(((Boolean) extensibleLoadManagerImpl.checkOwnershipAsync(Optional.empty(), namespaceBundle).get()).booleanValue());
            if (((Boolean) this.primaryLoadManager.checkOwnershipAsync(Optional.empty(), namespaceBundle).get()).booleanValue()) {
                Assert.assertFalse(((Boolean) this.secondaryLoadManager.checkOwnershipAsync(Optional.empty(), namespaceBundle).get()).booleanValue());
            } else {
                Assert.assertTrue(((Boolean) this.secondaryLoadManager.checkOwnershipAsync(Optional.empty(), namespaceBundle).get()).booleanValue());
            }
            if (createAdditionalPulsarTestContext != null) {
                createAdditionalPulsarTestContext.close();
            }
        } catch (Throwable th) {
            if (createAdditionalPulsarTestContext != null) {
                try {
                    createAdditionalPulsarTestContext.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 30000)
    public void testListTopic() throws Exception {
        this.admin.namespaces().createNamespace("public/testListTopic", 9);
        String topicName = TopicName.get("persistent", NamespaceName.get("public/testListTopic"), "get_topics_mode_" + UUID.randomUUID()).toString();
        String topicName2 = TopicName.get("non-persistent", NamespaceName.get("public/testListTopic"), "get_topics_mode_" + UUID.randomUUID()).toString();
        this.admin.topics().createPartitionedTopic(topicName, 9);
        this.admin.topics().createPartitionedTopic(topicName2, 9);
        this.pulsarClient.newProducer().topic(topicName).create().close();
        this.pulsarClient.newProducer().topic(topicName2).create().close();
        List boundaries = this.admin.namespaces().getBundles("public/testListTopic").getBoundaries();
        int i = 0;
        for (int i2 = 0; i2 < boundaries.size() - 1; i2++) {
            List listInBundle = this.admin.topics().getListInBundle("public/testListTopic", String.format("%s_%s", boundaries.get(i2), boundaries.get(i2 + 1)));
            if (listInBundle != null) {
                i += listInBundle.size();
                Iterator it = listInBundle.iterator();
                while (it.hasNext()) {
                    Assert.assertFalse(TopicName.get((String) it.next()).isPersistent());
                }
            }
        }
        Assert.assertEquals(i, 9);
        Assert.assertEquals(this.admin.topics().getList("public/testListTopic").size(), 18);
        this.admin.namespaces().deleteNamespace("public/testListTopic", true);
    }

    @Test(timeOut = 30000)
    public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws Exception {
        NamespaceName heartbeatNamespace = NamespaceService.getHeartbeatNamespace(this.pulsar1.getLookupServiceAddress(), this.pulsar1.getConfiguration());
        NamespaceName heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(this.pulsar1.getLookupServiceAddress(), this.pulsar1.getConfiguration());
        NamespaceName heartbeatNamespace2 = NamespaceService.getHeartbeatNamespace(this.pulsar2.getLookupServiceAddress(), this.pulsar2.getConfiguration());
        NamespaceName heartbeatNamespaceV22 = NamespaceService.getHeartbeatNamespaceV2(this.pulsar2.getLookupServiceAddress(), this.pulsar2.getConfiguration());
        NamespaceBundle fullBundle = this.pulsar1.getNamespaceService().getNamespaceBundleFactory().getFullBundle(heartbeatNamespace);
        NamespaceBundle fullBundle2 = this.pulsar1.getNamespaceService().getNamespaceBundleFactory().getFullBundle(heartbeatNamespaceV2);
        NamespaceBundle fullBundle3 = this.pulsar2.getNamespaceService().getNamespaceBundleFactory().getFullBundle(heartbeatNamespace2);
        NamespaceBundle fullBundle4 = this.pulsar2.getNamespaceService().getNamespaceBundleFactory().getFullBundle(heartbeatNamespaceV22);
        Set ownedServiceUnits = this.primaryLoadManager.getOwnedServiceUnits();
        log.info("Owned service units: {}", ownedServiceUnits);
        Assert.assertEquals(ownedServiceUnits.size(), 2);
        Assert.assertTrue(ownedServiceUnits.contains(fullBundle));
        Assert.assertTrue(ownedServiceUnits.contains(fullBundle2));
        Set ownedServiceUnits2 = this.secondaryLoadManager.getOwnedServiceUnits();
        log.info("Owned service units: {}", ownedServiceUnits2);
        Assert.assertEquals(ownedServiceUnits2.size(), 2);
        Assert.assertTrue(ownedServiceUnits2.contains(fullBundle3));
        Assert.assertTrue(ownedServiceUnits2.contains(fullBundle4));
        Map ownedNamespaces = this.admin.brokers().getOwnedNamespaces(this.conf.getClusterName(), this.pulsar1.getLookupServiceAddress());
        Map ownedNamespaces2 = this.admin.brokers().getOwnedNamespaces(this.conf.getClusterName(), this.pulsar2.getLookupServiceAddress());
        Assert.assertEquals(ownedNamespaces.size(), 2);
        Assert.assertTrue(ownedNamespaces.containsKey(fullBundle.toString()));
        Assert.assertTrue(ownedNamespaces.containsKey(fullBundle2.toString()));
        Assert.assertEquals(ownedNamespaces2.size(), 2);
        Assert.assertTrue(ownedNamespaces2.containsKey(fullBundle3.toString()));
        Assert.assertTrue(ownedNamespaces2.containsKey(fullBundle4.toString()));
        this.admin.topics().createPartitionedTopic("persistent://public/test/test-get-owned-service-units", 1);
        NamespaceBundle join = getBundleAsync(this.pulsar1, TopicName.get("persistent://public/test/test-get-owned-service-units")).join();
        CompletableFuture assign = this.primaryLoadManager.assign(Optional.empty(), join);
        Assert.assertFalse(((Optional) assign.join()).isEmpty());
        if (((BrokerLookupData) ((Optional) assign.join()).get()).getWebServiceUrl().equals(this.pulsar1.getWebServiceAddress())) {
            assertOwnedServiceUnits(this.pulsar1, this.primaryLoadManager, join);
        } else {
            assertOwnedServiceUnits(this.pulsar2, this.secondaryLoadManager, join);
        }
    }

    private void assertOwnedServiceUnits(PulsarService pulsarService, ExtensibleLoadManagerImpl extensibleLoadManagerImpl, NamespaceBundle namespaceBundle) throws PulsarAdminException {
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(extensibleLoadManagerImpl.getOwnedServiceUnits().contains(namespaceBundle));
        });
        Map ownedNamespaces = this.admin.brokers().getOwnedNamespaces(this.conf.getClusterName(), pulsarService.getLookupServiceAddress());
        Assert.assertTrue(ownedNamespaces.containsKey(namespaceBundle.toString()));
        NamespaceOwnershipStatus namespaceOwnershipStatus = (NamespaceOwnershipStatus) ownedNamespaces.get(namespaceBundle.toString());
        Assert.assertTrue(namespaceOwnershipStatus.is_active);
        Assert.assertFalse(namespaceOwnershipStatus.is_controlled);
        Assert.assertEquals(namespaceOwnershipStatus.broker_assignment, BrokerAssignment.shared);
    }

    @Test(timeOut = 30000)
    public void testGetOwnedServiceUnitsWhenLoadManagerNotStart() {
        Set ownedServiceUnits = new ExtensibleLoadManagerImpl().getOwnedServiceUnits();
        Assert.assertNotNull(ownedServiceUnits);
        Assert.assertTrue(ownedServiceUnits.isEmpty());
    }

    @Test(timeOut = 30000)
    public void testTryAcquiringOwnership() throws PulsarAdminException, ExecutionException, InterruptedException {
        this.admin.namespaces().createNamespace("public/testTryAcquiringOwnership", 1);
        Assert.assertEquals(((NamespaceEphemeralData) this.primaryLoadManager.tryAcquiringOwnership(getBundleAsync(this.pulsar1, TopicName.get("persistent://public/testTryAcquiringOwnership/test")).get()).get()).getNativeUrl(), this.pulsar1.getBrokerServiceUrl());
        this.admin.namespaces().deleteNamespace("public/testTryAcquiringOwnership", true);
    }

    @Test(timeOut = 30000)
    public void testHealthcheck() throws PulsarAdminException {
        this.admin.brokers().healthcheck(TopicVersion.V2);
    }

    private void setPrimaryLoadManager() throws IllegalAccessException {
        ExtensibleLoadManagerWrapper extensibleLoadManagerWrapper = (ExtensibleLoadManagerWrapper) this.pulsar1.getLoadManager().get();
        this.primaryLoadManager = (ExtensibleLoadManagerImpl) Mockito.spy((ExtensibleLoadManagerImpl) FieldUtils.readField(extensibleLoadManagerWrapper, "loadManager", true));
        FieldUtils.writeField(extensibleLoadManagerWrapper, "loadManager", this.primaryLoadManager, true);
        this.channel1 = (ServiceUnitStateChannelImpl) FieldUtils.readField(this.primaryLoadManager, "serviceUnitStateChannel", true);
    }

    private void setSecondaryLoadManager() throws IllegalAccessException {
        ExtensibleLoadManagerWrapper extensibleLoadManagerWrapper = (ExtensibleLoadManagerWrapper) this.pulsar2.getLoadManager().get();
        this.secondaryLoadManager = (ExtensibleLoadManagerImpl) Mockito.spy((ExtensibleLoadManagerImpl) FieldUtils.readField(extensibleLoadManagerWrapper, "loadManager", true));
        FieldUtils.writeField(extensibleLoadManagerWrapper, "loadManager", this.secondaryLoadManager, true);
        this.channel2 = (ServiceUnitStateChannelImpl) FieldUtils.readField(this.secondaryLoadManager, "serviceUnitStateChannel", true);
    }

    private CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService pulsarService, TopicName topicName) {
        return pulsarService.getNamespaceService().getBundleAsync(topicName);
    }
}
