package org.apache.pulsar.broker.service;

import java.util.Collections;
import java.util.Optional;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/SubscribeRateTest.class */
public class SubscribeRateTest extends BrokerTestBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.baseSetup();
        this.conf.setMaxPendingPublishRequestsPerConnection(0);
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testBrokerLevelSubscribeRateDynamicUpdate() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testBrokerLevelSubscribeRateDynamicUpdate").producerName("producer-name").create();
        Topic topic = (Topic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testBrokerLevelSubscribeRateDynamicUpdate").get();
        Assert.assertNotNull(topic);
        Assert.assertFalse(topic.getSubscribeRateLimiter().isPresent());
        String str = (String) this.admin.brokers().getRuntimeConfigurations().get("subscribeThrottlingRatePerConsumer");
        String str2 = (String) this.admin.brokers().getRuntimeConfigurations().get("subscribeRatePeriodPerConsumerInSecond");
        Assert.assertNotNull(str);
        Assert.assertNotNull(str2);
        Assert.assertNotEquals(10, Integer.valueOf(Integer.parseInt(str)));
        Assert.assertNotEquals(60, Integer.valueOf(Integer.parseInt(str2)));
        this.admin.brokers().updateDynamicConfiguration("subscribeThrottlingRatePerConsumer", "10");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(topic.getSubscribeRateLimiter().isPresent());
        });
        SubscribeRateLimiter subscribeRateLimiter = (SubscribeRateLimiter) topic.getSubscribeRateLimiter().get();
        Assert.assertEquals(subscribeRateLimiter.getSubscribeRate().subscribeThrottlingRatePerConsumer, 10);
        Assert.assertEquals(subscribeRateLimiter.getSubscribeRate().ratePeriodInSecond, 30);
        this.admin.brokers().updateDynamicConfiguration("subscribeRatePeriodPerConsumerInSecond", "60");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(subscribeRateLimiter.getSubscribeRate().ratePeriodInSecond, 60);
        });
        Assert.assertEquals(subscribeRateLimiter.getSubscribeRate().subscribeThrottlingRatePerConsumer, 10);
        create.close();
    }

    @Test
    public void testUpdateSubscribeRateLimiter() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testUpdateSubscribeRateLimiter").producerName("producer-name").create();
        try {
            PersistentTopic persistentTopic = (Topic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testUpdateSubscribeRateLimiter").get();
            Assert.assertNotNull(persistentTopic);
            Assert.assertTrue(persistentTopic instanceof PersistentTopic);
            Assert.assertFalse(persistentTopic.getSubscribeRateLimiter().isPresent());
            PersistentTopic persistentTopic2 = (PersistentTopic) Mockito.spy(persistentTopic);
            Mockito.when(persistentTopic2.getSubscribeRate()).thenReturn(new SubscribeRate(10, 60));
            persistentTopic2.updateSubscribeRateLimiter();
            Optional subscribeRateLimiter = persistentTopic2.getSubscribeRateLimiter();
            Assert.assertTrue(subscribeRateLimiter.isPresent());
            Assert.assertEquals(((SubscribeRateLimiter) subscribeRateLimiter.get()).getSubscribeRate(), new SubscribeRate(10, 60));
            Mockito.when(persistentTopic2.getSubscribeRate()).thenReturn(new SubscribeRate(20, 120));
            persistentTopic2.updateSubscribeRateLimiter();
            Optional subscribeRateLimiter2 = persistentTopic2.getSubscribeRateLimiter();
            Assert.assertTrue(subscribeRateLimiter2.isPresent());
            Assert.assertEquals(((SubscribeRateLimiter) subscribeRateLimiter2.get()).getSubscribeRate(), new SubscribeRate(20, 120));
            Assert.assertSame(subscribeRateLimiter, subscribeRateLimiter2);
            Mockito.when(persistentTopic2.getSubscribeRate()).thenReturn(new SubscribeRate(0, 0));
            persistentTopic2.updateSubscribeRateLimiter();
            Assert.assertFalse(persistentTopic2.getSubscribeRateLimiter().isPresent());
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }
}
