/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.testng.collections.Lists;

@Test(groups={"flaky"})
public class DispatcherBlockConsumerTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(DispatcherBlockConsumerTest.class);

    @Override
    @BeforeMethod(alwaysRun=true)
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider(name="gracefulUnload")
    public Object[][] bundleUnloading() {
        return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(enabled=false)
    public void testConsumerBlockingWithUnAckedMessagesAtDispatcher() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerSubscription();
        try {
            this.stopBroker();
            this.startBroker();
            int unackMsgAllowed = 100;
            int receiverQueueSize = 10;
            int totalProducedMsgs = 200;
            String topicName = "persistent://my-property/my-ns/unacked-topic";
            String subscriberName = "subscriber-1";
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscription(100);
            ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared);
            Consumer consumer1 = consumerBuilder.subscribe();
            Consumer consumer2 = consumerBuilder.subscribe();
            Consumer consumer3 = consumerBuilder.subscribe();
            List consumers = Lists.newArrayList((Object[])new Consumer[]{consumer1, consumer2, consumer3});
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
            for (int i = 0; i < 200; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            Message msg = null;
            HashMap messages = Maps.newHashMap();
            for (int i = 0; i < 3; ++i) {
                for (int j = 0; j < 200 && (msg = ((Consumer)consumers.get(i)).receive(500, TimeUnit.MILLISECONDS)) != null; ++j) {
                    messages.put(msg, consumers.get(i));
                }
            }
            Assert.assertEquals((float)messages.size(), (float)100.0f, (float)30.0f);
            messages.forEach((m, c) -> {
                try {
                    c.acknowledge(m);
                }
                catch (PulsarClientException e) {
                    Assert.fail((String)"ack failed", (Throwable)e);
                }
            });
            ConcurrentHashMap.KeySetView result = ConcurrentHashMap.newKeySet();
            int expectedRemainingMessages = 200 - messages.size();
            CountDownLatch latch = new CountDownLatch(expectedRemainingMessages);
            for (int i = 0; i < consumers.size(); ++i) {
                int consumerCount = i;
                for (int j = 0; j < 200; ++j) {
                    ((Consumer)consumers.get(i)).receiveAsync().thenAccept(m -> {
                        result.add(m.getMessageId());
                        try {
                            ((Consumer)consumers.get(consumerCount)).acknowledge(m);
                        }
                        catch (PulsarClientException e) {
                            Assert.fail((String)"failed to ack msg", (Throwable)e);
                        }
                        latch.countDown();
                    });
                }
            }
            latch.await(10L, TimeUnit.SECONDS);
            Assert.assertEquals((int)result.size(), (int)expectedRemainingMessages);
            producer.close();
            consumers.forEach(c -> {
                try {
                    c.close();
                }
                catch (PulsarClientException pulsarClientException) {
                    // empty catch block
                }
            });
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(enabled=false)
    public void testConsumerBlockingWithUnAckedMessagesAndRedelivery() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerSubscription();
        try {
            int unackMsgAllowed = 100;
            int totalProducedMsgs = 150;
            int receiverQueueSize = 10;
            String topicName = "persistent://my-property/my-ns/unacked-topic-" + UUID.randomUUID().toString();
            String subscriberName = "subscriber-1";
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscription(100);
            ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("subscriber-1").receiverQueueSize(10).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Shared);
            ConsumerImpl consumer1 = (ConsumerImpl)consumerBuilder.subscribe();
            ConsumerImpl consumer2 = (ConsumerImpl)consumerBuilder.subscribe();
            ConsumerImpl consumer3 = (ConsumerImpl)consumerBuilder.subscribe();
            List consumers = Lists.newArrayList((Object[])new ConsumerImpl[]{consumer1, consumer2, consumer3});
            Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
            for (int i = 0; i < 150; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            Message msg = null;
            ArrayListMultimap messages = ArrayListMultimap.create();
            for (int i = 0; i < 3; ++i) {
                for (int j = 0; j < 150 && (msg = ((ConsumerImpl)consumers.get(i)).receive(500, TimeUnit.MILLISECONDS)) != null; ++j) {
                    messages.put(consumers.get(i), (Object)msg.getMessageId());
                    log.info("Received message: " + new String(msg.getData()));
                }
            }
            Assert.assertNotEquals((Object)messages.size(), (Object)150);
            messages.asMap().forEach((c, msgs) -> c.redeliverUnacknowledgedMessages(msgs.stream().map(m -> (MessageIdImpl)m).collect(Collectors.toSet())));
            ConcurrentHashMap.KeySetView result = ConcurrentHashMap.newKeySet();
            for (int i = 0; i < consumers.size(); ++i) {
                int consumerCount = i;
                for (int j = 0; j < 150; ++j) {
                    ((ConsumerImpl)consumers.get(i)).receiveAsync().thenAccept(m -> {
                        log.info("Received: {}", (Object)new String(m.getData()));
                        result.add(m.getMessageId());
                        try {
                            ((ConsumerImpl)consumers.get(consumerCount)).acknowledge(m);
                        }
                        catch (PulsarClientException e) {
                            Assert.fail((String)"failed to ack msg", (Throwable)e);
                        }
                    });
                }
            }
            while (result.size() < 150) {
                Thread.sleep(100L);
                log.info("Result Size: " + result.size());
            }
            Assert.assertEquals((int)result.size(), (int)150);
            Assert.assertTrue((result.size() >= 150 ? 1 : 0) != 0);
            producer.close();
            consumers.forEach(c -> {
                try {
                    c.close();
                }
                catch (PulsarClientException pulsarClientException) {
                    // empty catch block
                }
            });
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(enabled=false)
    public void testCloseConsumerBlockedDispatcher() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topicName = "persistent://my-property/my-ns/unacked-topic-" + UUID.randomUUID().toString();
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerSubscription();
        try {
            int unackMsgAllowed = 100;
            int receiverQueueSize = 10;
            int totalProducedMsgs = 200;
            String subscriberName = "subscriber-1";
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscription(100);
            Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
            Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
            for (int i = 0; i < 200; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            Message msg = null;
            HashMap messages = Maps.newHashMap();
            for (int i = 0; i < 200 && (msg = consumer1.receive(500, TimeUnit.MILLISECONDS)) != null; ++i) {
                messages.put(msg, consumer1);
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((float)messages.size(), (float)100.0f, (float)20.0f);
            consumer1.close();
            Consumer consumer2 = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
            HashMap messages2 = Maps.newHashMap();
            for (int i = 0; i < 200 && (msg = consumer2.receive(5, TimeUnit.SECONDS)) != null; ++i) {
                messages2.put(msg, consumer2);
                consumer2.acknowledge(msg);
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)messages2.size(), (int)200);
            log.info("-- Exiting {} test --", (Object)this.methodName);
            producer.close();
            consumer2.close();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(enabled=false)
    public void testRedeliveryOnBlockedDispatcher() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerSubscription();
        try {
            int unackMsgAllowed = 100;
            int receiverQueueSize = 10;
            int totalProducedMsgs = 150;
            String topicName = "persistent://my-property/my-ns/unacked-topic-" + UUID.randomUUID().toString();
            String subscriberName = "subscriber-1";
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscription(100);
            ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared);
            ConsumerImpl consumer1 = (ConsumerImpl)consumerBuilder.subscribe();
            ConsumerImpl consumer2 = (ConsumerImpl)consumerBuilder.subscribe();
            ConsumerImpl consumer3 = (ConsumerImpl)consumerBuilder.subscribe();
            List consumers = Lists.newArrayList((Object[])new ConsumerImpl[]{consumer1, consumer2, consumer3});
            Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
            for (int i = 0; i < 150; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            Message msg = null;
            HashSet messages = Sets.newHashSet();
            for (int i = 0; i < 3; ++i) {
                for (int j = 0; j < 150 && (msg = ((ConsumerImpl)consumers.get(i)).receive(500, TimeUnit.MILLISECONDS)) != null; ++j) {
                    messages.add(msg.getMessageId());
                    log.info("Received message: " + new String(msg.getData()));
                }
            }
            int totalConsumedMsgs = messages.size();
            Assert.assertEquals((float)totalConsumedMsgs, (float)100.0f, (float)30.0f);
            consumers.forEach(c -> c.redeliverUnacknowledgedMessages());
            Thread.sleep(1000L);
            HashMap messages1 = Maps.newHashMap();
            for (int i = 0; i < 3; ++i) {
                for (int j = 0; j < 150 && (msg = ((ConsumerImpl)consumers.get(i)).receive(500, TimeUnit.MILLISECONDS)) != null; ++j) {
                    messages1.putIfAbsent(consumers.get(i), Sets.newHashSet());
                    ((Set)messages1.get(consumers.get(i))).add(msg.getMessageId());
                    log.info("Received message: " + new String(msg.getData()));
                }
            }
            HashSet result = Sets.newHashSet();
            messages1.values().forEach(result::addAll);
            Assert.assertEquals((float)totalConsumedMsgs, (float)result.size(), (float)30.0f);
            messages1.forEach((c, msgs) -> msgs.forEach(m -> {
                try {
                    c.acknowledge(m);
                }
                catch (PulsarClientException e) {
                    Assert.fail((String)"ack failed", (Throwable)e);
                }
            }));
            messages1.values().forEach(result::addAll);
            int remainingMessages = 150 - result.size();
            CountDownLatch latch = new CountDownLatch(remainingMessages);
            ConcurrentLinkedQueue consumedMessages = Queues.newConcurrentLinkedQueue();
            for (int i = 0; i < consumers.size(); ++i) {
                int counsumerIndex = i;
                for (int j = 0; j < remainingMessages; ++j) {
                    ((ConsumerImpl)consumers.get(i)).receiveAsync().thenAccept(m -> {
                        consumedMessages.add(m.getMessageId());
                        try {
                            ((ConsumerImpl)consumers.get(counsumerIndex)).acknowledge(m);
                        }
                        catch (PulsarClientException e) {
                            Assert.fail((String)"failed to ack", (Throwable)e);
                        }
                        latch.countDown();
                    });
                }
            }
            latch.await();
            Assert.assertTrue((consumedMessages.size() >= remainingMessages ? 1 : 0) != 0);
            producer.close();
            consumers.forEach(c -> {
                try {
                    c.close();
                }
                catch (PulsarClientException pulsarClientException) {
                    // empty catch block
                }
            });
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBlockDispatcherStats() throws Exception {
        int orginalDispatcherLimit = this.conf.getMaxUnackedMessagesPerSubscription();
        try {
            String topicName = "persistent://prop/use/ns-abc/blockDispatch";
            String subName = "blockDispatch";
            int timeWaitToSync = 100;
            this.conf.setMaxUnackedMessagesPerSubscription(10);
            this.stopBroker();
            this.startBroker();
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/blockDispatch"}).subscriptionName("blockDispatch").subscriptionType(SubscriptionType.Shared).subscribe();
            Thread.sleep(100L);
            PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/blockDispatch").get();
            Assert.assertNotNull((Object)topicRef);
            this.rolloverPerIntervalStats();
            TopicStats stats = topicRef.getStats(false, false);
            SubscriptionStats subStats = (SubscriptionStats)stats.subscriptions.values().iterator().next();
            Assert.assertEquals((int)stats.subscriptions.keySet().size(), (int)1);
            Assert.assertEquals((long)subStats.msgBacklog, (long)0L);
            Assert.assertEquals((int)subStats.consumers.size(), (int)1);
            Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/blockDispatch").create();
            Thread.sleep(100L);
            for (int i = 0; i < 100; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            Thread.sleep(100L);
            this.rolloverPerIntervalStats();
            stats = topicRef.getStats(false, false);
            subStats = (SubscriptionStats)stats.subscriptions.values().iterator().next();
            Assert.assertTrue((subStats.msgBacklog > 0L ? 1 : 0) != 0);
            Assert.assertTrue((subStats.unackedMessages > 0L ? 1 : 0) != 0);
            Assert.assertTrue((boolean)subStats.blockedSubscriptionOnUnackedMsgs);
            Assert.assertEquals((long)((ConsumerStats)subStats.consumers.get((int)0)).unackedMessages, (long)subStats.unackedMessages);
            Assert.assertTrue((((ConsumerStats)subStats.consumers.get((int)0)).msgRateOut > 0.0 ? 1 : 0) != 0);
            Assert.assertTrue((((ConsumerStats)subStats.consumers.get((int)0)).msgThroughputOut > 0.0 ? 1 : 0) != 0);
            Assert.assertEquals((double)subStats.msgRateRedeliver, (double)0.0);
            producer.close();
            consumer.close();
        }
        finally {
            this.conf.setMaxUnackedMessagesPerSubscription(orginalDispatcherLimit);
        }
    }

    @Test(dataProvider="gracefulUnload")
    public void testBrokerSubscriptionRecovery(boolean unloadBundleGracefully) throws Exception {
        Message msg2;
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topicName = "persistent://my-property/my-ns/unacked-topic";
        String subscriberName = "subscriber-1";
        int totalProducedMsgs = 500;
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        CountDownLatch latch = new CountDownLatch(500);
        for (int i2 = 0; i2 < 500; ++i2) {
            String message = "my-message-" + i2;
            producer.sendAsync((Object)message.getBytes()).thenAccept(msg -> latch.countDown());
        }
        latch.await();
        HashSet unackMessages = Sets.newHashSet((Object[])new Integer[]{5, 10, 20, 21, 22, 23, 25, 26, 30, 32, 40, 80, 160, 320});
        int receivedMsgCount = 0;
        for (int i3 = 0; i3 < 500; ++i3) {
            Message msg3 = consumer.receive(500, TimeUnit.MILLISECONDS);
            Assert.assertNotNull((Object)msg3);
            if (!unackMessages.contains(i3)) {
                consumer.acknowledge(msg3);
            }
            ++receivedMsgCount;
        }
        Assert.assertEquals((int)500, (int)receivedMsgCount);
        consumer.close();
        if (unloadBundleGracefully) {
            Supplier<NamespaceService> namespaceServiceSupplier = () -> (NamespaceService)Mockito.spy((Object)new NamespaceService(this.pulsar));
            ((PulsarService)Mockito.doReturn(namespaceServiceSupplier).when((Object)this.pulsar)).getNamespaceServiceProvider();
        }
        this.stopBroker();
        this.startBroker();
        consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").subscriptionType(SubscriptionType.Shared).subscribe();
        Set<String> unackMsgs = unackMessages.stream().map(i -> "my-message-" + i).collect(Collectors.toSet());
        HashSet receivedMsgs = Sets.newHashSet();
        for (int i4 = 0; i4 < 500 && (msg2 = consumer.receive(500, TimeUnit.MILLISECONDS)) != null; ++i4) {
            receivedMsgs.add(new String(msg2.getData()));
        }
        unackMsgs.forEach(msg -> Assert.assertTrue((boolean)receivedMsgs.contains(msg)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=10000L)
    public void testBlockBrokerDispatching() {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerBroker();
        double unAckedMessagePercentage = this.pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked();
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        try {
            try {
                int maxUnAckPerBroker = 200;
                double unAckMsgPercentagePerDispatcher = 10.0;
                int maxUnAckPerDispatcher = 20;
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerBroker(200);
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscriptionOnBrokerBlocked(10.0);
                this.stopBroker();
                this.startBroker();
                Field field = BrokerService.class.getDeclaredField("blockedDispatchers");
                field.setAccessible(true);
                ConcurrentOpenHashSet blockedDispatchers = (ConcurrentOpenHashSet)field.get(this.pulsar.getBrokerService());
                int receiverQueueSize = 10;
                int totalProducedMsgs = 600;
                String topicName = "persistent://my-property/my-ns/unacked-topic";
                String subscriberName1 = "subscriber-1";
                String subscriberName2 = "subscriber-2";
                String subscriberName3 = "subscriber-3";
                ConsumerImpl consumer1Sub1 = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
                ConsumerImpl consumer1Sub2 = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-2").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
                consumer1Sub2.close();
                ConsumerImpl consumer1Sub3 = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-3").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
                consumer1Sub3.close();
                Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
                executor.scheduleAtFixedRate(() -> this.pulsar.getBrokerService().checkUnAckMessageDispatching(), 10L, 10L, TimeUnit.MILLISECONDS);
                for (int i = 0; i < 600; ++i) {
                    String message = "my-message-" + i;
                    producer.send((Object)message.getBytes());
                }
                Message msg = null;
                HashSet messages1 = Sets.newHashSet();
                for (int j = 0; j < 600 && (msg = consumer1Sub1.receive(100, TimeUnit.MILLISECONDS)) != null; ++j) {
                    messages1.add(msg.getMessageId());
                    if (j != 200) continue;
                    Thread.sleep(200L);
                }
                Assert.assertNotEquals((Object)messages1.size(), (Object)600);
                PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
                try {
                    int j;
                    ConsumerImpl consumer2Sub1 = (ConsumerImpl)newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
                    int consumer2Msgs = 0;
                    for (int j2 = 0; j2 < 600 && (msg = consumer2Sub1.receive(100, TimeUnit.MILLISECONDS)) != null; ++j2) {
                        ++consumer2Msgs;
                    }
                    Assert.assertEquals((int)consumer2Msgs, (int)0);
                    consumer2Sub1.close();
                    Assert.assertEquals((long)blockedDispatchers.size(), (long)1L);
                    String dispatcherName = ((PersistentDispatcherMultipleConsumers)blockedDispatchers.values().get(0)).getName();
                    String subName = dispatcherName.substring(dispatcherName.lastIndexOf("/") + 2, dispatcherName.length());
                    Assert.assertEquals((String)subName, (String)"subscriber-1");
                    ConsumerImpl consumerSub2 = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-2").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
                    HashSet messages2 = Sets.newHashSet();
                    for (int j3 = 0; j3 < 600 && (msg = consumerSub2.receive(100, TimeUnit.MILLISECONDS)) != null; ++j3) {
                        messages2.add(msg.getMessageId());
                    }
                    Assert.assertEquals((float)messages2.size(), (float)maxUnAckPerDispatcher, (float)10.0f);
                    Assert.assertEquals((long)blockedDispatchers.size(), (long)2L);
                    consumer1Sub3 = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-3").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
                    int consumedMsgsSub3 = 0;
                    for (j = 0; j < 600 && (msg = consumer1Sub3.receive(100, TimeUnit.MILLISECONDS)) != null; ++j) {
                        ++consumedMsgsSub3;
                        consumer1Sub3.acknowledge(msg);
                    }
                    Assert.assertEquals((int)consumedMsgsSub3, (int)600);
                    Assert.assertEquals((long)blockedDispatchers.size(), (long)2L);
                    messages1.forEach(arg_0 -> ((ConsumerImpl)consumer1Sub1).acknowledgeAsync(arg_0));
                    Thread.sleep(1000L);
                    for (j = 0; j < 600 && (msg = consumer1Sub1.receive(1, TimeUnit.SECONDS)) != null; ++j) {
                        messages1.add(msg.getMessageId());
                        consumer1Sub1.acknowledge(msg);
                    }
                    Assert.assertEquals((int)messages1.size(), (int)600);
                    Assert.assertEquals((long)blockedDispatchers.size(), (long)0L);
                    consumerSub2.redeliverUnacknowledgedMessages();
                    AtomicInteger msgReceivedCount = new AtomicInteger(0);
                    CountDownLatch latch = new CountDownLatch(600);
                    for (int j4 = 0; j4 < 600; ++j4) {
                        consumerSub2.receiveAsync().thenAccept(m -> {
                            msgReceivedCount.incrementAndGet();
                            latch.countDown();
                            try {
                                consumerSub2.acknowledge(m);
                            }
                            catch (PulsarClientException e) {
                                Assert.fail((String)"failed to ack msg", (Throwable)e);
                            }
                        });
                    }
                    latch.await();
                    Assert.assertEquals((int)msgReceivedCount.get(), (int)600);
                    consumer1Sub1.close();
                    consumerSub2.close();
                    consumer1Sub3.close();
                    log.info("-- Exiting {} test --", (Object)this.methodName);
                }
                finally {
                    if (Collections.singletonList(newPulsarClient).get(0) != null) {
                        newPulsarClient.close();
                    }
                }
            }
            catch (Exception e) {
                Assert.fail();
            }
            finally {
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerBroker(unAckedMessages);
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscriptionOnBrokerBlocked(unAckedMessagePercentage);
            }
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBrokerDispatchBlockAndSubAckBackRequiredMsgs() {
        log.info("-- Starting {} test --", (Object)this.methodName);
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        try {
            int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerBroker();
            double unAckedMessagePercentage = this.pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked();
            try {
                int maxUnAckPerBroker = 200;
                double unAckMsgPercentagePerDispatcher = 10.0;
                int maxUnAckPerDispatcher = 20;
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerBroker(200);
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscriptionOnBrokerBlocked(10.0);
                this.stopBroker();
                this.startBroker();
                Field field = BrokerService.class.getDeclaredField("blockedDispatchers");
                field.setAccessible(true);
                ConcurrentOpenHashSet blockedDispatchers = (ConcurrentOpenHashSet)field.get(this.pulsar.getBrokerService());
                int receiverQueueSize = 10;
                int totalProducedMsgs = 600;
                String topicName = "persistent://my-property/my-ns/unacked-topic";
                String subscriberName1 = "subscriber-1";
                String subscriberName2 = "subscriber-2";
                ConsumerImpl consumer1Sub1 = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
                ConsumerImpl consumer1Sub2 = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-2").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
                consumer1Sub2.close();
                executor.scheduleAtFixedRate(() -> this.pulsar.getBrokerService().checkUnAckMessageDispatching(), 10L, 10L, TimeUnit.MILLISECONDS);
                Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
                for (int i = 0; i < 600; ++i) {
                    String message = "my-message-" + i;
                    producer.send((Object)message.getBytes());
                }
                Message msg = null;
                HashSet messages1 = Sets.newHashSet();
                for (int j = 0; j < 600 && (msg = consumer1Sub1.receive(100, TimeUnit.MILLISECONDS)) != null; ++j) {
                    messages1.add(msg.getMessageId());
                    if (j != 200) continue;
                    Thread.sleep(200L);
                }
                Assert.assertNotEquals((Object)messages1.size(), (Object)600);
                PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
                try {
                    ConsumerImpl consumer2Sub1 = (ConsumerImpl)newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
                    int consumer2Msgs = 0;
                    for (int j = 0; j < 600 && (msg = consumer2Sub1.receive(100, TimeUnit.MILLISECONDS)) != null; ++j) {
                        ++consumer2Msgs;
                    }
                    Assert.assertEquals((int)consumer2Msgs, (int)0);
                    consumer2Sub1.close();
                    Assert.assertEquals((long)blockedDispatchers.size(), (long)1L);
                    String dispatcherName = ((PersistentDispatcherMultipleConsumers)blockedDispatchers.values().get(0)).getName();
                    String subName = dispatcherName.substring(dispatcherName.lastIndexOf("/") + 2, dispatcherName.length());
                    Assert.assertEquals((String)subName, (String)"subscriber-1");
                    consumer1Sub2 = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-2").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
                    HashSet messages2 = Sets.newHashSet();
                    for (int j = 0; j < 600 && (msg = consumer1Sub2.receive(100, TimeUnit.MILLISECONDS)) != null; ++j) {
                        messages2.add(msg.getMessageId());
                    }
                    Assert.assertEquals((float)messages2.size(), (float)maxUnAckPerDispatcher, (float)10.0f);
                    Assert.assertEquals((long)blockedDispatchers.size(), (long)2L);
                    Iterator itrMsgs = messages2.iterator();
                    int additionalMsgConsumedAfterBlocked = messages2.size() - maxUnAckPerDispatcher + 1;
                    for (int i = 0; i < additionalMsgConsumedAfterBlocked + maxUnAckPerDispatcher / 2; ++i) {
                        consumer1Sub2.acknowledge((MessageId)itrMsgs.next());
                    }
                    Thread.sleep(1000L);
                    Assert.assertEquals((long)blockedDispatchers.size(), (long)1L);
                    for (int j = 0; j < 600 && (msg = consumer1Sub2.receive(200, TimeUnit.MILLISECONDS)) != null; ++j) {
                        messages2.add(msg.getMessageId());
                        consumer1Sub2.acknowledge(msg);
                    }
                    Assert.assertEquals((int)messages2.size(), (int)600);
                    consumer1Sub1.close();
                    consumer1Sub2.close();
                    log.info("-- Exiting {} test --", (Object)this.methodName);
                }
                finally {
                    if (Collections.singletonList(newPulsarClient).get(0) != null) {
                        newPulsarClient.close();
                    }
                }
            }
            catch (Exception e) {
                Assert.fail();
            }
            finally {
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerBroker(unAckedMessages);
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscriptionOnBrokerBlocked(unAckedMessagePercentage);
            }
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    private void rolloverPerIntervalStats() {
        try {
            this.pulsar.getExecutor().submit(() -> this.pulsar.getBrokerService().updateRates()).get();
        }
        catch (Exception e) {
            log.error("Stats executor error", (Throwable)e);
        }
    }
}

