/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AtomicDouble;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.PublishRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test
public class MessagePublishThrottlingTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(MessagePublishThrottlingTest.class);

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setClusterName("test");
        this.conf.setTopicPublisherThrottlingTickTimeMillis(1);
        this.conf.setBrokerPublisherThrottlingTickTimeMillis(1);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
        super.resetConfig();
    }

    @Test
    public void testSimplePublishMessageThrottling() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_publish";
        String topicName = "persistent://my-property/throttling_publish/throttlingMessageBlock";
        this.admin.namespaces().createNamespace("my-property/throttling_publish", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        PublishRate publishMsgRate = new PublishRate();
        publishMsgRate.publishThrottlingRateInMsg = 10;
        ProducerImpl producer = (ProducerImpl)this.pulsarClient.newProducer().topic("persistent://my-property/throttling_publish/throttlingMessageBlock").maxPendingMessages(30000).create();
        PersistentTopic topic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists("persistent://my-property/throttling_publish/throttlingMessageBlock").get()).get();
        Assert.assertEquals((Object)topic.getTopicPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER);
        this.admin.namespaces().setPublishRate("my-property/throttling_publish", publishMsgRate);
        MessagePublishThrottlingTest.retryStrategically(test -> !topic.getTopicPublishRateLimiter().equals(PublishRateLimiter.DISABLED_RATE_LIMITER), 5, 200L);
        Assert.assertNotEquals((Object)topic.getTopicPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER);
        Producer prod = (Producer)topic.getProducers().values().iterator().next();
        prod.updateRates();
        int total = 200;
        for (int i = 0; i < total; ++i) {
            producer.send((Object)new byte[80]);
        }
        prod.updateRates();
        double rateIn = prod.getStats().msgRateIn;
        Assert.assertTrue((rateIn < (double)total ? 1 : 0) != 0);
        publishMsgRate.publishThrottlingRateInMsg = -1;
        this.admin.namespaces().setPublishRate("my-property/throttling_publish", publishMsgRate);
        MessagePublishThrottlingTest.retryStrategically(test -> topic.getTopicPublishRateLimiter().equals(PublishRateLimiter.DISABLED_RATE_LIMITER), 5, 200L);
        Assert.assertEquals((Object)topic.getTopicPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER);
        prod.updateRates();
        for (int i = 0; i < total; ++i) {
            producer.send((Object)new byte[80]);
        }
        prod.updateRates();
        rateIn = prod.getStats().msgRateIn;
        Assert.assertTrue((rateIn > (double)total ? 1 : 0) != 0);
        producer.close();
    }

    @Test
    public void testSimplePublishByteThrottling() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_publish";
        String topicName = "persistent://my-property/throttling_publish/throttlingRateBlock";
        this.admin.namespaces().createNamespace("my-property/throttling_publish", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        PublishRate publishMsgRate = new PublishRate();
        publishMsgRate.publishThrottlingRateInByte = 400L;
        ProducerImpl producer = (ProducerImpl)this.pulsarClient.newProducer().topic("persistent://my-property/throttling_publish/throttlingRateBlock").create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_publish/throttlingRateBlock").get();
        Assert.assertEquals((Object)topic.getTopicPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER);
        this.admin.namespaces().setPublishRate("my-property/throttling_publish", publishMsgRate);
        MessagePublishThrottlingTest.retryStrategically(test -> !topic.getTopicPublishRateLimiter().equals(PublishRateLimiter.DISABLED_RATE_LIMITER), 5, 200L);
        Assert.assertNotEquals((Object)topic.getTopicPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER);
        Producer prod = (Producer)topic.getProducers().values().iterator().next();
        prod.updateRates();
        int total = 100;
        for (int i = 0; i < total; ++i) {
            producer.send((Object)new byte[1]);
        }
        prod.updateRates();
        double rateIn = prod.getStats().msgRateIn;
        Assert.assertTrue((rateIn < (double)total ? 1 : 0) != 0);
        publishMsgRate.publishThrottlingRateInByte = -1L;
        this.admin.namespaces().setPublishRate("my-property/throttling_publish", publishMsgRate);
        MessagePublishThrottlingTest.retryStrategically(test -> topic.getTopicPublishRateLimiter().equals(PublishRateLimiter.DISABLED_RATE_LIMITER), 5, 200L);
        Assert.assertEquals((Object)topic.getTopicPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER);
        prod.updateRates();
        for (int i = 0; i < total; ++i) {
            producer.send((Object)new byte[1]);
        }
        prod.updateRates();
        rateIn = prod.getStats().msgRateIn;
        Assert.assertTrue((rateIn > (double)total ? 1 : 0) != 0);
        producer.close();
    }

    @Test
    public void testBrokerPublishMessageThrottling() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_publish";
        String topicName = "persistent://my-property/throttling_publish/brokerThrottlingMessageBlock";
        this.admin.namespaces().createNamespace("my-property/throttling_publish", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        int messageRate = 10;
        ProducerImpl producer = (ProducerImpl)this.pulsarClient.newProducer().topic("persistent://my-property/throttling_publish/brokerThrottlingMessageBlock").enableBatching(false).maxPendingMessages(30000).create();
        PersistentTopic topic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists("persistent://my-property/throttling_publish/brokerThrottlingMessageBlock").get()).get();
        Assert.assertEquals((Object)topic.getBrokerPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER);
        this.admin.brokers().updateDynamicConfiguration("brokerPublisherThrottlingMaxMessageRate", Integer.toString(messageRate));
        MessagePublishThrottlingTest.retryStrategically(test -> topic.getBrokerPublishRateLimiter() != PublishRateLimiter.DISABLED_RATE_LIMITER, 5, 200L);
        log.info("Get broker configuration: brokerTick {},  MaxMessageRate {}, MaxByteRate {}", new Object[]{this.pulsar.getConfiguration().getBrokerPublisherThrottlingTickTimeMillis(), this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate(), this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate()});
        Assert.assertNotEquals((Object)topic.getBrokerPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER);
        Producer prod = (Producer)topic.getProducers().values().iterator().next();
        prod.updateRates();
        int total = 100;
        for (int i = 0; i < total; ++i) {
            producer.send((Object)new byte[80]);
        }
        prod.updateRates();
        double rateIn = prod.getStats().msgRateIn;
        log.info("1-st rate in: {}, total: {} ", (Object)rateIn, (Object)total);
        Assert.assertTrue((rateIn < (double)total ? 1 : 0) != 0);
        this.admin.brokers().updateDynamicConfiguration("brokerPublisherThrottlingMaxMessageRate", Integer.toString(0));
        MessagePublishThrottlingTest.retryStrategically(test -> topic.getBrokerPublishRateLimiter().equals(PublishRateLimiter.DISABLED_RATE_LIMITER), 5, 200L);
        Assert.assertEquals((Object)topic.getBrokerPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER);
        prod.updateRates();
        for (int i = 0; i < total; ++i) {
            producer.send((Object)new byte[80]);
        }
        prod.updateRates();
        rateIn = prod.getStats().msgRateIn;
        log.info("2-nd rate in: {}, total: {} ", (Object)rateIn, (Object)total);
        Assert.assertTrue((rateIn > (double)total ? 1 : 0) != 0);
        producer.close();
    }

    @Test
    public void testBrokerPublishByteThrottling() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_publish";
        String topicName = "persistent://my-property/throttling_publish/brokerThrottlingByteBlock";
        this.admin.namespaces().createNamespace("my-property/throttling_publish", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        long byteRate = 400L;
        ProducerImpl producer = (ProducerImpl)this.pulsarClient.newProducer().topic("persistent://my-property/throttling_publish/brokerThrottlingByteBlock").enableBatching(false).maxPendingMessages(30000).create();
        PersistentTopic topic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists("persistent://my-property/throttling_publish/brokerThrottlingByteBlock").get()).get();
        Assert.assertEquals((Object)topic.getBrokerPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER);
        this.admin.brokers().updateDynamicConfiguration("brokerPublisherThrottlingMaxByteRate", Long.toString(byteRate));
        MessagePublishThrottlingTest.retryStrategically(test -> topic.getBrokerPublishRateLimiter() != PublishRateLimiter.DISABLED_RATE_LIMITER, 5, 200L);
        log.info("Get broker configuration after enable: brokerTick {},  MaxMessageRate {}, MaxByteRate {}", new Object[]{this.pulsar.getConfiguration().getBrokerPublisherThrottlingTickTimeMillis(), this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate(), this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate()});
        Assert.assertNotEquals((Object)topic.getBrokerPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER);
        Producer prod = (Producer)topic.getProducers().values().iterator().next();
        prod.updateRates();
        int numMessage = 20;
        int msgBytes = 80;
        for (int i = 0; i < numMessage; ++i) {
            producer.send((Object)new byte[msgBytes]);
        }
        prod.updateRates();
        double rateIn = prod.getStats().msgThroughputIn;
        log.info("1-st byte rate in: {}, total: {} ", (Object)rateIn, (Object)(numMessage * msgBytes));
        Assert.assertTrue((rateIn < (double)(numMessage * msgBytes) ? 1 : 0) != 0);
        this.admin.brokers().updateDynamicConfiguration("brokerPublisherThrottlingMaxByteRate", Long.toString(0L));
        MessagePublishThrottlingTest.retryStrategically(test -> topic.getBrokerPublishRateLimiter().equals(PublishRateLimiter.DISABLED_RATE_LIMITER), 5, 200L);
        log.info("Get broker configuration after disable: brokerTick {},  MaxMessageRate {}, MaxByteRate {}", new Object[]{this.pulsar.getConfiguration().getBrokerPublisherThrottlingTickTimeMillis(), this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate(), this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate()});
        Assert.assertEquals((Object)topic.getBrokerPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER);
        prod.updateRates();
        for (int i = 0; i < numMessage; ++i) {
            producer.send((Object)new byte[msgBytes]);
        }
        prod.updateRates();
        rateIn = prod.getStats().msgThroughputIn;
        log.info("2-nd byte rate in: {}, total: {} ", (Object)rateIn, (Object)(numMessage * msgBytes));
        Assert.assertTrue((rateIn > (double)(numMessage * msgBytes) ? 1 : 0) != 0);
        producer.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBrokerTopicPublishByteThrottling() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/throttling_publish";
        String topicName = "persistent://my-property/throttling_publish/brokerTopicThrottlingByteBlock";
        this.admin.namespaces().createNamespace("my-property/throttling_publish", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        PublishRate topicPublishMsgRate = new PublishRate();
        long topicByteRate = 400L;
        long brokerByteRate = 800L;
        topicPublishMsgRate.publishThrottlingRateInByte = topicByteRate;
        ProducerImpl producer = (ProducerImpl)this.pulsarClient.newProducer().topic("persistent://my-property/throttling_publish/brokerTopicThrottlingByteBlock").enableBatching(false).maxPendingMessages(30000).create();
        try {
            PersistentTopic topic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists("persistent://my-property/throttling_publish/brokerTopicThrottlingByteBlock").get()).get();
            Assert.assertEquals((Object)topic.getBrokerPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER);
            Assert.assertEquals((Object)topic.getTopicPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER);
            this.admin.namespaces().setPublishRate("my-property/throttling_publish", topicPublishMsgRate);
            Awaitility.await().untilAsserted(() -> Assert.assertNotEquals((Object)topic.getTopicPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER));
            this.admin.brokers().updateDynamicConfiguration("brokerPublisherThrottlingMaxByteRate", Long.toString(brokerByteRate));
            Awaitility.await().untilAsserted(() -> Assert.assertNotSame((Object)topic.getBrokerPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER));
            log.info("Get broker configuration after enable: brokerTick {},  MaxMessageRate {}, MaxByteRate {}", new Object[]{this.pulsar.getConfiguration().getBrokerPublisherThrottlingTickTimeMillis(), this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate(), this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate()});
            Producer prod = (Producer)topic.getProducers().values().iterator().next();
            prod.updateRates();
            int numMessage = 40;
            int msgBytes = 80;
            for (int i = 0; i < numMessage; ++i) {
                producer.send((Object)new byte[msgBytes]);
            }
            prod.updateRates();
            double rateIn = prod.getStats().msgThroughputIn;
            log.info("1-st byte rate in 1: {}, total: {} ", (Object)rateIn, (Object)(numMessage * msgBytes));
            Assert.assertTrue((rateIn < (double)(numMessage * msgBytes) ? 1 : 0) != 0);
            int topicNumber = 3;
            String topicNameBase = "persistent://my-property/throttling_publish/brokerTopicThrottlingByteBlock";
            ArrayList producers = Lists.newArrayListWithExpectedSize((int)topicNumber);
            ArrayList topics = Lists.newArrayListWithExpectedSize((int)topicNumber);
            for (int i = 0; i < topicNumber; ++i) {
                String iTopicName = "persistent://my-property/throttling_publish/brokerTopicThrottlingByteBlock" + i;
                ProducerImpl iProducer = (ProducerImpl)this.pulsarClient.newProducer().topic(iTopicName).enableBatching(false).maxPendingMessages(30000).create();
                PersistentTopic iTopic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(iTopicName).get()).get();
                producers.add(iProducer);
                topics.add(iTopic);
                Assert.assertNotEquals((Object)iTopic.getBrokerPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER);
                this.admin.namespaces().setPublishRate("my-property/throttling_publish", topicPublishMsgRate);
                MessagePublishThrottlingTest.retryStrategically(test -> !iTopic.getTopicPublishRateLimiter().equals(PublishRateLimiter.DISABLED_RATE_LIMITER), 5, 200L);
                Assert.assertNotEquals((Object)iTopic.getTopicPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER);
            }
            ArrayList topicRatesCounter = Lists.newArrayListWithExpectedSize((int)3);
            ExecutorService executor = Executors.newSingleThreadExecutor();
            try {
                int i;
                AtomicDouble topicsRateIn = new AtomicDouble(0.0);
                AtomicInteger index = new AtomicInteger(0);
                CountDownLatch latch = new CountDownLatch(topicNumber);
                for (i = 0; i < topicNumber; ++i) {
                    topicRatesCounter.add(() -> {
                        int id = index.incrementAndGet();
                        ProducerImpl iProducer = (ProducerImpl)producers.get(id);
                        PersistentTopic iTopic = (PersistentTopic)topics.get(id);
                        Producer iProd = (Producer)iTopic.getProducers().values().iterator().next();
                        iProd.updateRates();
                        for (int j = 0; j < numMessage; ++j) {
                            iProducer.send((Object)new byte[msgBytes]);
                        }
                        iProd.updateRates();
                        topicsRateIn.addAndGet(iProd.getStats().msgThroughputIn);
                        latch.countDown();
                        return null;
                    });
                }
                executor.invokeAll(topicRatesCounter);
                latch.await(2L, TimeUnit.SECONDS);
                log.info("2-nd rate in: {}, total: {} ", (Object)topicsRateIn.get(), (Object)(topicNumber * numMessage * msgBytes));
                Assert.assertTrue((rateIn < topicsRateIn.get() ? 1 : 0) != 0);
                Assert.assertTrue((rateIn < (double)(topicNumber * numMessage * msgBytes) ? 1 : 0) != 0);
                topicPublishMsgRate.publishThrottlingRateInByte = -1L;
                this.admin.namespaces().setPublishRate("my-property/throttling_publish", topicPublishMsgRate);
                Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)topic.getTopicPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER));
                prod.updateRates();
                for (i = 0; i < numMessage; ++i) {
                    producer.send((Object)new byte[msgBytes]);
                }
                prod.updateRates();
                double rateIn2 = prod.getStats().msgThroughputIn;
                log.info("3-rd byte rate in: {}, rate in 2: {},  total: {} ", new Object[]{rateIn, rateIn2, numMessage * msgBytes});
                Assert.assertTrue((rateIn < rateIn2 ? 1 : 0) != 0);
                Assert.assertTrue((rateIn2 < (double)(numMessage * msgBytes) ? 1 : 0) != 0);
                this.admin.brokers().updateDynamicConfiguration("brokerPublisherThrottlingMaxByteRate", Long.toString(0L));
                MessagePublishThrottlingTest.retryStrategically(test -> topic.getBrokerPublishRateLimiter().equals(PublishRateLimiter.DISABLED_RATE_LIMITER), 5, 200L);
                log.info("Get broker configuration after disable: brokerTick {},  MaxMessageRate {}, MaxByteRate {}", new Object[]{this.pulsar.getConfiguration().getBrokerPublisherThrottlingTickTimeMillis(), this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate(), this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate()});
                Assert.assertEquals((Object)topic.getBrokerPublishRateLimiter(), (Object)PublishRateLimiter.DISABLED_RATE_LIMITER);
                prod.updateRates();
                for (int i2 = 0; i2 < numMessage; ++i2) {
                    producer.send((Object)new byte[msgBytes]);
                }
                prod.updateRates();
                rateIn = prod.getStats().msgThroughputIn;
                log.info("4-th byte rate in: {}, total: {} ", (Object)rateIn, (Object)(numMessage * msgBytes));
                Assert.assertTrue((rateIn > (double)(numMessage * msgBytes) ? 1 : 0) != 0);
            }
            finally {
                if (Collections.singletonList(executor).get(0) != null) {
                    executor.shutdownNow();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }
}

