/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class PrecisTopicPublishRateThrottleTest
extends BrokerTestBase {
    @Override
    protected void setup() throws Exception {
    }

    @Override
    protected void cleanup() throws Exception {
    }

    @Test
    public void testPrecisTopicPublishRateLimitingDisabled() throws Exception {
        PublishRate publishRate = new PublishRate(1, 10L);
        this.conf.setPreciseTopicPublishRateLimiterEnable(false);
        this.conf.setMaxPendingPublishRequestsPerConnection(0);
        super.baseSetup();
        String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting").producerName("producer-name").create();
        Policies policies = new Policies();
        policies.publishMaxMessageRate = new HashMap();
        policies.publishMaxMessageRate.put("test", publishRate);
        Topic topicRef = (Topic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting").get();
        Assert.assertNotNull((Object)topicRef);
        ((AbstractTopic)topicRef).updateMaxPublishRate(policies);
        MessageId messageId = null;
        try {
            messageId = (MessageId)producer.sendAsync((Object)new byte[10]).get(500L, TimeUnit.MILLISECONDS);
            Assert.assertNotNull((Object)messageId);
            messageId = (MessageId)producer.sendAsync((Object)new byte[10]).get(500L, TimeUnit.MILLISECONDS);
            Assert.assertNotNull((Object)messageId);
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        Thread.sleep(1000L);
        try {
            messageId = (MessageId)producer.sendAsync((Object)new byte[10]).get(1L, TimeUnit.SECONDS);
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        Assert.assertNotNull((Object)messageId);
        super.internalCleanup();
    }

    @Test
    public void testProducerBlockedByPrecisTopicPublishRateLimiting() throws Exception {
        PublishRate publishRate = new PublishRate(1, 10L);
        this.conf.setPreciseTopicPublishRateLimiterEnable(true);
        this.conf.setMaxPendingPublishRequestsPerConnection(0);
        super.baseSetup();
        String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting").producerName("producer-name").create();
        Policies policies = new Policies();
        policies.publishMaxMessageRate = new HashMap();
        policies.publishMaxMessageRate.put("test", publishRate);
        Topic topicRef = (Topic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting").get();
        Assert.assertNotNull((Object)topicRef);
        ((AbstractTopic)topicRef).updateMaxPublishRate(policies);
        MessageId messageId = null;
        try {
            messageId = (MessageId)producer.sendAsync((Object)new byte[10]).get(500L, TimeUnit.MILLISECONDS);
            Assert.assertNotNull((Object)messageId);
            producer.sendAsync((Object)new byte[10]).get(500L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"should failed, because producer blocked by topic publish rate limiting");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        super.internalCleanup();
    }

    @Test
    public void testPrecisTopicPublishRateLimitingProduceRefresh() throws Exception {
        PublishRate publishRate = new PublishRate(1, 10L);
        this.conf.setPreciseTopicPublishRateLimiterEnable(true);
        this.conf.setMaxPendingPublishRequestsPerConnection(0);
        super.baseSetup();
        String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting").producerName("producer-name").create();
        Policies policies = new Policies();
        policies.publishMaxMessageRate = new HashMap();
        policies.publishMaxMessageRate.put("test", publishRate);
        Topic topicRef = (Topic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting").get();
        Assert.assertNotNull((Object)topicRef);
        ((AbstractTopic)topicRef).updateMaxPublishRate(policies);
        MessageId messageId = null;
        try {
            messageId = (MessageId)producer.sendAsync((Object)new byte[10]).get(500L, TimeUnit.MILLISECONDS);
            Assert.assertNotNull((Object)messageId);
            producer.sendAsync((Object)new byte[10]).get(500L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"should failed, because producer blocked by topic publish rate limiting");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        Thread.sleep(1000L);
        try {
            messageId = (MessageId)producer.sendAsync((Object)new byte[10]).get(1L, TimeUnit.SECONDS);
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        Assert.assertNotNull((Object)messageId);
        super.internalCleanup();
    }
}

