package org.apache.pulsar.client.api;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.testng.collections.Lists;

@Test(groups = {"flaky"})
/* loaded from: input_file:org/apache/pulsar/client/api/DispatcherBlockConsumerTest.class */
public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(DispatcherBlockConsumerTest.class);

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod(alwaysRun = true)
    public void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "gracefulUnload")
    public Object[][] bundleUnloading() {
        return new Object[]{new Object[]{Boolean.TRUE}, new Object[]{Boolean.FALSE}};
    }

    @Test(enabled = false)
    public void testConsumerBlockingWithUnAckedMessagesAtDispatcher() throws Exception {
        Message receive;
        log.info("-- Starting {} test --", this.methodName);
        int maxUnackedMessagesPerSubscription = this.pulsar.getConfiguration().getMaxUnackedMessagesPerSubscription();
        try {
            stopBroker();
            startBroker();
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscription(100);
            ConsumerBuilder subscriptionType = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared);
            List newArrayList = Lists.newArrayList(new Consumer[]{subscriptionType.subscribe(), subscriptionType.subscribe(), subscriptionType.subscribe()});
            Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
            for (int i = 0; i < 200; i++) {
                create.send(("my-message-" + i).getBytes());
            }
            HashMap newHashMap = Maps.newHashMap();
            for (int i2 = 0; i2 < 3; i2++) {
                for (int i3 = 0; i3 < 200 && (receive = ((Consumer) newArrayList.get(i2)).receive(500, TimeUnit.MILLISECONDS)) != null; i3++) {
                    newHashMap.put(receive, (Consumer) newArrayList.get(i2));
                }
            }
            Assert.assertEquals(newHashMap.size(), 100.0f, 30.0f);
            newHashMap.forEach((message, consumer) -> {
                try {
                    consumer.acknowledge(message);
                } catch (PulsarClientException e) {
                    Assert.fail("ack failed", e);
                }
            });
            ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
            int size = 200 - newHashMap.size();
            CountDownLatch countDownLatch = new CountDownLatch(size);
            for (int i4 = 0; i4 < newArrayList.size(); i4++) {
                int i5 = i4;
                for (int i6 = 0; i6 < 200; i6++) {
                    ((Consumer) newArrayList.get(i4)).receiveAsync().thenAccept(message2 -> {
                        newKeySet.add(message2.getMessageId());
                        try {
                            ((Consumer) newArrayList.get(i5)).acknowledge(message2);
                        } catch (PulsarClientException e) {
                            Assert.fail("failed to ack msg", e);
                        }
                        countDownLatch.countDown();
                    });
                }
            }
            countDownLatch.await(10L, TimeUnit.SECONDS);
            Assert.assertEquals(newKeySet.size(), size);
            create.close();
            newArrayList.forEach(consumer2 -> {
                try {
                    consumer2.close();
                } catch (PulsarClientException e) {
                }
            });
            log.info("-- Exiting {} test --", this.methodName);
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerSubscription);
        } catch (Throwable th) {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerSubscription);
            throw th;
        }
    }

    @Test(enabled = false)
    public void testConsumerBlockingWithUnAckedMessagesAndRedelivery() throws Exception {
        Message receive;
        log.info("-- Starting {} test --", this.methodName);
        int maxUnackedMessagesPerSubscription = this.pulsar.getConfiguration().getMaxUnackedMessagesPerSubscription();
        try {
            String str = "persistent://my-property/my-ns/unacked-topic-" + UUID.randomUUID().toString();
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscription(100);
            ConsumerBuilder subscriptionType = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("subscriber-1").receiverQueueSize(10).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Shared);
            List newArrayList = Lists.newArrayList(new ConsumerImpl[]{subscriptionType.subscribe(), subscriptionType.subscribe(), subscriptionType.subscribe()});
            Producer create = this.pulsarClient.newProducer().topic(str).create();
            for (int i = 0; i < 150; i++) {
                create.send(("my-message-" + i).getBytes());
            }
            ArrayListMultimap create2 = ArrayListMultimap.create();
            for (int i2 = 0; i2 < 3; i2++) {
                for (int i3 = 0; i3 < 150 && (receive = ((ConsumerImpl) newArrayList.get(i2)).receive(500, TimeUnit.MILLISECONDS)) != null; i3++) {
                    create2.put((ConsumerImpl) newArrayList.get(i2), receive.getMessageId());
                    log.info("Received message: " + new String(receive.getData()));
                }
            }
            Assert.assertNotEquals(Integer.valueOf(create2.size()), 150);
            create2.asMap().forEach((consumerImpl, collection) -> {
                consumerImpl.redeliverUnacknowledgedMessages((Set) collection.stream().map(messageId -> {
                    return (MessageIdImpl) messageId;
                }).collect(Collectors.toSet()));
            });
            ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
            for (int i4 = 0; i4 < newArrayList.size(); i4++) {
                int i5 = i4;
                for (int i6 = 0; i6 < 150; i6++) {
                    ((ConsumerImpl) newArrayList.get(i4)).receiveAsync().thenAccept(message -> {
                        log.info("Received: {}", new String(message.getData()));
                        newKeySet.add(message.getMessageId());
                        try {
                            ((ConsumerImpl) newArrayList.get(i5)).acknowledge(message);
                        } catch (PulsarClientException e) {
                            Assert.fail("failed to ack msg", e);
                        }
                    });
                }
            }
            while (newKeySet.size() < 150) {
                Thread.sleep(100L);
                log.info("Result Size: " + newKeySet.size());
            }
            Assert.assertEquals(newKeySet.size(), 150);
            Assert.assertTrue(newKeySet.size() >= 150);
            create.close();
            newArrayList.forEach(consumerImpl2 -> {
                try {
                    consumerImpl2.close();
                } catch (PulsarClientException e) {
                }
            });
            log.info("-- Exiting {} test --", this.methodName);
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerSubscription);
        } catch (Throwable th) {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerSubscription);
            throw th;
        }
    }

    @Test(enabled = false)
    public void testCloseConsumerBlockedDispatcher() throws Exception {
        Message receive;
        Message receive2;
        log.info("-- Starting {} test --", this.methodName);
        String str = "persistent://my-property/my-ns/unacked-topic-" + UUID.randomUUID().toString();
        int maxUnackedMessagesPerSubscription = this.pulsar.getConfiguration().getMaxUnackedMessagesPerSubscription();
        try {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscription(100);
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
            Producer create = this.pulsarClient.newProducer().topic(str).create();
            for (int i = 0; i < 200; i++) {
                create.send(("my-message-" + i).getBytes());
            }
            HashMap newHashMap = Maps.newHashMap();
            for (int i2 = 0; i2 < 200 && (receive2 = subscribe.receive(500, TimeUnit.MILLISECONDS)) != null; i2++) {
                newHashMap.put(receive2, subscribe);
                log.info("Received message: " + new String(receive2.getData()));
            }
            Assert.assertEquals(newHashMap.size(), 100.0f, 20.0f);
            subscribe.close();
            Consumer subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
            HashMap newHashMap2 = Maps.newHashMap();
            for (int i3 = 0; i3 < 200 && (receive = subscribe2.receive(5, TimeUnit.SECONDS)) != null; i3++) {
                newHashMap2.put(receive, subscribe2);
                subscribe2.acknowledge(receive);
                log.info("Received message: " + new String(receive.getData()));
            }
            Assert.assertEquals(newHashMap2.size(), 200);
            log.info("-- Exiting {} test --", this.methodName);
            create.close();
            subscribe2.close();
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerSubscription);
        } catch (Throwable th) {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerSubscription);
            throw th;
        }
    }

    @Test(enabled = false)
    public void testRedeliveryOnBlockedDispatcher() throws Exception {
        Message receive;
        Message receive2;
        log.info("-- Starting {} test --", this.methodName);
        int maxUnackedMessagesPerSubscription = this.pulsar.getConfiguration().getMaxUnackedMessagesPerSubscription();
        try {
            String str = "persistent://my-property/my-ns/unacked-topic-" + UUID.randomUUID().toString();
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscription(100);
            ConsumerBuilder subscriptionType = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared);
            List newArrayList = Lists.newArrayList(new ConsumerImpl[]{subscriptionType.subscribe(), subscriptionType.subscribe(), subscriptionType.subscribe()});
            Producer create = this.pulsarClient.newProducer().topic(str).create();
            for (int i = 0; i < 150; i++) {
                create.send(("my-message-" + i).getBytes());
            }
            HashSet newHashSet = Sets.newHashSet();
            for (int i2 = 0; i2 < 3; i2++) {
                for (int i3 = 0; i3 < 150 && (receive2 = ((ConsumerImpl) newArrayList.get(i2)).receive(500, TimeUnit.MILLISECONDS)) != null; i3++) {
                    newHashSet.add(receive2.getMessageId());
                    log.info("Received message: " + new String(receive2.getData()));
                }
            }
            int size = newHashSet.size();
            Assert.assertEquals(size, 100.0f, 30.0f);
            newArrayList.forEach(consumerImpl -> {
                consumerImpl.redeliverUnacknowledgedMessages();
            });
            Thread.sleep(1000L);
            HashMap newHashMap = Maps.newHashMap();
            for (int i4 = 0; i4 < 3; i4++) {
                for (int i5 = 0; i5 < 150 && (receive = ((ConsumerImpl) newArrayList.get(i4)).receive(500, TimeUnit.MILLISECONDS)) != null; i5++) {
                    newHashMap.putIfAbsent((ConsumerImpl) newArrayList.get(i4), Sets.newHashSet());
                    ((Set) newHashMap.get(newArrayList.get(i4))).add(receive.getMessageId());
                    log.info("Received message: " + new String(receive.getData()));
                }
            }
            HashSet newHashSet2 = Sets.newHashSet();
            Collection values = newHashMap.values();
            Objects.requireNonNull(newHashSet2);
            values.forEach((v1) -> {
                r1.addAll(v1);
            });
            Assert.assertEquals(size, newHashSet2.size(), 30.0f);
            newHashMap.forEach((consumerImpl2, set) -> {
                set.forEach(messageId -> {
                    try {
                        consumerImpl2.acknowledge(messageId);
                    } catch (PulsarClientException e) {
                        Assert.fail("ack failed", e);
                    }
                });
            });
            Collection values2 = newHashMap.values();
            Objects.requireNonNull(newHashSet2);
            values2.forEach((v1) -> {
                r1.addAll(v1);
            });
            int size2 = 150 - newHashSet2.size();
            CountDownLatch countDownLatch = new CountDownLatch(size2);
            ConcurrentLinkedQueue newConcurrentLinkedQueue = Queues.newConcurrentLinkedQueue();
            for (int i6 = 0; i6 < newArrayList.size(); i6++) {
                int i7 = i6;
                for (int i8 = 0; i8 < size2; i8++) {
                    ((ConsumerImpl) newArrayList.get(i6)).receiveAsync().thenAccept(message -> {
                        newConcurrentLinkedQueue.add(message.getMessageId());
                        try {
                            ((ConsumerImpl) newArrayList.get(i7)).acknowledge(message);
                        } catch (PulsarClientException e) {
                            Assert.fail("failed to ack", e);
                        }
                        countDownLatch.countDown();
                    });
                }
            }
            countDownLatch.await();
            Assert.assertTrue(newConcurrentLinkedQueue.size() >= size2);
            create.close();
            newArrayList.forEach(consumerImpl3 -> {
                try {
                    consumerImpl3.close();
                } catch (PulsarClientException e) {
                }
            });
            log.info("-- Exiting {} test --", this.methodName);
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerSubscription);
        } catch (Throwable th) {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerSubscription);
            throw th;
        }
    }

    @Test
    public void testBlockDispatcherStats() throws Exception {
        int maxUnackedMessagesPerSubscription = this.conf.getMaxUnackedMessagesPerSubscription();
        try {
            this.conf.setMaxUnackedMessagesPerSubscription(10);
            stopBroker();
            startBroker();
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/blockDispatch"}).subscriptionName("blockDispatch").subscriptionType(SubscriptionType.Shared).subscribe();
            Thread.sleep(100L);
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/blockDispatch").get();
            Assert.assertNotNull(persistentTopic);
            rolloverPerIntervalStats();
            TopicStatsImpl stats = persistentTopic.getStats(false, false);
            SubscriptionStats subscriptionStats = (SubscriptionStats) stats.getSubscriptions().values().iterator().next();
            Assert.assertEquals(stats.getSubscriptions().keySet().size(), 1);
            Assert.assertEquals(subscriptionStats.getMsgBacklog(), 0L);
            Assert.assertEquals(subscriptionStats.getConsumers().size(), 1);
            Producer create = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/blockDispatch").create();
            Thread.sleep(100L);
            for (int i = 0; i < 100; i++) {
                create.send(("my-message-" + i).getBytes());
            }
            Thread.sleep(100L);
            rolloverPerIntervalStats();
            SubscriptionStats subscriptionStats2 = (SubscriptionStats) persistentTopic.getStats(false, false).getSubscriptions().values().iterator().next();
            Assert.assertTrue(subscriptionStats2.getMsgBacklog() > 0);
            Assert.assertTrue(subscriptionStats2.getUnackedMessages() > 0);
            Assert.assertTrue(subscriptionStats2.isBlockedSubscriptionOnUnackedMsgs());
            Assert.assertEquals(((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getUnackedMessages(), subscriptionStats2.getUnackedMessages());
            Assert.assertTrue(((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getMsgRateOut() > 0.0d);
            Assert.assertTrue(((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getMsgThroughputOut() > 0.0d);
            Assert.assertEquals(subscriptionStats2.getMsgRateRedeliver(), 0.0d);
            create.close();
            subscribe.close();
            this.conf.setMaxUnackedMessagesPerSubscription(maxUnackedMessagesPerSubscription);
        } catch (Throwable th) {
            this.conf.setMaxUnackedMessagesPerSubscription(maxUnackedMessagesPerSubscription);
            throw th;
        }
    }

    @Test(dataProvider = "gracefulUnload")
    public void testBrokerSubscriptionRecovery(boolean z) throws Exception {
        Message receive;
        log.info("-- Starting {} test --", this.methodName);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        CountDownLatch countDownLatch = new CountDownLatch(500);
        for (int i = 0; i < 500; i++) {
            create.sendAsync(("my-message-" + i).getBytes()).thenAccept(messageId -> {
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        HashSet newHashSet = Sets.newHashSet(new Integer[]{5, 10, 20, 21, 22, 23, 25, 26, 30, 32, 40, 80, 160, 320});
        int i2 = 0;
        for (int i3 = 0; i3 < 500; i3++) {
            Message receive2 = subscribe.receive(500, TimeUnit.MILLISECONDS);
            Assert.assertNotNull(receive2);
            if (!newHashSet.contains(Integer.valueOf(i3))) {
                subscribe.acknowledge(receive2);
            }
            i2++;
        }
        Assert.assertEquals(500, i2);
        subscribe.close();
        if (z) {
            ((PulsarService) Mockito.doReturn(() -> {
                return (NamespaceService) BrokerTestUtil.spyWithClassAndConstructorArgs(NamespaceService.class, this.pulsar);
            }).when(this.pulsar)).getNamespaceServiceProvider();
        }
        stopBroker();
        startBroker();
        Consumer subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").subscriptionType(SubscriptionType.Shared).subscribe();
        Set set = (Set) newHashSet.stream().map(num -> {
            return "my-message-" + num;
        }).collect(Collectors.toSet());
        HashSet newHashSet2 = Sets.newHashSet();
        for (int i4 = 0; i4 < 500 && (receive = subscribe2.receive(500, TimeUnit.MILLISECONDS)) != null; i4++) {
            newHashSet2.add(new String(receive.getData()));
        }
        set.forEach(str -> {
            Assert.assertTrue(newHashSet2.contains(str));
        });
    }

    /* JADX WARN: Finally extract failed */
    @Test(timeOut = 60000)
    public void testBlockBrokerDispatching() {
        Message receive;
        Message receive2;
        Message receive3;
        Message receive4;
        log.info("-- Starting {} test --", this.methodName);
        int maxUnackedMessagesPerBroker = this.pulsar.getConfiguration().getMaxUnackedMessagesPerBroker();
        double maxUnackedMessagesPerSubscriptionOnBrokerBlocked = this.pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked();
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(1);
        try {
            try {
                try {
                    this.pulsar.getConfiguration().setMaxUnackedMessagesPerBroker(200);
                    this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscriptionOnBrokerBlocked(10.0d);
                    stopBroker();
                    startBroker();
                    Field declaredField = BrokerService.class.getDeclaredField("blockedDispatchers");
                    declaredField.setAccessible(true);
                    ConcurrentOpenHashSet concurrentOpenHashSet = (ConcurrentOpenHashSet) declaredField.get(this.pulsar.getBrokerService());
                    ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
                    this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-2").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe().close();
                    this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-3").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe().close();
                    Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
                    newScheduledThreadPool.scheduleAtFixedRate(() -> {
                        this.pulsar.getBrokerService().checkUnAckMessageDispatching();
                    }, 10L, 10L, TimeUnit.MILLISECONDS);
                    for (int i = 0; i < 600; i++) {
                        create.send(("my-message-" + i).getBytes());
                    }
                    HashSet newHashSet = Sets.newHashSet();
                    for (int i2 = 0; i2 < 600 && (receive4 = subscribe.receive(500, TimeUnit.MILLISECONDS)) != null; i2++) {
                        newHashSet.add(receive4.getMessageId());
                        if (i2 == 200) {
                            Thread.sleep(500L);
                        }
                    }
                    Assert.assertNotEquals(Integer.valueOf(newHashSet.size()), 600);
                    PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
                    try {
                        ConsumerImpl subscribe2 = newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
                        int i3 = 0;
                        for (int i4 = 0; i4 < 600 && subscribe2.receive(500, TimeUnit.MILLISECONDS) != null; i4++) {
                            i3++;
                        }
                        Assert.assertEquals(i3, 0);
                        subscribe2.close();
                        Assert.assertEquals(concurrentOpenHashSet.size(), 1L);
                        String name = ((PersistentDispatcherMultipleConsumers) concurrentOpenHashSet.values().get(0)).getName();
                        Assert.assertEquals(name.substring(name.lastIndexOf("/") + 2, name.length()), "subscriber-1");
                        ConsumerImpl subscribe3 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-2").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
                        HashSet newHashSet2 = Sets.newHashSet();
                        for (int i5 = 0; i5 < 600 && (receive3 = subscribe3.receive(500, TimeUnit.MILLISECONDS)) != null; i5++) {
                            newHashSet2.add(receive3.getMessageId());
                        }
                        Assert.assertEquals(newHashSet2.size(), 20, 10.0f);
                        Assert.assertEquals(concurrentOpenHashSet.size(), 2L);
                        ConsumerImpl subscribe4 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-3").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
                        int i6 = 0;
                        for (int i7 = 0; i7 < 600 && (receive2 = subscribe4.receive()) != null; i7++) {
                            i6++;
                            subscribe4.acknowledge(receive2);
                        }
                        Assert.assertEquals(i6, 600);
                        Assert.assertEquals(concurrentOpenHashSet.size(), 2L);
                        Objects.requireNonNull(subscribe);
                        newHashSet.forEach(subscribe::acknowledgeAsync);
                        Thread.sleep(1000L);
                        for (int i8 = 0; i8 < 600 && (receive = subscribe.receive(1, TimeUnit.SECONDS)) != null; i8++) {
                            newHashSet.add(receive.getMessageId());
                            subscribe.acknowledge(receive);
                        }
                        Assert.assertEquals(newHashSet.size(), 600);
                        Assert.assertEquals(concurrentOpenHashSet.size(), 0L);
                        subscribe3.redeliverUnacknowledgedMessages();
                        AtomicInteger atomicInteger = new AtomicInteger(0);
                        CountDownLatch countDownLatch = new CountDownLatch(600);
                        for (int i9 = 0; i9 < 600; i9++) {
                            subscribe3.receiveAsync().thenAccept(message -> {
                                atomicInteger.incrementAndGet();
                                countDownLatch.countDown();
                                try {
                                    subscribe3.acknowledge(message);
                                } catch (PulsarClientException e) {
                                    Assert.fail("failed to ack msg", e);
                                }
                            });
                        }
                        countDownLatch.await();
                        Assert.assertEquals(atomicInteger.get(), 600);
                        subscribe.close();
                        subscribe3.close();
                        subscribe4.close();
                        log.info("-- Exiting {} test --", this.methodName);
                        if (Collections.singletonList(newPulsarClient).get(0) != null) {
                            newPulsarClient.close();
                        }
                        this.pulsar.getConfiguration().setMaxUnackedMessagesPerBroker(maxUnackedMessagesPerBroker);
                        this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscriptionOnBrokerBlocked(maxUnackedMessagesPerSubscriptionOnBrokerBlocked);
                    } catch (Throwable th) {
                        if (Collections.singletonList(newPulsarClient).get(0) != null) {
                            newPulsarClient.close();
                        }
                        throw th;
                    }
                } catch (Throwable th2) {
                    this.pulsar.getConfiguration().setMaxUnackedMessagesPerBroker(maxUnackedMessagesPerBroker);
                    this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscriptionOnBrokerBlocked(maxUnackedMessagesPerSubscriptionOnBrokerBlocked);
                    throw th2;
                }
            } catch (Exception e) {
                Assert.fail();
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerBroker(maxUnackedMessagesPerBroker);
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscriptionOnBrokerBlocked(maxUnackedMessagesPerSubscriptionOnBrokerBlocked);
            }
        } finally {
            if (Collections.singletonList(newScheduledThreadPool).get(0) != null) {
                newScheduledThreadPool.shutdownNow();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testBrokerDispatchBlockAndSubAckBackRequiredMsgs() {
        Message receive;
        Message receive2;
        Message receive3;
        log.info("-- Starting {} test --", this.methodName);
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(1);
        try {
            int maxUnackedMessagesPerBroker = this.pulsar.getConfiguration().getMaxUnackedMessagesPerBroker();
            double maxUnackedMessagesPerSubscriptionOnBrokerBlocked = this.pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked();
            try {
                try {
                    this.pulsar.getConfiguration().setMaxUnackedMessagesPerBroker(200);
                    this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscriptionOnBrokerBlocked(10.0d);
                    stopBroker();
                    startBroker();
                    Field declaredField = BrokerService.class.getDeclaredField("blockedDispatchers");
                    declaredField.setAccessible(true);
                    ConcurrentOpenHashSet concurrentOpenHashSet = (ConcurrentOpenHashSet) declaredField.get(this.pulsar.getBrokerService());
                    ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
                    this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-2").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe().close();
                    newScheduledThreadPool.scheduleAtFixedRate(() -> {
                        this.pulsar.getBrokerService().checkUnAckMessageDispatching();
                    }, 10L, 10L, TimeUnit.MILLISECONDS);
                    Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
                    for (int i = 0; i < 600; i++) {
                        create.send(("my-message-" + i).getBytes());
                    }
                    HashSet newHashSet = Sets.newHashSet();
                    for (int i2 = 0; i2 < 600 && (receive3 = subscribe.receive(100, TimeUnit.MILLISECONDS)) != null; i2++) {
                        newHashSet.add(receive3.getMessageId());
                        if (i2 == 200) {
                            Thread.sleep(200L);
                        }
                    }
                    Assert.assertNotEquals(Integer.valueOf(newHashSet.size()), 600);
                    PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
                    try {
                        ConsumerImpl subscribe2 = newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
                        int i3 = 0;
                        for (int i4 = 0; i4 < 600 && subscribe2.receive(100, TimeUnit.MILLISECONDS) != null; i4++) {
                            i3++;
                        }
                        Assert.assertEquals(i3, 0);
                        subscribe2.close();
                        Assert.assertEquals(concurrentOpenHashSet.size(), 1L);
                        String name = ((PersistentDispatcherMultipleConsumers) concurrentOpenHashSet.values().get(0)).getName();
                        Assert.assertEquals(name.substring(name.lastIndexOf("/") + 2, name.length()), "subscriber-1");
                        ConsumerImpl subscribe3 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-2").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
                        HashSet newHashSet2 = Sets.newHashSet();
                        for (int i5 = 0; i5 < 600 && (receive2 = subscribe3.receive(100, TimeUnit.MILLISECONDS)) != null; i5++) {
                            newHashSet2.add(receive2.getMessageId());
                        }
                        Assert.assertEquals(newHashSet2.size(), 20, 10.0f);
                        Assert.assertEquals(concurrentOpenHashSet.size(), 2L);
                        Iterator it = newHashSet2.iterator();
                        int size = (newHashSet2.size() - 20) + 1;
                        for (int i6 = 0; i6 < size + (20 / 2); i6++) {
                            subscribe3.acknowledge((MessageId) it.next());
                        }
                        Thread.sleep(1000L);
                        Assert.assertEquals(concurrentOpenHashSet.size(), 1L);
                        for (int i7 = 0; i7 < 600 && (receive = subscribe3.receive(200, TimeUnit.MILLISECONDS)) != null; i7++) {
                            newHashSet2.add(receive.getMessageId());
                            subscribe3.acknowledge(receive);
                        }
                        Assert.assertEquals(newHashSet2.size(), 600);
                        subscribe.close();
                        subscribe3.close();
                        log.info("-- Exiting {} test --", this.methodName);
                        if (Collections.singletonList(newPulsarClient).get(0) != null) {
                            newPulsarClient.close();
                        }
                        this.pulsar.getConfiguration().setMaxUnackedMessagesPerBroker(maxUnackedMessagesPerBroker);
                        this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscriptionOnBrokerBlocked(maxUnackedMessagesPerSubscriptionOnBrokerBlocked);
                    } catch (Throwable th) {
                        if (Collections.singletonList(newPulsarClient).get(0) != null) {
                            newPulsarClient.close();
                        }
                        throw th;
                    }
                } catch (Throwable th2) {
                    this.pulsar.getConfiguration().setMaxUnackedMessagesPerBroker(maxUnackedMessagesPerBroker);
                    this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscriptionOnBrokerBlocked(maxUnackedMessagesPerSubscriptionOnBrokerBlocked);
                    throw th2;
                }
            } catch (Exception e) {
                Assert.fail();
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerBroker(maxUnackedMessagesPerBroker);
                this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscriptionOnBrokerBlocked(maxUnackedMessagesPerSubscriptionOnBrokerBlocked);
            }
        } finally {
            if (Collections.singletonList(newScheduledThreadPool).get(0) != null) {
                newScheduledThreadPool.shutdownNow();
            }
        }
    }

    private void rolloverPerIntervalStats() {
        try {
            this.pulsar.getExecutor().submit(() -> {
                this.pulsar.getBrokerService().updateRates();
            }).get();
        } catch (Exception e) {
            log.error("Stats executor error", e);
        }
    }
}
