/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.google.common.collect.Sets;
import java.util.Optional;
import java.util.Set;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.PublishRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker-impl"})
public class TopicPublishThrottlingInitTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(TopicPublishThrottlingInitTest.class);

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setClusterName("test");
        this.conf.setTopicPublisherThrottlingTickTimeMillis(1);
        this.conf.setBrokerPublisherThrottlingTickTimeMillis(1);
        this.conf.setBrokerPublisherThrottlingMaxMessageRate(10);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
        super.resetConfig();
    }

    @Test
    public void testBrokerPublishMessageThrottlingInit() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_publish_init";
        String topicName = "persistent://my-property/throttling_publish_init/brokerThrottlingMessageBlock";
        this.admin.namespaces().createNamespace("my-property/throttling_publish_init", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        ProducerImpl producer = (ProducerImpl)this.pulsarClient.newProducer().topic("persistent://my-property/throttling_publish_init/brokerThrottlingMessageBlock").enableBatching(false).maxPendingMessages(30000).create();
        PersistentTopic topic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists("persistent://my-property/throttling_publish_init/brokerThrottlingMessageBlock").get()).get();
        Assert.assertNotEquals((Object)topic.getBrokerPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER);
        log.info("Get broker configuration: brokerTick {},  MaxMessageRate {}, MaxByteRate {}", new Object[]{this.pulsar.getConfiguration().getBrokerPublisherThrottlingTickTimeMillis(), this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate(), this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate()});
        Assert.assertNotEquals((Object)topic.getBrokerPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER);
        Producer prod = (Producer)topic.getProducers().values().iterator().next();
        prod.updateRates();
        int total = 100;
        for (int i = 0; i < total; ++i) {
            producer.send((Object)new byte[80]);
        }
        prod.updateRates();
        double rateIn = prod.getStats().msgRateIn;
        log.info("1-st rate in: {}, total: {} ", (Object)rateIn, (Object)total);
        Assert.assertTrue((rateIn < (double)total ? 1 : 0) != 0);
        this.admin.brokers().updateDynamicConfiguration("brokerPublisherThrottlingMaxMessageRate", Integer.toString(0));
        TopicPublishThrottlingInitTest.retryStrategically(test -> topic.getBrokerPublishRateLimiter().equals(PublishRateLimiter.DISABLED_RATE_LIMITER), 5, 200L);
        Assert.assertEquals((Object)topic.getBrokerPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER);
        prod.updateRates();
        for (int i = 0; i < total; ++i) {
            producer.send((Object)new byte[80]);
        }
        prod.updateRates();
        rateIn = prod.getStats().msgRateIn;
        log.info("2-nd rate in: {}, total: {} ", (Object)rateIn, (Object)total);
        Assert.assertTrue((rateIn > (double)total ? 1 : 0) != 0);
        producer.close();
    }
}

