package org.apache.pulsar.broker.service;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Sets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.jute.Record;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.OwnerShipForCurrentServerTestBase;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.namespace.ServiceUnitZkUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.DeleteRequest;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.mockito.ArgumentMatchers;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/service/TopicOwnerTest.class */
public class TopicOwnerTest {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TopicOwnerTest.class);
    LocalBookkeeperEnsemble bkEnsemble;
    protected static final int BROKER_COUNT = 5;
    protected PulsarService leaderPulsar;
    protected PulsarAdmin leaderAdmin;
    protected PulsarAdmin[] pulsarAdmins = new PulsarAdmin[5];
    protected ServiceConfiguration[] configurations = new ServiceConfiguration[5];
    protected PulsarService[] pulsarServices = new PulsarService[5];

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/pulsar/broker/service/TopicOwnerTest$RequestMatcher.class */
    public interface RequestMatcher {
        boolean match(Request request) throws Exception;
    }

    @BeforeMethod
    void setup() throws Exception {
        log.info("---- Initializing TopicOwnerTest -----");
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> {
            return 0;
        });
        this.bkEnsemble.start();
        for (int i = 0; i < 5; i++) {
            ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
            serviceConfiguration.setBrokerServicePort(Optional.of(0));
            serviceConfiguration.setClusterName("my-cluster");
            serviceConfiguration.setAdvertisedAddress("localhost");
            serviceConfiguration.setWebServicePort(Optional.of(0));
            serviceConfiguration.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
            serviceConfiguration.setDefaultNumberOfNamespaceBundles(1);
            serviceConfiguration.setLoadBalancerEnabled(false);
            this.configurations[i] = serviceConfiguration;
            this.pulsarServices[i] = new PulsarService(serviceConfiguration);
            this.pulsarServices[i].start();
            while (i == 0 && !this.pulsarServices[0].getLeaderElectionService().isLeader()) {
                Thread.sleep(10L);
            }
            this.pulsarAdmins[i] = PulsarAdmin.builder().serviceHttpUrl(this.pulsarServices[i].getWebServiceAddress()).build();
        }
        this.leaderPulsar = this.pulsarServices[0];
        this.leaderAdmin = this.pulsarAdmins[0];
        Thread.sleep(1000L);
    }

    @AfterMethod
    void tearDown() throws Exception {
        for (int i = 0; i < 5; i++) {
            this.pulsarServices[i].close();
            this.pulsarAdmins[i].close();
        }
        this.bkEnsemble.stop();
    }

    private MutableObject<PulsarService> spyLeaderNamespaceServiceForAuthorizedBroker() {
        NamespaceService namespaceService = (NamespaceService) PowerMockito.spy(this.leaderPulsar.getNamespaceService());
        MutableObject<PulsarService> mutableObject = new MutableObject<>();
        ((NamespaceService) PowerMockito.doAnswer(invocationOnMock -> {
            PulsarService pulsarService = (PulsarService) mutableObject.getValue2();
            return pulsarService == null ? (CompletableFuture) invocationOnMock.callRealMethod() : CompletableFuture.completedFuture(Optional.of(new LookupResult(pulsarService.getWebServiceAddress(), pulsarService.getWebServiceAddressTls(), pulsarService.getBrokerServiceUrl(), pulsarService.getBrokerServiceUrlTls(), true)));
        }).when(namespaceService)).getBrokerServiceUrlAsync((TopicName) ArgumentMatchers.any(TopicName.class), (LookupOptions) ArgumentMatchers.any(LookupOptions.class));
        Whitebox.setInternalState(this.leaderPulsar, "nsService", namespaceService);
        return mutableObject;
    }

    private CompletableFuture<Void> watchZookeeperReconnect(ZooKeeper zooKeeper) throws Exception {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        zooKeeper.exists("/", watchedEvent -> {
            if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
                completableFuture.complete(null);
            }
        });
        return completableFuture;
    }

    private void spyZookeeperToDisconnectBeforePersist(ZooKeeper zooKeeper, RequestMatcher requestMatcher) {
        ZooKeeperServer zkServer = this.bkEnsemble.getZkServer();
        ServerCnxn zookeeperServerConnection = this.bkEnsemble.getZookeeperServerConnection(zooKeeper);
        ZooKeeperServer zooKeeperServer = (ZooKeeperServer) PowerMockito.spy(zkServer);
        Whitebox.setInternalState(zookeeperServerConnection, "zkServer", zooKeeperServer);
        ((ZooKeeperServer) PowerMockito.doAnswer(invocationOnMock -> {
            Request request = (Request) invocationOnMock.getArgument(0);
            if (request.sessionId == zooKeeper.getSessionId() && requestMatcher.match(request)) {
                Whitebox.setInternalState(zookeeperServerConnection, "zkServer", zkServer);
                this.bkEnsemble.disconnectZookeeper(zooKeeper);
                return null;
            }
            return invocationOnMock.callRealMethod();
        }).when(zooKeeperServer)).submitRequest((Request) ArgumentMatchers.any(Request.class));
    }

    private void spyZookeeperToDisconnectAfterPersist(ZooKeeper zooKeeper, RequestMatcher requestMatcher) {
        ZooKeeperServer zkServer = this.bkEnsemble.getZkServer();
        ServerCnxn zookeeperServerConnection = this.bkEnsemble.getZookeeperServerConnection(zooKeeper);
        ZooKeeperServer zooKeeperServer = (ZooKeeperServer) PowerMockito.spy(zkServer);
        Whitebox.setInternalState(zookeeperServerConnection, "zkServer", zooKeeperServer);
        MutableBoolean mutableBoolean = new MutableBoolean();
        ((ZooKeeperServer) PowerMockito.doAnswer(invocationOnMock -> {
            Request request = (Request) invocationOnMock.getArgument(0);
            if (request.sessionId == zooKeeper.getSessionId() && requestMatcher.match(request)) {
                ServerCnxn serverCnxn = (ServerCnxn) PowerMockito.spy(zookeeperServerConnection);
                ((ServerCnxn) PowerMockito.doAnswer(invocationOnMock -> {
                    synchronized (mutableBoolean) {
                        ReplyHeader replyHeader = (ReplyHeader) invocationOnMock.getArgument(0);
                        if (replyHeader.getXid() == request.cxid && replyHeader.getErr() == 0) {
                            Whitebox.setInternalState(zookeeperServerConnection, "zkServer", zkServer);
                            mutableBoolean.setTrue();
                            this.bkEnsemble.disconnectZookeeper(zooKeeper);
                        } else if (mutableBoolean.isFalse()) {
                            return invocationOnMock.callRealMethod();
                        }
                        return null;
                    }
                }).when(serverCnxn)).sendResponse((ReplyHeader) ArgumentMatchers.any(ReplyHeader.class), (Record) ArgumentMatchers.nullable(Record.class), (String) ArgumentMatchers.any(String.class));
                Whitebox.setInternalState(request, "cnxn", serverCnxn);
                return invocationOnMock.callRealMethod();
            }
            return invocationOnMock.callRealMethod();
        }).when(zooKeeperServer)).submitRequest((Request) ArgumentMatchers.any(Request.class));
    }

    @Test
    public void testAcquireOwnershipWithZookeeperDisconnectedBeforeOwnershipNodeCreated() throws Exception {
        this.pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(this.pulsarServices[0].getWebServiceAddress()));
        TenantInfo tenantInfo = new TenantInfo();
        tenantInfo.setAllowedClusters(Sets.newHashSet("my-cluster"));
        this.pulsarAdmins[0].tenants().createTenant("my-tenant", tenantInfo);
        this.pulsarAdmins[0].namespaces().createNamespace("my-tenant/my-ns", 16);
        NamespaceBundle bundle = this.leaderPulsar.getNamespaceService().getBundle(TopicName.get("persistent://my-tenant/my-ns/topic-1"));
        MutableObject<PulsarService> spyLeaderNamespaceServiceForAuthorizedBroker = spyLeaderNamespaceServiceForAuthorizedBroker();
        PulsarService pulsarService = this.pulsarServices[1];
        ZooKeeper zkClient = pulsarService.getZkClient();
        CompletableFuture<Void> watchZookeeperReconnect = watchZookeeperReconnect(zkClient);
        String path = ServiceUnitZkUtils.path(bundle);
        spyZookeeperToDisconnectBeforePersist(zkClient, request -> {
            if (request.type != 1) {
                return false;
            }
            CreateRequest createRequest = new CreateRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request.duplicate(), createRequest);
            return createRequest.getPath().contains(path);
        });
        spyLeaderNamespaceServiceForAuthorizedBroker.setValue(pulsarService);
        try {
            this.pulsarAdmins[1].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1");
        } catch (Exception e) {
        }
        watchZookeeperReconnect.join();
        Assert.assertEquals(this.pulsarAdmins[0].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[2].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[3].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[4].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        pulsarService.getBrokerService().getTopic("persistent://my-tenant/my-ns/topic-1", true).join();
        Assert.assertEquals(this.pulsarAdmins[1].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
    }

    @Test
    public void testAcquireOwnershipWithZookeeperDisconnectedAfterOwnershipNodeCreated() throws Exception {
        this.pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(this.pulsarServices[0].getWebServiceAddress()));
        TenantInfo tenantInfo = new TenantInfo();
        tenantInfo.setAllowedClusters(Sets.newHashSet("my-cluster"));
        this.pulsarAdmins[0].tenants().createTenant("my-tenant", tenantInfo);
        this.pulsarAdmins[0].namespaces().createNamespace("my-tenant/my-ns", 16);
        NamespaceBundle bundle = this.leaderPulsar.getNamespaceService().getBundle(TopicName.get("persistent://my-tenant/my-ns/topic-1"));
        MutableObject<PulsarService> spyLeaderNamespaceServiceForAuthorizedBroker = spyLeaderNamespaceServiceForAuthorizedBroker();
        PulsarService pulsarService = this.pulsarServices[1];
        ZooKeeper zkClient = pulsarService.getZkClient();
        CompletableFuture<Void> watchZookeeperReconnect = watchZookeeperReconnect(zkClient);
        String path = ServiceUnitZkUtils.path(bundle);
        spyZookeeperToDisconnectAfterPersist(zkClient, request -> {
            if (request.type != 1) {
                return false;
            }
            CreateRequest createRequest = new CreateRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request.duplicate(), createRequest);
            return createRequest.getPath().contains(path);
        });
        spyLeaderNamespaceServiceForAuthorizedBroker.setValue(pulsarService);
        try {
            this.pulsarAdmins[1].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1");
        } catch (Exception e) {
        }
        watchZookeeperReconnect.join();
        Assert.assertEquals(this.pulsarAdmins[0].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[2].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[3].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[4].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        pulsarService.getBrokerService().getTopic("persistent://my-tenant/my-ns/topic-1", true).join();
        Assert.assertEquals(this.pulsarAdmins[1].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
    }

    @Test
    public void testReestablishOwnershipAfterInvalidateCache() throws Exception {
        this.pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(this.pulsarServices[0].getWebServiceAddress()));
        TenantInfo tenantInfo = new TenantInfo();
        tenantInfo.setAllowedClusters(Sets.newHashSet("my-cluster"));
        this.pulsarAdmins[0].tenants().createTenant("my-tenant", tenantInfo);
        this.pulsarAdmins[0].namespaces().createNamespace("my-tenant/my-ns", 16);
        NamespaceBundle bundle = this.leaderPulsar.getNamespaceService().getBundle(TopicName.get("persistent://my-tenant/my-ns/topic-1"));
        MutableObject<PulsarService> spyLeaderNamespaceServiceForAuthorizedBroker = spyLeaderNamespaceServiceForAuthorizedBroker();
        PulsarService pulsarService = this.pulsarServices[1];
        spyLeaderNamespaceServiceForAuthorizedBroker.setValue(pulsarService);
        Assert.assertEquals(this.pulsarAdmins[0].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[1].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[2].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[3].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[4].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        OwnershipCache ownershipCache = pulsarService.getNamespaceService().getOwnershipCache();
        AsyncLoadingCache asyncLoadingCache = (AsyncLoadingCache) Whitebox.getInternalState(ownershipCache, "ownedBundlesCache");
        spyLeaderNamespaceServiceForAuthorizedBroker.setValue(null);
        asyncLoadingCache.synchronous().invalidate(ServiceUnitZkUtils.path(bundle));
        Assert.assertNull(ownershipCache.getOwnedBundle(bundle));
        Assert.assertEquals(this.pulsarAdmins[0].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[2].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[3].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[4].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertNull(ownershipCache.getOwnedBundle(bundle));
        Assert.assertEquals(this.pulsarAdmins[1].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertNotNull(ownershipCache.getOwnedBundle(bundle));
        asyncLoadingCache.synchronous().invalidate(ServiceUnitZkUtils.path(bundle));
        ownershipCache.checkOwnership(bundle).join();
        Assert.assertNotNull(ownershipCache.getOwnedBundle(bundle));
        asyncLoadingCache.synchronous().invalidate(ServiceUnitZkUtils.path(bundle));
        pulsarService.getBrokerService().getTopic("persistent://my-tenant/my-ns/topic-1", true).join();
        Assert.assertNotNull(ownershipCache.getOwnedBundle(bundle));
        pulsarService.getBrokerService().deleteTopic("persistent://my-tenant/my-ns/topic-1", true).join();
        asyncLoadingCache.synchronous().invalidate(ServiceUnitZkUtils.path(bundle));
        this.pulsarAdmins[0].topics().createNonPartitionedTopic("persistent://my-tenant/my-ns/topic-1");
        Assert.assertNotNull(ownershipCache.getOwnedBundle(bundle));
    }

    @Test
    public void testReleaseOwnershipWithZookeeperDisconnectedBeforeOwnershipNodeDeleted() throws Exception {
        this.pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(this.pulsarServices[0].getWebServiceAddress()));
        TenantInfo tenantInfo = new TenantInfo();
        tenantInfo.setAllowedClusters(Sets.newHashSet("my-cluster"));
        this.pulsarAdmins[0].tenants().createTenant("my-tenant", tenantInfo);
        this.pulsarAdmins[0].namespaces().createNamespace("my-tenant/my-ns", 16);
        NamespaceBundle bundle = this.leaderPulsar.getNamespaceService().getBundle(TopicName.get("persistent://my-tenant/my-ns/topic-1"));
        MutableObject<PulsarService> spyLeaderNamespaceServiceForAuthorizedBroker = spyLeaderNamespaceServiceForAuthorizedBroker();
        PulsarService pulsarService = this.pulsarServices[1];
        PulsarService pulsarService2 = this.pulsarServices[2];
        spyLeaderNamespaceServiceForAuthorizedBroker.setValue(pulsarService);
        Assert.assertEquals(this.pulsarAdmins[0].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[1].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[2].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[3].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[4].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        ZooKeeper zkClient = pulsarService.getZkClient();
        CompletableFuture<Void> watchZookeeperReconnect = watchZookeeperReconnect(zkClient);
        String path = ServiceUnitZkUtils.path(bundle);
        spyZookeeperToDisconnectBeforePersist(zkClient, request -> {
            if (request.type != 2) {
                return false;
            }
            DeleteRequest deleteRequest = new DeleteRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request.duplicate(), deleteRequest);
            return deleteRequest.getPath().contains(path);
        });
        try {
            this.pulsarAdmins[1].namespaces().unloadNamespaceBundle(bundle.getNamespaceObject().toString(), bundle.getBundleRange());
        } catch (Exception e) {
        }
        watchZookeeperReconnect.join();
        spyLeaderNamespaceServiceForAuthorizedBroker.setValue(pulsarService2);
        String lookupTopic = this.pulsarAdmins[0].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1");
        Assert.assertEquals(this.pulsarAdmins[1].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), lookupTopic);
        Assert.assertEquals(this.pulsarAdmins[2].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), lookupTopic);
        Assert.assertEquals(this.pulsarAdmins[3].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), lookupTopic);
        Assert.assertEquals(this.pulsarAdmins[4].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), lookupTopic);
        this.pulsarAdmins[0].topics().createNonPartitionedTopic("persistent://my-tenant/my-ns/topic-1");
    }

    @Test
    public void testReleaseOwnershipWithZookeeperDisconnectedAfterOwnershipNodeDeleted() throws Exception {
        this.pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(this.pulsarServices[0].getWebServiceAddress()));
        TenantInfo tenantInfo = new TenantInfo();
        tenantInfo.setAllowedClusters(Sets.newHashSet("my-cluster"));
        this.pulsarAdmins[0].tenants().createTenant("my-tenant", tenantInfo);
        this.pulsarAdmins[0].namespaces().createNamespace("my-tenant/my-ns", 16);
        NamespaceBundle bundle = this.leaderPulsar.getNamespaceService().getBundle(TopicName.get("persistent://my-tenant/my-ns/topic-1"));
        MutableObject<PulsarService> spyLeaderNamespaceServiceForAuthorizedBroker = spyLeaderNamespaceServiceForAuthorizedBroker();
        PulsarService pulsarService = this.pulsarServices[1];
        PulsarService pulsarService2 = this.pulsarServices[2];
        spyLeaderNamespaceServiceForAuthorizedBroker.setValue(pulsarService);
        Assert.assertEquals(this.pulsarAdmins[0].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[1].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[2].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[3].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[4].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService.getBrokerServiceUrl());
        ZooKeeper zkClient = pulsarService.getZkClient();
        CompletableFuture<Void> watchZookeeperReconnect = watchZookeeperReconnect(zkClient);
        String path = ServiceUnitZkUtils.path(bundle);
        spyZookeeperToDisconnectAfterPersist(zkClient, request -> {
            if (request.type != 2) {
                return false;
            }
            DeleteRequest deleteRequest = new DeleteRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request.duplicate(), deleteRequest);
            return deleteRequest.getPath().contains(path);
        });
        try {
            this.pulsarAdmins[1].namespaces().unloadNamespaceBundle(bundle.getNamespaceObject().toString(), bundle.getBundleRange());
        } catch (Exception e) {
        }
        watchZookeeperReconnect.join();
        spyLeaderNamespaceServiceForAuthorizedBroker.setValue(pulsarService2);
        Assert.assertEquals(this.pulsarAdmins[0].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService2.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[3].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService2.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[4].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService2.getBrokerServiceUrl());
        pulsarService2.getBrokerService().getTopic("persistent://my-tenant/my-ns/topic-1", true).join();
        Assert.assertEquals(this.pulsarAdmins[2].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService2.getBrokerServiceUrl());
        Assert.assertEquals(this.pulsarAdmins[1].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1"), pulsarService2.getBrokerServiceUrl());
    }

    @Test
    public void testConnectToInvalidateBundleCacheBroker() throws Exception {
        this.pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(this.pulsarServices[0].getWebServiceAddress()));
        TenantInfo tenantInfo = new TenantInfo();
        tenantInfo.setAllowedClusters(Sets.newHashSet("my-cluster"));
        this.pulsarAdmins[0].tenants().createTenant("my-tenant", tenantInfo);
        this.pulsarAdmins[0].namespaces().createNamespace("my-tenant/my-ns", 16);
        Assert.assertEquals(this.pulsarAdmins[0].namespaces().getPolicies("my-tenant/my-ns").bundles.getNumBundles(), 16);
        String lookupTopic = this.pulsarAdmins[0].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1");
        String lookupTopic2 = this.pulsarAdmins[0].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-2");
        while (lookupTopic.equals(lookupTopic2)) {
            this.pulsarAdmins[0].namespaces().unload("my-tenant/my-ns");
            lookupTopic = this.pulsarAdmins[0].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1");
            lookupTopic2 = this.pulsarAdmins[0].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-2");
        }
        this.pulsarAdmins[0].namespaces().splitNamespaceBundle("my-tenant/my-ns", this.pulsarServices[0].getNamespaceService().getBundle(TopicName.get("persistent://my-tenant/my-ns/topic-1")).getBundleRange(), true, (String) null);
        Assert.assertTrue(PulsarClient.builder().serviceUrl(lookupTopic).build().newConsumer().topic("persistent://my-tenant/my-ns/topic-2").subscriptionName(OwnerShipForCurrentServerTestBase.CLUSTER_NAME).subscribe().isConnected());
    }
}
