/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Topic;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class MessagePublishBufferThrottleTest
extends BrokerTestBase {
    @Override
    protected void setup() throws Exception {
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
        this.resetConfig();
    }

    @Test
    public void testMessagePublishBufferThrottleDisabled() throws Exception {
        this.conf.setMaxMessagePublishBufferSizeInMB(-1);
        super.baseSetup();
        String topic = "persistent://prop/ns-abc/testMessagePublishBufferThrottleDisabled";
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testMessagePublishBufferThrottleDisabled").producerName("producer-name").create();
        Assert.assertEquals((long)this.pulsar.getBrokerService().getPausedConnections(), (long)0L);
        this.mockBookKeeper.addEntryDelay(1L, TimeUnit.SECONDS);
        byte[] payload = new byte[0x100000];
        for (int i = 0; i < 10; ++i) {
            producer.sendAsync((Object)payload);
        }
        producer.flush();
        Assert.assertEquals((long)this.pulsar.getBrokerService().getPausedConnections(), (long)0L);
    }

    @Test
    public void testMessagePublishBufferThrottleEnable() throws Exception {
        this.conf.setMaxMessagePublishBufferSizeInMB(1);
        super.baseSetup();
        Assert.assertEquals((long)this.pulsar.getBrokerService().getPausedConnections(), (long)0L);
        String topic = "persistent://prop/ns-abc/testMessagePublishBufferThrottleEnable";
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testMessagePublishBufferThrottleEnable").producerName("producer-name").create();
        Assert.assertEquals((long)this.pulsar.getBrokerService().getPausedConnections(), (long)0L);
        this.mockBookKeeper.addEntryDelay(1L, TimeUnit.SECONDS);
        byte[] payload = new byte[0x100000];
        for (int i = 0; i < 10; ++i) {
            producer.sendAsync((Object)payload);
        }
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((long)this.pulsar.getBrokerService().getPausedConnections(), (long)1L));
        Assert.assertEquals((long)this.pulsar.getBrokerService().getPausedConnections(), (long)1L);
        producer.flush();
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((long)this.pulsar.getBrokerService().getPausedConnections(), (long)0L));
        Assert.assertEquals((long)this.pulsar.getBrokerService().getPausedConnections(), (long)0L);
    }

    @Test
    public void testBlockByPublishRateLimiting() throws Exception {
        this.conf.setMaxMessagePublishBufferSizeInMB(1);
        super.baseSetup();
        Assert.assertEquals((long)this.pulsar.getBrokerService().getPausedConnections(), (long)0L);
        String topic = "persistent://prop/ns-abc/testBlockByPublishRateLimiting";
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testBlockByPublishRateLimiting").producerName("producer-name").create();
        Topic topicRef = (Topic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testBlockByPublishRateLimiting").get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((long)this.pulsar.getBrokerService().getPausedConnections(), (long)0L);
        this.mockBookKeeper.addEntryDelay(5L, TimeUnit.SECONDS);
        byte[] payload = new byte[0x100000];
        for (int i = 0; i < 10; ++i) {
            producer.sendAsync((Object)payload);
        }
        Assert.assertEquals((long)this.pulsar.getBrokerService().getPausedConnections(), (long)1L);
        CompletableFuture flushFuture = producer.flushAsync();
        Assert.assertEquals((long)this.pulsar.getBrokerService().getPausedConnections(), (long)1L);
        try {
            flushFuture.get(2L, TimeUnit.SECONDS);
            Assert.fail((String)"Should have timed out");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        flushFuture.join();
        Assert.assertEquals((long)this.pulsar.getBrokerService().getPausedConnections(), (long)0L);
        ((Producer)((AbstractTopic)topicRef).producers.get("producer-name")).getCnx().enableCnxAutoRead();
        flushFuture.get();
        Assert.assertEquals((long)this.pulsar.getBrokerService().getPausedConnections(), (long)0L);
    }
}

