/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageDispatchThrottlingTest;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups={"flaky"})
public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchThrottlingTest {
    private static final Logger log = LoggerFactory.getLogger(SubscriptionMessageDispatchThrottlingTest.class);

    @Override
    @Test(dataProvider="subscriptionAndDispatchRateType", timeOut=5000L)
    public void testMessageRateLimitingNotReceiveAllMessages(SubscriptionType subscription, MessageDispatchThrottlingTest.DispatchRateType dispatchRateType) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_ns";
        String topicName = "persistent://my-property/throttling_ns/throttlingBlock";
        String subName = "my-subscriber-name";
        int messageRate = 100;
        DispatchRate dispatchRate = null;
        dispatchRate = MessageDispatchThrottlingTest.DispatchRateType.messageRate.equals((Object)dispatchRateType) ? DispatchRate.builder().dispatchThrottlingRateInMsg(100).dispatchThrottlingRateInByte(-1L).ratePeriodInSecond(360).build() : DispatchRate.builder().dispatchThrottlingRateInMsg(-1).dispatchThrottlingRateInByte(100L).ratePeriodInSecond(360).build();
        this.admin.namespaces().createNamespace("my-property/throttling_ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.namespaces().setSubscriptionDispatchRate("my-property/throttling_ns", dispatchRate);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingBlock").create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingBlock").get();
        this.conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
        AtomicInteger totalReceived = new AtomicInteger(0);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingBlock"}).subscriptionName("my-subscriber-name").subscriptionType(subscription).messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            log.debug("Received message [{}] in the listener", (Object)receivedMessage);
            totalReceived.incrementAndGet();
        }).subscribe();
        DispatchRateLimiter subRateLimiter = null;
        Dispatcher subDispatcher = topic.getSubscription("my-subscriber-name").getDispatcher();
        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
            subRateLimiter = (DispatchRateLimiter)subDispatcher.getRateLimiter().get();
        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
            subRateLimiter = (DispatchRateLimiter)subDispatcher.getRateLimiter().get();
        } else {
            Assert.fail((String)"Should only have PersistentDispatcher in this test");
        }
        boolean isMessageRateUpdate = false;
        int retry = 5;
        for (int i = 0; i < retry; ++i) {
            if (subRateLimiter.getDispatchRateOnMsg() > 0L || subRateLimiter.getDispatchRateOnByte() > 0L) {
                isMessageRateUpdate = true;
                break;
            }
            if (i == retry - 1) continue;
            Thread.sleep(100L);
        }
        Assert.assertTrue((boolean)isMessageRateUpdate);
        Assert.assertEquals((Object)this.admin.namespaces().getSubscriptionDispatchRate("my-property/throttling_ns"), (Object)dispatchRate);
        int numMessages = 500;
        for (int i = 0; i < numMessages; ++i) {
            producer.send((Object)new byte[80]);
        }
        Assert.assertTrue((totalReceived.get() < 200 ? 1 : 0) != 0);
        consumer.close();
        producer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Override
    @Test(dataProvider="subscriptions", timeOut=5000L)
    public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType subscription) throws Exception {
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_ns";
        String topicName = "persistent://my-property/throttling_ns/throttlingAll";
        String subName = "my-subscriber-name";
        int messageRate = 10;
        DispatchRate dispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(10).dispatchThrottlingRateInByte(-1L).ratePeriodInSecond(1).build();
        this.admin.namespaces().createNamespace("my-property/throttling_ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.namespaces().setSubscriptionDispatchRate("my-property/throttling_ns", dispatchRate);
        int numProducedMessages = 30;
        CountDownLatch latch = new CountDownLatch(30);
        AtomicInteger totalReceived = new AtomicInteger(0);
        this.conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingAll"}).subscriptionName("my-subscriber-name").subscriptionType(subscription).messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            log.debug("Received message [{}] in the listener", (Object)receivedMessage);
            totalReceived.incrementAndGet();
            latch.countDown();
        }).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingAll").create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingAll").get();
        DispatchRateLimiter subRateLimiter = null;
        Dispatcher subDispatcher = topic.getSubscription("my-subscriber-name").getDispatcher();
        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
            subRateLimiter = (DispatchRateLimiter)subDispatcher.getRateLimiter().get();
        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
            subRateLimiter = (DispatchRateLimiter)subDispatcher.getRateLimiter().get();
        } else {
            Assert.fail((String)"Should only have PersistentDispatcher in this test");
        }
        boolean isMessageRateUpdate = false;
        int retry = 5;
        for (i = 0; i < retry; ++i) {
            if (subRateLimiter.getDispatchRateOnMsg() > 0L || subRateLimiter.getDispatchRateOnByte() > 0L) {
                isMessageRateUpdate = true;
                break;
            }
            if (i == retry - 1) continue;
            Thread.sleep(100L);
        }
        Assert.assertTrue((boolean)isMessageRateUpdate);
        Assert.assertEquals((Object)this.admin.namespaces().getSubscriptionDispatchRate("my-property/throttling_ns"), (Object)dispatchRate);
        for (i = 0; i < 30; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Awaitility.await().until(() -> latch.getCount() == 0L);
        Assert.assertEquals((int)totalReceived.get(), (int)30);
        consumer.close();
        producer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Override
    @Test(dataProvider="subscriptions", timeOut=5000L)
    public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType subscription) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_ns";
        String topicName = BrokerTestUtil.newUniqueName("persistent://my-property/throttling_ns/throttlingAll");
        String subName = "my-subscriber-name-" + subscription;
        int byteRate = 1000;
        DispatchRate dispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(-1).dispatchThrottlingRateInByte(1000L).ratePeriodInSecond(1).build();
        this.admin.namespaces().createNamespace("my-property/throttling_ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.namespaces().setSubscriptionDispatchRate("my-property/throttling_ns", dispatchRate);
        int numProducedMessages = 30;
        CountDownLatch latch = new CountDownLatch(30);
        AtomicInteger totalReceived = new AtomicInteger(0);
        this.conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subName).receiverQueueSize(10).subscriptionType(subscription).messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            log.debug("Received message [{}] in the listener", (Object)receivedMessage);
            totalReceived.incrementAndGet();
            latch.countDown();
        }).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic(topicName).get();
        DispatchRateLimiter subRateLimiter = null;
        Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
            subRateLimiter = (DispatchRateLimiter)subDispatcher.getRateLimiter().get();
        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
            subRateLimiter = (DispatchRateLimiter)subDispatcher.getRateLimiter().get();
        } else {
            Assert.fail((String)"Should only have PersistentDispatcher in this test");
        }
        boolean isMessageRateUpdate = false;
        int retry = 5;
        for (int i = 0; i < retry; ++i) {
            if (subRateLimiter.getDispatchRateOnMsg() > 0L || subRateLimiter.getDispatchRateOnByte() > 0L) {
                isMessageRateUpdate = true;
                break;
            }
            if (i == retry - 1) continue;
            Thread.sleep(100L);
        }
        Assert.assertTrue((boolean)isMessageRateUpdate);
        Assert.assertEquals((Object)this.admin.namespaces().getSubscriptionDispatchRate("my-property/throttling_ns"), (Object)dispatchRate);
        long start = System.currentTimeMillis();
        for (int i = 0; i < 30; ++i) {
            producer.send((Object)new byte[100]);
        }
        latch.await();
        Assert.assertEquals((float)totalReceived.get(), (float)30.0f, (float)10.0f);
        long end = System.currentTimeMillis();
        log.info("-- end - start: {} ", (Object)(end - start));
        Assert.assertTrue((end - start >= 2000L ? 1 : 0) != 0);
        consumer.close();
        producer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Override
    @Test(timeOut=5000L)
    public void testRateLimitingMultipleConsumers() throws Exception {
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_ns";
        String topicName = "persistent://my-property/throttling_ns/throttlingMultipleConsumers";
        String subName = "my-subscriber-name";
        int messageRate = 5;
        DispatchRate dispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(5).dispatchThrottlingRateInByte(-1L).ratePeriodInSecond(360).build();
        this.admin.namespaces().createNamespace("my-property/throttling_ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.namespaces().setSubscriptionDispatchRate("my-property/throttling_ns", dispatchRate);
        int numProducedMessages = 500;
        AtomicInteger totalReceived = new AtomicInteger(0);
        this.conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingMultipleConsumers"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            log.debug("Received message [{}] in the listener", (Object)receivedMessage);
            totalReceived.incrementAndGet();
        });
        Consumer consumer1 = consumerBuilder.subscribe();
        Consumer consumer2 = consumerBuilder.subscribe();
        Consumer consumer3 = consumerBuilder.subscribe();
        Consumer consumer4 = consumerBuilder.subscribe();
        Consumer consumer5 = consumerBuilder.subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingMultipleConsumers").create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingMultipleConsumers").get();
        DispatchRateLimiter subRateLimiter = null;
        Dispatcher subDispatcher = topic.getSubscription("my-subscriber-name").getDispatcher();
        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
            subRateLimiter = (DispatchRateLimiter)subDispatcher.getRateLimiter().get();
        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
            subRateLimiter = (DispatchRateLimiter)subDispatcher.getRateLimiter().get();
        } else {
            Assert.fail((String)"Should only have PersistentDispatcher in this test");
        }
        boolean isMessageRateUpdate = false;
        int retry = 5;
        for (i = 0; i < retry; ++i) {
            if (subRateLimiter.getDispatchRateOnMsg() > 0L || subRateLimiter.getDispatchRateOnByte() > 0L) {
                isMessageRateUpdate = true;
                break;
            }
            if (i == retry - 1) continue;
            Thread.sleep(100L);
        }
        Assert.assertTrue((boolean)isMessageRateUpdate);
        Assert.assertEquals((Object)this.admin.namespaces().getSubscriptionDispatchRate("my-property/throttling_ns"), (Object)dispatchRate);
        for (i = 0; i < 500; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Thread.sleep(500L);
        Assert.assertNotEquals((Object)totalReceived.get(), (Object)500);
        consumer1.close();
        consumer2.close();
        consumer3.close();
        consumer4.close();
        consumer5.close();
        producer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Override
    @Test(dataProvider="subscriptions", timeOut=5000L)
    public void testClusterRateLimitingConfiguration(SubscriptionType subscription) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_ns";
        String topicName = "persistent://my-property/throttling_ns/throttlingBlock";
        String subName = "my-subscriber-name";
        int messageRate = 5;
        this.conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
        int initValue = this.pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg();
        this.admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInMsg", Integer.toString(5));
        for (int i = 0; i < 5; ++i) {
            if (this.pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg() != initValue) continue;
            Thread.sleep(50 + i * 10);
        }
        Assert.assertNotEquals((Object)this.pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(), (Object)initValue);
        this.admin.namespaces().createNamespace("my-property/throttling_ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingBlock").create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingBlock").get();
        int numMessages = 500;
        AtomicInteger totalReceived = new AtomicInteger(0);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingBlock"}).subscriptionName("my-subscriber-name").subscriptionType(subscription).messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            log.debug("Received message [{}] in the listener", (Object)receivedMessage);
            totalReceived.incrementAndGet();
        }).subscribe();
        for (int i = 0; i < numMessages; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Thread.sleep(500L);
        Assert.assertNotEquals((Object)totalReceived.get(), (Object)numMessages);
        consumer.close();
        producer.close();
        this.pulsar.getConfiguration().setDispatchThrottlingRatePerSubscriptionInMsg(initValue);
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Override
    @Test
    public void testClusterPolicyOverrideConfiguration() throws Exception {
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_ns";
        String topicName1 = "persistent://my-property/throttling_ns/throttlingOverride1";
        String topicName2 = "persistent://my-property/throttling_ns/throttlingOverride2";
        String subName1 = "my-subscriber-name1";
        String subName2 = "my-subscriber-name2";
        int clusterMessageRate = 100;
        this.conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
        int initValue = this.pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg();
        this.admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInMsg", Integer.toString(100));
        for (int i2 = 0; i2 < 5; ++i2) {
            if (this.pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg() != initValue) continue;
            Thread.sleep(50 + i2 * 10);
        }
        Assert.assertNotEquals((Object)this.pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(), (Object)initValue);
        this.admin.namespaces().createNamespace("my-property/throttling_ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingOverride1").create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingOverride1").get();
        Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingOverride1"}).subscriptionName("my-subscriber-name1").subscribe();
        DispatchRateLimiter subRateLimiter = null;
        Dispatcher subDispatcher = topic.getSubscription("my-subscriber-name1").getDispatcher();
        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
            subRateLimiter = (DispatchRateLimiter)subDispatcher.getRateLimiter().get();
        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
            subRateLimiter = (DispatchRateLimiter)subDispatcher.getRateLimiter().get();
        } else {
            Assert.fail((String)"Should only have PersistentDispatcher in this test");
        }
        Assert.assertEquals((long)100L, (long)subRateLimiter.getDispatchRateOnMsg());
        int nsMessageRate = 500;
        DispatchRate dispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(nsMessageRate).dispatchThrottlingRateInByte(0L).ratePeriodInSecond(1).build();
        this.admin.namespaces().setSubscriptionDispatchRate("my-property/throttling_ns", dispatchRate);
        subRateLimiter = (DispatchRateLimiter)subDispatcher.getRateLimiter().get();
        for (i = 0; i < 5; ++i) {
            if (subRateLimiter.getDispatchRateOnMsg() == (long)nsMessageRate) continue;
            Thread.sleep(50 + i * 10);
        }
        Assert.assertEquals((long)nsMessageRate, (long)subRateLimiter.getDispatchRateOnMsg());
        dispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(0).dispatchThrottlingRateInByte(0L).ratePeriodInSecond(1).build();
        this.admin.namespaces().setSubscriptionDispatchRate("my-property/throttling_ns", dispatchRate);
        for (i = 0; i < 5; ++i) {
            if (subRateLimiter.getDispatchRateOnMsg() != (long)nsMessageRate) continue;
            Thread.sleep(50 + i * 10);
        }
        Assert.assertEquals((long)100L, (long)subRateLimiter.getDispatchRateOnMsg());
        Producer producer2 = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_ns/throttlingOverride2").create();
        PersistentTopic topic2 = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_ns/throttlingOverride2").get();
        Consumer consumer2 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/throttling_ns/throttlingOverride2"}).subscriptionName("my-subscriber-name2").subscribe();
        subDispatcher = topic2.getSubscription("my-subscriber-name2").getDispatcher();
        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
            subRateLimiter = (DispatchRateLimiter)subDispatcher.getRateLimiter().get();
        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
            subRateLimiter = (DispatchRateLimiter)subDispatcher.getRateLimiter().get();
        } else {
            Assert.fail((String)"Should only have PersistentDispatcher in this test");
        }
        Assert.assertEquals((long)100L, (long)subRateLimiter.getDispatchRateOnMsg());
        producer.close();
        producer2.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Override
    @Test(dataProvider="subscriptions", timeOut=10000L)
    public void testClosingRateLimiter(SubscriptionType subscription) throws Exception {
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_ns";
        String topicName = "persistent://my-property/throttling_ns/closingSubRateLimiter" + subscription.name();
        String subName = "mySubscription" + subscription.name();
        DispatchRate dispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(10).dispatchThrottlingRateInByte(1024L).ratePeriodInSecond(1).build();
        this.admin.namespaces().createNamespace("my-property/throttling_ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.namespaces().setSubscriptionDispatchRate("my-property/throttling_ns", dispatchRate);
        Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subName).subscriptionType(subscription).subscribe();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic(topicName).get();
        PersistentSubscription sub = topic.getSubscription(subName);
        int numProducedMessages = 10;
        for (i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        for (i = 0; i < 10; ++i) {
            Message msg = consumer.receive();
            consumer.acknowledge(msg);
        }
        Dispatcher dispatcher = sub.getDispatcher();
        Assert.assertTrue((boolean)dispatcher.getRateLimiter().isPresent());
        DispatchRateLimiter dispatchRateLimiter = (DispatchRateLimiter)dispatcher.getRateLimiter().get();
        producer.close();
        consumer.close();
        sub.disconnect().get();
        Assert.assertEquals((long)dispatchRateLimiter.getDispatchRateOnMsg(), (long)-1L);
        Assert.assertEquals((long)dispatchRateLimiter.getDispatchRateOnByte(), (long)-1L);
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }
}

