package org.apache.pulsar.broker.namespace;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/namespace/NamespaceServiceTest.class */
public class NamespaceServiceTest extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(NamespaceServiceTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testSplitAndOwnBundles() throws Exception {
        OwnershipCache ownershipCache = (OwnershipCache) Mockito.spy(this.pulsar.getNamespaceService().getOwnershipCache());
        ((OwnershipCache) Mockito.doReturn(CompletableFuture.completedFuture(null)).when(ownershipCache)).disableOwnership((NamespaceBundle) ArgumentMatchers.any(NamespaceBundle.class));
        Field declaredField = NamespaceService.class.getDeclaredField("ownershipCache");
        declaredField.setAccessible(true);
        declaredField.set(this.pulsar.getNamespaceService(), ownershipCache);
        NamespaceService namespaceService = this.pulsar.getNamespaceService();
        NamespaceName namespaceName = NamespaceName.get("pulsar/global/ns1");
        TopicName topicName = TopicName.get("persistent://pulsar/global/ns1/topic-1");
        NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(namespaceName);
        NamespaceBundle findBundle = bundles.findBundle(topicName);
        try {
            namespaceService.splitAndOwnBundle(findBundle, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO).get();
        } catch (Exception e) {
            Assert.fail("split bundle failed", e);
        }
        NamespaceBundleFactory namespaceBundleFactory = this.pulsar.getNamespaceService().getNamespaceBundleFactory();
        NamespaceBundles bundles2 = namespaceBundleFactory.getBundles(namespaceName);
        Assert.assertNotNull(bundles2);
        List bundles3 = bundles2.getBundles();
        Assert.assertNotNull(bundles);
        Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles = splitBundles(NamespaceBundleFactory.createFactory(this.pulsar, Hashing.crc32()), namespaceName, bundles, findBundle);
        Assert.assertNotNull(splitBundles);
        HashSet hashSet = new HashSet((Collection) splitBundles.getRight());
        hashSet.removeAll(bundles3);
        Assert.assertTrue(hashSet.isEmpty());
        Policies policies = (Policies) ObjectMapperFactory.getThreadLocal().readValue(this.pulsar.getLocalZkCache().getZooKeeper().getData(PulsarWebResource.joinPath(new String[]{"/admin/local-policies", namespaceName.toString()}), (Watcher) null, new Stat()), Policies.class);
        Assert.assertEquals(namespaceBundleFactory.getBundles(namespaceName, policies.bundles), bundles2);
        log.info("Policies: {}", policies);
        bundles3.forEach(namespaceBundle -> {
            try {
                Assert.assertEquals(((NamespaceEphemeralData) ObjectMapperFactory.getThreadLocal().readValue(this.pulsar.getLocalZkCache().getZooKeeper().getData(ServiceUnitZkUtils.path(namespaceBundle), (Watcher) null, new Stat()), NamespaceEphemeralData.class)).getNativeUrl(), this.pulsar.getSafeBrokerServiceUrl());
            } catch (Exception e2) {
                Assert.fail("failed to setup ownership", e2);
            }
        });
    }

    @Test
    public void testSplitMapWithRefreshedStatMap() throws Exception {
        OwnershipCache ownershipCache = (OwnershipCache) Mockito.spy(this.pulsar.getNamespaceService().getOwnershipCache());
        ManagedLedger managedLedger = (ManagedLedger) Mockito.mock(ManagedLedger.class);
        Mockito.when(managedLedger.getCursors()).thenReturn(Lists.newArrayList());
        ((OwnershipCache) Mockito.doReturn(CompletableFuture.completedFuture(null)).when(ownershipCache)).disableOwnership((NamespaceBundle) ArgumentMatchers.any(NamespaceBundle.class));
        Field declaredField = NamespaceService.class.getDeclaredField("ownershipCache");
        declaredField.setAccessible(true);
        declaredField.set(this.pulsar.getNamespaceService(), ownershipCache);
        NamespaceService namespaceService = this.pulsar.getNamespaceService();
        NamespaceName namespaceName = NamespaceName.get("pulsar/global/ns1");
        TopicName topicName = TopicName.get("persistent://pulsar/global/ns1/topic-1");
        NamespaceBundle findBundle = namespaceService.getNamespaceBundleFactory().getBundles(namespaceName).findBundle(topicName);
        PersistentTopic persistentTopic = new PersistentTopic(topicName.toString(), managedLedger, this.pulsar.getBrokerService());
        Method declaredMethod = this.pulsar.getBrokerService().getClass().getDeclaredMethod("addTopicToStatsMaps", TopicName.class, Topic.class);
        declaredMethod.setAccessible(true);
        declaredMethod.invoke(this.pulsar.getBrokerService(), topicName, persistentTopic);
        String namespaceName2 = findBundle.getNamespaceObject().toString();
        Assert.assertNotNull(this.pulsar.getBrokerService().getAllTopicsFromNamespaceBundle(namespaceName2, findBundle.toString()));
        try {
            namespaceService.splitAndOwnBundle(findBundle, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO).get();
        } catch (Exception e) {
            Assert.fail("split bundle failed", e);
        }
        Assert.assertTrue(this.pulsar.getBrokerService().getAllTopicsFromNamespaceBundle(namespaceName2, findBundle.toString()).isEmpty());
        Assert.assertFalse(CollectionUtils.isEmpty(this.pulsar.getBrokerService().getAllTopicsFromNamespaceBundle(namespaceName2, this.pulsar.getNamespaceService().getBundle(topicName).toString())));
    }

    @Test
    public void testIsServiceUnitDisabled() throws Exception {
        OwnershipCache ownershipCache = (OwnershipCache) Mockito.spy(this.pulsar.getNamespaceService().getOwnershipCache());
        Mockito.when(((ManagedLedger) Mockito.mock(ManagedLedger.class)).getCursors()).thenReturn(Lists.newArrayList());
        ((OwnershipCache) Mockito.doReturn(CompletableFuture.completedFuture(null)).when(ownershipCache)).disableOwnership((NamespaceBundle) ArgumentMatchers.any(NamespaceBundle.class));
        Field declaredField = NamespaceService.class.getDeclaredField("ownershipCache");
        declaredField.setAccessible(true);
        declaredField.set(this.pulsar.getNamespaceService(), ownershipCache);
        NamespaceService namespaceService = this.pulsar.getNamespaceService();
        NamespaceName namespaceName = NamespaceName.get("pulsar/global/ns1");
        Assert.assertFalse(namespaceService.isNamespaceBundleDisabled(namespaceService.getNamespaceBundleFactory().getBundles(namespaceName).findBundle(TopicName.get("persistent://pulsar/global/ns1/topic-1"))));
    }

    @Test
    public void testRemoveOwnershipNamespaceBundle() throws Exception {
        OwnershipCache ownershipCache = (OwnershipCache) Mockito.spy(this.pulsar.getNamespaceService().getOwnershipCache());
        Mockito.when(((ManagedLedger) Mockito.mock(ManagedLedger.class)).getCursors()).thenReturn(Lists.newArrayList());
        ((OwnershipCache) Mockito.doReturn(CompletableFuture.completedFuture(null)).when(ownershipCache)).disableOwnership((NamespaceBundle) ArgumentMatchers.any(NamespaceBundle.class));
        Field declaredField = NamespaceService.class.getDeclaredField("ownershipCache");
        declaredField.setAccessible(true);
        declaredField.set(this.pulsar.getNamespaceService(), ownershipCache);
        NamespaceService namespaceService = this.pulsar.getNamespaceService();
        NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(NamespaceName.get("prop/use/ns1"));
        NamespaceBundle namespaceBundle = (NamespaceBundle) bundles.getBundles().get(0);
        ownershipCache.tryAcquiringOwnership(namespaceBundle).get();
        Assert.assertNotNull(ownershipCache.getOwnedBundle(namespaceBundle));
        ownershipCache.removeOwnership(bundles).get();
        Assert.assertNull(ownershipCache.getOwnedBundle(namespaceBundle));
    }

    @Test
    public void testUnloadNamespaceBundleFailure() throws Exception {
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
        ConcurrentOpenHashMap topics = this.pulsar.getBrokerService().getTopics();
        Topic topic = (Topic) Mockito.spy(((Optional) ((CompletableFuture) topics.get("persistent://my-property/use/my-ns/my-topic1")).get()).get());
        topics.clear();
        topics.put("persistent://my-property/use/my-ns/my-topic1", CompletableFuture.completedFuture(Optional.of(topic)));
        ((Topic) Mockito.doAnswer(new Answer<CompletableFuture<Void>>() { // from class: org.apache.pulsar.broker.namespace.NamespaceServiceTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public CompletableFuture<Void> m35answer(InvocationOnMock invocationOnMock) {
                CompletableFuture<Void> completableFuture = new CompletableFuture<>();
                completableFuture.completeExceptionally(new RuntimeException("first time failed"));
                return completableFuture;
            }
        }).when(topic)).close(false);
        NamespaceBundle bundle = this.pulsar.getNamespaceService().getBundle(TopicName.get("persistent://my-property/use/my-ns/my-topic1"));
        this.pulsar.getNamespaceService().unloadNamespaceBundle(bundle).join();
        try {
            this.pulsar.getLocalZkCache().getZooKeeper().getData(ServiceUnitZkUtils.path(bundle), (Watcher) null, (Stat) null);
            Assert.fail("it should fail as node is not present");
        } catch (KeeperException.NoNodeException e) {
        }
    }

    @Test(timeOut = 6000)
    public void testUnloadNamespaceBundleWithStuckTopic() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
        ConcurrentOpenHashMap topics = this.pulsar.getBrokerService().getTopics();
        Topic topic = (Topic) Mockito.spy(((Optional) ((CompletableFuture) topics.get("persistent://my-property/use/my-ns/my-topic1")).get()).get());
        topics.clear();
        topics.put("persistent://my-property/use/my-ns/my-topic1", CompletableFuture.completedFuture(Optional.of(topic)));
        ((Topic) Mockito.doAnswer(new Answer<CompletableFuture<Void>>() { // from class: org.apache.pulsar.broker.namespace.NamespaceServiceTest.2
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public CompletableFuture<Void> m36answer(InvocationOnMock invocationOnMock) throws Throwable {
                return new CompletableFuture<>();
            }
        }).when(topic)).close(false);
        NamespaceBundle bundle = this.pulsar.getNamespaceService().getBundle(TopicName.get("persistent://my-property/use/my-ns/my-topic1"));
        this.pulsar.getNamespaceService().unloadNamespaceBundle(bundle, 1L, TimeUnit.SECONDS).join();
        try {
            this.pulsar.getLocalZkCache().getZooKeeper().getData(ServiceUnitZkUtils.path(bundle), (Watcher) null, (Stat) null);
            Assert.fail("it should fail as node is not present");
        } catch (KeeperException.NoNodeException e) {
        }
        subscribe.close();
    }

    @Test
    public void testLoadReportDeserialize() throws Exception {
        LoadReport loadReport = new LoadReport((String) null, (String) null, "http://localhost:8000", (String) null);
        LocalBrokerData localBrokerData = new LocalBrokerData((String) null, (String) null, "http://localhost:3000", (String) null);
        URI uri = new URI("http://localhost:8000");
        URI uri2 = new URI("http://localhost:3000");
        String format = String.format("%s/%s:%s", "/loadbalance/brokers", uri.getHost(), Integer.valueOf(uri.getPort()));
        String format2 = String.format("%s/%s:%s", "/loadbalance/brokers", uri2.getHost(), Integer.valueOf(uri2.getPort()));
        ZkUtils.createFullPathOptimistic(this.pulsar.getZkClient(), format, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(loadReport), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        ZkUtils.createFullPathOptimistic(this.pulsar.getZkClient(), format2, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(localBrokerData), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        LookupResult lookupResult = (LookupResult) this.pulsar.getNamespaceService().createLookupResult("http://localhost:8000", false, (String) null).get();
        ((LoadManager) this.pulsar.getLoadManager().getAndSet(new ModularLoadManagerWrapper(new ModularLoadManagerImpl()))).stop();
        LookupResult lookupResult2 = (LookupResult) this.pulsar.getNamespaceService().createLookupResult("http://localhost:3000", false, (String) null).get();
        Assert.assertEquals(lookupResult.getLookupData().getBrokerUrl(), "http://localhost:8000");
        Assert.assertEquals(lookupResult2.getLookupData().getBrokerUrl(), "http://localhost:3000");
        System.out.println(lookupResult2);
    }

    @Test
    public void testCreateLookupResult() throws Exception {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("listenerName", AdvertisedListener.builder().brokerServiceUrl(new URI("pulsar://localhost:7000")).brokerServiceUrlTls(new URI("pulsar://localhost:8000")).build());
        LocalBrokerData localBrokerData = new LocalBrokerData((String) null, (String) null, "pulsar://localhost:6650", (String) null, newHashMap);
        URI uri = new URI("pulsar://localhost:6650");
        ZkUtils.createFullPathOptimistic(this.pulsar.getZkClient(), String.format("%s/%s:%s", "/loadbalance/brokers", uri.getHost(), Integer.valueOf(uri.getPort())), ObjectMapperFactory.getThreadLocal().writeValueAsBytes(localBrokerData), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        LookupResult lookupResult = (LookupResult) this.pulsar.getNamespaceService().createLookupResult("pulsar://localhost:6650", false, (String) null).get();
        LookupResult lookupResult2 = (LookupResult) this.pulsar.getNamespaceService().createLookupResult("pulsar://localhost:6650", false, "listenerName").get();
        Assert.assertEquals(lookupResult.getLookupData().getBrokerUrl(), "pulsar://localhost:6650");
        Assert.assertEquals(lookupResult2.getLookupData().getBrokerUrl(), "pulsar://localhost:7000");
        Assert.assertEquals(lookupResult2.getLookupData().getBrokerUrlTls(), "pulsar://localhost:8000");
        System.out.println(lookupResult2);
    }

    @Test
    public void testCreateNamespaceWithDefaultNumberOfBundles() throws Exception {
        OwnershipCache ownershipCache = (OwnershipCache) Mockito.spy(this.pulsar.getNamespaceService().getOwnershipCache());
        ((OwnershipCache) Mockito.doReturn(CompletableFuture.completedFuture(null)).when(ownershipCache)).disableOwnership((NamespaceBundle) ArgumentMatchers.any(NamespaceBundle.class));
        Field declaredField = NamespaceService.class.getDeclaredField("ownershipCache");
        declaredField.setAccessible(true);
        declaredField.set(this.pulsar.getNamespaceService(), ownershipCache);
        NamespaceService namespaceService = this.pulsar.getNamespaceService();
        NamespaceName namespaceName = NamespaceName.get("pulsar/global/ns1");
        TopicName topicName = TopicName.get("persistent://pulsar/global/ns1/topic-1");
        NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(namespaceName);
        NamespaceBundle findBundle = bundles.findBundle(topicName);
        try {
            namespaceService.splitAndOwnBundle(findBundle, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO).get();
        } catch (Exception e) {
            Assert.fail("split bundle failed", e);
        }
        NamespaceBundleFactory namespaceBundleFactory = this.pulsar.getNamespaceService().getNamespaceBundleFactory();
        NamespaceBundles bundles2 = namespaceBundleFactory.getBundles(namespaceName);
        Assert.assertNotNull(bundles2);
        List bundles3 = bundles2.getBundles();
        Assert.assertNotNull(bundles);
        Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles = splitBundles(NamespaceBundleFactory.createFactory(this.pulsar, Hashing.crc32()), namespaceName, bundles, findBundle);
        Assert.assertNotNull(splitBundles);
        HashSet hashSet = new HashSet((Collection) splitBundles.getRight());
        hashSet.removeAll(bundles3);
        Assert.assertTrue(hashSet.isEmpty());
        Policies policies = (Policies) ObjectMapperFactory.getThreadLocal().readValue(this.pulsar.getLocalZkCache().getZooKeeper().getData(PulsarWebResource.joinPath(new String[]{"/admin/local-policies", namespaceName.toString()}), (Watcher) null, new Stat()), Policies.class);
        Assert.assertEquals(namespaceBundleFactory.getBundles(namespaceName, policies.bundles), bundles2);
        log.info("Policies: {}", policies);
        bundles3.forEach(namespaceBundle -> {
            try {
                Assert.assertEquals(((NamespaceEphemeralData) ObjectMapperFactory.getThreadLocal().readValue(this.pulsar.getLocalZkCache().getZooKeeper().getData(ServiceUnitZkUtils.path(namespaceBundle), (Watcher) null, new Stat()), NamespaceEphemeralData.class)).getNativeUrl(), this.pulsar.getSafeBrokerServiceUrl());
            } catch (Exception e2) {
                Assert.fail("failed to setup ownership", e2);
            }
        });
    }

    @Test
    public void testRemoveOwnershipAndSplitBundle() throws Exception {
        OwnershipCache ownershipCache = (OwnershipCache) Mockito.spy(this.pulsar.getNamespaceService().getOwnershipCache());
        ((OwnershipCache) Mockito.doReturn(CompletableFuture.completedFuture(null)).when(ownershipCache)).disableOwnership((NamespaceBundle) ArgumentMatchers.any(NamespaceBundle.class));
        Field declaredField = NamespaceService.class.getDeclaredField("ownershipCache");
        declaredField.setAccessible(true);
        declaredField.set(this.pulsar.getNamespaceService(), ownershipCache);
        NamespaceService namespaceService = this.pulsar.getNamespaceService();
        NamespaceName namespaceName = NamespaceName.get("pulsar/global/ns1");
        TopicName topicName = TopicName.get("persistent://pulsar/global/ns1/topic-1");
        try {
            namespaceService.splitAndOwnBundle(namespaceService.getNamespaceBundleFactory().getBundles(namespaceName).findBundle(topicName), false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO).get();
        } catch (Exception e) {
            Assert.fail("split bundle failed", e);
        }
        NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(namespaceName);
        Assert.assertNotNull(bundles);
        NamespaceBundle findBundle = bundles.findBundle(topicName);
        bundles.getBundles().stream().filter(namespaceBundle -> {
            return !namespaceBundle.equals(findBundle);
        }).forEach(namespaceBundle2 -> {
            try {
                ownershipCache.removeOwnership(namespaceBundle2).get();
            } catch (Exception e2) {
                Assert.fail("failed to remove ownership", e2);
            }
        });
        try {
            namespaceService.splitAndOwnBundle(findBundle, true, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO).get();
        } catch (Exception e2) {
            Assert.fail("split bundle failed", e2);
        }
    }

    private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory namespaceBundleFactory, NamespaceName namespaceName, NamespaceBundles namespaceBundles, NamespaceBundle namespaceBundle) throws Exception {
        Field declaredField = NamespaceBundleFactory.class.getDeclaredField("bundlesCache");
        declaredField.setAccessible(true);
        ((AsyncLoadingCache) declaredField.get(namespaceBundleFactory)).put(namespaceName, CompletableFuture.completedFuture(namespaceBundles));
        return namespaceBundleFactory.splitBundles(namespaceBundle, 2, (Long) null);
    }
}
