/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.admin;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BacklogQuotaManager;
import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class TopicPoliciesTest
extends MockedPulsarServiceBaseTest {
    private static final Logger log = LoggerFactory.getLogger(TopicPoliciesTest.class);
    private final String testTenant = "my-tenant";
    private final String testNamespace = "my-namespace";
    private final String myNamespace = "my-tenant/my-namespace";
    private final String testTopic = "persistent://my-tenant/my-namespace/test-set-backlog-quota";
    private final String persistenceTopic = "persistent://my-tenant/my-namespace/test-set-persistence";
    private final String topicPolicyEventsTopic = "persistent://my-tenant/my-namespace/__change_events";

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        this.conf.setDefaultNumberOfNamespaceBundles(1);
        this.conf.setMaxMessageSizeCheckIntervalInSeconds(1);
        super.internalSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        TenantInfoImpl tenantInfo = new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"role1", "role2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.tenants().createTenant(this.testTenant, (TenantInfo)tenantInfo);
        this.admin.namespaces().createNamespace("my-tenant/my-namespace", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.topics().createPartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", 2);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-tenant/my-namespace/test-set-backlog-quota").create();
        producer.close();
        this.waitForZooKeeperWatchers();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    public void cleanup() throws Exception {
        super.internalCleanup();
        this.resetConfig();
    }

    @Test
    public void testSetBacklogQuota() throws Exception {
        BacklogQuota backlogQuota = BacklogQuota.builder().limitSize(1024L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build();
        log.info("Backlog quota: {} will set to the topic: {}", (Object)backlogQuota, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().setBacklogQuota("persistent://my-tenant/my-namespace/test-set-backlog-quota", backlogQuota);
        log.info("Backlog quota set success on topic: {}", (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> Assert.assertEquals(this.admin.topics().getBacklogQuotaMap("persistent://my-tenant/my-namespace/test-set-backlog-quota").get(BacklogQuota.BacklogQuotaType.destination_storage), (Object)backlogQuota));
        BacklogQuotaManager backlogQuotaManager = this.pulsar.getBrokerService().getBacklogQuotaManager();
        BacklogQuotaImpl backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota"));
        log.info("Backlog quota {} in backlog quota manager on topic: {}", (Object)backlogQuotaInManager, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Assert.assertEquals((Object)backlogQuota, (Object)backlogQuotaInManager);
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testRemoveBacklogQuota() throws Exception {
        BacklogQuota backlogQuota = BacklogQuota.builder().limitSize(1024L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build();
        log.info("Backlog quota: {} will set to the topic: {}", (Object)backlogQuota, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().setBacklogQuota("persistent://my-tenant/my-namespace/test-set-backlog-quota", backlogQuota);
        log.info("Backlog quota set success on topic: {}", (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> Assert.assertEquals(this.admin.topics().getBacklogQuotaMap("persistent://my-tenant/my-namespace/test-set-backlog-quota").get(BacklogQuota.BacklogQuotaType.destination_storage), (Object)backlogQuota));
        BacklogQuotaManager backlogQuotaManager = this.pulsar.getBrokerService().getBacklogQuotaManager();
        BacklogQuotaImpl backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota"));
        log.info("Backlog quota {} in backlog quota manager on topic: {}", (Object)backlogQuotaInManager, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Assert.assertEquals((Object)backlogQuota, (Object)backlogQuotaInManager);
        this.admin.topics().removeBacklogQuota("persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> Assert.assertNull(this.admin.topics().getBacklogQuotaMap("persistent://my-tenant/my-namespace/test-set-backlog-quota").get(BacklogQuota.BacklogQuotaType.destination_storage)));
        backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota"));
        log.info("Backlog quota {} in backlog quota manager on topic: {} after remove", (Object)backlogQuotaInManager, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Assert.assertEquals((Object)backlogQuotaManager.getDefaultQuota(), (Object)backlogQuotaInManager);
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testCheckBacklogQuota() throws Exception {
        RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10);
        String namespace = TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota").getNamespace();
        this.admin.namespaces().setRetention(namespace, retentionPolicies);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.namespaces().getRetention(namespace), (Object)retentionPolicies));
        BacklogQuota backlogQuota = BacklogQuota.builder().limitSize(0xA00000L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build();
        log.info("Backlog quota: {} will set to the topic: {}", (Object)backlogQuota, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        try {
            this.admin.topics().setBacklogQuota("persistent://my-tenant/my-namespace/test-set-backlog-quota", backlogQuota);
            Assert.fail();
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((int)e.getStatusCode(), (int)412);
        }
        backlogQuota = BacklogQuota.builder().limitSize(0xA00001L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build();
        log.info("Backlog quota: {} will set to the topic: {}", (Object)backlogQuota, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        try {
            this.admin.topics().setBacklogQuota("persistent://my-tenant/my-namespace/test-set-backlog-quota", backlogQuota);
            Assert.fail();
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((int)e.getStatusCode(), (int)412);
        }
        backlogQuota = BacklogQuota.builder().limitSize(0x9FFFFFL).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build();
        log.info("Backlog quota: {} will set to the topic: {}", (Object)backlogQuota, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        this.admin.topics().setBacklogQuota("persistent://my-tenant/my-namespace/test-set-backlog-quota", backlogQuota);
        BacklogQuota finalBacklogQuota = backlogQuota;
        Awaitility.await().untilAsserted(() -> Assert.assertEquals(this.admin.topics().getBacklogQuotaMap("persistent://my-tenant/my-namespace/test-set-backlog-quota").get(BacklogQuota.BacklogQuotaType.destination_storage), (Object)finalBacklogQuota));
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test(timeOut=20000L)
    public void testGetBacklogQuotaApplied() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        Assert.assertEquals((Map)this.admin.topics().getBacklogQuotaMap(topic), (Map)Maps.newHashMap());
        Assert.assertEquals((Map)this.admin.namespaces().getBacklogQuotaMap("my-tenant/my-namespace"), (Map)Maps.newHashMap());
        Map<BacklogQuota.BacklogQuotaType, BacklogQuota> brokerQuotaMap = ConfigHelper.backlogQuotaMap(this.conf);
        Assert.assertEquals((Map)this.admin.topics().getBacklogQuotaMap(topic, true), brokerQuotaMap);
        BacklogQuota namespaceQuota = BacklogQuota.builder().limitSize(30L).retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception).build();
        this.admin.namespaces().setBacklogQuota("my-tenant/my-namespace", namespaceQuota);
        Awaitility.await().untilAsserted(() -> Assert.assertFalse((boolean)this.admin.namespaces().getBacklogQuotaMap("my-tenant/my-namespace").isEmpty()));
        HashMap namespaceQuotaMap = Maps.newHashMap();
        namespaceQuotaMap.put(BacklogQuota.BacklogQuotaType.destination_storage, namespaceQuota);
        Assert.assertEquals((Map)this.admin.topics().getBacklogQuotaMap(topic, true), (Map)namespaceQuotaMap);
        BacklogQuota topicQuota = BacklogQuota.builder().limitSize(40L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build();
        this.admin.topics().setBacklogQuota(topic, topicQuota);
        Awaitility.await().untilAsserted(() -> Assert.assertFalse((boolean)this.admin.topics().getBacklogQuotaMap(topic).isEmpty()));
        HashMap topicQuotaMap = Maps.newHashMap();
        topicQuotaMap.put(BacklogQuota.BacklogQuotaType.destination_storage, topicQuota);
        Assert.assertEquals((Map)this.admin.topics().getBacklogQuotaMap(topic, true), (Map)topicQuotaMap);
        this.admin.namespaces().removeBacklogQuota("my-tenant/my-namespace");
        this.admin.topics().removeBacklogQuota(topic);
        Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)this.admin.namespaces().getBacklogQuotaMap("my-tenant/my-namespace").isEmpty()));
        Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)this.admin.topics().getBacklogQuotaMap(topic).isEmpty()));
        Assert.assertEquals((Map)this.admin.topics().getBacklogQuotaMap(topic, true), brokerQuotaMap);
    }

    @Test
    public void testCheckBacklogQuotaFailed() throws Exception {
        RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10);
        String namespace = TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota").getNamespace();
        this.admin.namespaces().setRetention(namespace, retentionPolicies);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.namespaces().getRetention(namespace), (Object)retentionPolicies));
        BacklogQuota backlogQuota = BacklogQuota.builder().limitSize(0xA00000L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build();
        try {
            this.admin.topics().setBacklogQuota("persistent://my-tenant/my-namespace/test-set-backlog-quota", backlogQuota);
            Assert.fail();
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((int)e.getStatusCode(), (int)412);
        }
        Awaitility.await().atLeast(1L, TimeUnit.SECONDS);
        Assert.assertNull(this.admin.topics().getBacklogQuotaMap("persistent://my-tenant/my-namespace/test-set-backlog-quota").get(BacklogQuota.BacklogQuotaType.destination_storage));
    }

    @Test
    public void testCheckRetention() throws Exception {
        BacklogQuota backlogQuota = BacklogQuota.builder().limitSize(0xA00000L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().setBacklogQuota("persistent://my-tenant/my-namespace/test-set-backlog-quota", backlogQuota);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals(this.admin.topics().getBacklogQuotaMap("persistent://my-tenant/my-namespace/test-set-backlog-quota").get(BacklogQuota.BacklogQuotaType.destination_storage), (Object)backlogQuota));
        RetentionPolicies retention = new RetentionPolicies(10, 10);
        log.info("Retention: {} will set to the topic: {}", (Object)retention, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        try {
            this.admin.topics().setRetention("persistent://my-tenant/my-namespace/test-set-backlog-quota", retention);
            Assert.fail();
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((int)e.getStatusCode(), (int)412);
        }
        retention = new RetentionPolicies(10, 9);
        log.info("Retention: {} will set to the topic: {}", (Object)retention, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        try {
            this.admin.topics().setRetention("persistent://my-tenant/my-namespace/test-set-backlog-quota", retention);
            Assert.fail();
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((int)e.getStatusCode(), (int)412);
        }
        retention = new RetentionPolicies(10, 12);
        log.info("Backlog quota: {} will set to the topic: {}", (Object)backlogQuota, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        this.admin.topics().setRetention("persistent://my-tenant/my-namespace/test-set-backlog-quota", retention);
        RetentionPolicies finalRetention = retention;
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getRetention("persistent://my-tenant/my-namespace/test-set-backlog-quota"), (Object)finalRetention));
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testSetRetention() throws Exception {
        RetentionPolicies retention = new RetentionPolicies(60, 1024);
        log.info("Retention: {} will set to the topic: {}", (Object)retention, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().setRetention("persistent://my-tenant/my-namespace/test-set-backlog-quota", retention);
        log.info("Retention set success on topic: {}", (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getRetention("persistent://my-tenant/my-namespace/test-set-backlog-quota"), (Object)retention));
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testRemoveRetention() throws Exception {
        RetentionPolicies retention = new RetentionPolicies(60, 1024);
        log.info("Retention: {} will set to the topic: {}", (Object)retention, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().setRetention("persistent://my-tenant/my-namespace/test-set-backlog-quota", retention);
        log.info("Retention set success on topic: {}", (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getRetention("persistent://my-tenant/my-namespace/test-set-backlog-quota"), (Object)retention));
        this.admin.topics().removeRetention("persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getRetention("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test(timeOut=10000L)
    public void testRetentionAppliedApi() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        RetentionPolicies brokerPolicies = new RetentionPolicies(this.conf.getDefaultRetentionTimeInMinutes(), this.conf.getDefaultRetentionSizeInMB());
        Assert.assertEquals((Object)this.admin.topics().getRetention(topic, true), (Object)brokerPolicies);
        RetentionPolicies namespacePolicies = new RetentionPolicies(10, 20);
        this.admin.namespaces().setRetention("my-tenant/my-namespace", namespacePolicies);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getRetention(topic, true), (Object)namespacePolicies));
        RetentionPolicies topicPolicies = new RetentionPolicies(20, 30);
        this.admin.topics().setRetention(topic, topicPolicies);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getRetention(topic, true), (Object)topicPolicies));
        this.admin.topics().removeRetention(topic);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getRetention(topic, true), (Object)namespacePolicies));
        this.admin.namespaces().removeRetention("my-tenant/my-namespace");
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getRetention(topic, true), (Object)brokerPolicies));
    }

    @Test(timeOut=20000L)
    public void testGetSubDispatchRateApplied() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        Assert.assertNull((Object)this.admin.topics().getSubscriptionDispatchRate(topic));
        Assert.assertNull((Object)this.admin.namespaces().getSubscriptionDispatchRate("my-tenant/my-namespace"));
        DispatchRate brokerDispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(this.conf.getDispatchThrottlingRatePerSubscriptionInMsg()).dispatchThrottlingRateInByte(this.conf.getDispatchThrottlingRatePerSubscriptionInByte()).ratePeriodInSecond(1).build();
        Assert.assertEquals((Object)this.admin.topics().getSubscriptionDispatchRate(topic, true), (Object)brokerDispatchRate);
        DispatchRate namespaceDispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(10).dispatchThrottlingRateInByte(11L).ratePeriodInSecond(12).build();
        this.admin.namespaces().setSubscriptionDispatchRate("my-tenant/my-namespace", namespaceDispatchRate);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.namespaces().getSubscriptionDispatchRate("my-tenant/my-namespace")));
        Assert.assertEquals((Object)this.admin.topics().getSubscriptionDispatchRate(topic, true), (Object)namespaceDispatchRate);
        DispatchRate topicDispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(20).dispatchThrottlingRateInByte(21L).ratePeriodInSecond(12).build();
        this.admin.topics().setSubscriptionDispatchRate(topic, topicDispatchRate);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getSubscriptionDispatchRate(topic)));
        Assert.assertEquals((Object)this.admin.topics().getSubscriptionDispatchRate(topic, true), (Object)topicDispatchRate);
        this.admin.namespaces().removeSubscriptionDispatchRate("my-tenant/my-namespace");
        this.admin.topics().removeSubscriptionDispatchRate(topic);
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.namespaces().getSubscriptionDispatchRate("my-tenant/my-namespace")));
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getSubscriptionDispatchRate(topic)));
        Assert.assertEquals((Object)this.admin.topics().getSubscriptionDispatchRate(topic, true), (Object)brokerDispatchRate);
    }

    @Test(timeOut=20000L)
    public void testRetentionPriority() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        Assert.assertNull((Object)this.admin.topics().getRetention(topic));
        Assert.assertNull((Object)this.admin.namespaces().getRetention("my-tenant/my-namespace"));
        PersistentTopic persistentTopic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topic).get()).get();
        Method shouldTopicBeRetained = PersistentTopic.class.getDeclaredMethod("shouldTopicBeRetained", new Class[0]);
        shouldTopicBeRetained.setAccessible(true);
        Field lastActive = PersistentTopic.class.getSuperclass().getDeclaredField("lastActive");
        lastActive.setAccessible(true);
        lastActive.setLong(persistentTopic, System.nanoTime() - TimeUnit.MINUTES.toNanos(2L));
        Assert.assertFalse((boolean)((Boolean)shouldTopicBeRetained.invoke((Object)persistentTopic, new Object[0])));
        RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1);
        this.admin.namespaces().setRetention("my-tenant/my-namespace", retentionPolicies);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.namespaces().getRetention("my-tenant/my-namespace")));
        Assert.assertFalse((boolean)((Boolean)shouldTopicBeRetained.invoke((Object)persistentTopic, new Object[0])));
        this.admin.topics().setRetention(topic, new RetentionPolicies(3, 1));
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getRetention(topic)));
        Assert.assertTrue((boolean)((Boolean)shouldTopicBeRetained.invoke((Object)persistentTopic, new Object[0])));
        this.admin.topics().setRetention(topic, new RetentionPolicies(0, 0));
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((long)this.admin.topics().getRetention(topic).getRetentionSizeInMB(), (long)0L));
        Assert.assertFalse((boolean)((Boolean)shouldTopicBeRetained.invoke((Object)persistentTopic, new Object[0])));
        this.admin.topics().removeRetention(topic);
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getRetention(topic)));
        Assert.assertFalse((boolean)((Boolean)shouldTopicBeRetained.invoke((Object)persistentTopic, new Object[0])));
        this.admin.namespaces().setRetention("my-tenant/my-namespace", new RetentionPolicies(0, 0));
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.namespaces().getRetention("my-tenant/my-namespace")));
        Assert.assertFalse((boolean)((Boolean)shouldTopicBeRetained.invoke((Object)persistentTopic, new Object[0])));
        this.admin.namespaces().setRetention("my-tenant/my-namespace", new RetentionPolicies(1, 1));
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.namespaces().getRetention("my-tenant/my-namespace")));
        Assert.assertFalse((boolean)((Boolean)shouldTopicBeRetained.invoke((Object)persistentTopic, new Object[0])));
        this.admin.namespaces().removeRetention("my-tenant/my-namespace");
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.namespaces().getRetention("my-tenant/my-namespace")));
        Assert.assertFalse((boolean)((Boolean)shouldTopicBeRetained.invoke((Object)persistentTopic, new Object[0])));
    }

    @Test(timeOut=20000L)
    public void testGetPersistenceApplied() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        Assert.assertNull((Object)this.admin.topics().getPersistence(topic));
        Assert.assertNull((Object)this.admin.namespaces().getPersistence("my-tenant/my-namespace"));
        PersistencePolicies brokerPolicy = new PersistencePolicies(this.pulsar.getConfiguration().getManagedLedgerDefaultEnsembleSize(), this.pulsar.getConfiguration().getManagedLedgerDefaultWriteQuorum(), this.pulsar.getConfiguration().getManagedLedgerDefaultAckQuorum(), this.pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
        Assert.assertEquals((Object)this.admin.topics().getPersistence(topic, true), (Object)brokerPolicy);
        PersistencePolicies namespacePolicy = new PersistencePolicies(5, 4, 3, 2.0);
        this.admin.namespaces().setPersistence("my-tenant/my-namespace", namespacePolicy);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.namespaces().getPersistence("my-tenant/my-namespace")));
        Assert.assertEquals((Object)this.admin.topics().getPersistence(topic, true), (Object)namespacePolicy);
        PersistencePolicies topicPolicy = new PersistencePolicies(4, 3, 2, 1.0);
        this.admin.topics().setPersistence(topic, topicPolicy);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getPersistence(topic)));
        Assert.assertEquals((Object)this.admin.topics().getPersistence(topic, true), (Object)topicPolicy);
        this.admin.namespaces().removePersistence("my-tenant/my-namespace");
        this.admin.topics().removePersistence(topic);
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.namespaces().getPersistence("my-tenant/my-namespace")));
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getPersistence(topic)));
        Assert.assertEquals((Object)this.admin.topics().getPersistence(topic, true), (Object)brokerPolicy);
    }

    @Test
    public void testCheckPersistence() throws Exception {
        PersistencePolicies persistencePolicies = new PersistencePolicies(6, 2, 2, 0.0);
        log.info("PersistencePolicies: {} will set to the topic: {}", (Object)persistencePolicies, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        try {
            this.admin.topics().setPersistence("persistent://my-tenant/my-namespace/test-set-backlog-quota", persistencePolicies);
            Assert.fail();
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((int)e.getStatusCode(), (int)400);
        }
        persistencePolicies = new PersistencePolicies(2, 6, 2, 0.0);
        log.info("PersistencePolicies: {} will set to the topic: {}", (Object)persistencePolicies, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        try {
            this.admin.topics().setPersistence("persistent://my-tenant/my-namespace/test-set-backlog-quota", persistencePolicies);
            Assert.fail();
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((int)e.getStatusCode(), (int)400);
        }
        persistencePolicies = new PersistencePolicies(2, 2, 6, 0.0);
        log.info("PersistencePolicies: {} will set to the topic: {}", (Object)persistencePolicies, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        try {
            this.admin.topics().setPersistence("persistent://my-tenant/my-namespace/test-set-backlog-quota", persistencePolicies);
            Assert.fail();
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((int)e.getStatusCode(), (int)400);
        }
        persistencePolicies = new PersistencePolicies(1, 2, 2, 0.0);
        log.info("PersistencePolicies: {} will set to the topic: {}", (Object)persistencePolicies, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        try {
            this.admin.topics().setPersistence("persistent://my-tenant/my-namespace/test-set-backlog-quota", persistencePolicies);
            Assert.fail();
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((int)e.getStatusCode(), (int)400);
        }
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testSetPersistence() throws Exception {
        PersistencePolicies persistencePoliciesForNamespace = new PersistencePolicies(2, 2, 2, 0.3);
        this.admin.namespaces().setPersistence("my-tenant/my-namespace", persistencePoliciesForNamespace);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.namespaces().getPersistence("my-tenant/my-namespace"), (Object)persistencePoliciesForNamespace));
        PersistencePolicies persistencePolicies = new PersistencePolicies(3, 3, 3, 0.1);
        log.info("PersistencePolicies: {} will set to the topic: {}", (Object)persistencePolicies, (Object)"persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().createNonPartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence");
        this.admin.topics().setPersistence("persistent://my-tenant/my-namespace/test-set-persistence", persistencePolicies);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getPersistence("persistent://my-tenant/my-namespace/test-set-persistence"), (Object)persistencePolicies));
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).subscriptionName("test").subscribe();
        this.admin.topics().unload("persistent://my-tenant/my-namespace/test-set-persistence");
        Topic t = (Topic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-tenant/my-namespace/test-set-persistence").get();
        PersistentTopic persistentTopic = (PersistentTopic)t;
        Awaitility.await().untilAsserted(() -> {
            ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
            Assert.assertEquals((int)managedLedgerConfig.getEnsembleSize(), (int)3);
            Assert.assertEquals((int)managedLedgerConfig.getWriteQuorumSize(), (int)3);
            Assert.assertEquals((int)managedLedgerConfig.getAckQuorumSize(), (int)3);
            Assert.assertEquals((double)managedLedgerConfig.getThrottleMarkDelete(), (double)0.1);
        });
        PersistencePolicies getPersistencePolicies = this.admin.topics().getPersistence("persistent://my-tenant/my-namespace/test-set-persistence");
        log.info("PersistencePolicies: {} will set to the topic: {}", (Object)persistencePolicies, (Object)"persistent://my-tenant/my-namespace/test-set-persistence");
        Assert.assertEquals((Object)getPersistencePolicies, (Object)persistencePolicies);
        consumer.close();
    }

    @Test
    public void testGetDispatchRateApplied() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        Assert.assertNull((Object)this.admin.topics().getDispatchRate(topic));
        Assert.assertNull((Object)this.admin.namespaces().getDispatchRate("my-tenant/my-namespace"));
        DispatchRate brokerDispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(this.conf.getDispatchThrottlingRatePerTopicInMsg()).dispatchThrottlingRateInByte(this.conf.getDispatchThrottlingRatePerTopicInByte()).ratePeriodInSecond(1).build();
        Assert.assertEquals((Object)this.admin.topics().getDispatchRate(topic, true), (Object)brokerDispatchRate);
        DispatchRate namespaceDispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(10).dispatchThrottlingRateInByte(11L).ratePeriodInSecond(12).build();
        this.admin.namespaces().setDispatchRate("my-tenant/my-namespace", namespaceDispatchRate);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.namespaces().getDispatchRate("my-tenant/my-namespace")));
        Assert.assertEquals((Object)this.admin.topics().getDispatchRate(topic, true), (Object)namespaceDispatchRate);
        DispatchRate topicDispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(20).dispatchThrottlingRateInByte(21L).ratePeriodInSecond(22).build();
        this.admin.topics().setDispatchRate(topic, topicDispatchRate);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getDispatchRate(topic)));
        Assert.assertEquals((Object)this.admin.topics().getDispatchRate(topic, true), (Object)topicDispatchRate);
        this.admin.namespaces().removeDispatchRate("my-tenant/my-namespace");
        this.admin.topics().removeDispatchRate(topic);
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.namespaces().getDispatchRate("my-tenant/my-namespace")));
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getDispatchRate(topic)));
        Assert.assertEquals((Object)this.admin.topics().getDispatchRate(topic, true), (Object)brokerDispatchRate);
    }

    @Test
    public void testRemovePersistence() throws Exception {
        PersistencePolicies persistencePoliciesForNamespace = new PersistencePolicies(2, 2, 2, 0.3);
        this.admin.namespaces().setPersistence("my-tenant/my-namespace", persistencePoliciesForNamespace);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.namespaces().getPersistence("my-tenant/my-namespace"), (Object)persistencePoliciesForNamespace));
        PersistencePolicies persistencePolicies = new PersistencePolicies(3, 3, 3, 0.1);
        log.info("PersistencePolicies: {} will set to the topic: {}", (Object)persistencePolicies, (Object)"persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().createNonPartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence");
        this.admin.topics().setPersistence("persistent://my-tenant/my-namespace/test-set-persistence", persistencePolicies);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getPersistence("persistent://my-tenant/my-namespace/test-set-persistence"), (Object)persistencePolicies));
        this.admin.topics().removePersistence("persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getPersistence("persistent://my-tenant/my-namespace/test-set-persistence")));
        this.admin.lookups().lookupTopic("persistent://my-tenant/my-namespace/test-set-persistence");
        Topic t = (Topic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-tenant/my-namespace/test-set-persistence").get();
        PersistentTopic persistentTopic = (PersistentTopic)t;
        ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
        Assert.assertEquals((int)managedLedgerConfig.getEnsembleSize(), (int)2);
        Assert.assertEquals((int)managedLedgerConfig.getWriteQuorumSize(), (int)2);
        Assert.assertEquals((int)managedLedgerConfig.getAckQuorumSize(), (int)2);
        Assert.assertEquals((double)managedLedgerConfig.getThrottleMarkDelete(), (double)0.3);
    }

    @Test
    public void testCheckMaxProducers() throws Exception {
        int maxProducers = -1;
        log.info("MaxProducers: {} will set to the topic: {}", (Object)maxProducers, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        try {
            this.admin.topics().setMaxProducers("persistent://my-tenant/my-namespace/test-set-backlog-quota", maxProducers);
            Assert.fail();
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((int)e.getStatusCode(), (int)412);
        }
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testGetMaxProducerApplied() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        Assert.assertNull((Object)this.admin.topics().getMaxProducers(topic));
        Assert.assertNull((Object)this.admin.namespaces().getMaxProducersPerTopic("my-tenant/my-namespace"));
        Assert.assertEquals((int)this.admin.topics().getMaxProducers(topic, true), (int)this.conf.getMaxProducersPerTopic());
        this.admin.namespaces().setMaxProducersPerTopic("my-tenant/my-namespace", 7);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.namespaces().getMaxProducersPerTopic("my-tenant/my-namespace")));
        Assert.assertEquals((int)this.admin.topics().getMaxProducers(topic, true), (int)7);
        this.admin.topics().setMaxProducers(topic, 1000);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getMaxProducers(topic)));
        Assert.assertEquals((int)this.admin.topics().getMaxProducers(topic, true), (int)1000);
        this.admin.namespaces().removeMaxProducersPerTopic("my-tenant/my-namespace");
        this.admin.topics().removeMaxProducers(topic);
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.namespaces().getMaxProducersPerTopic("my-tenant/my-namespace")));
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getMaxProducers(topic)));
        Assert.assertEquals((int)this.admin.topics().getMaxProducers(topic, true), (int)this.conf.getMaxProducersPerTopic());
    }

    @Test
    public void testSetMaxProducers() throws Exception {
        Integer maxProducers = 2;
        log.info("MaxProducers: {} will set to the topic: {}", (Object)maxProducers, (Object)"persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().createPartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", 2);
        this.admin.topics().setMaxProducers("persistent://my-tenant/my-namespace/test-set-persistence", maxProducers.intValue());
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getMaxProducers("persistent://my-tenant/my-namespace/test-set-persistence"), (Object)maxProducers));
        Producer producer1 = this.pulsarClient.newProducer().topic("persistent://my-tenant/my-namespace/test-set-persistence").create();
        Producer producer2 = this.pulsarClient.newProducer().topic("persistent://my-tenant/my-namespace/test-set-persistence").create();
        Producer producer3 = null;
        try {
            producer3 = this.pulsarClient.newProducer().topic("persistent://my-tenant/my-namespace/test-set-persistence").create();
            Assert.fail();
        }
        catch (PulsarClientException e) {
            log.info("Topic reached max producers limit on topic level.");
        }
        Assert.assertNotNull((Object)producer1);
        Assert.assertNotNull((Object)producer2);
        Assert.assertNull((Object)producer3);
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", true);
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testRemoveMaxProducers() throws Exception {
        Integer maxProducers = 2;
        log.info("MaxProducers: {} will set to the topic: {}", (Object)maxProducers, (Object)"persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().createPartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", 2);
        this.admin.topics().setMaxProducers("persistent://my-tenant/my-namespace/test-set-persistence", maxProducers.intValue());
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getMaxProducers("persistent://my-tenant/my-namespace/test-set-persistence"), (Object)maxProducers));
        Producer producer1 = this.pulsarClient.newProducer().topic("persistent://my-tenant/my-namespace/test-set-persistence").create();
        Producer producer2 = this.pulsarClient.newProducer().topic("persistent://my-tenant/my-namespace/test-set-persistence").create();
        Producer producer3 = null;
        Producer producer4 = null;
        try {
            producer3 = this.pulsarClient.newProducer().topic("persistent://my-tenant/my-namespace/test-set-persistence").create();
            Assert.fail();
        }
        catch (PulsarClientException e) {
            log.info("Topic reached max producers limit on topic level.");
        }
        Assert.assertNotNull((Object)producer1);
        Assert.assertNotNull((Object)producer2);
        Assert.assertNull((Object)producer3);
        this.admin.topics().removeMaxProducers("persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getMaxProducers("persistent://my-tenant/my-namespace/test-set-persistence")));
        producer3 = this.pulsarClient.newProducer().topic("persistent://my-tenant/my-namespace/test-set-persistence").create();
        Assert.assertNotNull((Object)producer3);
        this.admin.namespaces().setMaxProducersPerTopic("my-tenant/my-namespace", 3);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)this.admin.namespaces().getMaxProducersPerTopic("my-tenant/my-namespace"), (int)3));
        log.info("MaxProducers: {} will set to the namespace: {}", (Object)3, (Object)"my-tenant/my-namespace");
        try {
            producer4 = this.pulsarClient.newProducer().topic("persistent://my-tenant/my-namespace/test-set-persistence").create();
            Assert.fail();
        }
        catch (PulsarClientException e) {
            log.info("Topic reached max producers limit on namespace level.");
        }
        Assert.assertNull((Object)producer4);
        producer1.close();
        producer2.close();
        producer3.close();
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", true);
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testGetSetDispatchRate() throws Exception {
        DispatchRate dispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(100).dispatchThrottlingRateInByte(1000L).ratePeriodInSecond(1).relativeToPublishRate(true).build();
        log.info("Dispatch Rate: {} will set to the topic: {}", (Object)dispatchRate, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().setDispatchRate("persistent://my-tenant/my-namespace/test-set-backlog-quota", dispatchRate);
        log.info("Dispatch Rate set success on topic: {}", (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getDispatchRate("persistent://my-tenant/my-namespace/test-set-backlog-quota"), (Object)dispatchRate));
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testRemoveDispatchRate() throws Exception {
        DispatchRate dispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(100).dispatchThrottlingRateInByte(1000L).ratePeriodInSecond(1).relativeToPublishRate(true).build();
        log.info("Dispatch Rate: {} will set to the topic: {}", (Object)dispatchRate, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().setDispatchRate("persistent://my-tenant/my-namespace/test-set-backlog-quota", dispatchRate);
        log.info("Dispatch Rate set success on topic: {}", (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getDispatchRate("persistent://my-tenant/my-namespace/test-set-backlog-quota"), (Object)dispatchRate));
        this.admin.topics().removeDispatchRate("persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getDispatchRate("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test(timeOut=20000L)
    public void testPolicyOverwrittenByNamespaceLevel() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.admin.topics().createNonPartitionedTopic(topic);
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        DispatchRate dispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(200).dispatchThrottlingRateInByte(20000L).ratePeriodInSecond(1).relativeToPublishRate(true).build();
        this.admin.namespaces().setDispatchRate("my-tenant/my-namespace", dispatchRate);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)this.admin.namespaces().getDispatchRate("my-tenant/my-namespace").getDispatchThrottlingRateInMsg(), (int)200));
        dispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(100).dispatchThrottlingRateInByte(10000L).ratePeriodInSecond(1).relativeToPublishRate(true).build();
        this.admin.topics().setDispatchRate(topic, dispatchRate);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getDispatchRate(topic)));
        dispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(300).dispatchThrottlingRateInByte(30000L).ratePeriodInSecond(2).relativeToPublishRate(true).build();
        this.admin.namespaces().setDispatchRate("my-tenant/my-namespace", dispatchRate);
        Awaitility.await().untilAsserted(() -> {
            DispatchRateLimiter limiter = (DispatchRateLimiter)((Topic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topic).get()).get()).getDispatchRateLimiter().get();
            Assert.assertEquals((long)limiter.getDispatchRateOnByte(), (long)10000L);
            Assert.assertEquals((long)limiter.getDispatchRateOnMsg(), (long)100L);
        });
        this.admin.topics().removeDispatchRate(topic);
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getDispatchRate(topic)));
        Awaitility.await().untilAsserted(() -> {
            DispatchRateLimiter limiter = (DispatchRateLimiter)((Topic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topic).get()).get()).getDispatchRateLimiter().get();
            Assert.assertEquals((long)limiter.getDispatchRateOnByte(), (long)30000L);
            Assert.assertEquals((long)limiter.getDispatchRateOnMsg(), (long)300L);
        });
    }

    @Test(timeOut=20000L)
    public void testRestart() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.admin.topics().createNonPartitionedTopic(topic);
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, 100, true);
        this.admin.namespaces().setInactiveTopicPolicies("my-tenant/my-namespace", inactiveTopicPolicies);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.namespaces().getInactiveTopicPolicies("my-tenant/my-namespace").getInactiveTopicDeleteMode(), (Object)InactiveTopicDeleteMode.delete_when_subscriptions_caught_up));
        RetentionPolicies retentionPolicies = new RetentionPolicies(10, -1);
        this.admin.namespaces().setRetention("my-tenant/my-namespace", retentionPolicies);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.namespaces().getRetention("my-tenant/my-namespace"), (Object)retentionPolicies));
        inactiveTopicPolicies = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 200, false);
        this.admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies);
        InactiveTopicPolicies finalInactiveTopicPolicies = inactiveTopicPolicies;
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getInactiveTopicPolicies(topic), (Object)finalInactiveTopicPolicies));
        RetentionPolicies finalRetentionPolicies = new RetentionPolicies(20, -1);
        this.admin.topics().setRetention(topic, finalRetentionPolicies);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getRetention(topic), (Object)finalRetentionPolicies));
        this.restartBroker();
        Producer producer = this.pulsarClient.newProducer().topic(topic).create();
        Awaitility.await().untilAsserted(() -> {
            PersistentTopic persistentTopic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topic).get()).get();
            ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
            Assert.assertEquals((Object)persistentTopic.getInactiveTopicPolicies(), (Object)finalInactiveTopicPolicies);
            Assert.assertEquals((long)managedLedgerConfig.getRetentionSizeInMB(), (long)finalRetentionPolicies.getRetentionSizeInMB());
            Assert.assertEquals((long)managedLedgerConfig.getRetentionTimeMillis(), (long)TimeUnit.MINUTES.toMillis(finalRetentionPolicies.getRetentionTimeInMinutes()));
        });
        producer.close();
    }

    @Test
    public void testGetSetSubscriptionDispatchRate() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.admin.topics().createNonPartitionedTopic(topic);
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        DispatchRate dispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(1000).dispatchThrottlingRateInByte(0x100000L).ratePeriodInSecond(1).build();
        log.info("Subscription Dispatch Rate: {} will set to the topic: {}", (Object)dispatchRate, (Object)topic);
        this.admin.topics().setSubscriptionDispatchRate(topic, dispatchRate);
        log.info("Subscription dispatch rate set success on topic: {}", (Object)topic);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getSubscriptionDispatchRate(topic), (Object)dispatchRate));
        String subscriptionName = "test_subscription_rate";
        Consumer consumer = this.pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(new String[]{topic}).subscribe();
        DispatchRateLimiter dispatchRateLimiter = (DispatchRateLimiter)((Topic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topic).get()).get()).getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
        Assert.assertNotNull((Object)dispatchRateLimiter);
        Assert.assertEquals((long)dispatchRateLimiter.getDispatchRateOnByte(), (long)dispatchRate.getDispatchThrottlingRateInByte());
        Assert.assertEquals((long)dispatchRateLimiter.getDispatchRateOnMsg(), (long)dispatchRate.getDispatchThrottlingRateInMsg());
        consumer.close();
        this.admin.topics().delete(topic, true);
    }

    @Test
    public void testGetSetSubscriptionDispatchRateAfterTopicLoaded() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.admin.topics().createNonPartitionedTopic(topic);
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        DispatchRate dispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(1000).dispatchThrottlingRateInByte(0x100000L).ratePeriodInSecond(1).build();
        log.info("Subscription Dispatch Rate: {} will set to the topic: {}", (Object)dispatchRate, (Object)topic);
        String subscriptionName = "test_subscription_rate";
        Consumer consumer = this.pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(new String[]{topic}).subscribe();
        this.admin.topics().setSubscriptionDispatchRate(topic, dispatchRate);
        log.info("Subscription dispatch rate set success on topic: {}", (Object)topic);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getSubscriptionDispatchRate(topic), (Object)dispatchRate));
        DispatchRateLimiter dispatchRateLimiter = (DispatchRateLimiter)((Topic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topic).get()).get()).getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
        Assert.assertNotNull((Object)dispatchRateLimiter);
        Assert.assertEquals((long)dispatchRateLimiter.getDispatchRateOnByte(), (long)dispatchRate.getDispatchThrottlingRateInByte());
        Assert.assertEquals((long)dispatchRateLimiter.getDispatchRateOnMsg(), (long)dispatchRate.getDispatchThrottlingRateInMsg());
        consumer.close();
        this.admin.topics().delete(topic, true);
    }

    @Test
    public void testRemoveSubscriptionDispatchRate() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.admin.topics().createNonPartitionedTopic(topic);
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        DispatchRate dispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(1000).dispatchThrottlingRateInByte(0x100000L).ratePeriodInSecond(1).build();
        log.info("Subscription Dispatch Rate: {} will set to the topic: {}", (Object)dispatchRate, (Object)topic);
        this.admin.topics().setSubscriptionDispatchRate(topic, dispatchRate);
        log.info("Subscription dispatch rate set success on topic: {}", (Object)topic);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getSubscriptionDispatchRate(topic), (Object)dispatchRate));
        String subscriptionName = "test_subscription_rate";
        Consumer consumer = this.pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(new String[]{topic}).subscribe();
        DispatchRateLimiter dispatchRateLimiter = (DispatchRateLimiter)((Topic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topic).get()).get()).getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
        Assert.assertNotNull((Object)dispatchRateLimiter);
        Assert.assertEquals((long)dispatchRateLimiter.getDispatchRateOnByte(), (long)dispatchRate.getDispatchThrottlingRateInByte());
        Assert.assertEquals((long)dispatchRateLimiter.getDispatchRateOnMsg(), (long)dispatchRate.getDispatchThrottlingRateInMsg());
        this.admin.topics().removeSubscriptionDispatchRate(topic);
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getSubscriptionDispatchRate(topic)));
        dispatchRateLimiter = (DispatchRateLimiter)((Topic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topic).get()).get()).getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
        Assert.assertNotEquals((Object)dispatchRateLimiter.getDispatchRateOnMsg(), (Object)dispatchRate.getDispatchThrottlingRateInByte());
        Assert.assertNotEquals((Object)dispatchRateLimiter.getDispatchRateOnByte(), (Object)dispatchRate.getDispatchThrottlingRateInByte());
        consumer.close();
        this.admin.topics().delete(topic, true);
    }

    @Test
    public void testSubscriptionDispatchRatePolicyOverwrittenNamespaceLevel() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.admin.topics().createNonPartitionedTopic(topic);
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        DispatchRate namespaceDispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(100).dispatchThrottlingRateInByte(0x100000L).ratePeriodInSecond(1).build();
        this.admin.namespaces().setSubscriptionDispatchRate("my-tenant/my-namespace", namespaceDispatchRate);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.namespaces().getSubscriptionDispatchRate("my-tenant/my-namespace"), (Object)namespaceDispatchRate));
        String subscriptionName = "test_subscription_rate";
        Consumer consumer = this.pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(new String[]{topic}).subscribe();
        DispatchRateLimiter dispatchRateLimiter = (DispatchRateLimiter)((Topic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topic).get()).get()).getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
        Assert.assertEquals((long)dispatchRateLimiter.getDispatchRateOnMsg(), (long)namespaceDispatchRate.getDispatchThrottlingRateInMsg());
        Assert.assertEquals((long)dispatchRateLimiter.getDispatchRateOnByte(), (long)namespaceDispatchRate.getDispatchThrottlingRateInByte());
        DispatchRate topicDispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(200).dispatchThrottlingRateInByte(0x200000L).ratePeriodInSecond(1).build();
        this.admin.topics().setSubscriptionDispatchRate(topic, topicDispatchRate);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getSubscriptionDispatchRate(topic), (Object)topicDispatchRate));
        dispatchRateLimiter = (DispatchRateLimiter)((Topic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topic).get()).get()).getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
        Assert.assertEquals((long)dispatchRateLimiter.getDispatchRateOnByte(), (long)topicDispatchRate.getDispatchThrottlingRateInByte());
        Assert.assertEquals((long)dispatchRateLimiter.getDispatchRateOnMsg(), (long)topicDispatchRate.getDispatchThrottlingRateInMsg());
        this.admin.topics().removeSubscriptionDispatchRate(topic);
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getSubscriptionDispatchRate(topic)));
        dispatchRateLimiter = (DispatchRateLimiter)((Topic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topic).get()).get()).getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
        Assert.assertEquals((long)dispatchRateLimiter.getDispatchRateOnByte(), (long)namespaceDispatchRate.getDispatchThrottlingRateInByte());
        Assert.assertEquals((long)dispatchRateLimiter.getDispatchRateOnMsg(), (long)namespaceDispatchRate.getDispatchThrottlingRateInMsg());
        consumer.close();
        this.admin.topics().delete(topic, true);
    }

    @Test
    public void testGetSetCompactionThreshold() throws Exception {
        Long compactionThreshold = 100000L;
        log.info("Compaction threshold: {} will set to the topic: {}", (Object)compactionThreshold, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().setCompactionThreshold("persistent://my-tenant/my-namespace/test-set-backlog-quota", compactionThreshold.longValue());
        log.info("Compaction threshold set success on topic: {}", (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getCompactionThreshold("persistent://my-tenant/my-namespace/test-set-backlog-quota"), (Object)compactionThreshold));
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testRemoveCompactionThreshold() throws Exception {
        Long compactionThreshold = 100000L;
        log.info("Compaction threshold: {} will set to the topic: {}", (Object)compactionThreshold, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().setCompactionThreshold("persistent://my-tenant/my-namespace/test-set-backlog-quota", compactionThreshold.longValue());
        log.info("Compaction threshold set success on topic: {}", (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getCompactionThreshold("persistent://my-tenant/my-namespace/test-set-backlog-quota"), (Object)compactionThreshold));
        this.admin.topics().removeCompactionThreshold("persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getCompactionThreshold("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testGetSetMaxConsumersPerSubscription() throws Exception {
        Integer maxConsumersPerSubscription = 10;
        log.info("MaxConsumersPerSubscription: {} will set to the topic: {}", (Object)maxConsumersPerSubscription, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().setMaxConsumersPerSubscription("persistent://my-tenant/my-namespace/test-set-backlog-quota", maxConsumersPerSubscription.intValue());
        log.info("MaxConsumersPerSubscription set success on topic: {}", (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getMaxConsumersPerSubscription("persistent://my-tenant/my-namespace/test-set-backlog-quota"), (Object)maxConsumersPerSubscription));
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testRemoveMaxConsumersPerSubscription() throws Exception {
        Integer maxConsumersPerSubscription = 10;
        log.info("MaxConsumersPerSubscription: {} will set to the topic: {}", (Object)maxConsumersPerSubscription, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().setMaxConsumersPerSubscription("persistent://my-tenant/my-namespace/test-set-backlog-quota", maxConsumersPerSubscription.intValue());
        log.info("MaxConsumersPerSubscription set success on topic: {}", (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getMaxConsumersPerSubscription("persistent://my-tenant/my-namespace/test-set-backlog-quota"), (Object)maxConsumersPerSubscription));
        this.admin.topics().removeMaxConsumersPerSubscription("persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getMaxConsumersPerSubscription("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testGetSetPublishRate() throws Exception {
        PublishRate publishRate = new PublishRate(10000, 0x500000L);
        log.info("Publish Rate: {} will set to the topic: {}", (Object)publishRate, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().setPublishRate("persistent://my-tenant/my-namespace/test-set-backlog-quota", publishRate);
        log.info("Publish Rate set success on topic: {}", (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getPublishRate("persistent://my-tenant/my-namespace/test-set-backlog-quota"), (Object)publishRate));
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testRemovePublishRate() throws Exception {
        PublishRate publishRate = new PublishRate(10000, 0x500000L);
        log.info("Publish Rate: {} will set to the topic: {}", (Object)publishRate, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().setPublishRate("persistent://my-tenant/my-namespace/test-set-backlog-quota", publishRate);
        log.info("Publish Rate set success on topic: {}", (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getPublishRate("persistent://my-tenant/my-namespace/test-set-backlog-quota"), (Object)publishRate));
        this.admin.topics().removePublishRate("persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getPublishRate("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testCheckMaxConsumers() throws Exception {
        Integer maxProducers = new Integer(-1);
        log.info("MaxConsumers: {} will set to the topic: {}", (Object)maxProducers, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        try {
            this.admin.topics().setMaxConsumers("persistent://my-tenant/my-namespace/test-set-backlog-quota", maxProducers.intValue());
            Assert.fail();
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((int)e.getStatusCode(), (int)412);
        }
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testGetMaxConsumersApplied() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        Assert.assertNull((Object)this.admin.topics().getMaxConsumers(topic));
        Assert.assertNull((Object)this.admin.namespaces().getMaxConsumersPerTopic("my-tenant/my-namespace"));
        Assert.assertEquals((int)this.admin.topics().getMaxConsumers(topic, true), (int)this.conf.getMaxConsumersPerTopic());
        this.admin.namespaces().setMaxConsumersPerTopic("my-tenant/my-namespace", 7);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.namespaces().getMaxConsumersPerTopic("my-tenant/my-namespace")));
        Assert.assertEquals((int)this.admin.topics().getMaxConsumers(topic, true), (int)7);
        this.admin.topics().setMaxConsumers(topic, 1000);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getMaxConsumers(topic)));
        Assert.assertEquals((int)this.admin.topics().getMaxConsumers(topic, true), (int)1000);
        this.admin.namespaces().removeMaxConsumersPerTopic("my-tenant/my-namespace");
        this.admin.topics().removeMaxConsumers(topic);
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.namespaces().getMaxConsumersPerTopic("my-tenant/my-namespace")));
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getMaxConsumers(topic)));
        Assert.assertEquals((int)this.admin.topics().getMaxConsumers(topic, true), (int)this.conf.getMaxConsumersPerTopic());
    }

    @Test
    public void testSetMaxConsumers() throws Exception {
        this.admin.namespaces().setMaxConsumersPerTopic("my-tenant/my-namespace", 1);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)this.admin.namespaces().getMaxConsumersPerTopic("my-tenant/my-namespace"), (int)1));
        log.info("MaxConsumers: {} will set to the namespace: {}", (Object)1, (Object)"my-tenant/my-namespace");
        Integer maxConsumers = 2;
        log.info("MaxConsumers: {} will set to the topic: {}", (Object)maxConsumers, (Object)"persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().createPartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", 2);
        this.admin.topics().setMaxConsumers("persistent://my-tenant/my-namespace/test-set-persistence", maxConsumers.intValue());
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getMaxConsumers("persistent://my-tenant/my-namespace/test-set-persistence"), (Object)maxConsumers));
        Consumer consumer1 = this.pulsarClient.newConsumer().subscriptionName("sub1").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).subscribe();
        Consumer consumer2 = this.pulsarClient.newConsumer().subscriptionName("sub2").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).subscribe();
        Consumer consumer3 = null;
        try {
            consumer3 = this.pulsarClient.newConsumer().subscriptionName("sub3").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).subscribe();
            Assert.fail();
        }
        catch (PulsarClientException e) {
            log.info("Topic reached max consumers limit");
        }
        Assert.assertNotNull((Object)consumer1);
        Assert.assertNotNull((Object)consumer2);
        Assert.assertNull(consumer3);
        consumer1.close();
        consumer2.close();
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", true);
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testRemoveMaxConsumers() throws Exception {
        Integer maxConsumers = 2;
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().createPartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", 2);
        this.admin.topics().setMaxConsumers("persistent://my-tenant/my-namespace/test-set-persistence", maxConsumers.intValue());
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getMaxConsumers("persistent://my-tenant/my-namespace/test-set-persistence"), (Object)maxConsumers));
        Consumer consumer1 = this.pulsarClient.newConsumer().subscriptionName("sub1").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).subscribe();
        Consumer consumer2 = this.pulsarClient.newConsumer().subscriptionName("sub2").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).subscribe();
        Consumer consumer3 = null;
        try {
            consumer3 = this.pulsarClient.newConsumer().subscriptionName("sub3").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).subscribe();
            Assert.fail();
        }
        catch (PulsarClientException e) {
            log.info("Topic reached max consumers limit");
        }
        Assert.assertNotNull((Object)consumer1);
        Assert.assertNotNull((Object)consumer2);
        Assert.assertNull(consumer3);
        this.admin.topics().removeMaxConsumers("persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getMaxConsumers("persistent://my-tenant/my-namespace/test-set-persistence")));
        consumer3 = this.pulsarClient.newConsumer().subscriptionName("sub3").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).subscribe();
        Assert.assertNotNull((Object)consumer3);
        this.admin.namespaces().setMaxConsumersPerTopic("my-tenant/my-namespace", 3);
        log.info("MaxConsumers: {} will set to the namespace: {}", (Object)3, (Object)"my-tenant/my-namespace");
        Consumer consumer4 = null;
        try {
            consumer4 = this.pulsarClient.newConsumer().subscriptionName("sub4").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).subscribe();
            Assert.fail();
        }
        catch (PulsarClientException e) {
            log.info("Topic reached max consumers limit on namespace level.");
        }
        Assert.assertNull(consumer4);
        consumer1.close();
        consumer2.close();
        consumer3.close();
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", true);
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testGetSetSubscribeRate() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", 2);
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        SubscribeRate subscribeRate1 = new SubscribeRate(1, 30);
        log.info("Subscribe Rate: {} will be set to the namespace: {}", (Object)subscribeRate1, (Object)"my-tenant/my-namespace");
        this.admin.namespaces().setSubscribeRate("my-tenant/my-namespace", subscribeRate1);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.namespaces().getSubscribeRate("my-tenant/my-namespace"), (Object)subscribeRate1));
        SubscribeRate subscribeRate2 = new SubscribeRate(2, 30);
        log.info("Subscribe Rate: {} will set to the topic: {}", (Object)subscribeRate2, (Object)"persistent://my-tenant/my-namespace/test-set-persistence");
        this.admin.topics().setSubscribeRate("persistent://my-tenant/my-namespace/test-set-persistence", subscribeRate2);
        log.info("Subscribe Rate set success on topic: {}", (Object)"persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getSubscribeRate("persistent://my-tenant/my-namespace/test-set-persistence"), (Object)subscribeRate2));
        PulsarClient pulsarClient1 = this.newPulsarClient(this.lookupUrl.toString(), 0);
        PulsarClient pulsarClient2 = this.newPulsarClient(this.lookupUrl.toString(), 0);
        PulsarClient pulsarClient3 = this.newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer consumer1 = pulsarClient1.newConsumer().subscriptionName("sub1").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).consumerName("test").subscribe();
        Assert.assertNotNull((Object)consumer1);
        consumer1.close();
        pulsarClient1.shutdown();
        Consumer consumer2 = pulsarClient2.newConsumer().subscriptionName("sub1").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).consumerName("test").subscribe();
        Assert.assertNotNull((Object)consumer2);
        consumer2.close();
        pulsarClient2.shutdown();
        Consumer consumer3 = null;
        try {
            consumer3 = pulsarClient3.newConsumer().subscriptionName("sub1").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).consumerName("test").subscribe();
            Assert.fail();
        }
        catch (PulsarClientException e) {
            log.info("subscribe rate reached max subscribe rate limit");
        }
        Assert.assertNull(consumer3);
        pulsarClient3.shutdown();
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", true);
    }

    @Test(timeOut=20000L)
    public void testGetSubscribeRateApplied() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        Assert.assertNull((Object)this.admin.topics().getSubscribeRate(topic));
        Assert.assertNull((Object)this.admin.namespaces().getSubscribeRate("my-tenant/my-namespace"));
        SubscribeRate brokerPolicy = new SubscribeRate(this.pulsar.getConfiguration().getSubscribeThrottlingRatePerConsumer(), this.pulsar.getConfiguration().getSubscribeRatePeriodPerConsumerInSecond());
        Assert.assertEquals((Object)this.admin.topics().getSubscribeRate(topic, true), (Object)brokerPolicy);
        SubscribeRate namespacePolicy = new SubscribeRate(10, 11);
        this.admin.namespaces().setSubscribeRate("my-tenant/my-namespace", namespacePolicy);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.namespaces().getSubscribeRate("my-tenant/my-namespace")));
        Assert.assertEquals((Object)this.admin.topics().getSubscribeRate(topic, true), (Object)namespacePolicy);
        SubscribeRate topicPolicy = new SubscribeRate(20, 21);
        this.admin.topics().setSubscribeRate(topic, topicPolicy);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getSubscribeRate(topic)));
        Assert.assertEquals((Object)this.admin.topics().getSubscribeRate(topic, true), (Object)topicPolicy);
        this.admin.namespaces().removeSubscribeRate("my-tenant/my-namespace");
        this.admin.topics().removeSubscribeRate(topic);
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.namespaces().getSubscribeRate("my-tenant/my-namespace")));
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getSubscribeRate(topic)));
        Assert.assertEquals((Object)this.admin.topics().getSubscribeRate(topic, true), (Object)brokerPolicy);
    }

    @Test(timeOut=30000L)
    public void testPriorityAndDisableMaxConsumersOnSub() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        int maxConsumerInBroker = 1;
        int maxConsumerInNs = 2;
        int maxConsumerInTopic = 4;
        String mySub = "my-sub";
        this.conf.setMaxConsumersPerSubscription(maxConsumerInBroker);
        this.pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        ArrayList<Consumer> consumerList = new ArrayList<Consumer>();
        ConsumerBuilder builder = this.pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Shared).topic(new String[]{topic}).subscriptionName(mySub);
        consumerList.add(builder.subscribe());
        try {
            builder.subscribe();
            Assert.fail((String)"should fail");
        }
        catch (PulsarClientException pulsarClientException) {
            // empty catch block
        }
        this.admin.namespaces().setMaxConsumersPerSubscription("my-tenant/my-namespace", maxConsumerInNs);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.namespaces().getMaxConsumersPerSubscription("my-tenant/my-namespace")));
        consumerList.add(builder.subscribe());
        try {
            builder.subscribe();
            Assert.fail((String)"should fail");
        }
        catch (PulsarClientException pulsarClientException) {
            // empty catch block
        }
        this.admin.namespaces().setMaxConsumersPerSubscription("my-tenant/my-namespace", 0);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)this.admin.namespaces().getMaxConsumersPerSubscription("my-tenant/my-namespace"), (int)0));
        consumerList.add(builder.subscribe());
        this.admin.topics().setMaxConsumersPerSubscription(topic, maxConsumerInTopic);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getMaxConsumersPerSubscription(topic)));
        consumerList.add(builder.subscribe());
        try {
            builder.subscribe();
            Assert.fail((String)"should fail");
        }
        catch (PulsarClientException pulsarClientException) {
            // empty catch block
        }
        this.admin.topics().removeMaxConsumersPerSubscription(topic);
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getMaxConsumersPerSubscription(topic)));
        consumerList.add(builder.subscribe());
        this.admin.namespaces().removeMaxConsumersPerSubscription("my-tenant/my-namespace");
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.namespaces().getMaxConsumersPerSubscription("my-tenant/my-namespace")));
        try {
            builder.subscribe();
            Assert.fail((String)"should fail");
        }
        catch (PulsarClientException pulsarClientException) {
            // empty catch block
        }
        for (Consumer consumer : consumerList) {
            consumer.close();
        }
    }

    @Test
    public void testRemoveSubscribeRate() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", 2);
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-persistence")));
        SubscribeRate subscribeRate = new SubscribeRate(2, 30);
        log.info("Subscribe Rate: {} will set to the topic: {}", (Object)subscribeRate, (Object)"persistent://my-tenant/my-namespace/test-set-persistence");
        this.admin.topics().setSubscribeRate("persistent://my-tenant/my-namespace/test-set-persistence", subscribeRate);
        log.info("Subscribe Rate set success on topic: {}", (Object)"persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getSubscribeRate("persistent://my-tenant/my-namespace/test-set-persistence"), (Object)subscribeRate));
        PulsarClient pulsarClient1 = this.newPulsarClient(this.lookupUrl.toString(), 0);
        PulsarClient pulsarClient2 = this.newPulsarClient(this.lookupUrl.toString(), 0);
        PulsarClient pulsarClient3 = this.newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer consumer1 = pulsarClient1.newConsumer().subscriptionName("sub1").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).consumerName("test").subscribe();
        Assert.assertNotNull((Object)consumer1);
        consumer1.close();
        pulsarClient1.shutdown();
        Consumer consumer2 = pulsarClient2.newConsumer().subscriptionName("sub1").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).consumerName("test").subscribe();
        Assert.assertNotNull((Object)consumer2);
        consumer2.close();
        pulsarClient2.shutdown();
        Consumer consumer3 = null;
        try {
            consumer3 = pulsarClient3.newConsumer().subscriptionName("sub1").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).consumerName("test").subscribe();
            Assert.fail();
        }
        catch (PulsarClientException e) {
            log.info("subscribe rate reached max subscribe rate limit");
        }
        Assert.assertNull(consumer3);
        this.admin.topics().removeSubscribeRate("persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getSubscribeRate("persistent://my-tenant/my-namespace/test-set-persistence")));
        this.admin.topics().unload("persistent://my-tenant/my-namespace/test-set-persistence");
        PulsarClient pulsarClient4 = this.newPulsarClient(this.lookupUrl.toString(), 0);
        PulsarClient pulsarClient5 = this.newPulsarClient(this.lookupUrl.toString(), 0);
        PulsarClient pulsarClient6 = this.newPulsarClient(this.lookupUrl.toString(), 0);
        consumer3 = pulsarClient3.newConsumer().subscriptionName("sub2").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).consumerName("test").subscribe();
        Assert.assertNotNull((Object)consumer3);
        consumer3.close();
        pulsarClient3.shutdown();
        Consumer consumer4 = pulsarClient4.newConsumer().subscriptionName("sub2").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).consumerName("test").subscribe();
        Assert.assertNotNull((Object)consumer4);
        consumer4.close();
        pulsarClient4.shutdown();
        Consumer consumer5 = pulsarClient5.newConsumer().subscriptionName("sub2").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).consumerName("test").subscribe();
        Assert.assertNotNull((Object)consumer5);
        consumer5.close();
        pulsarClient5.shutdown();
        Consumer consumer6 = pulsarClient6.newConsumer().subscriptionName("sub2").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).consumerName("test").subscribe();
        Assert.assertNotNull((Object)consumer6);
        consumer6.close();
        pulsarClient6.shutdown();
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", true);
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testPublishRateInDifferentLevelPolicy() throws Exception {
        this.cleanup();
        this.conf.setMaxPublishRatePerTopicInMessages(5);
        this.conf.setMaxPublishRatePerTopicInBytes(50L);
        this.setup();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        String topicName = "persistent://my-tenant/my-namespace/test-" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(topicName).create().close();
        Field publishMaxMessageRate = PublishRateLimiterImpl.class.getDeclaredField("publishMaxMessageRate");
        publishMaxMessageRate.setAccessible(true);
        Field publishMaxByteRate = PublishRateLimiterImpl.class.getDeclaredField("publishMaxByteRate");
        publishMaxByteRate.setAccessible(true);
        PersistentTopic topic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topicName).get()).get();
        PublishRateLimiterImpl publishRateLimiter = (PublishRateLimiterImpl)topic.getTopicPublishRateLimiter();
        Assert.assertEquals((Object)publishMaxMessageRate.get(publishRateLimiter), (Object)5);
        Assert.assertEquals((Object)publishMaxByteRate.get(publishRateLimiter), (Object)50L);
        PublishRate publishMsgRate = new PublishRate(10, 100L);
        this.admin.namespaces().setPublishRate("my-tenant/my-namespace", publishMsgRate);
        Awaitility.await().until(() -> {
            PublishRateLimiterImpl limiter = (PublishRateLimiterImpl)topic.getTopicPublishRateLimiter();
            return (Integer)publishMaxMessageRate.get(limiter) == 10;
        });
        publishRateLimiter = (PublishRateLimiterImpl)topic.getTopicPublishRateLimiter();
        Assert.assertEquals((Object)publishMaxMessageRate.get(publishRateLimiter), (Object)10);
        Assert.assertEquals((Object)publishMaxByteRate.get(publishRateLimiter), (Object)100L);
        PublishRate publishMsgRate2 = new PublishRate(11, 101L);
        this.admin.topics().setPublishRate(topicName, publishMsgRate2);
        Awaitility.await().until(() -> this.admin.topics().getPublishRate(topicName) != null);
        publishRateLimiter = (PublishRateLimiterImpl)topic.getTopicPublishRateLimiter();
        Assert.assertEquals((Object)publishMaxMessageRate.get(publishRateLimiter), (Object)11);
        Assert.assertEquals((Object)publishMaxByteRate.get(publishRateLimiter), (Object)101L);
        this.admin.topics().removePublishRate(topicName);
        Awaitility.await().until(() -> this.admin.topics().getPublishRate(topicName) == null);
        publishRateLimiter = (PublishRateLimiterImpl)topic.getTopicPublishRateLimiter();
        Assert.assertEquals((Object)publishMaxMessageRate.get(publishRateLimiter), (Object)10);
        Assert.assertEquals((Object)publishMaxByteRate.get(publishRateLimiter), (Object)100L);
        this.admin.namespaces().removePublishRate("my-tenant/my-namespace");
        Awaitility.await().until(() -> {
            PublishRateLimiterImpl limiter = (PublishRateLimiterImpl)topic.getTopicPublishRateLimiter();
            return (Integer)publishMaxMessageRate.get(limiter) == 5;
        });
        publishRateLimiter = (PublishRateLimiterImpl)topic.getTopicPublishRateLimiter();
        Assert.assertEquals((Object)publishMaxMessageRate.get(publishRateLimiter), (Object)5);
        Assert.assertEquals((Object)publishMaxByteRate.get(publishRateLimiter), (Object)50L);
    }

    @Test(timeOut=20000L)
    public void testTopicMaxMessageSizeApi() throws Exception {
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-persistence")));
        this.admin.topics().createNonPartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence");
        Assert.assertNull((Object)this.admin.topics().getMaxMessageSize("persistent://my-tenant/my-namespace/test-set-persistence"));
        this.admin.topics().setMaxMessageSize("persistent://my-tenant/my-namespace/test-set-persistence", 10);
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-persistence")) != null);
        Assert.assertEquals((int)this.admin.topics().getMaxMessageSize("persistent://my-tenant/my-namespace/test-set-persistence"), (int)10);
        this.admin.topics().removeMaxMessageSize("persistent://my-tenant/my-namespace/test-set-persistence");
        Assert.assertNull((Object)this.admin.topics().getMaxMessageSize("persistent://my-tenant/my-namespace/test-set-persistence"));
        try {
            this.admin.topics().setMaxMessageSize("persistent://my-tenant/my-namespace/test-set-persistence", Integer.MAX_VALUE);
            Assert.fail((String)"should fail");
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((int)e.getStatusCode(), (int)412);
        }
        try {
            this.admin.topics().setMaxMessageSize("persistent://my-tenant/my-namespace/test-set-persistence", -1);
            Assert.fail((String)"should fail");
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((int)e.getStatusCode(), (int)412);
        }
    }

    @Test(timeOut=20000L)
    public void testTopicMaxMessageSize() throws Exception {
        this.doTestTopicMaxMessageSize(true);
        this.doTestTopicMaxMessageSize(false);
    }

    private void doTestTopicMaxMessageSize(boolean isPartitioned) throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-" + UUID.randomUUID();
        if (isPartitioned) {
            this.admin.topics().createPartitionedTopic(topic, 3);
        }
        Producer producer = this.pulsarClient.newProducer().topic(topic).create();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        Assert.assertNull((Object)this.admin.topics().getMaxMessageSize(topic));
        this.admin.topics().setMaxMessageSize(topic, 10);
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get((String)topic)) != null);
        Assert.assertEquals((int)this.admin.topics().getMaxMessageSize(topic), (int)10);
        try {
            producer.send((Object)new byte[1024]);
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)(e instanceof PulsarClientException.NotAllowedException));
        }
        this.admin.topics().removeMaxMessageSize(topic);
        Assert.assertNull((Object)this.admin.topics().getMaxMessageSize(topic));
        try {
            this.admin.topics().setMaxMessageSize(topic, Integer.MAX_VALUE);
            Assert.fail((String)"should fail");
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((int)e.getStatusCode(), (int)412);
        }
        try {
            this.admin.topics().setMaxMessageSize(topic, -1);
            Assert.fail((String)"should fail");
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((int)e.getStatusCode(), (int)412);
        }
        Awaitility.await().untilAsserted(() -> {
            try {
                MessageId messageId = producer.send((Object)new byte[1024]);
                Assert.assertNotNull((Object)messageId);
            }
            catch (PulsarClientException e) {
                Assert.fail((String)"failed to send message");
            }
        });
        producer.close();
    }

    @Test(timeOut=20000L)
    public void testMaxSubscriptionsFailFast() throws Exception {
        this.doTestMaxSubscriptionsFailFast(SubscriptionMode.Durable);
        this.doTestMaxSubscriptionsFailFast(SubscriptionMode.NonDurable);
    }

    private void doTestMaxSubscriptionsFailFast(SubscriptionMode subMode) throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        int maxSubInNamespace = 2;
        ArrayList<Consumer> consumers = new ArrayList<Consumer>();
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().subscriptionMode(subMode).subscriptionType(SubscriptionType.Shared).topic(new String[]{topic});
        this.admin.namespaces().setMaxSubscriptionsPerTopic("my-tenant/my-namespace", maxSubInNamespace);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.namespaces().getMaxSubscriptionsPerTopic("my-tenant/my-namespace")));
        for (int i = 0; i < maxSubInNamespace; ++i) {
            consumers.add(consumerBuilder.subscriptionName("sub" + i).subscribe());
        }
        long start = System.currentTimeMillis();
        try {
            consumerBuilder.subscriptionName("sub").subscribe();
            Assert.fail((String)"should fail");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)(e instanceof PulsarClientException.NotAllowedException));
        }
        Assert.assertTrue((System.currentTimeMillis() - start < 3000L ? 1 : 0) != 0);
        for (Consumer consumer : consumers) {
            consumer.close();
        }
    }

    @Test(timeOut=20000L)
    public void testMaxSubscriptionsPerTopicApi() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        Assert.assertNull((Object)this.admin.topics().getMaxSubscriptionsPerTopic(topic));
        this.admin.topics().setMaxSubscriptionsPerTopic(topic, 10);
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get((String)topic)) != null);
        Assert.assertEquals((int)this.admin.topics().getMaxSubscriptionsPerTopic(topic), (int)10);
        this.admin.topics().removeMaxSubscriptionsPerTopic(topic);
        Assert.assertNull((Object)this.admin.topics().getMaxSubscriptionsPerTopic(topic));
        try {
            this.admin.topics().setMaxMessageSize(topic, -1);
            Assert.fail((String)"should fail");
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((int)e.getStatusCode(), (int)412);
        }
    }

    @Test(timeOut=20000L)
    public void testMaxSubscriptionsPerTopicWithExistingSubs() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        int topicLevelMaxSubNum = 2;
        this.admin.topics().setMaxSubscriptionsPerTopic(topic, 2);
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get((String)topic)) != null);
        ArrayList<Consumer> consumerList = new ArrayList<Consumer>();
        String subName = "my-sub-";
        for (int i = 0; i < 2; ++i) {
            Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Shared).subscriptionName(subName + i).topic(new String[]{topic}).subscribe();
            consumerList.add(consumer);
        }
        try {
            Throwable throwable = null;
            try (PulsarClient client = PulsarClient.builder().operationTimeout(2, TimeUnit.SECONDS).serviceUrl(this.brokerUrl.toString()).build();){
                consumerList.add(client.newConsumer(Schema.STRING).subscriptionName(UUID.randomUUID().toString()).topic(new String[]{topic}).subscribe());
                Assert.fail((String)"should fail");
            }
            catch (Throwable throwable2) {
                Throwable throwable3 = throwable2;
                throw throwable2;
            }
        }
        catch (PulsarClientException ignore) {
            Assert.assertEquals((int)consumerList.size(), (int)2);
        }
        this.pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Shared).subscriptionName(subName + "0").topic(new String[]{topic}).subscribe().close();
        for (Consumer consumer : consumerList) {
            consumer.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMaxUnackedMessagesOnSubscriptionPriority() throws Exception {
        this.cleanup();
        this.conf.setMaxUnackedMessagesPerSubscription(30);
        this.setup();
        String topic = "persistent://my-tenant/my-namespace/test-" + UUID.randomUUID();
        Producer producer = this.pulsarClient.newProducer().topic(topic).create();
        try {
            Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
            Assert.assertNull((Object)this.admin.namespaces().getMaxUnackedMessagesPerSubscription("my-tenant/my-namespace"));
            int msgNum = 100;
            int maxUnackedMsgOnTopic = 10;
            int maxUnackedMsgNumOnNamespace = 5;
            int defaultMaxUnackedMsgOnBroker = this.pulsar.getConfiguration().getMaxUnackedMessagesPerSubscription();
            this.produceMsg(producer, msgNum);
            this.admin.namespaces().setMaxUnackedMessagesPerSubscription("my-tenant/my-namespace", maxUnackedMsgNumOnNamespace);
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)this.admin.namespaces().getMaxUnackedMessagesPerSubscription("my-tenant/my-namespace"), (int)maxUnackedMsgNumOnNamespace));
            String subName = "sub-" + UUID.randomUUID();
            ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName(subName).receiverQueueSize(1).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionType(SubscriptionType.Shared);
            Consumer consumer1 = consumerBuilder.subscribe();
            try {
                List<Message<?>> messages = this.getMsgReceived((Consumer<byte[]>)consumer1, msgNum);
                Assert.assertEquals((int)messages.size(), (int)maxUnackedMsgNumOnNamespace);
                this.ackMessages(consumer1, messages);
                this.admin.namespaces().setMaxUnackedMessagesPerSubscription("my-tenant/my-namespace", 0);
                Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)this.admin.namespaces().getMaxUnackedMessagesPerSubscription("my-tenant/my-namespace"), (int)0));
                messages = this.getMsgReceived((Consumer<byte[]>)consumer1, 40);
                Assert.assertEquals((int)messages.size(), (int)40);
                this.ackMessages(consumer1, messages);
                this.admin.namespaces().setMaxUnackedMessagesPerSubscription("my-tenant/my-namespace", maxUnackedMsgNumOnNamespace);
                Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)this.admin.namespaces().getMaxUnackedMessagesPerSubscription("my-tenant/my-namespace"), (int)maxUnackedMsgNumOnNamespace));
                this.admin.topics().setMaxUnackedMessagesOnSubscription(topic, maxUnackedMsgOnTopic);
                Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getMaxUnackedMessagesOnSubscription(topic)));
                PersistentTopic persistentTopic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topic).get()).get();
                Assert.assertEquals((int)persistentTopic.getMaxUnackedMessagesOnSubscription(), (int)maxUnackedMsgOnTopic);
                messages = this.getMsgReceived((Consumer<byte[]>)consumer1, Integer.MAX_VALUE);
                Assert.assertEquals((int)messages.size(), (int)maxUnackedMsgOnTopic);
                this.ackMessages(consumer1, messages);
                this.admin.namespaces().removeMaxUnackedMessagesPerSubscription("my-tenant/my-namespace");
                this.admin.topics().removeMaxUnackedMessagesOnSubscription(topic);
                Awaitility.await().until(() -> this.admin.namespaces().getMaxUnackedMessagesPerSubscription("my-tenant/my-namespace") == null && this.admin.topics().getMaxUnackedMessagesOnSubscription(topic) == null);
                messages = this.getMsgReceived((Consumer<byte[]>)consumer1, Integer.MAX_VALUE);
                Assert.assertEquals((int)messages.size(), (int)defaultMaxUnackedMsgOnBroker);
            }
            finally {
                if (Collections.singletonList(consumer1).get(0) != null) {
                    consumer1.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    private void produceMsg(Producer producer, int msgNum) throws Exception {
        for (int i = 0; i < msgNum; ++i) {
            producer.send((Object)"msg".getBytes());
        }
    }

    private List<Message<?>> getMsgReceived(Consumer<byte[]> consumer1, int msgNum) throws PulsarClientException {
        Message message;
        ArrayList messages = new ArrayList();
        for (int i = 0; i < msgNum && (message = consumer1.receive(1000, TimeUnit.MILLISECONDS)) != null; ++i) {
            messages.add(message);
        }
        return messages;
    }

    private void ackMessages(Consumer<?> consumer, List<Message<?>> messages) throws Exception {
        for (Message<?> message : messages) {
            consumer.acknowledge(message);
        }
    }

    @Test(timeOut=20000L)
    public void testMaxSubscriptionsPerTopic() throws Exception {
        Throwable throwable;
        PulsarClient client;
        Object consumer;
        int brokerLevelMaxSub = 4;
        this.conf.setMaxSubscriptionsPerTopic(4);
        this.restartBroker();
        String topic = "persistent://my-tenant/my-namespace/test-" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        int topicLevelMaxSubNum = 2;
        this.admin.topics().setMaxSubscriptionsPerTopic(topic, 2);
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get((String)topic)) != null);
        ArrayList<Consumer> consumerList = new ArrayList<Consumer>();
        for (int i = 0; i < 2; ++i) {
            consumer = this.pulsarClient.newConsumer(Schema.STRING).subscriptionName(UUID.randomUUID().toString()).topic(new String[]{topic}).subscribe();
            consumerList.add((Consumer)consumer);
        }
        try {
            PulsarClient client2 = PulsarClient.builder().operationTimeout(2, TimeUnit.SECONDS).serviceUrl(this.brokerUrl.toString()).build();
            consumer = null;
            try {
                consumerList.add(client2.newConsumer(Schema.STRING).subscriptionName(UUID.randomUUID().toString()).topic(new String[]{topic}).subscribe());
                Assert.fail((String)"should fail");
            }
            catch (Throwable throwable2) {
                consumer = throwable2;
                throw throwable2;
            }
            finally {
                if (client2 != null) {
                    if (consumer != null) {
                        try {
                            client2.close();
                        }
                        catch (Throwable throwable3) {
                            ((Throwable)consumer).addSuppressed(throwable3);
                        }
                    } else {
                        client2.close();
                    }
                }
            }
        }
        catch (PulsarClientException ignore) {
            Assert.assertEquals((int)consumerList.size(), (int)2);
        }
        int namespaceLevelMaxSub = 3;
        this.admin.namespaces().setMaxSubscriptionsPerTopic("my-tenant/my-namespace", 3);
        PersistentTopic persistentTopic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topic).get()).get();
        Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic");
        field.setAccessible(true);
        Awaitility.await().until(() -> (Integer)field.get(persistentTopic) == 3);
        try {
            client = PulsarClient.builder().operationTimeout(1000, TimeUnit.MILLISECONDS).serviceUrl(this.brokerUrl.toString()).build();
            throwable = null;
            try {
                client.newConsumer(Schema.STRING).subscriptionName(UUID.randomUUID().toString()).topic(new String[]{topic}).subscribe();
                Assert.fail((String)"should fail");
            }
            catch (Throwable throwable4) {
                throwable = throwable4;
                throw throwable4;
            }
            finally {
                if (client != null) {
                    if (throwable != null) {
                        try {
                            client.close();
                        }
                        catch (Throwable throwable5) {
                            throwable.addSuppressed(throwable5);
                        }
                    } else {
                        client.close();
                    }
                }
            }
        }
        catch (PulsarClientException ignore) {
            Assert.assertEquals((int)consumerList.size(), (int)2);
        }
        this.admin.topics().removeMaxSubscriptionsPerTopic(topic);
        consumerList.add(this.pulsarClient.newConsumer(Schema.STRING).subscriptionName(UUID.randomUUID().toString()).topic(new String[]{topic}).subscribe());
        Assert.assertEquals((int)consumerList.size(), (int)3);
        try {
            client = PulsarClient.builder().operationTimeout(1000, TimeUnit.MILLISECONDS).serviceUrl(this.brokerUrl.toString()).build();
            throwable = null;
            try {
                consumerList.add(client.newConsumer(Schema.STRING).subscriptionName(UUID.randomUUID().toString()).topic(new String[]{topic}).subscribe());
                Assert.fail((String)"should fail");
            }
            catch (Throwable throwable6) {
                throwable = throwable6;
                throw throwable6;
            }
            finally {
                if (client != null) {
                    if (throwable != null) {
                        try {
                            client.close();
                        }
                        catch (Throwable throwable7) {
                            throwable.addSuppressed(throwable7);
                        }
                    } else {
                        client.close();
                    }
                }
            }
        }
        catch (PulsarClientException ignore) {
            Assert.assertEquals((int)consumerList.size(), (int)3);
        }
        this.admin.namespaces().removeMaxSubscriptionsPerTopic("my-tenant/my-namespace");
        Awaitility.await().until(() -> field.get(persistentTopic) == null);
        consumerList.add(this.pulsarClient.newConsumer(Schema.STRING).subscriptionName(UUID.randomUUID().toString()).topic(new String[]{topic}).subscribe());
        Assert.assertEquals((int)consumerList.size(), (int)brokerLevelMaxSub);
        try {
            client = PulsarClient.builder().operationTimeout(1000, TimeUnit.MILLISECONDS).serviceUrl(this.brokerUrl.toString()).build();
            throwable = null;
            try {
                consumerList.add(client.newConsumer(Schema.STRING).subscriptionName(UUID.randomUUID().toString()).topic(new String[]{topic}).subscribe());
                Assert.fail((String)"should fail");
            }
            catch (Throwable throwable8) {
                throwable = throwable8;
                throw throwable8;
            }
            finally {
                if (client != null) {
                    if (throwable != null) {
                        try {
                            client.close();
                        }
                        catch (Throwable throwable9) {
                            throwable.addSuppressed(throwable9);
                        }
                    } else {
                        client.close();
                    }
                }
            }
        }
        catch (PulsarClientException ignore) {
            Assert.assertEquals((int)consumerList.size(), (int)brokerLevelMaxSub);
        }
        for (Consumer c : consumerList) {
            c.close();
        }
    }

    @Test(timeOut=30000L)
    public void testReplicatorRateApi() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        Assert.assertNull((Object)this.admin.topics().getReplicatorDispatchRate(topic));
        DispatchRate dispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(100).dispatchThrottlingRateInByte(200L).ratePeriodInSecond(10).build();
        this.admin.topics().setReplicatorDispatchRate(topic, dispatchRate);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getReplicatorDispatchRate(topic), (Object)dispatchRate));
        this.admin.topics().removeReplicatorDispatchRate(topic);
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getReplicatorDispatchRate(topic)));
    }

    @Test(timeOut=20000L)
    public void testGetReplicatorRateApplied() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        Assert.assertNull((Object)this.admin.topics().getReplicatorDispatchRate(topic));
        Assert.assertNull((Object)this.admin.namespaces().getReplicatorDispatchRate("my-tenant/my-namespace"));
        DispatchRate brokerDispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(this.pulsar.getConfiguration().getDispatchThrottlingRatePerReplicatorInMsg()).dispatchThrottlingRateInByte(this.pulsar.getConfiguration().getDispatchThrottlingRatePerReplicatorInByte()).ratePeriodInSecond(1).build();
        Assert.assertEquals((Object)this.admin.topics().getReplicatorDispatchRate(topic, true), (Object)brokerDispatchRate);
        DispatchRate namespaceDispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(10).dispatchThrottlingRateInByte(11L).ratePeriodInSecond(12).build();
        this.admin.namespaces().setReplicatorDispatchRate("my-tenant/my-namespace", namespaceDispatchRate);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.namespaces().getReplicatorDispatchRate("my-tenant/my-namespace")));
        Assert.assertEquals((Object)this.admin.topics().getReplicatorDispatchRate(topic, true), (Object)namespaceDispatchRate);
        DispatchRate topicDispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(20).dispatchThrottlingRateInByte(21L).ratePeriodInSecond(22).build();
        this.admin.topics().setReplicatorDispatchRate(topic, topicDispatchRate);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getReplicatorDispatchRate(topic)));
        Assert.assertEquals((Object)this.admin.topics().getReplicatorDispatchRate(topic, true), (Object)topicDispatchRate);
        this.admin.namespaces().removeReplicatorDispatchRate("my-tenant/my-namespace");
        this.admin.topics().removeReplicatorDispatchRate(topic);
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.namespaces().getReplicatorDispatchRate("my-tenant/my-namespace")));
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getReplicatorDispatchRate(topic)));
        Assert.assertEquals((Object)this.admin.topics().getReplicatorDispatchRate(topic, true), (Object)brokerDispatchRate);
    }

    @Test(timeOut=30000L)
    public void testAutoCreationDisabled() throws Exception {
        this.cleanup();
        this.conf.setAllowAutoTopicCreation(false);
        this.setup();
        String topic = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(topic, 3);
        this.pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        Assert.assertNull((Object)this.admin.topics().getMessageTTL(topic));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSubscriptionTypesWithPartitionedTopic() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(topic, 1);
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("test").subscribe().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        HashSet<SubscriptionType> subscriptionTypeSet = new HashSet<SubscriptionType>();
        subscriptionTypeSet.add(SubscriptionType.Key_Shared);
        this.admin.topics().setSubscriptionTypesEnabled(topic, subscriptionTypeSet);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getSubscriptionTypesEnabled(topic)));
        PersistentTopic persistentTopic = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(TopicName.get((String)topic).getPartition(0).toString()).get();
        HashSet old = new HashSet(this.pulsar.getConfiguration().getSubscriptionTypesEnabled());
        try {
            this.pulsar.getConfiguration().getSubscriptionTypesEnabled().clear();
            Assert.assertTrue((boolean)persistentTopic.checkSubscriptionTypesEnable(CommandSubscribe.SubType.Key_Shared));
        }
        finally {
            this.pulsar.getConfiguration().getSubscriptionTypesEnabled().addAll(old);
        }
    }

    @Test(timeOut=30000L)
    public void testSubscriptionTypesEnabled() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-" + UUID.randomUUID();
        this.admin.topics().createNonPartitionedTopic(topic);
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("test").subscribe().close();
        Assert.assertNull((Object)this.admin.topics().getSubscriptionTypesEnabled(topic));
        Set<SubscriptionType> subscriptionTypeSet = new HashSet<SubscriptionType>();
        subscriptionTypeSet.add(SubscriptionType.Failover);
        this.admin.topics().setSubscriptionTypesEnabled(topic, subscriptionTypeSet);
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get((String)topic)) != null);
        subscriptionTypeSet = this.admin.topics().getSubscriptionTypesEnabled(topic);
        Assert.assertTrue((boolean)subscriptionTypeSet.contains(SubscriptionType.Failover));
        Assert.assertFalse((boolean)subscriptionTypeSet.contains(SubscriptionType.Shared));
        Assert.assertEquals((int)subscriptionTypeSet.size(), (int)1);
        try {
            this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionType(SubscriptionType.Shared).subscriptionName("test").subscribe();
            Assert.fail();
        }
        catch (PulsarClientException pulsarClientException) {
            Assert.assertTrue((boolean)(pulsarClientException instanceof PulsarClientException.NotAllowedException));
        }
        subscriptionTypeSet.add(SubscriptionType.Shared);
        this.admin.topics().setSubscriptionTypesEnabled(topic, subscriptionTypeSet);
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionType(SubscriptionType.Shared).subscriptionName("test").subscribe().close();
        subscriptionTypeSet.add(SubscriptionType.Shared);
        this.admin.namespaces().setSubscriptionTypesEnabled("my-tenant/my-namespace", subscriptionTypeSet);
        subscriptionTypeSet.clear();
        subscriptionTypeSet.add(SubscriptionType.Failover);
        this.admin.topics().setSubscriptionTypesEnabled(topic, subscriptionTypeSet);
        try {
            this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionType(SubscriptionType.Shared).subscriptionName("test").subscribe();
            Assert.fail();
        }
        catch (PulsarClientException pulsarClientException) {
            Assert.assertTrue((boolean)(pulsarClientException instanceof PulsarClientException.NotAllowedException));
        }
    }

    @Test(timeOut=20000L)
    public void testNonPersistentMaxConsumerOnSub() throws Exception {
        int maxConsumerPerSubInBroker = 1;
        int maxConsumerPerSubInNs = 2;
        int maxConsumerPerSubInTopic = 3;
        this.conf.setMaxConsumersPerSubscription(maxConsumerPerSubInBroker);
        String topic = "non-persistent://my-tenant/my-namespace/test-" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(topic, 3);
        Producer producer = this.pulsarClient.newProducer().topic(topic).create();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        String subName = "my-sub";
        ConsumerBuilder builder = this.pulsarClient.newConsumer().subscriptionType(SubscriptionType.Shared).subscriptionName("my-sub").topic(new String[]{topic});
        Consumer consumer = builder.subscribe();
        try {
            builder.subscribe();
            Assert.fail((String)"should fail");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("reached max consumers limit"));
        }
        this.admin.namespaces().setMaxConsumersPerSubscription("my-tenant/my-namespace", maxConsumerPerSubInNs);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull((Object)this.admin.namespaces().getMaxConsumersPerSubscription("my-tenant/my-namespace"));
            Assert.assertEquals((int)this.admin.namespaces().getMaxConsumersPerSubscription("my-tenant/my-namespace"), (int)maxConsumerPerSubInNs);
        });
        Consumer consumer2 = builder.subscribe();
        try {
            builder.subscribe();
            Assert.fail((String)"should fail");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("reached max consumers limit"));
        }
        this.admin.topics().setMaxConsumersPerSubscription(topic, maxConsumerPerSubInTopic);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull((Object)this.admin.topics().getMaxConsumersPerSubscription(topic));
            Assert.assertEquals((int)this.admin.topics().getMaxConsumersPerSubscription(topic), (int)maxConsumerPerSubInTopic);
        });
        Consumer consumer3 = builder.subscribe();
        try {
            builder.subscribe();
            Assert.fail((String)"should fail");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("reached max consumers limit"));
        }
        consumer.close();
        consumer2.close();
        consumer3.close();
        producer.close();
    }

    @Test(timeOut=20000L)
    public void testGetCompactionThresholdApplied() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topic)));
        Assert.assertNull((Object)this.admin.topics().getCompactionThreshold(topic));
        Assert.assertNull((Object)this.admin.namespaces().getCompactionThreshold("my-tenant/my-namespace"));
        long brokerPolicy = this.pulsar.getConfiguration().getBrokerServiceCompactionThresholdInBytes();
        Assert.assertEquals((long)this.admin.topics().getCompactionThreshold(topic, true), (long)brokerPolicy);
        long namespacePolicy = 10L;
        this.admin.namespaces().setCompactionThreshold("my-tenant/my-namespace", namespacePolicy);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.namespaces().getCompactionThreshold("my-tenant/my-namespace")));
        Assert.assertEquals((long)this.admin.topics().getCompactionThreshold(topic, true), (long)namespacePolicy);
        long topicPolicy = 20L;
        this.admin.topics().setCompactionThreshold(topic, topicPolicy);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getCompactionThreshold(topic)));
        Assert.assertEquals((long)this.admin.topics().getCompactionThreshold(topic, true), (long)topicPolicy);
        this.admin.namespaces().removeCompactionThreshold("my-tenant/my-namespace");
        this.admin.topics().removeCompactionThreshold(topic);
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.namespaces().getCompactionThreshold("my-tenant/my-namespace")));
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getCompactionThreshold(topic)));
        Assert.assertEquals((long)this.admin.topics().getCompactionThreshold(topic, true), (long)brokerPolicy);
    }

    @Test(timeOut=30000L)
    public void testProduceConsumeOnTopicPolicy() {
        String msg = "send message ";
        int numMsg = 10;
        try {
            int i;
            Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionName("test").subscribe();
            Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-tenant/my-namespace/test-set-persistence").create();
            for (i = 0; i < numMsg; ++i) {
                producer.newMessage().value((Object)("send message " + i)).send();
            }
            for (i = 0; i < numMsg; ++i) {
                Message message = consumer.receive(100, TimeUnit.MILLISECONDS);
                Assert.assertEquals((String)((String)message.getValue()), (String)("send message " + i));
            }
        }
        catch (PulsarClientException e) {
            log.error("Failed to send/produce message, ", (Throwable)e);
            Assert.fail();
        }
    }

    @Test(timeOut=30000L)
    public void testSystemTopicShouldBeCompacted() throws Exception {
        BacklogQuota backlogQuota = BacklogQuota.builder().limitSize(1024L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build();
        log.info("Backlog quota: {} will set to the topic: {}", (Object)backlogQuota, (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        this.admin.topics().setBacklogQuota("persistent://my-tenant/my-namespace/test-set-backlog-quota", backlogQuota);
        log.info("Backlog quota set success on topic: {}", (Object)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> {
            TopicStats stats = this.admin.topics().getStats("persistent://my-tenant/my-namespace/__change_events");
            Assert.assertTrue((boolean)stats.getSubscriptions().containsKey("__compaction"));
        });
        PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats("persistent://my-tenant/my-namespace/__change_events");
        long previousCompactedLedgerId = internalStats.compactedLedger.ledgerId;
        Awaitility.await().untilAsserted(() -> Assert.assertEquals(this.admin.topics().getBacklogQuotaMap("persistent://my-tenant/my-namespace/test-set-backlog-quota").get(BacklogQuota.BacklogQuotaType.destination_storage), (Object)backlogQuota));
        this.pulsar.getBrokerService().checkCompaction();
        Awaitility.await().untilAsserted(() -> {
            PersistentTopicInternalStats iStats = this.admin.topics().getInternalStats("persistent://my-tenant/my-namespace/__change_events");
            Assert.assertTrue((iStats.compactedLedger.ledgerId != previousCompactedLedgerId ? 1 : 0) != 0);
        });
    }

    @Test(timeOut=30000L)
    public void testTopicRetentionPolicySetInManagedLedgerConfig() throws Exception {
        RetentionPolicies nsRetentionPolicies = new RetentionPolicies(1, -1);
        TopicName topicName = TopicName.get((String)"persistent://my-tenant/my-namespace/test-set-backlog-quota");
        this.admin.namespaces().setRetention("my-tenant/my-namespace", nsRetentionPolicies);
        ManagedLedgerConfig managedLedgerConfig = (ManagedLedgerConfig)this.pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
        Assert.assertEquals((long)managedLedgerConfig.getRetentionTimeMillis(), (long)TimeUnit.MINUTES.toMillis(nsRetentionPolicies.getRetentionTimeInMinutes()));
        Assert.assertEquals((long)managedLedgerConfig.getRetentionSizeInMB(), (long)nsRetentionPolicies.getRetentionSizeInMB());
        RetentionPolicies topicRetentionPolicies = new RetentionPolicies(2, -1);
        this.admin.topics().setRetention("persistent://my-tenant/my-namespace/test-set-backlog-quota", topicRetentionPolicies);
        Awaitility.await().untilAsserted(() -> {
            ManagedLedgerConfig config = (ManagedLedgerConfig)this.pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
            Assert.assertEquals((long)config.getRetentionTimeMillis(), (long)TimeUnit.MINUTES.toMillis(topicRetentionPolicies.getRetentionTimeInMinutes()));
            Assert.assertEquals((long)config.getRetentionSizeInMB(), (long)topicRetentionPolicies.getRetentionSizeInMB());
        });
    }

    @Test
    public void testPolicyIsDeleteTogetherManually() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().untilAsserted(() -> Assertions.assertThat((Object)this.pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get((String)topic))).isNull());
        int maxConsumersPerSubscription = 10;
        this.admin.topics().setMaxConsumersPerSubscription(topic, maxConsumersPerSubscription);
        Awaitility.await().untilAsserted(() -> Assertions.assertThat((boolean)((Optional)this.pulsar.getBrokerService().getTopic(topic, false).get()).isPresent()).isTrue());
        Awaitility.await().untilAsserted(() -> {
            ObjectAssert cfr_ignored_0 = (ObjectAssert)Assertions.assertThat((Object)this.pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get((String)topic))).isNotNull();
        });
        this.admin.topics().delete(topic);
        Awaitility.await().untilAsserted(() -> Assertions.assertThat((boolean)((Optional)this.pulsar.getBrokerService().getTopic(topic, false).get()).isPresent()).isFalse());
        Awaitility.await().untilAsserted(() -> Assertions.assertThat((Object)this.pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get((String)topic))).isNull());
    }

    @Test
    public void testPolicyIsDeleteTogetherAutomatically() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().untilAsserted(() -> Assertions.assertThat((Object)this.pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get((String)topic))).isNull());
        int maxConsumersPerSubscription = 10;
        this.admin.topics().setMaxConsumersPerSubscription(topic, maxConsumersPerSubscription);
        Awaitility.await().untilAsserted(() -> Assertions.assertThat((boolean)((Optional)this.pulsar.getBrokerService().getTopic(topic, false).get()).isPresent()).isTrue());
        Awaitility.await().untilAsserted(() -> {
            ObjectAssert cfr_ignored_0 = (ObjectAssert)Assertions.assertThat((Object)this.pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get((String)topic))).isNotNull();
        });
        InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 3, true);
        this.admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies);
        Thread.sleep(4000L);
        this.pulsar.getBrokerService().checkGC();
        Awaitility.await().untilAsserted(() -> Assertions.assertThat((boolean)((Optional)this.pulsar.getBrokerService().getTopic(topic, false).get()).isPresent()).isFalse());
        Awaitility.await().untilAsserted(() -> Assertions.assertThat((Object)this.pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get((String)topic))).isNull());
    }
}

