package org.apache.pulsar.broker.admin;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BacklogQuotaManager;
import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/admin/TopicPoliciesTest.class */
public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
    private static final Logger log = LoggerFactory.getLogger(TopicPoliciesTest.class);
    private final String testTenant = "my-tenant";
    private final String testNamespace = "my-namespace";
    private final String myNamespace = "my-tenant/my-namespace";
    private final String testTopic = "persistent://my-tenant/my-namespace/test-set-backlog-quota";
    private final String persistenceTopic = "persistent://my-tenant/my-namespace/test-set-persistence";
    private final String topicPolicyEventsTopic = "persistent://my-tenant/my-namespace/__change_events";

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        this.conf.setDefaultNumberOfNamespaceBundles(1);
        super.internalSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        TenantInfoImpl tenantInfoImpl = new TenantInfoImpl(Sets.newHashSet(new String[]{"role1", "role2"}), Sets.newHashSet(new String[]{"test"}));
        Tenants tenants = this.admin.tenants();
        getClass();
        tenants.createTenant("my-tenant", tenantInfoImpl);
        this.admin.namespaces().createNamespace("my-tenant/my-namespace", Sets.newHashSet(new String[]{"test"}));
        this.admin.topics().createPartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", 2);
        this.pulsarClient.newProducer().topic("persistent://my-tenant/my-namespace/test-set-backlog-quota").create().close();
        waitForZooKeeperWatchers();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    public void cleanup() throws Exception {
        super.internalCleanup();
        resetConfig();
    }

    @Test
    public void testSetBacklogQuota() throws Exception {
        BacklogQuota build = BacklogQuota.builder().limitSize(1024L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build();
        log.info("Backlog quota: {} will set to the topic: {}", build, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        this.admin.topics().setBacklogQuota("persistent://my-tenant/my-namespace/test-set-backlog-quota", build);
        log.info("Backlog quota set success on topic: {}", "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getBacklogQuotaMap("persistent://my-tenant/my-namespace/test-set-backlog-quota").get(BacklogQuota.BacklogQuotaType.destination_storage), build);
        });
        BacklogQuotaImpl backlogQuota = this.pulsar.getBrokerService().getBacklogQuotaManager().getBacklogQuota(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota"));
        log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuota, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Assert.assertEquals(build, backlogQuota);
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testRemoveBacklogQuota() throws Exception {
        BacklogQuota build = BacklogQuota.builder().limitSize(1024L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build();
        log.info("Backlog quota: {} will set to the topic: {}", build, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        this.admin.topics().setBacklogQuota("persistent://my-tenant/my-namespace/test-set-backlog-quota", build);
        log.info("Backlog quota set success on topic: {}", "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getBacklogQuotaMap("persistent://my-tenant/my-namespace/test-set-backlog-quota").get(BacklogQuota.BacklogQuotaType.destination_storage), build);
        });
        BacklogQuotaManager backlogQuotaManager = this.pulsar.getBrokerService().getBacklogQuotaManager();
        BacklogQuotaImpl backlogQuota = backlogQuotaManager.getBacklogQuota(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota"));
        log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuota, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Assert.assertEquals(build, backlogQuota);
        this.admin.topics().removeBacklogQuota("persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getBacklogQuotaMap("persistent://my-tenant/my-namespace/test-set-backlog-quota").get(BacklogQuota.BacklogQuotaType.destination_storage));
        });
        BacklogQuotaImpl backlogQuota2 = backlogQuotaManager.getBacklogQuota(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota"));
        log.info("Backlog quota {} in backlog quota manager on topic: {} after remove", backlogQuota2, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Assert.assertEquals(backlogQuotaManager.getDefaultQuota(), backlogQuota2);
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testCheckBacklogQuota() throws Exception {
        RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10);
        String namespace = TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota").getNamespace();
        this.admin.namespaces().setRetention(namespace, retentionPolicies);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.namespaces().getRetention(namespace), retentionPolicies);
        });
        BacklogQuota build = BacklogQuota.builder().limitSize(10485760L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build();
        log.info("Backlog quota: {} will set to the topic: {}", build, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        try {
            this.admin.topics().setBacklogQuota("persistent://my-tenant/my-namespace/test-set-backlog-quota", build);
            Assert.fail();
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 412);
        }
        BacklogQuota build2 = BacklogQuota.builder().limitSize(10485761L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build();
        log.info("Backlog quota: {} will set to the topic: {}", build2, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        try {
            this.admin.topics().setBacklogQuota("persistent://my-tenant/my-namespace/test-set-backlog-quota", build2);
            Assert.fail();
        } catch (PulsarAdminException e2) {
            Assert.assertEquals(e2.getStatusCode(), 412);
        }
        BacklogQuota build3 = BacklogQuota.builder().limitSize(10485759L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build();
        log.info("Backlog quota: {} will set to the topic: {}", build3, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        this.admin.topics().setBacklogQuota("persistent://my-tenant/my-namespace/test-set-backlog-quota", build3);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getBacklogQuotaMap("persistent://my-tenant/my-namespace/test-set-backlog-quota").get(BacklogQuota.BacklogQuotaType.destination_storage), build3);
        });
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test(timeOut = 20000)
    public void testGetBacklogQuotaApplied() throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(str).create().close();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        Assert.assertEquals(this.admin.topics().getBacklogQuotaMap(str), Maps.newHashMap());
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("my-tenant/my-namespace"), Maps.newHashMap());
        Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlogQuotaMap = ConfigHelper.backlogQuotaMap(this.conf);
        Assert.assertEquals(this.admin.topics().getBacklogQuotaMap(str, true), backlogQuotaMap);
        BacklogQuota build = BacklogQuota.builder().limitSize(30L).retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception).build();
        this.admin.namespaces().setBacklogQuota("my-tenant/my-namespace", build);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertFalse(this.admin.namespaces().getBacklogQuotaMap("my-tenant/my-namespace").isEmpty());
        });
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put(BacklogQuota.BacklogQuotaType.destination_storage, build);
        Assert.assertEquals(this.admin.topics().getBacklogQuotaMap(str, true), newHashMap);
        BacklogQuota build2 = BacklogQuota.builder().limitSize(40L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build();
        this.admin.topics().setBacklogQuota(str, build2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertFalse(this.admin.topics().getBacklogQuotaMap(str).isEmpty());
        });
        HashMap newHashMap2 = Maps.newHashMap();
        newHashMap2.put(BacklogQuota.BacklogQuotaType.destination_storage, build2);
        Assert.assertEquals(this.admin.topics().getBacklogQuotaMap(str, true), newHashMap2);
        this.admin.namespaces().removeBacklogQuota("my-tenant/my-namespace");
        this.admin.topics().removeBacklogQuota(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(this.admin.namespaces().getBacklogQuotaMap("my-tenant/my-namespace").isEmpty());
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(this.admin.topics().getBacklogQuotaMap(str).isEmpty());
        });
        Assert.assertEquals(this.admin.topics().getBacklogQuotaMap(str, true), backlogQuotaMap);
    }

    @Test
    public void testCheckBacklogQuotaFailed() throws Exception {
        RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10);
        String namespace = TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota").getNamespace();
        this.admin.namespaces().setRetention(namespace, retentionPolicies);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.namespaces().getRetention(namespace), retentionPolicies);
        });
        try {
            this.admin.topics().setBacklogQuota("persistent://my-tenant/my-namespace/test-set-backlog-quota", BacklogQuota.builder().limitSize(10485760L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build());
            Assert.fail();
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 412);
        }
        Awaitility.await().atLeast(1L, TimeUnit.SECONDS);
        Assert.assertNull(this.admin.topics().getBacklogQuotaMap("persistent://my-tenant/my-namespace/test-set-backlog-quota").get(BacklogQuota.BacklogQuotaType.destination_storage));
    }

    @Test
    public void testCheckRetention() throws Exception {
        BacklogQuota build = BacklogQuota.builder().limitSize(10485760L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        this.admin.topics().setBacklogQuota("persistent://my-tenant/my-namespace/test-set-backlog-quota", build);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getBacklogQuotaMap("persistent://my-tenant/my-namespace/test-set-backlog-quota").get(BacklogQuota.BacklogQuotaType.destination_storage), build);
        });
        RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10);
        log.info("Retention: {} will set to the topic: {}", retentionPolicies, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        try {
            this.admin.topics().setRetention("persistent://my-tenant/my-namespace/test-set-backlog-quota", retentionPolicies);
            Assert.fail();
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 412);
        }
        RetentionPolicies retentionPolicies2 = new RetentionPolicies(10, 9);
        log.info("Retention: {} will set to the topic: {}", retentionPolicies2, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        try {
            this.admin.topics().setRetention("persistent://my-tenant/my-namespace/test-set-backlog-quota", retentionPolicies2);
            Assert.fail();
        } catch (PulsarAdminException e2) {
            Assert.assertEquals(e2.getStatusCode(), 412);
        }
        RetentionPolicies retentionPolicies3 = new RetentionPolicies(10, 12);
        log.info("Backlog quota: {} will set to the topic: {}", build, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        this.admin.topics().setRetention("persistent://my-tenant/my-namespace/test-set-backlog-quota", retentionPolicies3);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getRetention("persistent://my-tenant/my-namespace/test-set-backlog-quota"), retentionPolicies3);
        });
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testSetRetention() throws Exception {
        RetentionPolicies retentionPolicies = new RetentionPolicies(60, 1024);
        log.info("Retention: {} will set to the topic: {}", retentionPolicies, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        this.admin.topics().setRetention("persistent://my-tenant/my-namespace/test-set-backlog-quota", retentionPolicies);
        log.info("Retention set success on topic: {}", "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getRetention("persistent://my-tenant/my-namespace/test-set-backlog-quota"), retentionPolicies);
        });
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testRemoveRetention() throws Exception {
        RetentionPolicies retentionPolicies = new RetentionPolicies(60, 1024);
        log.info("Retention: {} will set to the topic: {}", retentionPolicies, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        this.admin.topics().setRetention("persistent://my-tenant/my-namespace/test-set-backlog-quota", retentionPolicies);
        log.info("Retention set success on topic: {}", "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getRetention("persistent://my-tenant/my-namespace/test-set-backlog-quota"), retentionPolicies);
        });
        this.admin.topics().removeRetention("persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getRetention("persistent://my-tenant/my-namespace/test-set-backlog-quota"));
        });
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test(timeOut = 10000)
    public void testRetentionAppliedApi() throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(str).create().close();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        RetentionPolicies retentionPolicies = new RetentionPolicies(this.conf.getDefaultRetentionTimeInMinutes(), this.conf.getDefaultRetentionSizeInMB());
        Assert.assertEquals(this.admin.topics().getRetention(str, true), retentionPolicies);
        RetentionPolicies retentionPolicies2 = new RetentionPolicies(10, 20);
        this.admin.namespaces().setRetention("my-tenant/my-namespace", retentionPolicies2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getRetention(str, true), retentionPolicies2);
        });
        RetentionPolicies retentionPolicies3 = new RetentionPolicies(20, 30);
        this.admin.topics().setRetention(str, retentionPolicies3);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getRetention(str, true), retentionPolicies3);
        });
        this.admin.topics().removeRetention(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getRetention(str, true), retentionPolicies2);
        });
        this.admin.namespaces().removeRetention("my-tenant/my-namespace");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getRetention(str, true), retentionPolicies);
        });
    }

    @Test(timeOut = 20000)
    public void testGetSubDispatchRateApplied() throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(str).create().close();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        Assert.assertNull(this.admin.topics().getSubscriptionDispatchRate(str));
        Assert.assertNull(this.admin.namespaces().getSubscriptionDispatchRate("my-tenant/my-namespace"));
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(this.conf.getDispatchThrottlingRatePerSubscriptionInMsg()).dispatchThrottlingRateInByte(this.conf.getDispatchThrottlingRatePerSubscriptionInByte()).ratePeriodInSecond(1).build();
        Assert.assertEquals(this.admin.topics().getSubscriptionDispatchRate(str, true), build);
        DispatchRate build2 = DispatchRate.builder().dispatchThrottlingRateInMsg(10).dispatchThrottlingRateInByte(11L).ratePeriodInSecond(12).build();
        this.admin.namespaces().setSubscriptionDispatchRate("my-tenant/my-namespace", build2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.namespaces().getSubscriptionDispatchRate("my-tenant/my-namespace"));
        });
        Assert.assertEquals(this.admin.topics().getSubscriptionDispatchRate(str, true), build2);
        DispatchRate build3 = DispatchRate.builder().dispatchThrottlingRateInMsg(20).dispatchThrottlingRateInByte(21L).ratePeriodInSecond(12).build();
        this.admin.topics().setSubscriptionDispatchRate(str, build3);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.topics().getSubscriptionDispatchRate(str));
        });
        Assert.assertEquals(this.admin.topics().getSubscriptionDispatchRate(str, true), build3);
        this.admin.namespaces().removeSubscriptionDispatchRate("my-tenant/my-namespace");
        this.admin.topics().removeSubscriptionDispatchRate(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.namespaces().getSubscriptionDispatchRate("my-tenant/my-namespace"));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getSubscriptionDispatchRate(str));
        });
        Assert.assertEquals(this.admin.topics().getSubscriptionDispatchRate(str, true), build);
    }

    @Test(timeOut = 20000)
    public void testRetentionPriority() throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(str).create().close();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        Assert.assertNull(this.admin.topics().getRetention(str));
        Assert.assertNull(this.admin.namespaces().getRetention("my-tenant/my-namespace"));
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get();
        Method declaredMethod = PersistentTopic.class.getDeclaredMethod("shouldTopicBeRetained", new Class[0]);
        declaredMethod.setAccessible(true);
        Field declaredField = PersistentTopic.class.getSuperclass().getDeclaredField("lastActive");
        declaredField.setAccessible(true);
        declaredField.setLong(persistentTopic, System.nanoTime() - TimeUnit.MINUTES.toNanos(2L));
        Assert.assertFalse(((Boolean) declaredMethod.invoke(persistentTopic, new Object[0])).booleanValue());
        this.admin.namespaces().setRetention("my-tenant/my-namespace", new RetentionPolicies(1, 1));
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.namespaces().getRetention("my-tenant/my-namespace"));
        });
        Assert.assertFalse(((Boolean) declaredMethod.invoke(persistentTopic, new Object[0])).booleanValue());
        this.admin.topics().setRetention(str, new RetentionPolicies(3, 1));
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.topics().getRetention(str));
        });
        Assert.assertTrue(((Boolean) declaredMethod.invoke(persistentTopic, new Object[0])).booleanValue());
        this.admin.topics().setRetention(str, new RetentionPolicies(0, 0));
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getRetention(str).getRetentionSizeInMB(), 0L);
        });
        Assert.assertFalse(((Boolean) declaredMethod.invoke(persistentTopic, new Object[0])).booleanValue());
        this.admin.topics().removeRetention(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getRetention(str));
        });
        Assert.assertFalse(((Boolean) declaredMethod.invoke(persistentTopic, new Object[0])).booleanValue());
        this.admin.namespaces().setRetention("my-tenant/my-namespace", new RetentionPolicies(0, 0));
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.namespaces().getRetention("my-tenant/my-namespace"));
        });
        Assert.assertFalse(((Boolean) declaredMethod.invoke(persistentTopic, new Object[0])).booleanValue());
        this.admin.namespaces().setRetention("my-tenant/my-namespace", new RetentionPolicies(1, 1));
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.namespaces().getRetention("my-tenant/my-namespace"));
        });
        Assert.assertFalse(((Boolean) declaredMethod.invoke(persistentTopic, new Object[0])).booleanValue());
        this.admin.namespaces().removeRetention("my-tenant/my-namespace");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.namespaces().getRetention("my-tenant/my-namespace"));
        });
        Assert.assertFalse(((Boolean) declaredMethod.invoke(persistentTopic, new Object[0])).booleanValue());
    }

    @Test(timeOut = 20000)
    public void testGetPersistenceApplied() throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(str).create().close();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        Assert.assertNull(this.admin.topics().getPersistence(str));
        Assert.assertNull(this.admin.namespaces().getPersistence("my-tenant/my-namespace"));
        PersistencePolicies persistencePolicies = new PersistencePolicies(this.pulsar.getConfiguration().getManagedLedgerDefaultEnsembleSize(), this.pulsar.getConfiguration().getManagedLedgerDefaultWriteQuorum(), this.pulsar.getConfiguration().getManagedLedgerDefaultAckQuorum(), this.pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
        Assert.assertEquals(this.admin.topics().getPersistence(str, true), persistencePolicies);
        PersistencePolicies persistencePolicies2 = new PersistencePolicies(5, 4, 3, 2.0d);
        this.admin.namespaces().setPersistence("my-tenant/my-namespace", persistencePolicies2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.namespaces().getPersistence("my-tenant/my-namespace"));
        });
        Assert.assertEquals(this.admin.topics().getPersistence(str, true), persistencePolicies2);
        PersistencePolicies persistencePolicies3 = new PersistencePolicies(4, 3, 2, 1.0d);
        this.admin.topics().setPersistence(str, persistencePolicies3);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.topics().getPersistence(str));
        });
        Assert.assertEquals(this.admin.topics().getPersistence(str, true), persistencePolicies3);
        this.admin.namespaces().removePersistence("my-tenant/my-namespace");
        this.admin.topics().removePersistence(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.namespaces().getPersistence("my-tenant/my-namespace"));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getPersistence(str));
        });
        Assert.assertEquals(this.admin.topics().getPersistence(str, true), persistencePolicies);
    }

    @Test
    public void testCheckPersistence() throws Exception {
        PersistencePolicies persistencePolicies = new PersistencePolicies(6, 2, 2, 0.0d);
        log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        try {
            this.admin.topics().setPersistence("persistent://my-tenant/my-namespace/test-set-backlog-quota", persistencePolicies);
            Assert.fail();
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 400);
        }
        PersistencePolicies persistencePolicies2 = new PersistencePolicies(2, 6, 2, 0.0d);
        log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies2, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        try {
            this.admin.topics().setPersistence("persistent://my-tenant/my-namespace/test-set-backlog-quota", persistencePolicies2);
            Assert.fail();
        } catch (PulsarAdminException e2) {
            Assert.assertEquals(e2.getStatusCode(), 400);
        }
        PersistencePolicies persistencePolicies3 = new PersistencePolicies(2, 2, 6, 0.0d);
        log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies3, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        try {
            this.admin.topics().setPersistence("persistent://my-tenant/my-namespace/test-set-backlog-quota", persistencePolicies3);
            Assert.fail();
        } catch (PulsarAdminException e3) {
            Assert.assertEquals(e3.getStatusCode(), 400);
        }
        PersistencePolicies persistencePolicies4 = new PersistencePolicies(1, 2, 2, 0.0d);
        log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies4, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        try {
            this.admin.topics().setPersistence("persistent://my-tenant/my-namespace/test-set-backlog-quota", persistencePolicies4);
            Assert.fail();
        } catch (PulsarAdminException e4) {
            Assert.assertEquals(e4.getStatusCode(), 400);
        }
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testSetPersistence() throws Exception {
        PersistencePolicies persistencePolicies = new PersistencePolicies(2, 2, 2, 0.3d);
        this.admin.namespaces().setPersistence("my-tenant/my-namespace", persistencePolicies);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.namespaces().getPersistence("my-tenant/my-namespace"), persistencePolicies);
        });
        PersistencePolicies persistencePolicies2 = new PersistencePolicies(3, 3, 3, 0.1d);
        log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies2, "persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        this.admin.topics().setPersistence("persistent://my-tenant/my-namespace/test-set-persistence", persistencePolicies2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getPersistence("persistent://my-tenant/my-namespace/test-set-persistence"), persistencePolicies2);
        });
        this.admin.lookups().lookupTopic("persistent://my-tenant/my-namespace/test-set-persistence");
        ManagedLedgerConfig config = ((Topic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-tenant/my-namespace/test-set-persistence").get()).getManagedLedger().getConfig();
        Assert.assertEquals(config.getEnsembleSize(), 3);
        Assert.assertEquals(config.getWriteQuorumSize(), 3);
        Assert.assertEquals(config.getAckQuorumSize(), 3);
        Assert.assertEquals(config.getThrottleMarkDelete(), 0.1d);
        PersistencePolicies persistence = this.admin.topics().getPersistence("persistent://my-tenant/my-namespace/test-set-persistence");
        log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies2, "persistent://my-tenant/my-namespace/test-set-persistence");
        Assert.assertEquals(persistence, persistencePolicies2);
    }

    @Test
    public void testGetDispatchRateApplied() throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(str).create().close();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        Assert.assertNull(this.admin.topics().getDispatchRate(str));
        Assert.assertNull(this.admin.namespaces().getDispatchRate("my-tenant/my-namespace"));
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(this.conf.getDispatchThrottlingRatePerTopicInMsg()).dispatchThrottlingRateInByte(this.conf.getDispatchThrottlingRatePerTopicInByte()).ratePeriodInSecond(1).build();
        Assert.assertEquals(this.admin.topics().getDispatchRate(str, true), build);
        DispatchRate build2 = DispatchRate.builder().dispatchThrottlingRateInMsg(10).dispatchThrottlingRateInByte(11L).ratePeriodInSecond(12).build();
        this.admin.namespaces().setDispatchRate("my-tenant/my-namespace", build2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.namespaces().getDispatchRate("my-tenant/my-namespace"));
        });
        Assert.assertEquals(this.admin.topics().getDispatchRate(str, true), build2);
        DispatchRate build3 = DispatchRate.builder().dispatchThrottlingRateInMsg(20).dispatchThrottlingRateInByte(21L).ratePeriodInSecond(22).build();
        this.admin.topics().setDispatchRate(str, build3);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.topics().getDispatchRate(str));
        });
        Assert.assertEquals(this.admin.topics().getDispatchRate(str, true), build3);
        this.admin.namespaces().removeDispatchRate("my-tenant/my-namespace");
        this.admin.topics().removeDispatchRate(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.namespaces().getDispatchRate("my-tenant/my-namespace"));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getDispatchRate(str));
        });
        Assert.assertEquals(this.admin.topics().getDispatchRate(str, true), build);
    }

    @Test
    public void testRemovePersistence() throws Exception {
        PersistencePolicies persistencePolicies = new PersistencePolicies(2, 2, 2, 0.3d);
        this.admin.namespaces().setPersistence("my-tenant/my-namespace", persistencePolicies);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.namespaces().getPersistence("my-tenant/my-namespace"), persistencePolicies);
        });
        PersistencePolicies persistencePolicies2 = new PersistencePolicies(3, 3, 3, 0.1d);
        log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies2, "persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        this.admin.topics().setPersistence("persistent://my-tenant/my-namespace/test-set-persistence", persistencePolicies2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getPersistence("persistent://my-tenant/my-namespace/test-set-persistence"), persistencePolicies2);
        });
        this.admin.topics().removePersistence("persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getPersistence("persistent://my-tenant/my-namespace/test-set-persistence"));
        });
        this.admin.lookups().lookupTopic("persistent://my-tenant/my-namespace/test-set-persistence");
        ManagedLedgerConfig config = ((Topic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-tenant/my-namespace/test-set-persistence").get()).getManagedLedger().getConfig();
        Assert.assertEquals(config.getEnsembleSize(), 2);
        Assert.assertEquals(config.getWriteQuorumSize(), 2);
        Assert.assertEquals(config.getAckQuorumSize(), 2);
        Assert.assertEquals(config.getThrottleMarkDelete(), 0.3d);
    }

    @Test
    public void testCheckMaxProducers() throws Exception {
        log.info("MaxProducers: {} will set to the topic: {}", -1, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        try {
            this.admin.topics().setMaxProducers("persistent://my-tenant/my-namespace/test-set-backlog-quota", -1);
            Assert.fail();
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 412);
        }
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testGetMaxProducerApplied() throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(str).create().close();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        Assert.assertNull(this.admin.topics().getMaxProducers(str));
        Assert.assertNull(this.admin.namespaces().getMaxProducersPerTopic("my-tenant/my-namespace"));
        Assert.assertEquals(this.admin.topics().getMaxProducers(str, true).intValue(), this.conf.getMaxProducersPerTopic());
        this.admin.namespaces().setMaxProducersPerTopic("my-tenant/my-namespace", 7);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.namespaces().getMaxProducersPerTopic("my-tenant/my-namespace"));
        });
        Assert.assertEquals(this.admin.topics().getMaxProducers(str, true).intValue(), 7);
        this.admin.topics().setMaxProducers(str, 1000);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.topics().getMaxProducers(str));
        });
        Assert.assertEquals(this.admin.topics().getMaxProducers(str, true).intValue(), 1000);
        this.admin.namespaces().removeMaxProducersPerTopic("my-tenant/my-namespace");
        this.admin.topics().removeMaxProducers(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.namespaces().getMaxProducersPerTopic("my-tenant/my-namespace"));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getMaxProducers(str));
        });
        Assert.assertEquals(this.admin.topics().getMaxProducers(str, true).intValue(), this.conf.getMaxProducersPerTopic());
    }

    @Test
    public void testSetMaxProducers() throws Exception {
        Integer num = 2;
        log.info("MaxProducers: {} will set to the topic: {}", num, "persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        this.admin.topics().setMaxProducers("persistent://my-tenant/my-namespace/test-set-persistence", num.intValue());
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getMaxProducers("persistent://my-tenant/my-namespace/test-set-persistence"), num);
        });
        this.admin.topics().createPartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", 2);
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-tenant/my-namespace/test-set-persistence").create();
        Producer create2 = this.pulsarClient.newProducer().topic("persistent://my-tenant/my-namespace/test-set-persistence").create();
        Producer producer = null;
        try {
            producer = this.pulsarClient.newProducer().topic("persistent://my-tenant/my-namespace/test-set-persistence").create();
            Assert.fail();
        } catch (PulsarClientException e) {
            log.info("Topic reached max producers limit on topic level.");
        }
        Assert.assertNotNull(create);
        Assert.assertNotNull(create2);
        Assert.assertNull(producer);
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", true);
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testRemoveMaxProducers() throws Exception {
        Integer num = 2;
        log.info("MaxProducers: {} will set to the topic: {}", num, "persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        this.admin.topics().setMaxProducers("persistent://my-tenant/my-namespace/test-set-persistence", num.intValue());
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getMaxProducers("persistent://my-tenant/my-namespace/test-set-persistence"), num);
        });
        this.admin.topics().createPartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", 2);
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-tenant/my-namespace/test-set-persistence").create();
        Producer create2 = this.pulsarClient.newProducer().topic("persistent://my-tenant/my-namespace/test-set-persistence").create();
        Producer producer = null;
        Producer producer2 = null;
        try {
            producer = this.pulsarClient.newProducer().topic("persistent://my-tenant/my-namespace/test-set-persistence").create();
            Assert.fail();
        } catch (PulsarClientException e) {
            log.info("Topic reached max producers limit on topic level.");
        }
        Assert.assertNotNull(create);
        Assert.assertNotNull(create2);
        Assert.assertNull(producer);
        this.admin.topics().removeMaxProducers("persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getMaxProducers("persistent://my-tenant/my-namespace/test-set-persistence"));
        });
        Producer create3 = this.pulsarClient.newProducer().topic("persistent://my-tenant/my-namespace/test-set-persistence").create();
        Assert.assertNotNull(create3);
        this.admin.namespaces().setMaxProducersPerTopic("my-tenant/my-namespace", 3);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.namespaces().getMaxProducersPerTopic("my-tenant/my-namespace").intValue(), 3);
        });
        log.info("MaxProducers: {} will set to the namespace: {}", 3, "my-tenant/my-namespace");
        try {
            producer2 = this.pulsarClient.newProducer().topic("persistent://my-tenant/my-namespace/test-set-persistence").create();
            Assert.fail();
        } catch (PulsarClientException e2) {
            log.info("Topic reached max producers limit on namespace level.");
        }
        Assert.assertNull(producer2);
        create.close();
        create2.close();
        create3.close();
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", true);
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testGetSetDispatchRate() throws Exception {
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(100).dispatchThrottlingRateInByte(1000L).ratePeriodInSecond(1).relativeToPublishRate(true).build();
        log.info("Dispatch Rate: {} will set to the topic: {}", build, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        this.admin.topics().setDispatchRate("persistent://my-tenant/my-namespace/test-set-backlog-quota", build);
        log.info("Dispatch Rate set success on topic: {}", "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getDispatchRate("persistent://my-tenant/my-namespace/test-set-backlog-quota"), build);
        });
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testRemoveDispatchRate() throws Exception {
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(100).dispatchThrottlingRateInByte(1000L).ratePeriodInSecond(1).relativeToPublishRate(true).build();
        log.info("Dispatch Rate: {} will set to the topic: {}", build, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        this.admin.topics().setDispatchRate("persistent://my-tenant/my-namespace/test-set-backlog-quota", build);
        log.info("Dispatch Rate set success on topic: {}", "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getDispatchRate("persistent://my-tenant/my-namespace/test-set-backlog-quota"), build);
        });
        this.admin.topics().removeDispatchRate("persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getDispatchRate("persistent://my-tenant/my-namespace/test-set-backlog-quota"));
        });
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test(timeOut = 20000)
    public void testPolicyOverwrittenByNamespaceLevel() throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.admin.topics().createNonPartitionedTopic(str);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        this.admin.namespaces().setDispatchRate("my-tenant/my-namespace", DispatchRate.builder().dispatchThrottlingRateInMsg(200).dispatchThrottlingRateInByte(20000L).ratePeriodInSecond(1).relativeToPublishRate(true).build());
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.namespaces().getDispatchRate("my-tenant/my-namespace").getDispatchThrottlingRateInMsg(), 200);
        });
        this.admin.topics().setDispatchRate(str, DispatchRate.builder().dispatchThrottlingRateInMsg(100).dispatchThrottlingRateInByte(10000L).ratePeriodInSecond(1).relativeToPublishRate(true).build());
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.topics().getDispatchRate(str));
        });
        this.admin.namespaces().setDispatchRate("my-tenant/my-namespace", DispatchRate.builder().dispatchThrottlingRateInMsg(300).dispatchThrottlingRateInByte(30000L).ratePeriodInSecond(2).relativeToPublishRate(true).build());
        Awaitility.await().untilAsserted(() -> {
            DispatchRateLimiter dispatchRateLimiter = (DispatchRateLimiter) ((Topic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get()).getDispatchRateLimiter().get();
            Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), 10000L);
            Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), 100L);
        });
        this.admin.topics().removeDispatchRate(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getDispatchRate(str));
        });
        Awaitility.await().untilAsserted(() -> {
            DispatchRateLimiter dispatchRateLimiter = (DispatchRateLimiter) ((Topic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get()).getDispatchRateLimiter().get();
            Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), 30000L);
            Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), 300L);
        });
    }

    @Test(timeOut = 20000)
    public void testRestart() throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.admin.topics().createNonPartitionedTopic(str);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        this.admin.namespaces().setInactiveTopicPolicies("my-tenant/my-namespace", new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, 100, true));
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.namespaces().getInactiveTopicPolicies("my-tenant/my-namespace").getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
        });
        InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 200, false);
        this.admin.topics().setInactiveTopicPolicies(str, inactiveTopicPolicies);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.topics().getInactiveTopicPolicies(str));
        });
        restartBroker();
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get()).getInactiveTopicPolicies(), inactiveTopicPolicies);
        });
        create.close();
    }

    @Test
    public void testGetSetSubscriptionDispatchRate() throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.admin.topics().createNonPartitionedTopic(str);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(1000).dispatchThrottlingRateInByte(1048576L).ratePeriodInSecond(1).build();
        log.info("Subscription Dispatch Rate: {} will set to the topic: {}", build, str);
        this.admin.topics().setSubscriptionDispatchRate(str, build);
        log.info("Subscription dispatch rate set success on topic: {}", str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getSubscriptionDispatchRate(str), build);
        });
        Consumer subscribe = this.pulsarClient.newConsumer().subscriptionName("test_subscription_rate").topic(new String[]{str}).subscribe();
        DispatchRateLimiter dispatchRateLimiter = (DispatchRateLimiter) ((Topic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get()).getSubscription("test_subscription_rate").getDispatcher().getRateLimiter().get();
        Assert.assertNotNull(dispatchRateLimiter);
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), build.getDispatchThrottlingRateInByte());
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), build.getDispatchThrottlingRateInMsg());
        subscribe.close();
        this.admin.topics().delete(str, true);
    }

    @Test
    public void testGetSetSubscriptionDispatchRateAfterTopicLoaded() throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.admin.topics().createNonPartitionedTopic(str);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(1000).dispatchThrottlingRateInByte(1048576L).ratePeriodInSecond(1).build();
        log.info("Subscription Dispatch Rate: {} will set to the topic: {}", build, str);
        Consumer subscribe = this.pulsarClient.newConsumer().subscriptionName("test_subscription_rate").topic(new String[]{str}).subscribe();
        this.admin.topics().setSubscriptionDispatchRate(str, build);
        log.info("Subscription dispatch rate set success on topic: {}", str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getSubscriptionDispatchRate(str), build);
        });
        DispatchRateLimiter dispatchRateLimiter = (DispatchRateLimiter) ((Topic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get()).getSubscription("test_subscription_rate").getDispatcher().getRateLimiter().get();
        Assert.assertNotNull(dispatchRateLimiter);
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), build.getDispatchThrottlingRateInByte());
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), build.getDispatchThrottlingRateInMsg());
        subscribe.close();
        this.admin.topics().delete(str, true);
    }

    @Test
    public void testRemoveSubscriptionDispatchRate() throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.admin.topics().createNonPartitionedTopic(str);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(1000).dispatchThrottlingRateInByte(1048576L).ratePeriodInSecond(1).build();
        log.info("Subscription Dispatch Rate: {} will set to the topic: {}", build, str);
        this.admin.topics().setSubscriptionDispatchRate(str, build);
        log.info("Subscription dispatch rate set success on topic: {}", str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getSubscriptionDispatchRate(str), build);
        });
        Consumer subscribe = this.pulsarClient.newConsumer().subscriptionName("test_subscription_rate").topic(new String[]{str}).subscribe();
        DispatchRateLimiter dispatchRateLimiter = (DispatchRateLimiter) ((Topic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get()).getSubscription("test_subscription_rate").getDispatcher().getRateLimiter().get();
        Assert.assertNotNull(dispatchRateLimiter);
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), build.getDispatchThrottlingRateInByte());
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), build.getDispatchThrottlingRateInMsg());
        this.admin.topics().removeSubscriptionDispatchRate(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getSubscriptionDispatchRate(str));
        });
        DispatchRateLimiter dispatchRateLimiter2 = (DispatchRateLimiter) ((Topic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get()).getSubscription("test_subscription_rate").getDispatcher().getRateLimiter().get();
        Assert.assertNotEquals(Long.valueOf(dispatchRateLimiter2.getDispatchRateOnMsg()), Long.valueOf(build.getDispatchThrottlingRateInByte()));
        Assert.assertNotEquals(Long.valueOf(dispatchRateLimiter2.getDispatchRateOnByte()), Long.valueOf(build.getDispatchThrottlingRateInByte()));
        subscribe.close();
        this.admin.topics().delete(str, true);
    }

    @Test
    public void testSubscriptionDispatchRatePolicyOverwrittenNamespaceLevel() throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.admin.topics().createNonPartitionedTopic(str);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(100).dispatchThrottlingRateInByte(1048576L).ratePeriodInSecond(1).build();
        this.admin.namespaces().setSubscriptionDispatchRate("my-tenant/my-namespace", build);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.namespaces().getSubscriptionDispatchRate("my-tenant/my-namespace"), build);
        });
        Consumer subscribe = this.pulsarClient.newConsumer().subscriptionName("test_subscription_rate").topic(new String[]{str}).subscribe();
        DispatchRateLimiter dispatchRateLimiter = (DispatchRateLimiter) ((Topic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get()).getSubscription("test_subscription_rate").getDispatcher().getRateLimiter().get();
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), build.getDispatchThrottlingRateInMsg());
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), build.getDispatchThrottlingRateInByte());
        DispatchRate build2 = DispatchRate.builder().dispatchThrottlingRateInMsg(200).dispatchThrottlingRateInByte(2097152L).ratePeriodInSecond(1).build();
        this.admin.topics().setSubscriptionDispatchRate(str, build2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getSubscriptionDispatchRate(str), build2);
        });
        DispatchRateLimiter dispatchRateLimiter2 = (DispatchRateLimiter) ((Topic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get()).getSubscription("test_subscription_rate").getDispatcher().getRateLimiter().get();
        Assert.assertEquals(dispatchRateLimiter2.getDispatchRateOnByte(), build2.getDispatchThrottlingRateInByte());
        Assert.assertEquals(dispatchRateLimiter2.getDispatchRateOnMsg(), build2.getDispatchThrottlingRateInMsg());
        this.admin.topics().removeSubscriptionDispatchRate(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getSubscriptionDispatchRate(str));
        });
        DispatchRateLimiter dispatchRateLimiter3 = (DispatchRateLimiter) ((Topic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get()).getSubscription("test_subscription_rate").getDispatcher().getRateLimiter().get();
        Assert.assertEquals(dispatchRateLimiter3.getDispatchRateOnByte(), build.getDispatchThrottlingRateInByte());
        Assert.assertEquals(dispatchRateLimiter3.getDispatchRateOnMsg(), build.getDispatchThrottlingRateInMsg());
        subscribe.close();
        this.admin.topics().delete(str, true);
    }

    @Test
    public void testGetSetCompactionThreshold() throws Exception {
        Long l = 100000L;
        log.info("Compaction threshold: {} will set to the topic: {}", l, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        this.admin.topics().setCompactionThreshold("persistent://my-tenant/my-namespace/test-set-backlog-quota", l.longValue());
        log.info("Compaction threshold set success on topic: {}", "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getCompactionThreshold("persistent://my-tenant/my-namespace/test-set-backlog-quota"), l);
        });
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testRemoveCompactionThreshold() throws Exception {
        Long l = 100000L;
        log.info("Compaction threshold: {} will set to the topic: {}", l, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        this.admin.topics().setCompactionThreshold("persistent://my-tenant/my-namespace/test-set-backlog-quota", l.longValue());
        log.info("Compaction threshold set success on topic: {}", "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getCompactionThreshold("persistent://my-tenant/my-namespace/test-set-backlog-quota"), l);
        });
        this.admin.topics().removeCompactionThreshold("persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getCompactionThreshold("persistent://my-tenant/my-namespace/test-set-backlog-quota"));
        });
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testGetSetMaxConsumersPerSubscription() throws Exception {
        Integer num = 10;
        log.info("MaxConsumersPerSubscription: {} will set to the topic: {}", num, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        this.admin.topics().setMaxConsumersPerSubscription("persistent://my-tenant/my-namespace/test-set-backlog-quota", num.intValue());
        log.info("MaxConsumersPerSubscription set success on topic: {}", "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getMaxConsumersPerSubscription("persistent://my-tenant/my-namespace/test-set-backlog-quota"), num);
        });
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testRemoveMaxConsumersPerSubscription() throws Exception {
        Integer num = 10;
        log.info("MaxConsumersPerSubscription: {} will set to the topic: {}", num, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        this.admin.topics().setMaxConsumersPerSubscription("persistent://my-tenant/my-namespace/test-set-backlog-quota", num.intValue());
        log.info("MaxConsumersPerSubscription set success on topic: {}", "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getMaxConsumersPerSubscription("persistent://my-tenant/my-namespace/test-set-backlog-quota"), num);
        });
        this.admin.topics().removeMaxConsumersPerSubscription("persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getMaxConsumersPerSubscription("persistent://my-tenant/my-namespace/test-set-backlog-quota"));
        });
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testGetSetPublishRate() throws Exception {
        PublishRate publishRate = new PublishRate(10000, 5242880L);
        log.info("Publish Rate: {} will set to the topic: {}", publishRate, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        this.admin.topics().setPublishRate("persistent://my-tenant/my-namespace/test-set-backlog-quota", publishRate);
        log.info("Publish Rate set success on topic: {}", "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getPublishRate("persistent://my-tenant/my-namespace/test-set-backlog-quota"), publishRate);
        });
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testRemovePublishRate() throws Exception {
        PublishRate publishRate = new PublishRate(10000, 5242880L);
        log.info("Publish Rate: {} will set to the topic: {}", publishRate, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        this.admin.topics().setPublishRate("persistent://my-tenant/my-namespace/test-set-backlog-quota", publishRate);
        log.info("Publish Rate set success on topic: {}", "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getPublishRate("persistent://my-tenant/my-namespace/test-set-backlog-quota"), publishRate);
        });
        this.admin.topics().removePublishRate("persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getPublishRate("persistent://my-tenant/my-namespace/test-set-backlog-quota"));
        });
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testCheckMaxConsumers() throws Exception {
        Integer num = new Integer(-1);
        log.info("MaxConsumers: {} will set to the topic: {}", num, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        try {
            this.admin.topics().setMaxConsumers("persistent://my-tenant/my-namespace/test-set-backlog-quota", num.intValue());
            Assert.fail();
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 412);
        }
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testGetMaxConsumersApplied() throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(str).create().close();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        Assert.assertNull(this.admin.topics().getMaxConsumers(str));
        Assert.assertNull(this.admin.namespaces().getMaxConsumersPerTopic("my-tenant/my-namespace"));
        Assert.assertEquals(this.admin.topics().getMaxConsumers(str, true).intValue(), this.conf.getMaxConsumersPerTopic());
        this.admin.namespaces().setMaxConsumersPerTopic("my-tenant/my-namespace", 7);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.namespaces().getMaxConsumersPerTopic("my-tenant/my-namespace"));
        });
        Assert.assertEquals(this.admin.topics().getMaxConsumers(str, true).intValue(), 7);
        this.admin.topics().setMaxConsumers(str, 1000);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.topics().getMaxConsumers(str));
        });
        Assert.assertEquals(this.admin.topics().getMaxConsumers(str, true).intValue(), 1000);
        this.admin.namespaces().removeMaxConsumersPerTopic("my-tenant/my-namespace");
        this.admin.topics().removeMaxConsumers(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.namespaces().getMaxConsumersPerTopic("my-tenant/my-namespace"));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getMaxConsumers(str));
        });
        Assert.assertEquals(this.admin.topics().getMaxConsumers(str, true).intValue(), this.conf.getMaxConsumersPerTopic());
    }

    @Test
    public void testSetMaxConsumers() throws Exception {
        this.admin.namespaces().setMaxConsumersPerTopic("my-tenant/my-namespace", 1);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.namespaces().getMaxConsumersPerTopic("my-tenant/my-namespace").intValue(), 1);
        });
        log.info("MaxConsumers: {} will set to the namespace: {}", 1, "my-tenant/my-namespace");
        Integer num = 2;
        log.info("MaxConsumers: {} will set to the topic: {}", num, "persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        this.admin.topics().setMaxConsumers("persistent://my-tenant/my-namespace/test-set-persistence", num.intValue());
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getMaxConsumers("persistent://my-tenant/my-namespace/test-set-persistence"), num);
        });
        this.admin.topics().createPartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", 2);
        Consumer subscribe = this.pulsarClient.newConsumer().subscriptionName("sub1").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).subscribe();
        Consumer subscribe2 = this.pulsarClient.newConsumer().subscriptionName("sub2").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).subscribe();
        Consumer consumer = null;
        try {
            consumer = this.pulsarClient.newConsumer().subscriptionName("sub3").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).subscribe();
            Assert.fail();
        } catch (PulsarClientException e) {
            log.info("Topic reached max consumers limit");
        }
        Assert.assertNotNull(subscribe);
        Assert.assertNotNull(subscribe2);
        Assert.assertNull(consumer);
        subscribe.close();
        subscribe2.close();
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", true);
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testRemoveMaxConsumers() throws Exception {
        Integer num = 2;
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        this.admin.topics().setMaxConsumers("persistent://my-tenant/my-namespace/test-set-persistence", num.intValue());
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getMaxConsumers("persistent://my-tenant/my-namespace/test-set-persistence"), num);
        });
        this.admin.topics().createPartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", 2);
        Consumer subscribe = this.pulsarClient.newConsumer().subscriptionName("sub1").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).subscribe();
        Consumer subscribe2 = this.pulsarClient.newConsumer().subscriptionName("sub2").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).subscribe();
        Consumer consumer = null;
        try {
            consumer = this.pulsarClient.newConsumer().subscriptionName("sub3").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).subscribe();
            Assert.fail();
        } catch (PulsarClientException e) {
            log.info("Topic reached max consumers limit");
        }
        Assert.assertNotNull(subscribe);
        Assert.assertNotNull(subscribe2);
        Assert.assertNull(consumer);
        this.admin.topics().removeMaxConsumers("persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getMaxConsumers("persistent://my-tenant/my-namespace/test-set-persistence"));
        });
        Consumer subscribe3 = this.pulsarClient.newConsumer().subscriptionName("sub3").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).subscribe();
        Assert.assertNotNull(subscribe3);
        this.admin.namespaces().setMaxConsumersPerTopic("my-tenant/my-namespace", 3);
        log.info("MaxConsumers: {} will set to the namespace: {}", 3, "my-tenant/my-namespace");
        Consumer consumer2 = null;
        try {
            consumer2 = this.pulsarClient.newConsumer().subscriptionName("sub4").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).subscribe();
            Assert.fail();
        } catch (PulsarClientException e2) {
            log.info("Topic reached max consumers limit on namespace level.");
        }
        Assert.assertNull(consumer2);
        subscribe.close();
        subscribe2.close();
        subscribe3.close();
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", true);
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testGetSetSubscribeRate() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", 2);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        SubscribeRate subscribeRate = new SubscribeRate(1, 30);
        log.info("Subscribe Rate: {} will be set to the namespace: {}", subscribeRate, "my-tenant/my-namespace");
        this.admin.namespaces().setSubscribeRate("my-tenant/my-namespace", subscribeRate);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.namespaces().getSubscribeRate("my-tenant/my-namespace"), subscribeRate);
        });
        SubscribeRate subscribeRate2 = new SubscribeRate(2, 30);
        log.info("Subscribe Rate: {} will set to the topic: {}", subscribeRate2, "persistent://my-tenant/my-namespace/test-set-persistence");
        this.admin.topics().setSubscribeRate("persistent://my-tenant/my-namespace/test-set-persistence", subscribeRate2);
        log.info("Subscribe Rate set success on topic: {}", "persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getSubscribeRate("persistent://my-tenant/my-namespace/test-set-persistence"), subscribeRate2);
        });
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        PulsarClient newPulsarClient2 = newPulsarClient(this.lookupUrl.toString(), 0);
        PulsarClient newPulsarClient3 = newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer subscribe = newPulsarClient.newConsumer().subscriptionName("sub1").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).consumerName("test").subscribe();
        Assert.assertNotNull(subscribe);
        subscribe.close();
        newPulsarClient.shutdown();
        Consumer subscribe2 = newPulsarClient2.newConsumer().subscriptionName("sub1").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).consumerName("test").subscribe();
        Assert.assertNotNull(subscribe2);
        subscribe2.close();
        newPulsarClient2.shutdown();
        Consumer consumer = null;
        try {
            consumer = newPulsarClient3.newConsumer().subscriptionName("sub1").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).consumerName("test").subscribe();
            Assert.fail();
        } catch (PulsarClientException e) {
            log.info("subscribe rate reached max subscribe rate limit");
        }
        Assert.assertNull(consumer);
        newPulsarClient3.shutdown();
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", true);
    }

    @Test(timeOut = 20000)
    public void testGetSubscribeRateApplied() throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(str).create().close();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        Assert.assertNull(this.admin.topics().getSubscribeRate(str));
        Assert.assertNull(this.admin.namespaces().getSubscribeRate("my-tenant/my-namespace"));
        SubscribeRate subscribeRate = new SubscribeRate(this.pulsar.getConfiguration().getSubscribeThrottlingRatePerConsumer(), this.pulsar.getConfiguration().getSubscribeRatePeriodPerConsumerInSecond());
        Assert.assertEquals(this.admin.topics().getSubscribeRate(str, true), subscribeRate);
        SubscribeRate subscribeRate2 = new SubscribeRate(10, 11);
        this.admin.namespaces().setSubscribeRate("my-tenant/my-namespace", subscribeRate2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.namespaces().getSubscribeRate("my-tenant/my-namespace"));
        });
        Assert.assertEquals(this.admin.topics().getSubscribeRate(str, true), subscribeRate2);
        SubscribeRate subscribeRate3 = new SubscribeRate(20, 21);
        this.admin.topics().setSubscribeRate(str, subscribeRate3);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.topics().getSubscribeRate(str));
        });
        Assert.assertEquals(this.admin.topics().getSubscribeRate(str, true), subscribeRate3);
        this.admin.namespaces().removeSubscribeRate("my-tenant/my-namespace");
        this.admin.topics().removeSubscribeRate(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.namespaces().getSubscribeRate("my-tenant/my-namespace"));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getSubscribeRate(str));
        });
        Assert.assertEquals(this.admin.topics().getSubscribeRate(str, true), subscribeRate);
    }

    @Test(timeOut = 30000)
    public void testPriorityAndDisableMaxConsumersOnSub() throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.conf.setMaxConsumersPerSubscription(1);
        this.pulsarClient.newProducer().topic(str).create().close();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        ArrayList arrayList = new ArrayList();
        ConsumerBuilder subscriptionName = this.pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Shared).topic(new String[]{str}).subscriptionName("my-sub");
        arrayList.add(subscriptionName.subscribe());
        try {
            subscriptionName.subscribe();
            Assert.fail("should fail");
        } catch (PulsarClientException e) {
        }
        this.admin.namespaces().setMaxConsumersPerSubscription("my-tenant/my-namespace", 2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.namespaces().getMaxConsumersPerSubscription("my-tenant/my-namespace"));
        });
        arrayList.add(subscriptionName.subscribe());
        try {
            subscriptionName.subscribe();
            Assert.fail("should fail");
        } catch (PulsarClientException e2) {
        }
        this.admin.namespaces().setMaxConsumersPerSubscription("my-tenant/my-namespace", 0);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.namespaces().getMaxConsumersPerSubscription("my-tenant/my-namespace").intValue(), 0);
        });
        arrayList.add(subscriptionName.subscribe());
        this.admin.topics().setMaxConsumersPerSubscription(str, 4);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.topics().getMaxConsumersPerSubscription(str));
        });
        arrayList.add(subscriptionName.subscribe());
        try {
            subscriptionName.subscribe();
            Assert.fail("should fail");
        } catch (PulsarClientException e3) {
        }
        this.admin.topics().removeMaxConsumersPerSubscription(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getMaxConsumersPerSubscription(str));
        });
        arrayList.add(subscriptionName.subscribe());
        this.admin.namespaces().removeMaxConsumersPerSubscription("my-tenant/my-namespace");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.namespaces().getMaxConsumersPerSubscription("my-tenant/my-namespace"));
        });
        try {
            subscriptionName.subscribe();
            Assert.fail("should fail");
        } catch (PulsarClientException e4) {
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Consumer) it.next()).close();
        }
    }

    @Test
    public void testRemoveSubscribeRate() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", 2);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-persistence")));
        });
        SubscribeRate subscribeRate = new SubscribeRate(2, 30);
        log.info("Subscribe Rate: {} will set to the topic: {}", subscribeRate, "persistent://my-tenant/my-namespace/test-set-persistence");
        this.admin.topics().setSubscribeRate("persistent://my-tenant/my-namespace/test-set-persistence", subscribeRate);
        log.info("Subscribe Rate set success on topic: {}", "persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getSubscribeRate("persistent://my-tenant/my-namespace/test-set-persistence"), subscribeRate);
        });
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        PulsarClient newPulsarClient2 = newPulsarClient(this.lookupUrl.toString(), 0);
        PulsarClient newPulsarClient3 = newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer subscribe = newPulsarClient.newConsumer().subscriptionName("sub1").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).consumerName("test").subscribe();
        Assert.assertNotNull(subscribe);
        subscribe.close();
        newPulsarClient.shutdown();
        Consumer subscribe2 = newPulsarClient2.newConsumer().subscriptionName("sub1").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).consumerName("test").subscribe();
        Assert.assertNotNull(subscribe2);
        subscribe2.close();
        newPulsarClient2.shutdown();
        Consumer consumer = null;
        try {
            consumer = newPulsarClient3.newConsumer().subscriptionName("sub1").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).consumerName("test").subscribe();
            Assert.fail();
        } catch (PulsarClientException e) {
            log.info("subscribe rate reached max subscribe rate limit");
        }
        Assert.assertNull(consumer);
        this.admin.topics().removeSubscribeRate("persistent://my-tenant/my-namespace/test-set-persistence");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getSubscribeRate("persistent://my-tenant/my-namespace/test-set-persistence"));
        });
        this.admin.topics().unload("persistent://my-tenant/my-namespace/test-set-persistence");
        PulsarClient newPulsarClient4 = newPulsarClient(this.lookupUrl.toString(), 0);
        PulsarClient newPulsarClient5 = newPulsarClient(this.lookupUrl.toString(), 0);
        PulsarClient newPulsarClient6 = newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer subscribe3 = newPulsarClient3.newConsumer().subscriptionName("sub2").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).consumerName("test").subscribe();
        Assert.assertNotNull(subscribe3);
        subscribe3.close();
        newPulsarClient3.shutdown();
        Consumer subscribe4 = newPulsarClient4.newConsumer().subscriptionName("sub2").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).consumerName("test").subscribe();
        Assert.assertNotNull(subscribe4);
        subscribe4.close();
        newPulsarClient4.shutdown();
        Consumer subscribe5 = newPulsarClient5.newConsumer().subscriptionName("sub2").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).consumerName("test").subscribe();
        Assert.assertNotNull(subscribe5);
        subscribe5.close();
        newPulsarClient5.shutdown();
        Consumer subscribe6 = newPulsarClient6.newConsumer().subscriptionName("sub2").topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).consumerName("test").subscribe();
        Assert.assertNotNull(subscribe6);
        subscribe6.close();
        newPulsarClient6.shutdown();
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-persistence", true);
        this.admin.topics().deletePartitionedTopic("persistent://my-tenant/my-namespace/test-set-backlog-quota", true);
    }

    @Test
    public void testPublishRateInDifferentLevelPolicy() throws Exception {
        cleanup();
        this.conf.setMaxPublishRatePerTopicInMessages(5);
        this.conf.setMaxPublishRatePerTopicInBytes(50L);
        setup();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        String str = "persistent://my-tenant/my-namespace/test-" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(str).create().close();
        Field declaredField = PublishRateLimiterImpl.class.getDeclaredField("publishMaxMessageRate");
        declaredField.setAccessible(true);
        Field declaredField2 = PublishRateLimiterImpl.class.getDeclaredField("publishMaxByteRate");
        declaredField2.setAccessible(true);
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get();
        PublishRateLimiterImpl topicPublishRateLimiter = persistentTopic.getTopicPublishRateLimiter();
        Assert.assertEquals(declaredField.get(topicPublishRateLimiter), 5);
        Assert.assertEquals(declaredField2.get(topicPublishRateLimiter), 50L);
        this.admin.namespaces().setPublishRate("my-tenant/my-namespace", new PublishRate(10, 100L));
        Awaitility.await().until(() -> {
            return Boolean.valueOf(((Integer) declaredField.get(persistentTopic.getTopicPublishRateLimiter())).intValue() == 10);
        });
        PublishRateLimiterImpl topicPublishRateLimiter2 = persistentTopic.getTopicPublishRateLimiter();
        Assert.assertEquals(declaredField.get(topicPublishRateLimiter2), 10);
        Assert.assertEquals(declaredField2.get(topicPublishRateLimiter2), 100L);
        this.admin.topics().setPublishRate(str, new PublishRate(11, 101L));
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.admin.topics().getPublishRate(str) != null);
        });
        PublishRateLimiterImpl topicPublishRateLimiter3 = persistentTopic.getTopicPublishRateLimiter();
        Assert.assertEquals(declaredField.get(topicPublishRateLimiter3), 11);
        Assert.assertEquals(declaredField2.get(topicPublishRateLimiter3), 101L);
        this.admin.topics().removePublishRate(str);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.admin.topics().getPublishRate(str) == null);
        });
        PublishRateLimiterImpl topicPublishRateLimiter4 = persistentTopic.getTopicPublishRateLimiter();
        Assert.assertEquals(declaredField.get(topicPublishRateLimiter4), 10);
        Assert.assertEquals(declaredField2.get(topicPublishRateLimiter4), 100L);
        this.admin.namespaces().removePublishRate("my-tenant/my-namespace");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(((Integer) declaredField.get(persistentTopic.getTopicPublishRateLimiter())).intValue() == 5);
        });
        PublishRateLimiterImpl topicPublishRateLimiter5 = persistentTopic.getTopicPublishRateLimiter();
        Assert.assertEquals(declaredField.get(topicPublishRateLimiter5), 5);
        Assert.assertEquals(declaredField2.get(topicPublishRateLimiter5), 50L);
    }

    @Test(timeOut = 20000)
    public void testTopicMaxMessageSizeApi() throws Exception {
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-persistence")));
        });
        Assert.assertNull(this.admin.topics().getMaxMessageSize("persistent://my-tenant/my-namespace/test-set-persistence"));
        this.admin.topics().setMaxMessageSize("persistent://my-tenant/my-namespace/test-set-persistence", 10);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get("persistent://my-tenant/my-namespace/test-set-persistence")) != null);
        });
        Assert.assertEquals(this.admin.topics().getMaxMessageSize("persistent://my-tenant/my-namespace/test-set-persistence").intValue(), 10);
        this.admin.topics().removeMaxMessageSize("persistent://my-tenant/my-namespace/test-set-persistence");
        Assert.assertNull(this.admin.topics().getMaxMessageSize("persistent://my-tenant/my-namespace/test-set-persistence"));
        try {
            this.admin.topics().setMaxMessageSize("persistent://my-tenant/my-namespace/test-set-persistence", Integer.MAX_VALUE);
            Assert.fail("should fail");
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 412);
        }
        try {
            this.admin.topics().setMaxMessageSize("persistent://my-tenant/my-namespace/test-set-persistence", -1);
            Assert.fail("should fail");
        } catch (PulsarAdminException e2) {
            Assert.assertEquals(e2.getStatusCode(), 412);
        }
    }

    @Test(timeOut = 20000)
    public void testTopicMaxMessageSize() throws Exception {
        doTestTopicMaxMessageSize(true);
        doTestTopicMaxMessageSize(false);
    }

    private void doTestTopicMaxMessageSize(boolean z) throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-" + UUID.randomUUID();
        if (z) {
            this.admin.topics().createPartitionedTopic(str, 3);
        }
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        Assert.assertNull(this.admin.topics().getMaxMessageSize(str));
        this.admin.topics().setMaxMessageSize(str, 10);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(str)) != null);
        });
        Assert.assertEquals(this.admin.topics().getMaxMessageSize(str).intValue(), 10);
        try {
            create.send(new byte[1024]);
        } catch (PulsarClientException e) {
            Assert.assertTrue(e instanceof PulsarClientException.NotAllowedException);
        }
        this.admin.topics().removeMaxMessageSize(str);
        Assert.assertNull(this.admin.topics().getMaxMessageSize(str));
        try {
            this.admin.topics().setMaxMessageSize(str, Integer.MAX_VALUE);
            Assert.fail("should fail");
        } catch (PulsarAdminException e2) {
            Assert.assertEquals(e2.getStatusCode(), 412);
        }
        try {
            this.admin.topics().setMaxMessageSize(str, -1);
            Assert.fail("should fail");
        } catch (PulsarAdminException e3) {
            Assert.assertEquals(e3.getStatusCode(), 412);
        }
        Assert.assertNotNull(create.send(new byte[1024]));
        create.close();
    }

    @Test(timeOut = 20000)
    public void testMaxSubscriptionsFailFast() throws Exception {
        doTestMaxSubscriptionsFailFast(SubscriptionMode.Durable);
        doTestMaxSubscriptionsFailFast(SubscriptionMode.NonDurable);
    }

    private void doTestMaxSubscriptionsFailFast(SubscriptionMode subscriptionMode) throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(str).create().close();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        ArrayList arrayList = new ArrayList();
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().subscriptionMode(subscriptionMode).subscriptionType(SubscriptionType.Shared).topic(new String[]{str});
        this.admin.namespaces().setMaxSubscriptionsPerTopic("my-tenant/my-namespace", 2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.namespaces().getMaxSubscriptionsPerTopic("my-tenant/my-namespace"));
        });
        for (int i = 0; i < 2; i++) {
            arrayList.add(consumerBuilder.subscriptionName("sub" + i).subscribe());
        }
        long currentTimeMillis = System.currentTimeMillis();
        try {
            consumerBuilder.subscriptionName("sub").subscribe();
            Assert.fail("should fail");
        } catch (PulsarClientException e) {
            Assert.assertTrue(e instanceof PulsarClientException.NotAllowedException);
        }
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis < 3000);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Consumer) it.next()).close();
        }
    }

    @Test(timeOut = 20000)
    public void testMaxSubscriptionsPerTopicApi() throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(str).create().close();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        Assert.assertNull(this.admin.topics().getMaxSubscriptionsPerTopic(str));
        this.admin.topics().setMaxSubscriptionsPerTopic(str, 10);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(str)) != null);
        });
        Assert.assertEquals(this.admin.topics().getMaxSubscriptionsPerTopic(str).intValue(), 10);
        this.admin.topics().removeMaxSubscriptionsPerTopic(str);
        Assert.assertNull(this.admin.topics().getMaxSubscriptionsPerTopic(str));
        try {
            this.admin.topics().setMaxMessageSize(str, -1);
            Assert.fail("should fail");
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 412);
        }
    }

    @Test(timeOut = 20000)
    public void testMaxSubscriptionsPerTopicWithExistingSubs() throws Exception {
        PulsarClient build;
        Throwable th;
        String str = "persistent://my-tenant/my-namespace/test-" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(str).create().close();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        this.admin.topics().setMaxSubscriptionsPerTopic(str, 2);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(str)) != null);
        });
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2; i++) {
            arrayList.add(this.pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Shared).subscriptionName("my-sub-" + i).topic(new String[]{str}).subscribe());
        }
        try {
            build = PulsarClient.builder().operationTimeout(2, TimeUnit.SECONDS).serviceUrl(this.brokerUrl.toString()).build();
            th = null;
        } catch (PulsarClientException e) {
            Assert.assertEquals(arrayList.size(), 2);
        }
        try {
            try {
                arrayList.add(build.newConsumer(Schema.STRING).subscriptionName(UUID.randomUUID().toString()).topic(new String[]{str}).subscribe());
                Assert.fail("should fail");
                if (build != null) {
                    if (0 != 0) {
                        try {
                            build.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        build.close();
                    }
                }
                this.pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Shared).subscriptionName("my-sub-0").topic(new String[]{str}).subscribe().close();
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    ((Consumer) it.next()).close();
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testMaxUnackedMessagesOnSubscriptionPriority() throws Exception {
        cleanup();
        this.conf.setMaxUnackedMessagesPerSubscription(30);
        setup();
        String str = "persistent://my-tenant/my-namespace/test-" + UUID.randomUUID();
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        try {
            Awaitility.await().until(() -> {
                return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
            });
            Assert.assertNull(this.admin.namespaces().getMaxUnackedMessagesPerSubscription("my-tenant/my-namespace"));
            int i = 5;
            int maxUnackedMessagesPerSubscription = this.pulsar.getConfiguration().getMaxUnackedMessagesPerSubscription();
            produceMsg(create, 100);
            this.admin.namespaces().setMaxUnackedMessagesPerSubscription("my-tenant/my-namespace", 5);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(this.admin.namespaces().getMaxUnackedMessagesPerSubscription("my-tenant/my-namespace").intValue(), i);
            });
            Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("sub-" + UUID.randomUUID()).receiverQueueSize(1).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionType(SubscriptionType.Shared).subscribe();
            try {
                List<Message<?>> msgReceived = getMsgReceived(subscribe, 100);
                Assert.assertEquals(msgReceived.size(), 5);
                ackMessages(subscribe, msgReceived);
                this.admin.namespaces().setMaxUnackedMessagesPerSubscription("my-tenant/my-namespace", 0);
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertEquals(this.admin.namespaces().getMaxUnackedMessagesPerSubscription("my-tenant/my-namespace").intValue(), 0);
                });
                List<Message<?>> msgReceived2 = getMsgReceived(subscribe, 40);
                Assert.assertEquals(msgReceived2.size(), 40);
                ackMessages(subscribe, msgReceived2);
                this.admin.namespaces().setMaxUnackedMessagesPerSubscription("my-tenant/my-namespace", 5);
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertEquals(this.admin.namespaces().getMaxUnackedMessagesPerSubscription("my-tenant/my-namespace").intValue(), i);
                });
                this.admin.topics().setMaxUnackedMessagesOnSubscription(str, 10);
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertNotNull(this.admin.topics().getMaxUnackedMessagesOnSubscription(str));
                });
                Assert.assertEquals(((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get()).getMaxUnackedMessagesOnSubscription(), 10);
                List<Message<?>> msgReceived3 = getMsgReceived(subscribe, Integer.MAX_VALUE);
                Assert.assertEquals(msgReceived3.size(), 10);
                ackMessages(subscribe, msgReceived3);
                this.admin.namespaces().removeMaxUnackedMessagesPerSubscription("my-tenant/my-namespace");
                this.admin.topics().removeMaxUnackedMessagesOnSubscription(str);
                Awaitility.await().until(() -> {
                    return Boolean.valueOf(this.admin.namespaces().getMaxUnackedMessagesPerSubscription("my-tenant/my-namespace") == null && this.admin.topics().getMaxUnackedMessagesOnSubscription(str) == null);
                });
                Assert.assertEquals(getMsgReceived(subscribe, Integer.MAX_VALUE).size(), maxUnackedMessagesPerSubscription);
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    private void produceMsg(Producer producer, int i) throws Exception {
        for (int i2 = 0; i2 < i; i2++) {
            producer.send("msg".getBytes());
        }
    }

    private List<Message<?>> getMsgReceived(Consumer<byte[]> consumer, int i) throws PulsarClientException {
        Message receive;
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i && (receive = consumer.receive(1000, TimeUnit.MILLISECONDS)) != null; i2++) {
            arrayList.add(receive);
        }
        return arrayList;
    }

    private void ackMessages(Consumer<?> consumer, List<Message<?>> list) throws Exception {
        Iterator<Message<?>> it = list.iterator();
        while (it.hasNext()) {
            consumer.acknowledge(it.next());
        }
    }

    @Test(timeOut = 20000)
    public void testMaxSubscriptionsPerTopic() throws Exception {
        PulsarClient build;
        this.conf.setMaxSubscriptionsPerTopic(4);
        restartBroker();
        String str = "persistent://my-tenant/my-namespace/test-" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(str).create().close();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        this.admin.topics().setMaxSubscriptionsPerTopic(str, 2);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(str)) != null);
        });
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2; i++) {
            arrayList.add(this.pulsarClient.newConsumer(Schema.STRING).subscriptionName(UUID.randomUUID().toString()).topic(new String[]{str}).subscribe());
        }
        try {
            PulsarClient build2 = PulsarClient.builder().operationTimeout(2, TimeUnit.SECONDS).serviceUrl(this.brokerUrl.toString()).build();
            Throwable th = null;
            try {
                arrayList.add(build2.newConsumer(Schema.STRING).subscriptionName(UUID.randomUUID().toString()).topic(new String[]{str}).subscribe());
                Assert.fail("should fail");
                if (build2 != null) {
                    if (0 != 0) {
                        try {
                            build2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        build2.close();
                    }
                }
            } finally {
            }
        } catch (PulsarClientException e) {
            Assert.assertEquals(arrayList.size(), 2);
        }
        this.admin.namespaces().setMaxSubscriptionsPerTopic("my-tenant/my-namespace", 3);
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get();
        Field declaredField = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic");
        declaredField.setAccessible(true);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(((Integer) declaredField.get(persistentTopic)).intValue() == 3);
        });
        try {
            build = PulsarClient.builder().operationTimeout(1000, TimeUnit.MILLISECONDS).serviceUrl(this.brokerUrl.toString()).build();
            Throwable th3 = null;
            try {
                try {
                    build.newConsumer(Schema.STRING).subscriptionName(UUID.randomUUID().toString()).topic(new String[]{str}).subscribe();
                    Assert.fail("should fail");
                    if (build != null) {
                        if (0 != 0) {
                            try {
                                build.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            build.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (PulsarClientException e2) {
            Assert.assertEquals(arrayList.size(), 2);
        }
        this.admin.topics().removeMaxSubscriptionsPerTopic(str);
        arrayList.add(this.pulsarClient.newConsumer(Schema.STRING).subscriptionName(UUID.randomUUID().toString()).topic(new String[]{str}).subscribe());
        Assert.assertEquals(arrayList.size(), 3);
        try {
            PulsarClient build3 = PulsarClient.builder().operationTimeout(1000, TimeUnit.MILLISECONDS).serviceUrl(this.brokerUrl.toString()).build();
            Throwable th5 = null;
            try {
                try {
                    arrayList.add(build3.newConsumer(Schema.STRING).subscriptionName(UUID.randomUUID().toString()).topic(new String[]{str}).subscribe());
                    Assert.fail("should fail");
                    if (build3 != null) {
                        if (0 != 0) {
                            try {
                                build3.close();
                            } catch (Throwable th6) {
                                th5.addSuppressed(th6);
                            }
                        } else {
                            build3.close();
                        }
                    }
                } finally {
                }
            } finally {
                if (build3 != null) {
                    if (th5 != null) {
                        try {
                            build3.close();
                        } catch (Throwable th7) {
                            th5.addSuppressed(th7);
                        }
                    } else {
                        build3.close();
                    }
                }
            }
        } catch (PulsarClientException e3) {
            Assert.assertEquals(arrayList.size(), 3);
        }
        this.admin.namespaces().removeMaxSubscriptionsPerTopic("my-tenant/my-namespace");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(declaredField.get(persistentTopic) == null);
        });
        arrayList.add(this.pulsarClient.newConsumer(Schema.STRING).subscriptionName(UUID.randomUUID().toString()).topic(new String[]{str}).subscribe());
        Assert.assertEquals(arrayList.size(), 4);
        try {
            build = PulsarClient.builder().operationTimeout(1000, TimeUnit.MILLISECONDS).serviceUrl(this.brokerUrl.toString()).build();
            Throwable th8 = null;
            try {
                try {
                    arrayList.add(build.newConsumer(Schema.STRING).subscriptionName(UUID.randomUUID().toString()).topic(new String[]{str}).subscribe());
                    Assert.fail("should fail");
                    if (build != null) {
                        if (0 != 0) {
                            try {
                                build.close();
                            } catch (Throwable th9) {
                                th8.addSuppressed(th9);
                            }
                        } else {
                            build.close();
                        }
                    }
                } finally {
                }
            } finally {
                if (build != null) {
                    if (th8 != null) {
                        try {
                            build.close();
                        } catch (Throwable th10) {
                            th8.addSuppressed(th10);
                        }
                    } else {
                        build.close();
                    }
                }
            }
        } catch (PulsarClientException e4) {
            Assert.assertEquals(arrayList.size(), 4);
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Consumer) it.next()).close();
        }
    }

    @Test(timeOut = 30000)
    public void testReplicatorRateApi() throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(str).create().close();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        Assert.assertNull(this.admin.topics().getReplicatorDispatchRate(str));
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(100).dispatchThrottlingRateInByte(200L).ratePeriodInSecond(10).build();
        this.admin.topics().setReplicatorDispatchRate(str, build);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getReplicatorDispatchRate(str), build);
        });
        this.admin.topics().removeReplicatorDispatchRate(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getReplicatorDispatchRate(str));
        });
    }

    @Test(timeOut = 20000)
    public void testGetReplicatorRateApplied() throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(str).create().close();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        Assert.assertNull(this.admin.topics().getReplicatorDispatchRate(str));
        Assert.assertNull(this.admin.namespaces().getReplicatorDispatchRate("my-tenant/my-namespace"));
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(this.pulsar.getConfiguration().getDispatchThrottlingRatePerReplicatorInMsg()).dispatchThrottlingRateInByte(this.pulsar.getConfiguration().getDispatchThrottlingRatePerReplicatorInByte()).ratePeriodInSecond(1).build();
        Assert.assertEquals(this.admin.topics().getReplicatorDispatchRate(str, true), build);
        DispatchRate build2 = DispatchRate.builder().dispatchThrottlingRateInMsg(10).dispatchThrottlingRateInByte(11L).ratePeriodInSecond(12).build();
        this.admin.namespaces().setReplicatorDispatchRate("my-tenant/my-namespace", build2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.namespaces().getReplicatorDispatchRate("my-tenant/my-namespace"));
        });
        Assert.assertEquals(this.admin.topics().getReplicatorDispatchRate(str, true), build2);
        DispatchRate build3 = DispatchRate.builder().dispatchThrottlingRateInMsg(20).dispatchThrottlingRateInByte(21L).ratePeriodInSecond(22).build();
        this.admin.topics().setReplicatorDispatchRate(str, build3);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.topics().getReplicatorDispatchRate(str));
        });
        Assert.assertEquals(this.admin.topics().getReplicatorDispatchRate(str, true), build3);
        this.admin.namespaces().removeReplicatorDispatchRate("my-tenant/my-namespace");
        this.admin.topics().removeReplicatorDispatchRate(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.namespaces().getReplicatorDispatchRate("my-tenant/my-namespace"));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getReplicatorDispatchRate(str));
        });
        Assert.assertEquals(this.admin.topics().getReplicatorDispatchRate(str, true), build);
    }

    @Test(timeOut = 30000)
    public void testAutoCreationDisabled() throws Exception {
        cleanup();
        this.conf.setAllowAutoTopicCreation(false);
        setup();
        String str = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(str, 3);
        this.pulsarClient.newProducer().topic(str).create().close();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        Assert.assertNull(this.admin.topics().getMessageTTL(str));
    }

    @Test(timeOut = 30000)
    public void testSubscriptionTypesEnabled() throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-" + UUID.randomUUID();
        this.admin.topics().createNonPartitionedTopic(str);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("test").subscribe().close();
        Assert.assertNull(this.admin.topics().getSubscriptionTypesEnabled(str));
        HashSet hashSet = new HashSet();
        hashSet.add(SubscriptionType.Failover);
        this.admin.topics().setSubscriptionTypesEnabled(str, hashSet);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(str)) != null);
        });
        Set subscriptionTypesEnabled = this.admin.topics().getSubscriptionTypesEnabled(str);
        Assert.assertTrue(subscriptionTypesEnabled.contains(SubscriptionType.Failover));
        Assert.assertFalse(subscriptionTypesEnabled.contains(SubscriptionType.Shared));
        Assert.assertEquals(subscriptionTypesEnabled.size(), 1);
        try {
            this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionType(SubscriptionType.Shared).subscriptionName("test").subscribe();
            Assert.fail();
        } catch (PulsarClientException e) {
            Assert.assertTrue(e instanceof PulsarClientException.NotAllowedException);
        }
        subscriptionTypesEnabled.add(SubscriptionType.Shared);
        this.admin.topics().setSubscriptionTypesEnabled(str, subscriptionTypesEnabled);
        this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionType(SubscriptionType.Shared).subscriptionName("test").subscribe().close();
        subscriptionTypesEnabled.add(SubscriptionType.Shared);
        this.admin.namespaces().setSubscriptionTypesEnabled("my-tenant/my-namespace", subscriptionTypesEnabled);
        subscriptionTypesEnabled.clear();
        subscriptionTypesEnabled.add(SubscriptionType.Failover);
        this.admin.topics().setSubscriptionTypesEnabled(str, subscriptionTypesEnabled);
        try {
            this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionType(SubscriptionType.Shared).subscriptionName("test").subscribe();
            Assert.fail();
        } catch (PulsarClientException e2) {
            Assert.assertTrue(e2 instanceof PulsarClientException.NotAllowedException);
        }
    }

    @Test(timeOut = 20000)
    public void testNonPersistentMaxConsumerOnSub() throws Exception {
        int i = 2;
        int i2 = 3;
        this.conf.setMaxConsumersPerSubscription(1);
        String str = "non-persistent://my-tenant/my-namespace/test-" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(str, 3);
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().subscriptionType(SubscriptionType.Shared).subscriptionName("my-sub").topic(new String[]{str});
        Consumer subscribe = consumerBuilder.subscribe();
        try {
            consumerBuilder.subscribe();
            Assert.fail("should fail");
        } catch (PulsarClientException e) {
            Assert.assertTrue(e.getMessage().contains("reached max consumers limit"));
        }
        this.admin.namespaces().setMaxConsumersPerSubscription("my-tenant/my-namespace", 2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.namespaces().getMaxConsumersPerSubscription("my-tenant/my-namespace"));
            Assert.assertEquals(this.admin.namespaces().getMaxConsumersPerSubscription("my-tenant/my-namespace").intValue(), i);
        });
        Consumer subscribe2 = consumerBuilder.subscribe();
        try {
            consumerBuilder.subscribe();
            Assert.fail("should fail");
        } catch (PulsarClientException e2) {
            Assert.assertTrue(e2.getMessage().contains("reached max consumers limit"));
        }
        this.admin.topics().setMaxConsumersPerSubscription(str, 3);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.topics().getMaxConsumersPerSubscription(str));
            Assert.assertEquals(this.admin.topics().getMaxConsumersPerSubscription(str).intValue(), i2);
        });
        Consumer subscribe3 = consumerBuilder.subscribe();
        try {
            consumerBuilder.subscribe();
            Assert.fail("should fail");
        } catch (PulsarClientException e3) {
            Assert.assertTrue(e3.getMessage().contains("reached max consumers limit"));
        }
        subscribe.close();
        subscribe2.close();
        subscribe3.close();
        create.close();
    }

    @Test(timeOut = 20000)
    public void testGetCompactionThresholdApplied() throws Exception {
        String str = "persistent://my-tenant/my-namespace/test-set-backlog-quota" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(str).create().close();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(str)));
        });
        Assert.assertNull(this.admin.topics().getCompactionThreshold(str));
        Assert.assertNull(this.admin.namespaces().getCompactionThreshold("my-tenant/my-namespace"));
        long brokerServiceCompactionThresholdInBytes = this.pulsar.getConfiguration().getBrokerServiceCompactionThresholdInBytes();
        Assert.assertEquals(this.admin.topics().getCompactionThreshold(str, true).longValue(), brokerServiceCompactionThresholdInBytes);
        this.admin.namespaces().setCompactionThreshold("my-tenant/my-namespace", 10L);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.namespaces().getCompactionThreshold("my-tenant/my-namespace"));
        });
        Assert.assertEquals(this.admin.topics().getCompactionThreshold(str, true).longValue(), 10L);
        this.admin.topics().setCompactionThreshold(str, 20L);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.topics().getCompactionThreshold(str));
        });
        Assert.assertEquals(this.admin.topics().getCompactionThreshold(str, true).longValue(), 20L);
        this.admin.namespaces().removeCompactionThreshold("my-tenant/my-namespace");
        this.admin.topics().removeCompactionThreshold(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.namespaces().getCompactionThreshold("my-tenant/my-namespace"));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getCompactionThreshold(str));
        });
        Assert.assertEquals(this.admin.topics().getCompactionThreshold(str, true).longValue(), brokerServiceCompactionThresholdInBytes);
    }

    @Test(timeOut = 30000)
    public void testProduceConsumeOnTopicPolicy() {
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-tenant/my-namespace/test-set-persistence"}).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionName("test").subscribe();
            Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-tenant/my-namespace/test-set-persistence").create();
            for (int i = 0; i < 10; i++) {
                create.newMessage().value("send message " + i).send();
            }
            for (int i2 = 0; i2 < 10; i2++) {
                Assert.assertEquals((String) subscribe.receive(100, TimeUnit.MILLISECONDS).getValue(), "send message " + i2);
            }
        } catch (PulsarClientException e) {
            log.error("Failed to send/produce message, ", e);
            Assert.fail();
        }
    }

    @Test
    public void testSystemTopicShouldBeCompacted() throws Exception {
        BacklogQuota build = BacklogQuota.builder().limitSize(1024L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build();
        log.info("Backlog quota: {} will set to the topic: {}", build, "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get("persistent://my-tenant/my-namespace/test-set-backlog-quota")));
        });
        this.admin.topics().setBacklogQuota("persistent://my-tenant/my-namespace/test-set-backlog-quota", build);
        log.info("Backlog quota set success on topic: {}", "persistent://my-tenant/my-namespace/test-set-backlog-quota");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(this.admin.topics().getStats("persistent://my-tenant/my-namespace/__change_events").getSubscriptions().containsKey("__compaction"));
        });
        long j = this.admin.topics().getInternalStats("persistent://my-tenant/my-namespace/__change_events").compactedLedger.ledgerId;
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getBacklogQuotaMap("persistent://my-tenant/my-namespace/test-set-backlog-quota").get(BacklogQuota.BacklogQuotaType.destination_storage), build);
        });
        this.pulsar.getBrokerService().checkCompaction();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(this.admin.topics().getInternalStats("persistent://my-tenant/my-namespace/__change_events").compactedLedger.ledgerId != j);
        });
    }
}
