package org.apache.pulsar.broker.service;

import java.util.Collections;
import java.util.Optional;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/service/SubscribeDispatchLimiterTest.class */
public class SubscribeDispatchLimiterTest extends BrokerTestBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setDispatchThrottlingRatePerSubscriptionInMsg(0);
        this.conf.setDispatchThrottlingRatePerSubscriptionInByte(0L);
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testDispatchRateLimiterPerSubscriptionInMsgOnlyBrokerLevel() throws Exception {
        String str = "persistent://" + newTopicName();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("cg_testDispatchRateLimiterPerSubscriptionInMsgOnlyBrokerLevel").subscribe();
        try {
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic(str).get();
            Assert.assertNotNull(persistentTopic);
            PersistentSubscription persistentSubscription = (PersistentSubscription) persistentTopic.getSubscriptions().get("cg_testDispatchRateLimiterPerSubscriptionInMsgOnlyBrokerLevel");
            Assert.assertNotNull(persistentSubscription);
            Assert.assertFalse(persistentSubscription.getDispatcher().getRateLimiter().isPresent());
            this.admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInMsg", "100");
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(this.pulsar.getConfig().getDispatchThrottlingRatePerSubscriptionInMsg(), 100);
            });
            Awaitility.await().untilAsserted(() -> {
                Optional rateLimiter = persistentSubscription.getDispatcher().getRateLimiter();
                Assert.assertTrue(rateLimiter.isPresent());
                Assert.assertEquals(((DispatchRateLimiter) rateLimiter.get()).getDispatchRateOnMsg(), 100L);
            });
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
            throw th;
        }
    }

    @Test
    public void testDispatchRateLimiterPerSubscriptionInByteOnlyBrokerLevel() throws Exception {
        String str = "persistent://" + newTopicName();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("testDispatchRateLimiterPerSubscriptionInByteOnlyBrokerLevel").subscribe();
        try {
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic(str).get();
            Assert.assertNotNull(persistentTopic);
            PersistentSubscription persistentSubscription = (PersistentSubscription) persistentTopic.getSubscriptions().get("testDispatchRateLimiterPerSubscriptionInByteOnlyBrokerLevel");
            Assert.assertNotNull(persistentSubscription);
            Assert.assertFalse(persistentSubscription.getDispatcher().getRateLimiter().isPresent());
            this.admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInByte", "1000");
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(this.pulsar.getConfig().getDispatchThrottlingRatePerSubscriptionInByte(), 1000L);
            });
            Awaitility.await().untilAsserted(() -> {
                Optional rateLimiter = persistentSubscription.getDispatcher().getRateLimiter();
                Assert.assertTrue(rateLimiter.isPresent());
                Assert.assertEquals(((DispatchRateLimiter) rateLimiter.get()).getDispatchRateOnByte(), 1000L);
            });
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
            throw th;
        }
    }
}
