package org.apache.pulsar.broker.service;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.class */
public class PrecisTopicPublishRateThrottleTest extends BrokerTestBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    protected void setup() throws Exception {
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    protected void cleanup() throws Exception {
    }

    @Test
    public void testPrecisTopicPublishRateLimitingDisabled() throws Exception {
        PublishRate publishRate = new PublishRate(1, 10L);
        this.conf.setPreciseTopicPublishRateLimiterEnable(false);
        this.conf.setMaxPendingPublishRequestsPerConnection(0);
        super.baseSetup();
        this.admin.namespaces().setPublishRate("prop/ns-abc", publishRate);
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting").producerName("producer-name").create();
        Assert.assertNotNull((Topic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting").get());
        MessageId messageId = null;
        try {
            Assert.assertNotNull((MessageId) create.sendAsync(new byte[10]).get(500L, TimeUnit.MILLISECONDS));
            messageId = (MessageId) create.sendAsync(new byte[10]).get(500L, TimeUnit.MILLISECONDS);
            Assert.assertNotNull(messageId);
        } catch (TimeoutException e) {
        }
        Thread.sleep(1000L);
        try {
            messageId = (MessageId) create.sendAsync(new byte[10]).get(1L, TimeUnit.SECONDS);
        } catch (TimeoutException e2) {
        }
        Assert.assertNotNull(messageId);
        super.internalCleanup();
    }

    @Test
    public void testProducerBlockedByPrecisTopicPublishRateLimiting() throws Exception {
        PublishRate publishRate = new PublishRate(1, 10L);
        this.conf.setPreciseTopicPublishRateLimiterEnable(true);
        this.conf.setMaxPendingPublishRequestsPerConnection(0);
        super.baseSetup();
        this.admin.namespaces().setPublishRate("prop/ns-abc", publishRate);
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting").producerName("producer-name").create();
        Assert.assertNotNull((Topic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting").get());
        try {
            Assert.assertNotNull((MessageId) create.sendAsync(new byte[10]).get(500L, TimeUnit.MILLISECONDS));
            create.sendAsync(new byte[10]).get(500L, TimeUnit.MILLISECONDS);
            Assert.fail("should failed, because producer blocked by topic publish rate limiting");
        } catch (TimeoutException e) {
        }
        super.internalCleanup();
    }

    @Test
    public void testPrecisTopicPublishRateLimitingProduceRefresh() throws Exception {
        PublishRate publishRate = new PublishRate(1, 10L);
        this.conf.setPreciseTopicPublishRateLimiterEnable(true);
        this.conf.setMaxPendingPublishRequestsPerConnection(0);
        super.baseSetup();
        this.admin.namespaces().setPublishRate("prop/ns-abc", publishRate);
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting").producerName("producer-name").create();
        Assert.assertNotNull((Topic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting").get());
        MessageId messageId = null;
        try {
            messageId = (MessageId) create.sendAsync(new byte[10]).get(500L, TimeUnit.MILLISECONDS);
            Assert.assertNotNull(messageId);
            create.sendAsync(new byte[10]).get(500L, TimeUnit.MILLISECONDS);
            Assert.fail("should failed, because producer blocked by topic publish rate limiting");
        } catch (TimeoutException e) {
        }
        Thread.sleep(1000L);
        try {
            messageId = (MessageId) create.sendAsync(new byte[10]).get(1L, TimeUnit.SECONDS);
        } catch (TimeoutException e2) {
        }
        Assert.assertNotNull(messageId);
        super.internalCleanup();
    }

    @Test
    public void testBrokerLevelPublishRateDynamicUpdate() throws Exception {
        this.conf.setPreciseTopicPublishRateLimiterEnable(true);
        this.conf.setMaxPendingPublishRequestsPerConnection(0);
        super.baseSetup();
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testMultiLevelPublishRate").producerName("producer-name").create();
        this.admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInMessages", "10");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals((String) this.admin.brokers().getAllDynamicConfigurations().get("maxPublishRatePerTopicInMessages"), "10");
        });
        AbstractTopic abstractTopic = (Topic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testMultiLevelPublishRate").get();
        Assert.assertNotNull(abstractTopic);
        PrecisPublishLimiter precisPublishLimiter = abstractTopic.topicPublishRateLimiter;
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(precisPublishLimiter.publishMaxMessageRate, 10);
        });
        Assert.assertEquals(precisPublishLimiter.publishMaxByteRate, 0L);
        this.admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInBytes", "20");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals((String) this.admin.brokers().getAllDynamicConfigurations().get("maxPublishRatePerTopicInBytes"), "20");
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(precisPublishLimiter.publishMaxByteRate, 20L);
        });
        Assert.assertEquals(precisPublishLimiter.publishMaxMessageRate, 10);
        create.close();
        super.internalCleanup();
    }
}
