/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.beust.jcommander.internal.Maps;
import com.google.common.collect.Sets;
import java.net.URL;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class BacklogQuotaManagerTest {
    PulsarService pulsar;
    ServiceConfiguration config;
    URL adminUrl;
    PulsarAdmin admin;
    LocalBookkeeperEnsemble bkEnsemble;
    private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 3;
    private static final int MAX_ENTRIES_PER_LEDGER = 5;
    private static final Logger LOG = LoggerFactory.getLogger(BacklogQuotaManagerTest.class);

    @BeforeMethod
    void setup() throws Exception {
        try {
            this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
            this.bkEnsemble.start();
            this.config = new ServiceConfiguration();
            this.config.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
            this.config.setAdvertisedAddress("localhost");
            this.config.setWebServicePort(Optional.of(0));
            this.config.setClusterName("usc");
            this.config.setBrokerShutdownTimeoutMs(0L);
            this.config.setBrokerServicePort(Optional.of(0));
            this.config.setAuthorizationEnabled(false);
            this.config.setAuthenticationEnabled(false);
            this.config.setBacklogQuotaCheckIntervalInSeconds(3);
            this.config.setManagedLedgerMaxEntriesPerLedger(5);
            this.config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
            this.config.setAllowAutoTopicCreationType("non-partitioned");
            this.pulsar = new PulsarService(this.config);
            this.pulsar.start();
            this.adminUrl = new URL("http://127.0.0.1:" + this.pulsar.getListenPortHTTP().get());
            this.admin = PulsarAdmin.builder().serviceHttpUrl(this.adminUrl.toString()).build();
            this.admin.clusters().createCluster("usc", ClusterData.builder().serviceUrl(this.adminUrl.toString()).build());
            this.admin.tenants().createTenant("prop", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid1"}), (Set)Sets.newHashSet((Object[])new String[]{"usc"})));
            this.admin.namespaces().createNamespace("prop/ns-quota");
            this.admin.namespaces().setNamespaceReplicationClusters("prop/ns-quota", (Set)Sets.newHashSet((Object[])new String[]{"usc"}));
            this.admin.namespaces().createNamespace("prop/quotahold");
            this.admin.namespaces().setNamespaceReplicationClusters("prop/quotahold", (Set)Sets.newHashSet((Object[])new String[]{"usc"}));
            this.admin.namespaces().createNamespace("prop/quotaholdasync");
            this.admin.namespaces().setNamespaceReplicationClusters("prop/quotaholdasync", (Set)Sets.newHashSet((Object[])new String[]{"usc"}));
        }
        catch (Throwable t) {
            LOG.error("Error setting up broker test", t);
            Assert.fail((String)"Broker test setup failed");
        }
    }

    @AfterMethod(alwaysRun=true)
    void shutdown() throws Exception {
        try {
            if (this.admin != null) {
                this.admin.close();
                this.admin = null;
            }
            if (this.pulsar != null) {
                this.pulsar.close();
                this.pulsar = null;
            }
            if (this.bkEnsemble != null) {
                this.bkEnsemble.stop();
                this.bkEnsemble = null;
            }
        }
        catch (Throwable t) {
            LOG.error("Error cleaning up broker test setup state", t);
            Assert.fail((String)"Broker test cleanup failed");
        }
    }

    private void rolloverStats() {
        this.pulsar.getBrokerService().updateRates();
    }

    @Test
    public void testBacklogQuotaWithReader() throws Exception {
        Assert.assertEquals((Map)this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), (Map)Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitSize(10240L).limitTime(3).retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception).build());
        try (PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();){
            Message msg;
            String topic1 = "persistent://prop/ns-quota/topic1";
            int numMsgs = 20;
            Reader reader = client.newReader().topic("persistent://prop/ns-quota/topic1").receiverQueueSize(1).startMessageId(MessageId.latest).create();
            Producer<byte[]> producer = this.createProducer(client, "persistent://prop/ns-quota/topic1");
            byte[] content = new byte[1024];
            for (int i = 0; i < 20; ++i) {
                content[0] = (byte)(content[0] + 1);
                MessageId messageId = producer.send((Object)content);
            }
            Thread.sleep(4000L);
            this.rolloverStats();
            TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic1");
            Assert.assertEquals((long)stats.getBacklogSize(), (long)0L, (String)("backlog size is [" + stats.getBacklogSize() + "]"));
            Assert.assertEquals((int)stats.getSubscriptions().size(), (int)1);
            long nonDurableSubscriptionBacklog = ((SubscriptionStats)stats.getSubscriptions().values().iterator().next()).getMsgBacklog();
            Assert.assertEquals((long)nonDurableSubscriptionBacklog, (long)5L, (String)("non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"));
            try {
                for (int i = 0; i < 20; ++i) {
                    content[0] = (byte)(content[0] + 1);
                    MessageId messageId = producer.send((Object)content);
                }
            }
            catch (PulsarClientException ce) {
                Assert.fail((String)("Should not have gotten exception: " + ce.getMessage()));
            }
            Awaitility.await().untilAsserted(() -> {
                PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats("persistent://prop/ns-quota/topic1", false);
                Assert.assertEquals((int)internalStats.ledgers.size(), (int)1);
                Assert.assertEquals((long)((ManagedLedgerInternalStats.LedgerInfo)internalStats.ledgers.get((int)0)).ledgerId, (long)7L);
            });
            while ((msg = reader.readNext(5, TimeUnit.SECONDS)) != null) {
                LOG.info("msg read: {} - {}", (Object)msg.getMessageId(), (Object)msg.getData()[0]);
            }
        }
    }

    @Test
    public void testTriggerBacklogQuotaSizeWithReader() throws Exception {
        Assert.assertEquals((Map)this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), (Map)Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitSize(10240L).limitTime(3).retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception).build());
        try (PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();){
            Message msg;
            String topic1 = "persistent://prop/ns-quota/topic1" + UUID.randomUUID();
            int numMsgs = 20;
            Reader reader = client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create();
            Producer<byte[]> producer = this.createProducer(client, topic1);
            byte[] content = new byte[1024];
            for (int i = 0; i < 20; ++i) {
                content[0] = (byte)(content[0] + 1);
                producer.send((Object)content);
            }
            Thread.sleep(3000L);
            this.admin.brokers().backlogQuotaCheck();
            this.rolloverStats();
            TopicStats stats = this.admin.topics().getStats(topic1);
            Assert.assertEquals((long)stats.getBacklogSize(), (long)0L, (String)("backlog size is [" + stats.getBacklogSize() + "]"));
            Assert.assertEquals((int)stats.getSubscriptions().size(), (int)1);
            long nonDurableSubscriptionBacklog = ((SubscriptionStats)stats.getSubscriptions().values().iterator().next()).getMsgBacklog();
            Assert.assertEquals((long)nonDurableSubscriptionBacklog, (long)5L, (String)("non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"));
            try {
                for (int i = 0; i < 20; ++i) {
                    content[0] = (byte)(content[0] + 1);
                    producer.send((Object)content);
                }
            }
            catch (PulsarClientException ce) {
                Assert.fail((String)("Should not have gotten exception: " + ce.getMessage()));
            }
            Awaitility.await().untilAsserted(() -> {
                PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats(topic1, false);
                Assert.assertEquals((int)internalStats.ledgers.size(), (int)1);
                Assert.assertEquals((long)((ManagedLedgerInternalStats.LedgerInfo)internalStats.ledgers.get((int)0)).ledgerId, (long)7L);
            });
            while ((msg = reader.readNext(5, TimeUnit.SECONDS)) != null) {
                LOG.info("msg read: {} - {}", (Object)msg.getMessageId(), (Object)msg.getData()[0]);
            }
            producer.close();
            reader.close();
        }
    }

    @Test
    public void testTriggerBacklogTimeQuotaWithReader() throws Exception {
        Assert.assertEquals((Map)this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), (Map)Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitSize(10240L).limitTime(3).retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception).build());
        try (PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();){
            Message msg;
            String topic1 = "persistent://prop/ns-quota/topic2" + UUID.randomUUID();
            int numMsgs = 9;
            Reader reader = client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create();
            Producer<byte[]> producer = this.createProducer(client, topic1);
            byte[] content = new byte[1024];
            for (int i = 0; i < 9; ++i) {
                content[0] = (byte)(content[0] + 1);
                producer.send((Object)content);
            }
            Thread.sleep(3000L);
            this.admin.brokers().backlogQuotaCheck();
            this.rolloverStats();
            TopicStats stats = this.admin.topics().getStats(topic1);
            Assert.assertEquals((long)stats.getBacklogSize(), (long)0L, (String)("backlog size is [" + stats.getBacklogSize() + "]"));
            Assert.assertEquals((int)stats.getSubscriptions().size(), (int)1);
            long nonDurableSubscriptionBacklog = ((SubscriptionStats)stats.getSubscriptions().values().iterator().next()).getMsgBacklog();
            Assert.assertEquals((long)nonDurableSubscriptionBacklog, (long)9L, (String)("non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"));
            Awaitility.await().pollDelay(Duration.ofSeconds(3L)).pollInterval(Duration.ofSeconds(1L)).untilAsserted(() -> {
                PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats(topic1, false);
                Assert.assertEquals((int)internalStats.ledgers.size(), (int)2);
            });
            try {
                for (int i = 0; i < 9; ++i) {
                    content[0] = (byte)(content[0] + 1);
                    producer.send((Object)content);
                }
            }
            catch (PulsarClientException ce) {
                Assert.fail((String)("Should not have gotten exception: " + ce.getMessage()));
            }
            while ((msg = reader.readNext(5, TimeUnit.SECONDS)) != null) {
                LOG.info("msg read: {} - {}", (Object)msg.getMessageId(), (Object)msg.getData()[0]);
            }
            producer.close();
            reader.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConsumerBacklogEvictionSizeQuota() throws Exception {
        Assert.assertEquals((Map)this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), (Map)Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitSize(10240L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build());
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            String topic1 = "persistent://prop/ns-quota/topic2";
            String subName1 = "c1";
            String subName2 = "c2";
            int numMsgs = 20;
            Consumer consumer1 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic2"}).subscriptionName("c1").subscribe();
            Consumer consumer2 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic2"}).subscriptionName("c2").subscribe();
            Producer<byte[]> producer = this.createProducer(client, "persistent://prop/ns-quota/topic2");
            byte[] content = new byte[1024];
            for (int i = 0; i < 20; ++i) {
                producer.send((Object)content);
                consumer1.receive();
                consumer2.receive();
            }
            Thread.sleep(4000L);
            this.rolloverStats();
            TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic2");
            Assert.assertTrue((stats.getBacklogSize() < 10240L ? 1 : 0) != 0, (String)("Storage size is [" + stats.getStorageSize() + "]"));
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    @Test
    public void testConsumerBacklogEvictionTimeQuotaPrecise() throws Exception {
        Assert.assertEquals((Map)this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), (Map)Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitSize(20480L).limitTime(3).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build());
        this.config.setPreciseTimeBasedBacklogQuotaCheck(true);
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        String topic1 = "persistent://prop/ns-quota/topic3";
        String subName1 = "c1";
        String subName2 = "c2";
        int numMsgs = 9;
        Consumer consumer1 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic3"}).subscriptionName("c1").subscribe();
        Consumer consumer2 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic3"}).subscriptionName("c2").subscribe();
        Producer<byte[]> producer = this.createProducer(client, "persistent://prop/ns-quota/topic3");
        byte[] content = new byte[1024];
        for (int i = 0; i < 9; ++i) {
            producer.send((Object)content);
            consumer1.receive();
            consumer2.receive();
        }
        TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic3");
        Assert.assertEquals((long)((SubscriptionStats)stats.getSubscriptions().get("c1")).getMsgBacklog(), (long)9L);
        Assert.assertEquals((long)((SubscriptionStats)stats.getSubscriptions().get("c2")).getMsgBacklog(), (long)9L);
        Thread.sleep(6000L);
        this.rolloverStats();
        stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic3");
        Assert.assertEquals((long)((SubscriptionStats)stats.getSubscriptions().get("c1")).getMsgBacklog(), (long)0L);
        Assert.assertEquals((long)((SubscriptionStats)stats.getSubscriptions().get("c2")).getMsgBacklog(), (long)0L);
        client.close();
    }

    @Test
    public void testConsumerBacklogEvictionTimeQuota() throws Exception {
        Assert.assertEquals((Map)this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), (Map)Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitSize(20480L).limitTime(3).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build());
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        String topic1 = "persistent://prop/ns-quota/topic3";
        String subName1 = "c1";
        String subName2 = "c2";
        int numMsgs = 14;
        Consumer consumer1 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic3"}).subscriptionName("c1").subscribe();
        Consumer consumer2 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic3"}).subscriptionName("c2").subscribe();
        Producer<byte[]> producer = this.createProducer(client, "persistent://prop/ns-quota/topic3");
        byte[] content = new byte[1024];
        for (int i = 0; i < 14; ++i) {
            producer.send((Object)content);
            consumer1.receive();
            consumer2.receive();
        }
        TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic3");
        Assert.assertEquals((long)((SubscriptionStats)stats.getSubscriptions().get("c1")).getMsgBacklog(), (long)14L);
        Assert.assertEquals((long)((SubscriptionStats)stats.getSubscriptions().get("c2")).getMsgBacklog(), (long)14L);
        Thread.sleep(6000L);
        this.rolloverStats();
        stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic3");
        Assert.assertEquals((long)((SubscriptionStats)stats.getSubscriptions().get("c1")).getMsgBacklog(), (long)4L);
        Assert.assertEquals((long)((SubscriptionStats)stats.getSubscriptions().get("c2")).getMsgBacklog(), (long)4L);
        client.close();
    }

    @Test
    public void testConsumerBacklogEvictionWithAckSizeQuota() throws Exception {
        Assert.assertEquals((Map)this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), (Map)Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitSize(10240L).limitTime(3).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build());
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).build();
        String topic1 = "persistent://prop/ns-quota/topic11";
        String subName1 = "c11";
        String subName2 = "c21";
        int numMsgs = 20;
        Consumer consumer1 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic11"}).subscriptionName("c11").subscribe();
        Consumer consumer2 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic11"}).subscriptionName("c21").subscribe();
        Producer<byte[]> producer = this.createProducer(client, "persistent://prop/ns-quota/topic11");
        byte[] content = new byte[1024];
        for (int i = 0; i < 20; ++i) {
            producer.send((Object)content);
            consumer1.acknowledge(consumer1.receive());
            consumer2.receive();
        }
        Thread.sleep(4000L);
        this.rolloverStats();
        TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic11");
        Assert.assertTrue((stats.getBacklogSize() <= 10240L ? 1 : 0) != 0, (String)("Storage size is [" + stats.getStorageSize() + "]"));
    }

    @Test
    public void testConsumerBacklogEvictionWithAckTimeQuotaPrecise() throws Exception {
        Assert.assertEquals((Map)this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), (Map)Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitSize(10240L).limitTime(3).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build());
        this.config.setPreciseTimeBasedBacklogQuotaCheck(true);
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).build();
        String topic1 = "persistent://prop/ns-quota/topic12";
        String subName1 = "c11";
        String subName2 = "c21";
        int numMsgs = 9;
        Consumer consumer1 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic12"}).subscriptionName("c11").subscribe();
        Consumer consumer2 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic12"}).subscriptionName("c21").subscribe();
        Producer<byte[]> producer = this.createProducer(client, "persistent://prop/ns-quota/topic12");
        byte[] content = new byte[1024];
        for (int i = 0; i < 9; ++i) {
            producer.send((Object)content);
            consumer1.receive();
            consumer2.receive();
        }
        TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic12");
        Assert.assertEquals((long)((SubscriptionStats)stats.getSubscriptions().get("c11")).getMsgBacklog(), (long)9L);
        Assert.assertEquals((long)((SubscriptionStats)stats.getSubscriptions().get("c21")).getMsgBacklog(), (long)9L);
        consumer1.redeliverUnacknowledgedMessages();
        for (int i = 0; i < 9; ++i) {
            consumer1.acknowledge(consumer1.receive());
        }
        Thread.sleep(1000L);
        this.rolloverStats();
        stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic12");
        Assert.assertEquals((long)((SubscriptionStats)stats.getSubscriptions().get("c11")).getMsgBacklog(), (long)0L);
        Assert.assertEquals((long)((SubscriptionStats)stats.getSubscriptions().get("c21")).getMsgBacklog(), (long)9L);
        Thread.sleep(6000L);
        this.rolloverStats();
        stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic12");
        Assert.assertEquals((long)((SubscriptionStats)stats.getSubscriptions().get("c21")).getMsgBacklog(), (long)0L);
        client.close();
    }

    private Producer<byte[]> createProducer(PulsarClient client, String topic) throws PulsarClientException {
        return client.newProducer().enableBatching(false).sendTimeout(2, TimeUnit.SECONDS).topic(topic).create();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConsumerBacklogEvictionWithAckTimeQuota() throws Exception {
        Assert.assertEquals((Map)this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), (Map)Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitSize(20480L).limitTime(6).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build());
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).build();
        try {
            String topic1 = "persistent://prop/ns-quota/topic12";
            String subName1 = "c11";
            String subName2 = "c21";
            int numMsgs = 14;
            Consumer consumer1 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic12"}).subscriptionName("c11").subscribe();
            Consumer consumer2 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic12"}).subscriptionName("c21").subscribe();
            Producer<byte[]> producer = this.createProducer(client, "persistent://prop/ns-quota/topic12");
            byte[] content = new byte[1024];
            ArrayList<Message> messagesToAcknowledge = new ArrayList<Message>();
            for (int i = 0; i < 14; ++i) {
                producer.send((Object)content);
                messagesToAcknowledge.add(consumer1.receive());
                consumer2.receive();
            }
            TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic12");
            Assert.assertEquals((long)((SubscriptionStats)stats.getSubscriptions().get("c11")).getMsgBacklog(), (long)14L);
            Assert.assertEquals((long)((SubscriptionStats)stats.getSubscriptions().get("c21")).getMsgBacklog(), (long)14L);
            for (int i = 0; i < 14; ++i) {
                if (i == 10) {
                    Thread.sleep(3000L);
                }
                consumer1.acknowledge((Message)messagesToAcknowledge.get(i));
            }
            Awaitility.await().pollInterval(Duration.ofSeconds(1L)).untilAsserted(() -> {
                this.rolloverStats();
                TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic12");
                Assert.assertEquals((long)((SubscriptionStats)stats.getSubscriptions().get("c11")).getMsgBacklog(), (long)0L);
                Assert.assertEquals((long)((SubscriptionStats)stats.getSubscriptions().get("c21")).getMsgBacklog(), (long)14L);
            });
            Awaitility.await().pollInterval(Duration.ofSeconds(1L)).atMost(Duration.ofSeconds(12L)).untilAsserted(() -> {
                long msgBacklog = ((SubscriptionStats)this.admin.topics().getStats("persistent://prop/ns-quota/topic12").getSubscriptions().get("c21")).getMsgBacklog();
                Assert.assertEquals((float)msgBacklog, (float)4.0f, (float)1.0f);
            });
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentAckAndEviction() throws Exception {
        Assert.assertEquals((Map)this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), (Map)Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitSize(10240L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build());
        String topic1 = "persistent://prop/ns-quota/topic12";
        String subName1 = "c12";
        String subName2 = "c22";
        int numMsgs = 20;
        final CyclicBarrier barrier = new CyclicBarrier(2);
        final CountDownLatch counter = new CountDownLatch(2);
        final AtomicBoolean gotException = new AtomicBoolean(false);
        final PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            PulsarClient client2 = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                final Consumer consumer1 = client2.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic12"}).subscriptionName("c12").subscribe();
                final Consumer consumer2 = client2.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic12"}).subscriptionName("c22").subscribe();
                Thread producerThread = new Thread(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run() {
                        try {
                            barrier.await();
                            Producer producer = BacklogQuotaManagerTest.this.createProducer(client, "persistent://prop/ns-quota/topic12");
                            byte[] content = new byte[1024];
                            for (int i = 0; i < 20; ++i) {
                                producer.send((Object)content);
                            }
                            producer.close();
                        }
                        catch (Exception e) {
                            gotException.set(true);
                        }
                        finally {
                            counter.countDown();
                        }
                    }
                };
                Thread consumerThread = new Thread(){

                    @Override
                    public void run() {
                        try {
                            barrier.await();
                            for (int i = 0; i < 20; ++i) {
                                consumer1.acknowledge(consumer1.receive());
                                consumer2.receive();
                            }
                        }
                        catch (Exception e) {
                            gotException.set(true);
                        }
                        finally {
                            counter.countDown();
                        }
                    }
                };
                producerThread.start();
                consumerThread.start();
                counter.await(20L, TimeUnit.SECONDS);
                Assert.assertFalse((boolean)gotException.get());
                Thread.sleep(4000L);
                this.rolloverStats();
                TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic12");
                Assert.assertTrue((stats.getBacklogSize() <= 10240L ? 1 : 0) != 0, (String)("Storage size is [" + stats.getStorageSize() + "]"));
            }
            finally {
                if (Collections.singletonList(client2).get(0) != null) {
                    client2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNoEviction() throws Exception {
        Assert.assertEquals((Map)this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), (Map)Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitSize(10240L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build());
        String topic1 = "persistent://prop/ns-quota/topic13";
        String subName1 = "c13";
        String subName2 = "c23";
        int numMsgs = 10;
        final CyclicBarrier barrier = new CyclicBarrier(2);
        final CountDownLatch counter = new CountDownLatch(2);
        final AtomicBoolean gotException = new AtomicBoolean(false);
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            final Consumer consumer1 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic13"}).subscriptionName("c13").subscribe();
            final Consumer consumer2 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic13"}).subscriptionName("c23").subscribe();
            final PulsarClient client2 = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                Thread producerThread = new Thread(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run() {
                        try {
                            barrier.await();
                            Producer producer = BacklogQuotaManagerTest.this.createProducer(client2, "persistent://prop/ns-quota/topic13");
                            byte[] content = new byte[1024];
                            for (int i = 0; i < 10; ++i) {
                                producer.send((Object)content);
                            }
                            producer.close();
                        }
                        catch (Exception e) {
                            gotException.set(true);
                        }
                        finally {
                            counter.countDown();
                        }
                    }
                };
                Thread consumerThread = new Thread(){

                    @Override
                    public void run() {
                        try {
                            barrier.await();
                            for (int i = 0; i < 10; ++i) {
                                consumer1.acknowledge(consumer1.receive());
                                consumer2.acknowledge(consumer2.receive());
                            }
                        }
                        catch (Exception e) {
                            gotException.set(true);
                        }
                        finally {
                            counter.countDown();
                        }
                    }
                };
                producerThread.start();
                consumerThread.start();
                counter.await();
                Assert.assertFalse((boolean)gotException.get());
            }
            finally {
                if (Collections.singletonList(client2).get(0) != null) {
                    client2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEvictionMulti() throws Exception {
        Assert.assertEquals((Map)this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), (Map)Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitSize(15360L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build());
        String topic1 = "persistent://prop/ns-quota/topic14";
        String subName1 = "c14";
        String subName2 = "c24";
        int numMsgs = 10;
        final CyclicBarrier barrier = new CyclicBarrier(4);
        final CountDownLatch counter = new CountDownLatch(4);
        final AtomicBoolean gotException = new AtomicBoolean(false);
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            final Consumer consumer1 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic14"}).subscriptionName("c14").subscribe();
            final Consumer consumer2 = client.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic14"}).subscriptionName("c24").subscribe();
            final PulsarClient client3 = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                final PulsarClient client2 = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
                try {
                    Thread producerThread1 = new Thread(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        @Override
                        public void run() {
                            try {
                                barrier.await();
                                Producer producer = BacklogQuotaManagerTest.this.createProducer(client2, "persistent://prop/ns-quota/topic14");
                                byte[] content = new byte[1024];
                                for (int i = 0; i < 10; ++i) {
                                    producer.send((Object)content);
                                }
                                producer.close();
                            }
                            catch (Exception e) {
                                gotException.set(true);
                            }
                            finally {
                                counter.countDown();
                            }
                        }
                    };
                    Thread producerThread2 = new Thread(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        @Override
                        public void run() {
                            try {
                                barrier.await();
                                Producer producer = BacklogQuotaManagerTest.this.createProducer(client3, "persistent://prop/ns-quota/topic14");
                                byte[] content = new byte[1024];
                                for (int i = 0; i < 10; ++i) {
                                    producer.send((Object)content);
                                }
                                producer.close();
                            }
                            catch (Exception e) {
                                gotException.set(true);
                            }
                            finally {
                                counter.countDown();
                            }
                        }
                    };
                    Thread consumerThread1 = new Thread(){

                        @Override
                        public void run() {
                            try {
                                barrier.await();
                                for (int i = 0; i < 20; ++i) {
                                    consumer1.acknowledge(consumer1.receive());
                                }
                            }
                            catch (Exception e) {
                                gotException.set(true);
                            }
                            finally {
                                counter.countDown();
                            }
                        }
                    };
                    Thread consumerThread2 = new Thread(){

                        @Override
                        public void run() {
                            try {
                                barrier.await();
                                for (int i = 0; i < 20; ++i) {
                                    consumer2.acknowledge(consumer2.receive());
                                }
                            }
                            catch (Exception e) {
                                gotException.set(true);
                            }
                            finally {
                                counter.countDown();
                            }
                        }
                    };
                    producerThread1.start();
                    producerThread2.start();
                    consumerThread1.start();
                    consumerThread2.start();
                    counter.await(20L, TimeUnit.SECONDS);
                    Assert.assertFalse((boolean)gotException.get());
                    Thread.sleep(4000L);
                    this.rolloverStats();
                    TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic14");
                    Assert.assertTrue((stats.getBacklogSize() <= 15360L ? 1 : 0) != 0, (String)("Storage size is [" + stats.getStorageSize() + "]"));
                }
                finally {
                    if (Collections.singletonList(client2).get(0) != null) {
                        client2.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(client3).get(0) != null) {
                    client3.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAheadProducerOnHold() throws Exception {
        Assert.assertEquals((Map)this.admin.namespaces().getBacklogQuotaMap("prop/quotahold"), (Map)Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/quotahold", BacklogQuota.builder().limitSize(10240L).retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold).build());
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            int i;
            String topic1 = "persistent://prop/quotahold/hold";
            String subName1 = "c1hold";
            int numMsgs = 10;
            Consumer consumer = client.newConsumer().topic(new String[]{"persistent://prop/quotahold/hold"}).subscriptionName("c1hold").subscribe();
            byte[] content = new byte[1024];
            Producer<byte[]> producer = this.createProducer(client, "persistent://prop/quotahold/hold");
            for (i = 0; i <= 10; ++i) {
                try {
                    producer.send((Object)content);
                    LOG.info("sent [{}]", (Object)i);
                    continue;
                }
                catch (PulsarClientException.TimeoutException cte) {
                    LOG.info("timeout on [{}]", (Object)i);
                }
            }
            for (i = 0; i < 10; ++i) {
                consumer.receive();
                LOG.info("received [{}]", (Object)i);
            }
            Thread.sleep(4000L);
            this.rolloverStats();
            TopicStats stats = this.admin.topics().getStats("persistent://prop/quotahold/hold");
            Assert.assertEquals((int)stats.getPublishers().size(), (int)0, (String)("Number of producers on topic persistent://prop/quotahold/hold are [" + stats.getPublishers().size() + "]"));
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAheadProducerOnHoldTimeout() throws Exception {
        Assert.assertEquals((Map)this.admin.namespaces().getBacklogQuotaMap("prop/quotahold"), (Map)Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/quotahold", BacklogQuota.builder().limitSize(10240L).retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold).build());
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            String topic1 = "persistent://prop/quotahold/holdtimeout";
            String subName1 = "c1holdtimeout";
            boolean gotException = false;
            client.newConsumer().topic(new String[]{"persistent://prop/quotahold/holdtimeout"}).subscriptionName("c1holdtimeout").subscribe();
            byte[] content = new byte[1024];
            Producer<byte[]> producer = this.createProducer(client, "persistent://prop/quotahold/holdtimeout");
            for (int i = 0; i < 10; ++i) {
                producer.send((Object)content);
            }
            Thread.sleep(4000L);
            try {
                producer.send((Object)content);
                producer.send((Object)content);
                Assert.fail((String)"backlog quota did not exceed");
            }
            catch (PulsarClientException.TimeoutException te) {
                gotException = true;
            }
            Assert.assertTrue((boolean)gotException, (String)"timeout did not occur");
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProducerException() throws Exception {
        Assert.assertEquals((Map)this.admin.namespaces().getBacklogQuotaMap("prop/quotahold"), (Map)Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/quotahold", BacklogQuota.builder().limitSize(10240L).retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception).build());
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            String topic1 = "persistent://prop/quotahold/except";
            String subName1 = "c1except";
            boolean gotException = false;
            client.newConsumer().topic(new String[]{"persistent://prop/quotahold/except"}).subscriptionName("c1except").subscribe();
            byte[] content = new byte[1024];
            Producer<byte[]> producer = this.createProducer(client, "persistent://prop/quotahold/except");
            for (int i = 0; i < 10; ++i) {
                producer.send((Object)content);
            }
            Thread.sleep(4000L);
            try {
                producer.send((Object)content);
                producer.send((Object)content);
                Assert.fail((String)"backlog quota did not exceed");
            }
            catch (PulsarClientException ce) {
                Assert.assertTrue((ce instanceof PulsarClientException.ProducerBlockedQuotaExceededException || ce instanceof PulsarClientException.TimeoutException ? 1 : 0) != 0, (String)ce.getMessage());
                gotException = true;
            }
            Assert.assertTrue((boolean)gotException, (String)"backlog exceeded exception did not occur");
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception {
        Assert.assertEquals((Map)this.admin.namespaces().getBacklogQuotaMap("prop/quotahold"), (Map)Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/quotahold", BacklogQuota.builder().limitSize(10240L).retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception).build());
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            String topic1 = "persistent://prop/quotahold/exceptandunblock";
            String subName1 = "c1except";
            boolean gotException = false;
            Consumer consumer = client.newConsumer().topic(new String[]{"persistent://prop/quotahold/exceptandunblock"}).subscriptionName("c1except").subscribe();
            byte[] content = new byte[1024];
            Producer<byte[]> producer = this.createProducer(client, "persistent://prop/quotahold/exceptandunblock");
            for (int i = 0; i < 10; ++i) {
                producer.send((Object)content);
            }
            Thread.sleep(4000L);
            try {
                producer.send((Object)content);
                producer.send((Object)content);
                Assert.fail((String)"backlog quota did not exceed");
            }
            catch (PulsarClientException ce) {
                Assert.assertTrue((ce instanceof PulsarClientException.ProducerBlockedQuotaExceededException || ce instanceof PulsarClientException.TimeoutException ? 1 : 0) != 0, (String)ce.getMessage());
                gotException = true;
            }
            Assert.assertTrue((boolean)gotException, (String)"backlog exceeded exception did not occur");
            TopicStats stats = this.admin.topics().getStats("persistent://prop/quotahold/exceptandunblock");
            int backlog = (int)((SubscriptionStats)stats.getSubscriptions().get("c1except")).getMsgBacklog();
            for (int i = 0; i < backlog; ++i) {
                Message msg = consumer.receive();
                consumer.acknowledge(msg);
            }
            Thread.sleep(4000L);
            Exception sendException = null;
            gotException = false;
            try {
                for (int i = 0; i < 5; ++i) {
                    producer.send((Object)content);
                }
            }
            catch (Exception e) {
                gotException = true;
                sendException = e;
            }
            Assert.assertFalse((boolean)gotException, (String)("unable to publish due to " + sendException));
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    @Test
    public void testProducerExceptionAndThenUnblockTimeQuotaPrecise() throws Exception {
        Assert.assertEquals((Map)this.admin.namespaces().getBacklogQuotaMap("prop/quotahold"), (Map)Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/quotahold", BacklogQuota.builder().limitSize(10240L).limitTime(3).retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception).build());
        this.config.setPreciseTimeBasedBacklogQuotaCheck(true);
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        String topic1 = "persistent://prop/quotahold/exceptandunblock2";
        String subName1 = "c1except";
        boolean gotException = false;
        int numMsgs = 9;
        Consumer consumer = client.newConsumer().topic(new String[]{"persistent://prop/quotahold/exceptandunblock2"}).subscriptionName("c1except").subscribe();
        byte[] content = new byte[1024];
        Producer<byte[]> producer = this.createProducer(client, "persistent://prop/quotahold/exceptandunblock2");
        for (int i = 0; i < numMsgs; ++i) {
            producer.send((Object)content);
        }
        Thread.sleep(6000L);
        try {
            producer.send((Object)content);
            Assert.fail((String)"backlog quota did not exceed");
        }
        catch (PulsarClientException ce) {
            Assert.assertTrue((ce instanceof PulsarClientException.ProducerBlockedQuotaExceededException || ce instanceof PulsarClientException.TimeoutException ? 1 : 0) != 0, (String)ce.getMessage());
            gotException = true;
        }
        Assert.assertTrue((boolean)gotException, (String)"backlog exceeded exception did not occur");
        TopicStats stats = this.admin.topics().getStats("persistent://prop/quotahold/exceptandunblock2");
        Assert.assertEquals((long)((SubscriptionStats)stats.getSubscriptions().get("c1except")).getMsgBacklog(), (long)numMsgs);
        for (int i = 0; i < numMsgs; ++i) {
            consumer.acknowledge(consumer.receive());
        }
        Thread.sleep(6000L);
        this.rolloverStats();
        stats = this.admin.topics().getStats("persistent://prop/quotahold/exceptandunblock2");
        Assert.assertEquals((long)((SubscriptionStats)stats.getSubscriptions().get("c1except")).getMsgBacklog(), (long)0L);
        Exception sendException = null;
        gotException = false;
        try {
            for (int i = 0; i < 5; ++i) {
                producer.send((Object)content);
            }
        }
        catch (Exception e) {
            gotException = true;
            sendException = e;
        }
        Assert.assertFalse((boolean)gotException, (String)("unable to publish due to " + sendException));
        client.close();
    }

    @Test
    public void testProducerExceptionAndThenUnblockTimeQuota() throws Exception {
        Assert.assertEquals((Map)this.admin.namespaces().getBacklogQuotaMap("prop/quotahold"), (Map)Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/quotahold", BacklogQuota.builder().limitSize(15360L).limitTime(3).retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception).build());
        PulsarClient client = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        String topic1 = "persistent://prop/quotahold/exceptandunblock2";
        String subName1 = "c1except";
        boolean gotException = false;
        int numMsgs = 14;
        Consumer consumer = client.newConsumer().topic(new String[]{"persistent://prop/quotahold/exceptandunblock2"}).subscriptionName("c1except").subscribe();
        byte[] content = new byte[1024];
        Producer<byte[]> producer = this.createProducer(client, "persistent://prop/quotahold/exceptandunblock2");
        for (int i = 0; i < numMsgs; ++i) {
            producer.send((Object)content);
        }
        Thread.sleep(6000L);
        try {
            producer.send((Object)content);
            Assert.fail((String)"backlog quota did not exceed");
        }
        catch (PulsarClientException ce) {
            Assert.assertTrue((ce instanceof PulsarClientException.ProducerBlockedQuotaExceededException || ce instanceof PulsarClientException.TimeoutException ? 1 : 0) != 0, (String)ce.getMessage());
            gotException = true;
        }
        Assert.assertTrue((boolean)gotException, (String)"backlog exceeded exception did not occur");
        TopicStats stats = this.admin.topics().getStats("persistent://prop/quotahold/exceptandunblock2");
        Assert.assertEquals((long)((SubscriptionStats)stats.getSubscriptions().get("c1except")).getMsgBacklog(), (long)numMsgs);
        for (int i = 0; i < numMsgs; ++i) {
            consumer.acknowledge(consumer.receive());
        }
        Thread.sleep(6000L);
        this.rolloverStats();
        stats = this.admin.topics().getStats("persistent://prop/quotahold/exceptandunblock2");
        Assert.assertEquals((long)((SubscriptionStats)stats.getSubscriptions().get("c1except")).getMsgBacklog(), (long)0L);
        Exception sendException = null;
        gotException = false;
        try {
            for (int i = 0; i < 5; ++i) {
                producer.send((Object)content);
            }
        }
        catch (Exception e) {
            gotException = true;
            sendException = e;
        }
        Assert.assertFalse((boolean)gotException, (String)("unable to publish due to " + sendException));
        client.close();
    }
}

