package org.apache.pulsar.broker.stats;

import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.io.ByteArrayOutputStream;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.PrometheusMetricsTestUtil;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.PendingAcksMap;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.StickyKeyDispatcher;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilterProducerTest;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.assertj.core.groups.Tuple;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.SkipException;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/stats/ConsumerStatsTest.class */
public class ConsumerStatsTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(ConsumerStatsTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    public ServiceConfiguration getDefaultConf() {
        ServiceConfiguration defaultConf = super.getDefaultConf();
        defaultConf.setMaxUnackedMessagesPerConsumer(0);
        defaultConf.setBrokerShutdownTimeoutMs(5000L);
        return defaultConf;
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer() throws PulsarClientException, InterruptedException, PulsarAdminException {
        Assert.assertEquals(this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer(), 0);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer"}).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscriptionName("sub").subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer").create();
        for (int i = 0; i < 10; i++) {
            create.send(("message-" + i).getBytes());
        }
        int i2 = 0;
        for (int i3 = 0; i3 < 10; i3++) {
            subscribe.receive();
            i2++;
        }
        Assert.assertEquals(i2, 10);
        int i4 = 0;
        TopicStats stats = this.admin.topics().getStats("persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer");
        Assert.assertEquals(stats.getSubscriptions().size(), 1);
        Assert.assertEquals(((SubscriptionStats) ((Map.Entry) stats.getSubscriptions().entrySet().iterator().next()).getValue()).getConsumers().size(), 1);
        Assert.assertFalse(((ConsumerStats) ((SubscriptionStats) ((Map.Entry) stats.getSubscriptions().entrySet().iterator().next()).getValue()).getConsumers().get(0)).isBlockedConsumerOnUnackedMsgs());
        Assert.assertEquals(((ConsumerStats) ((SubscriptionStats) ((Map.Entry) stats.getSubscriptions().entrySet().iterator().next()).getValue()).getConsumers().get(0)).getUnackedMessages(), 10);
        for (int i5 = 0; i5 < 10; i5++) {
            subscribe.acknowledge(subscribe.receive());
            i4++;
        }
        Assert.assertEquals(i4, 10);
        Thread.sleep(2000L);
        TopicStats stats2 = this.admin.topics().getStats("persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer");
        Assert.assertFalse(((ConsumerStats) ((SubscriptionStats) ((Map.Entry) stats2.getSubscriptions().entrySet().iterator().next()).getValue()).getConsumers().get(0)).isBlockedConsumerOnUnackedMsgs());
        Assert.assertEquals(((ConsumerStats) ((SubscriptionStats) ((Map.Entry) stats2.getSubscriptions().entrySet().iterator().next()).getValue()).getConsumers().get(0)).getUnackedMessages(), 0);
    }

    @Test
    public void testAckStatsOnPartitionedTopicForExclusiveSubscription() throws PulsarAdminException, PulsarClientException, InterruptedException {
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription", 3);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription"}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("sub").subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription").create();
        for (int i = 0; i < 10; i++) {
            create.send(("message-" + i).getBytes());
        }
        int i2 = 0;
        for (int i3 = 0; i3 < 10; i3++) {
            subscribe.acknowledge(subscribe.receive());
            i2++;
        }
        Assert.assertEquals(10, i2);
        Thread.sleep(2000L);
        for (int i4 = 0; i4 < 3; i4++) {
            TopicStats stats = this.admin.topics().getStats("persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription-partition-" + i4);
            Assert.assertEquals(stats.getSubscriptions().size(), 1);
            Assert.assertEquals(((SubscriptionStats) ((Map.Entry) stats.getSubscriptions().entrySet().iterator().next()).getValue()).getConsumers().size(), 1);
            Assert.assertEquals(((ConsumerStats) ((SubscriptionStats) ((Map.Entry) stats.getSubscriptions().entrySet().iterator().next()).getValue()).getConsumers().get(0)).getUnackedMessages(), 0);
        }
    }

    @Test
    public void testUpdateStatsForActiveConsumerAndSubscription() throws Exception {
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://public/default/testUpdateStatsForActiveConsumerAndSubscription"}).subscriptionType(SubscriptionType.Shared).subscriptionName("my-subscription").subscribe();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://public/default/testUpdateStatsForActiveConsumerAndSubscription").get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertEquals(persistentTopic.getSubscriptions().size(), 1);
        List consumers = ((PersistentSubscription) persistentTopic.getSubscriptions().get("my-subscription")).getConsumers();
        Assert.assertEquals(consumers.size(), 1);
        ConsumerStatsImpl consumerStatsImpl = new ConsumerStatsImpl();
        consumerStatsImpl.msgOutCounter = 10L;
        consumerStatsImpl.bytesOutCounter = 1280L;
        ((org.apache.pulsar.broker.service.Consumer) consumers.get(0)).updateStats(consumerStatsImpl);
        ConsumerStatsImpl stats = ((org.apache.pulsar.broker.service.Consumer) consumers.get(0)).getStats();
        Assert.assertEquals(stats.getMsgOutCounter(), 10L);
        Assert.assertEquals(stats.getBytesOutCounter(), 1280L);
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "classicAndSubscriptionType")
    public Object[][] classicAndSubscriptionType() {
        return new Object[]{new Object[]{false, SubscriptionType.Shared}, new Object[]{true, SubscriptionType.Key_Shared}, new Object[]{false, SubscriptionType.Key_Shared}};
    }

    @Test(dataProvider = "classicAndSubscriptionType")
    public void testConsumerStatsOutput(boolean z, SubscriptionType subscriptionType) throws Exception {
        if (this instanceof AuthenticatedConsumerStatsTest) {
            throw new SkipException("Skip test for AuthenticatedConsumerStatsTest");
        }
        this.conf.setSubscriptionSharedUseClassicPersistentImplementation(z);
        this.conf.setSubscriptionKeySharedUseClassicPersistentImplementation(z);
        HashSet newHashSet = Sets.newHashSet(new String[]{"msgRateOut", "msgThroughputOut", "bytesOutCounter", "msgOutCounter", "messageAckRate", "msgRateRedeliver", "chunkedMessageRate", "consumerName", "availablePermits", "unackedMessages", "avgMessagesPerEntry", "blockedConsumerOnUnackedMsgs", "lastAckedTime", "lastAckedTimestamp", "lastConsumedTime", "lastConsumedTimestamp", "lastConsumedFlowTimestamp", "metadata", "address", "connectedSince", "clientVersion", "drainingHashesCount", "drainingHashesClearedTotal", "drainingHashesUnackedMessages"});
        if (subscriptionType == SubscriptionType.Key_Shared) {
            if (z) {
                newHashSet.addAll(List.of("readPositionWhenJoining", "keyHashRanges"));
            } else {
                newHashSet.addAll(List.of("drainingHashes", "keyHashRangeArrays"));
            }
        }
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/testConsumerStatsOutput");
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{newUniqueName}).subscriptionType(subscriptionType).subscriptionName("my-subscription").subscribe();
        try {
            Assertions.assertThat(ObjectMapperFactory.create().readTree(BrokerTestUtil.getJsonResourceAsString(String.format("%s/admin/v2/%s/stats", this.pulsar.getWebServiceAddress(), newUniqueName.replace("://", "/")))).get("subscriptions").get("my-subscription").get("consumers").get(0).fieldNames()).toIterable().containsExactlyInAnyOrderElementsOf(newHashSet);
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
            throw th;
        }
    }

    @Test
    public void testLastConsumerFlowTimestamp() throws PulsarClientException, PulsarAdminException {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/testLastConsumerFlowTimestamp");
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{newUniqueName}).subscriptionType(SubscriptionType.Shared).subscriptionName("my-subscription").subscribe();
        try {
            Assert.assertTrue(((ConsumerStats) ((SubscriptionStats) this.admin.topics().getStats(newUniqueName).getSubscriptions().get("my-subscription")).getConsumers().get(0)).getLastConsumedFlowTimestamp() > 0);
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
            throw th;
        }
    }

    @Test
    public void testPersistentTopicMessageAckRateMetricTopicLevel() throws Exception {
        testMessageAckRateMetric("persistent://public/default/msg_ack_rate" + UUID.randomUUID(), true);
    }

    @Test
    public void testPersistentTopicMessageAckRateMetricNamespaceLevel() throws Exception {
        testMessageAckRateMetric("persistent://public/default/msg_ack_rate" + UUID.randomUUID(), false);
    }

    /* JADX WARN: Finally extract failed */
    private void testMessageAckRateMetric(String str, boolean z) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1000);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).enableBatching(true).batchingMaxMessages(10).create();
        try {
            MessageListener messageListener = (consumer, message) -> {
                try {
                    consumer.acknowledge(message);
                    countDownLatch.countDown();
                } catch (PulsarClientException e) {
                }
            };
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("test_sub").subscriptionType(SubscriptionType.Shared).messageListener(messageListener).subscribe();
            try {
                Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("test_sub").subscriptionType(SubscriptionType.Shared).messageListener(messageListener).subscribe();
                try {
                    String namespace = TopicName.get(str).getNamespace();
                    for (int i = 0; i < 1000; i++) {
                        create.sendAsync(UUID.randomUUID().toString());
                    }
                    create.flush();
                    countDownLatch.await(20L, TimeUnit.SECONDS);
                    TimeUnit.SECONDS.sleep(1L);
                    List consumers = ((Topic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).get()).get()).getSubscription("test_sub").getConsumers();
                    Assert.assertEquals(consumers.size(), 2);
                    org.apache.pulsar.broker.service.Consumer consumer2 = (org.apache.pulsar.broker.service.Consumer) consumers.get(0);
                    org.apache.pulsar.broker.service.Consumer consumer3 = (org.apache.pulsar.broker.service.Consumer) consumers.get(1);
                    consumer2.updateRates();
                    consumer3.updateRates();
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                    PrometheusMetricsTestUtil.generate(this.pulsar, z, true, true, byteArrayOutputStream);
                    Multimap parseMetrics = PrometheusMetricsClient.parseMetrics(byteArrayOutputStream.toString(StandardCharsets.UTF_8));
                    Collection collection = parseMetrics.get("pulsar_consumer_msg_ack_rate");
                    Collection collection2 = parseMetrics.get(z ? "pulsar_consumer_msg_rate_out" : "pulsar_rate_out");
                    Assert.assertTrue(collection.size() > 0);
                    Assert.assertTrue(collection2.size() > 0);
                    if (z) {
                        String consumerName = consumer2.consumerName();
                        String consumerName2 = consumer3.consumerName();
                        double sum = collection.stream().filter(metric -> {
                            return ((String) metric.tags.get("consumer_name")).equals(consumerName) || ((String) metric.tags.get("consumer_name")).equals(consumerName2);
                        }).mapToDouble(metric2 -> {
                            return metric2.value;
                        }).sum();
                        double sum2 = collection2.stream().filter(metric3 -> {
                            return ((String) metric3.tags.get("consumer_name")).equals(consumerName) || ((String) metric3.tags.get("consumer_name")).equals(consumerName2);
                        }).mapToDouble(metric4 -> {
                            return metric4.value;
                        }).sum();
                        Assert.assertTrue(sum > 0.0d);
                        Assert.assertTrue(sum2 > 0.0d);
                        Assert.assertEquals(sum, sum2, sum2 * 0.1d);
                    } else {
                        double sum3 = collection.stream().filter(metric5 -> {
                            return namespace.equals(metric5.tags.get("namespace"));
                        }).mapToDouble(metric6 -> {
                            return metric6.value;
                        }).sum();
                        double sum4 = collection2.stream().filter(metric7 -> {
                            return namespace.equals(metric7.tags.get("namespace"));
                        }).mapToDouble(metric8 -> {
                            return metric8.value;
                        }).sum();
                        Assert.assertTrue(sum3 > 0.0d);
                        Assert.assertTrue(sum4 > 0.0d);
                        Assert.assertEquals(sum3, sum4, sum4 * 0.1d);
                    }
                    if (Collections.singletonList(subscribe2).get(0) != null) {
                        subscribe2.close();
                    }
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe2).get(0) != null) {
                        subscribe2.close();
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th2;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testAvgMessagesPerEntry() throws Exception {
        this.conf.setAllowOverrideEntryFilters(true);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).producerName("producer1").enableBatching(true).topic("persistent://public/default/testFilterState").batchingMaxMessages(20).batchingMaxPublishDelay(5L, TimeUnit.SECONDS).batchingMaxBytes(Integer.MAX_VALUE).create();
        create.send("first-message");
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 20; i++) {
            arrayList.add(create.sendAsync("message"));
        }
        FutureUtil.waitForAll(arrayList);
        create.close();
        Producer create2 = this.pulsarClient.newProducer(Schema.STRING).producerName("producer2").enableBatching(false).topic("persistent://public/default/testFilterState").create();
        create2.newMessage().value("producer2-message").send();
        create2.close();
        Pair of = Pair.of("filter", List.of((EntryFilterWithClassLoader) BrokerTestUtil.spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, new EntryFilterProducerTest(), (NarClassLoader) Mockito.mock(NarClassLoader.class), false)));
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://public/default/testFilterState").get();
        Field declaredField = persistentTopic.getClass().getSuperclass().getDeclaredField("entryFilters");
        declaredField.setAccessible(true);
        declaredField.set(persistentTopic, of);
        HashMap hashMap = new HashMap();
        hashMap.put("matchValueAccept", "producer1");
        hashMap.put("matchValueReschedule", "producer2");
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://public/default/testFilterState"}).properties(hashMap).subscriptionName("sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        int i2 = 0;
        while (true) {
            try {
                Message receive = subscribe.receive(10, TimeUnit.SECONDS);
                if (receive == null) {
                    break;
                }
                i2++;
                Assert.assertNotEquals(receive.getValue(), "producer2-message");
                subscribe.acknowledge(receive);
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        }
        AssertJUnit.assertEquals(21, i2);
        ConsumerStats consumerStats = (ConsumerStats) ((SubscriptionStats) this.admin.topics().getStats("persistent://public/default/testFilterState").getSubscriptions().get("sub")).getConsumers().get(0);
        AssertJUnit.assertEquals(21L, consumerStats.getMsgOutCounter());
        AssertJUnit.assertEquals(3, consumerStats.getAvgMessagesPerEntry());
        if (Collections.singletonList(subscribe).get(0) != null) {
            subscribe.close();
        }
    }

    @Test
    public void testNonPersistentTopicSharedSubscriptionUnackedMessages() throws Exception {
        String str = "non-persistent://my-property/my-ns/my-topic" + UUID.randomUUID();
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-sub").subscriptionType(SubscriptionType.Shared).subscribe();
            for (int i = 0; i < 5; i++) {
                try {
                    create.send(("message-" + i).getBytes());
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    throw th;
                }
            }
            for (int i2 = 0; i2 < 5; i2++) {
                subscribe.acknowledge(subscribe.receive(5, TimeUnit.SECONDS));
            }
            TimeUnit.SECONDS.sleep(1L);
            TopicStats stats = this.admin.topics().getStats(str);
            AssertJUnit.assertEquals(1, stats.getSubscriptions().size());
            List consumers = ((SubscriptionStats) stats.getSubscriptions().get("my-sub")).getConsumers();
            AssertJUnit.assertEquals(1, consumers.size());
            AssertJUnit.assertEquals(0, ((ConsumerStats) consumers.get(0)).getUnackedMessages());
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testKeySharedDrainingHashesConsumerStats() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("testKeySharedDrainingHashesConsumerStats");
        String str = "sub";
        Producer create = this.pulsarClient.newProducer(Schema.INT32).topic(newUniqueName).enableBatching(false).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{newUniqueName}).consumerName("c1").receiverQueueSize(100).subscriptionName("sub").subscriptionType(SubscriptionType.Key_Shared).subscribe();
            try {
                StickyKeyDispatcher dispatcher = getDispatcher(newUniqueName, "sub");
                StickyKeyConsumerSelector selector = dispatcher.getSelector();
                for (int i = 0; i < 20; i++) {
                    String valueOf = String.valueOf(i % 10);
                    log.info("Sending message with value {} key {} hash {}", new Object[]{valueOf, Integer.valueOf(i), Integer.valueOf(selector.makeStickyKeyHash(valueOf.getBytes(StandardCharsets.UTF_8)))});
                    create.newMessage().key(valueOf).value(Integer.valueOf(i)).send();
                }
                PendingAcksMap pendingAcks = ((org.apache.pulsar.broker.service.Consumer) dispatcher.getConsumers().get(0)).getPendingAcks();
                Awaitility.await().ignoreExceptions().until(() -> {
                    return Boolean.valueOf(pendingAcks.size() == 20);
                });
                Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{newUniqueName}).consumerName("c2").subscriptionName("sub").subscriptionType(SubscriptionType.Key_Shared).subscribe();
                try {
                    SubscriptionStats subscriptionStats = (SubscriptionStats) this.admin.topics().getStats(newUniqueName).getSubscriptions().get("sub");
                    ConsumerStats consumerStats = (ConsumerStats) subscriptionStats.getConsumers().get(0);
                    ConsumerStats consumerStats2 = (ConsumerStats) subscriptionStats.getConsumers().get(1);
                    HashSet hashSet = new HashSet();
                    HashSet hashSet2 = new HashSet();
                    HashMap hashMap = new HashMap();
                    int i2 = 0;
                    for (int i3 = 0; i3 < 20; i3++) {
                        int makeStickyKeyHash = selector.makeStickyKeyHash(String.valueOf(i3 % 10).getBytes(StandardCharsets.UTF_8));
                        if ("c2".equals(findConsumerNameForHash(subscriptionStats, makeStickyKeyHash))) {
                            hashSet.add(Integer.valueOf(makeStickyKeyHash));
                        }
                        if ("c2".equals(selector.select(makeStickyKeyHash).consumerName())) {
                            hashSet2.add(Integer.valueOf(makeStickyKeyHash));
                            hashMap.compute(Integer.valueOf(makeStickyKeyHash), (num, num2) -> {
                                return Integer.valueOf(num2 == null ? 1 : num2.intValue() + 1);
                            });
                            i2++;
                        }
                    }
                    Assertions.assertThat(hashSet).containsExactlyInAnyOrderElementsOf(hashSet2);
                    Assertions.assertThat(consumerStats.getDrainingHashes()).extracting((v0) -> {
                        return v0.getHash();
                    }).containsExactlyInAnyOrderElementsOf(hashSet);
                    Assertions.assertThat(consumerStats.getDrainingHashes()).extracting(new Function[]{(v0) -> {
                        return v0.getHash();
                    }, (v0) -> {
                        return v0.getUnackMsgs();
                    }}).containsExactlyInAnyOrderElementsOf(hashMap.entrySet().stream().map(entry -> {
                        return Tuple.tuple(new Object[]{entry.getKey(), entry.getValue()});
                    }).toList());
                    Assertions.assertThat(consumerStats2.getDrainingHashes()).isEmpty();
                    Assertions.assertThat(consumerStats.getDrainingHashesCount()).isEqualTo(hashSet.size());
                    Assertions.assertThat(consumerStats.getDrainingHashesClearedTotal()).isEqualTo(0L);
                    Assertions.assertThat(consumerStats.getDrainingHashesUnackedMessages()).isEqualTo(i2);
                    Assertions.assertThat(consumerStats2.getDrainingHashesCount()).isEqualTo(0);
                    Assertions.assertThat(consumerStats2.getDrainingHashesClearedTotal()).isEqualTo(0L);
                    Assertions.assertThat(consumerStats2.getDrainingHashesUnackedMessages()).isEqualTo(0);
                    for (int i4 = 0; i4 < 20; i4++) {
                        create.newMessage().key(String.valueOf(i4 % 10)).value(Integer.valueOf(i4)).send();
                    }
                    Awaitility.await().ignoreExceptions().untilAsserted(() -> {
                        Assertions.assertThat(((ConsumerStats) ((SubscriptionStats) this.admin.topics().getStats(newUniqueName).getSubscriptions().get(str)).getConsumers().get(0)).getDrainingHashes()).isNotEmpty().allSatisfy(drainingHash -> {
                            Assertions.assertThat(drainingHash).extracting((v0) -> {
                                return v0.getBlockedAttempts();
                            }).asInstanceOf(InstanceOfAssertFactories.INTEGER).isGreaterThan(0);
                        });
                    });
                    for (int i5 = 0; i5 < 20; i5++) {
                        Message receive = subscribe.receive(1, TimeUnit.SECONDS);
                        log.info("Acking message with value {} key {}", receive.getValue(), receive.getKey());
                        subscribe.acknowledge(receive);
                        if (i5 == 18) {
                            Awaitility.await().pollInterval(Duration.ofSeconds(1L)).atMost(Duration.ofSeconds(3L)).untilAsserted(() -> {
                                Assertions.assertThat((ConsumerStats) ((SubscriptionStats) this.admin.topics().getStats(newUniqueName).getSubscriptions().get(str)).getConsumers().get(0)).satisfies(new ThrowingConsumer[]{consumerStats3 -> {
                                    ((ObjectAssert) Assertions.assertThat(consumerStats3).describedAs("Consumer stats should have one draining hash %s", new Object[]{consumerStats3})).extracting((v0) -> {
                                        return v0.getDrainingHashes();
                                    }).asList().hasSize(1);
                                }});
                            });
                        }
                        if (i5 == 19) {
                            Awaitility.await().pollInterval(Duration.ofSeconds(1L)).atMost(Duration.ofSeconds(3L)).untilAsserted(() -> {
                                Assertions.assertThat((ConsumerStats) ((SubscriptionStats) this.admin.topics().getStats(newUniqueName).getSubscriptions().get(str)).getConsumers().get(0)).satisfies(new ThrowingConsumer[]{consumerStats3 -> {
                                    Assertions.assertThat(consumerStats3).extracting((v0) -> {
                                        return v0.getDrainingHashes();
                                    }).asList().isEmpty();
                                }});
                            });
                        }
                    }
                    SubscriptionStats subscriptionStats2 = (SubscriptionStats) this.admin.topics().getStats(newUniqueName).getSubscriptions().get("sub");
                    ConsumerStats consumerStats3 = (ConsumerStats) subscriptionStats2.getConsumers().get(0);
                    ConsumerStats consumerStats4 = (ConsumerStats) subscriptionStats2.getConsumers().get(1);
                    Assertions.assertThat(consumerStats3.getDrainingHashesCount()).isEqualTo(0);
                    Assertions.assertThat(consumerStats3.getDrainingHashesClearedTotal()).isEqualTo(hashSet.size());
                    Assertions.assertThat(consumerStats3.getDrainingHashesUnackedMessages()).isEqualTo(0);
                    Assertions.assertThat(consumerStats4.getDrainingHashesCount()).isEqualTo(0);
                    Assertions.assertThat(consumerStats4.getDrainingHashesClearedTotal()).isEqualTo(0L);
                    Assertions.assertThat(consumerStats4.getDrainingHashesUnackedMessages()).isEqualTo(0);
                    if (Collections.singletonList(subscribe2).get(0) != null) {
                        subscribe2.close();
                    }
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe2).get(0) != null) {
                        subscribe2.close();
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th2;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    private String findConsumerNameForHash(SubscriptionStats subscriptionStats, int i) {
        return (String) findConsumerForHash(subscriptionStats, i).map((v0) -> {
            return v0.getConsumerName();
        }).orElse(null);
    }

    private Optional<? extends ConsumerStats> findConsumerForHash(SubscriptionStats subscriptionStats, int i) {
        return subscriptionStats.getConsumers().stream().filter(consumerStats -> {
            return consumerStats.getKeyHashRangeArrays().stream().anyMatch(iArr -> {
                return iArr[0] <= i && iArr[1] >= i;
            });
        }).findFirst();
    }

    private StickyKeyDispatcher getDispatcher(String str, String str2) {
        return ((Topic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get()).getSubscription(str2).getDispatcher();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 889373328:
                if (implMethodName.equals("lambda$testMessageAckRateMetric$35305259$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case SHARED_VALUE:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/broker/stats/ConsumerStatsTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/CountDownLatch;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    CountDownLatch countDownLatch = (CountDownLatch) serializedLambda.getCapturedArg(0);
                    return (consumer, message) -> {
                        try {
                            consumer.acknowledge(message);
                            countDownLatch.countDown();
                        } catch (PulsarClientException e) {
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
