package org.apache.pulsar.broker.stats;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.io.ByteArrayOutputStream;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilterProducerTest;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/stats/ConsumerStatsTest.class */
public class ConsumerStatsTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(ConsumerStatsTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setMaxUnackedMessagesPerConsumer(0);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer() throws PulsarClientException, InterruptedException, PulsarAdminException {
        Assert.assertEquals(this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer(), 0);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer"}).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscriptionName("sub").subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer").create();
        for (int i = 0; i < 10; i++) {
            create.send(("message-" + i).getBytes());
        }
        int i2 = 0;
        for (int i3 = 0; i3 < 10; i3++) {
            subscribe.receive();
            i2++;
        }
        Assert.assertEquals(i2, 10);
        int i4 = 0;
        TopicStats stats = this.admin.topics().getStats("persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer");
        Assert.assertEquals(stats.getSubscriptions().size(), 1);
        Assert.assertEquals(((SubscriptionStats) ((Map.Entry) stats.getSubscriptions().entrySet().iterator().next()).getValue()).getConsumers().size(), 1);
        Assert.assertFalse(((ConsumerStats) ((SubscriptionStats) ((Map.Entry) stats.getSubscriptions().entrySet().iterator().next()).getValue()).getConsumers().get(0)).isBlockedConsumerOnUnackedMsgs());
        Assert.assertEquals(((ConsumerStats) ((SubscriptionStats) ((Map.Entry) stats.getSubscriptions().entrySet().iterator().next()).getValue()).getConsumers().get(0)).getUnackedMessages(), 10);
        for (int i5 = 0; i5 < 10; i5++) {
            subscribe.acknowledge(subscribe.receive());
            i4++;
        }
        Assert.assertEquals(i4, 10);
        Thread.sleep(2000L);
        TopicStats stats2 = this.admin.topics().getStats("persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer");
        Assert.assertFalse(((ConsumerStats) ((SubscriptionStats) ((Map.Entry) stats2.getSubscriptions().entrySet().iterator().next()).getValue()).getConsumers().get(0)).isBlockedConsumerOnUnackedMsgs());
        Assert.assertEquals(((ConsumerStats) ((SubscriptionStats) ((Map.Entry) stats2.getSubscriptions().entrySet().iterator().next()).getValue()).getConsumers().get(0)).getUnackedMessages(), 0);
    }

    @Test
    public void testAckStatsOnPartitionedTopicForExclusiveSubscription() throws PulsarAdminException, PulsarClientException, InterruptedException {
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription", 3);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription"}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("sub").subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription").create();
        for (int i = 0; i < 10; i++) {
            create.send(("message-" + i).getBytes());
        }
        int i2 = 0;
        for (int i3 = 0; i3 < 10; i3++) {
            subscribe.acknowledge(subscribe.receive());
            i2++;
        }
        Assert.assertEquals(10, i2);
        Thread.sleep(2000L);
        for (int i4 = 0; i4 < 3; i4++) {
            TopicStats stats = this.admin.topics().getStats("persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription-partition-" + i4);
            Assert.assertEquals(stats.getSubscriptions().size(), 1);
            Assert.assertEquals(((SubscriptionStats) ((Map.Entry) stats.getSubscriptions().entrySet().iterator().next()).getValue()).getConsumers().size(), 1);
            Assert.assertEquals(((ConsumerStats) ((SubscriptionStats) ((Map.Entry) stats.getSubscriptions().entrySet().iterator().next()).getValue()).getConsumers().get(0)).getUnackedMessages(), 0);
        }
    }

    @Test
    public void testUpdateStatsForActiveConsumerAndSubscription() throws Exception {
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testUpdateStatsForActiveConsumerAndSubscription"}).subscriptionType(SubscriptionType.Shared).subscriptionName("my-subscription").subscribe();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/testUpdateStatsForActiveConsumerAndSubscription").get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertEquals(persistentTopic.getSubscriptions().size(), 1L);
        List consumers = ((PersistentSubscription) persistentTopic.getSubscriptions().get("my-subscription")).getConsumers();
        Assert.assertEquals(consumers.size(), 1);
        ConsumerStatsImpl consumerStatsImpl = new ConsumerStatsImpl();
        consumerStatsImpl.msgOutCounter = 10L;
        consumerStatsImpl.bytesOutCounter = 1280L;
        ((org.apache.pulsar.broker.service.Consumer) consumers.get(0)).updateStats(consumerStatsImpl);
        ConsumerStatsImpl stats = ((org.apache.pulsar.broker.service.Consumer) consumers.get(0)).getStats();
        Assert.assertEquals(stats.getMsgOutCounter(), 10L);
        Assert.assertEquals(stats.getBytesOutCounter(), 1280L);
    }

    @Test
    public void testConsumerStatsOutput() throws Exception {
        HashSet newHashSet = Sets.newHashSet(new String[]{"msgRateOut", "msgThroughputOut", "bytesOutCounter", "msgOutCounter", "messageAckRate", "msgRateRedeliver", "chunkedMessageRate", "consumerName", "availablePermits", "unackedMessages", "avgMessagesPerEntry", "blockedConsumerOnUnackedMsgs", "readPositionWhenJoining", "lastAckedTimestamp", "lastConsumedTimestamp", "lastConsumedFlowTimestamp", "keyHashRanges", "metadata", "address", "connectedSince", "clientVersion"});
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testConsumerStatsOutput"}).subscriptionType(SubscriptionType.Shared).subscriptionName("my-subscription").subscribe();
        TopicStats stats = this.admin.topics().getStats("persistent://prop/use/ns-abc/testConsumerStatsOutput");
        ObjectMapper create = ObjectMapperFactory.create();
        ConsumerStats consumerStats = (ConsumerStats) ((SubscriptionStats) stats.getSubscriptions().get("my-subscription")).getConsumers().get(0);
        Assert.assertTrue(consumerStats.getLastConsumedFlowTimestamp() > 0);
        Iterator fieldNames = create.readTree(create.writer().writeValueAsString(consumerStats)).fieldNames();
        while (fieldNames.hasNext()) {
            String str = (String) fieldNames.next();
            Assert.assertTrue(newHashSet.contains(str), str + " should not be exposed");
        }
        subscribe.close();
    }

    @Test
    public void testPersistentTopicMessageAckRateMetricTopicLevel() throws Exception {
        testMessageAckRateMetric("persistent://public/default/msg_ack_rate" + UUID.randomUUID(), true);
    }

    @Test
    public void testPersistentTopicMessageAckRateMetricNamespaceLevel() throws Exception {
        testMessageAckRateMetric("persistent://public/default/msg_ack_rate" + UUID.randomUUID(), false);
    }

    /* JADX WARN: Finally extract failed */
    private void testMessageAckRateMetric(String str, boolean z) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1000);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).enableBatching(true).batchingMaxMessages(10).create();
        try {
            MessageListener messageListener = (consumer, message) -> {
                try {
                    consumer.acknowledge(message);
                    countDownLatch.countDown();
                } catch (PulsarClientException e) {
                }
            };
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("test_sub").subscriptionType(SubscriptionType.Shared).messageListener(messageListener).subscribe();
            try {
                Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("test_sub").subscriptionType(SubscriptionType.Shared).messageListener(messageListener).subscribe();
                try {
                    String namespace = TopicName.get(str).getNamespace();
                    for (int i = 0; i < 1000; i++) {
                        create.sendAsync(UUID.randomUUID().toString());
                    }
                    create.flush();
                    countDownLatch.await(20L, TimeUnit.SECONDS);
                    TimeUnit.SECONDS.sleep(1L);
                    List consumers = ((Topic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).get()).get()).getSubscription("test_sub").getConsumers();
                    Assert.assertEquals(consumers.size(), 2);
                    org.apache.pulsar.broker.service.Consumer consumer2 = (org.apache.pulsar.broker.service.Consumer) consumers.get(0);
                    org.apache.pulsar.broker.service.Consumer consumer3 = (org.apache.pulsar.broker.service.Consumer) consumers.get(1);
                    consumer2.updateRates();
                    consumer3.updateRates();
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                    PrometheusMetricsGenerator.generate(this.pulsar, z, true, true, byteArrayOutputStream);
                    Multimap<String, PrometheusMetricsTest.Metric> parseMetrics = PrometheusMetricsTest.parseMetrics(byteArrayOutputStream.toString(StandardCharsets.UTF_8));
                    Collection collection = parseMetrics.get("pulsar_consumer_msg_ack_rate");
                    Collection collection2 = parseMetrics.get(z ? "pulsar_consumer_msg_rate_out" : "pulsar_rate_out");
                    Assert.assertTrue(collection.size() > 0);
                    Assert.assertTrue(collection2.size() > 0);
                    if (z) {
                        String consumerName = consumer2.consumerName();
                        String consumerName2 = consumer3.consumerName();
                        double sum = collection.stream().filter(metric -> {
                            return metric.tags.get("consumer_name").equals(consumerName) || metric.tags.get("consumer_name").equals(consumerName2);
                        }).mapToDouble(metric2 -> {
                            return metric2.value;
                        }).sum();
                        double sum2 = collection2.stream().filter(metric3 -> {
                            return metric3.tags.get("consumer_name").equals(consumerName) || metric3.tags.get("consumer_name").equals(consumerName2);
                        }).mapToDouble(metric4 -> {
                            return metric4.value;
                        }).sum();
                        Assert.assertTrue(sum > 0.0d);
                        Assert.assertTrue(sum2 > 0.0d);
                        Assert.assertEquals(sum, sum2, sum2 * 0.1d);
                    } else {
                        double sum3 = collection.stream().filter(metric5 -> {
                            return namespace.equals(metric5.tags.get("namespace"));
                        }).mapToDouble(metric6 -> {
                            return metric6.value;
                        }).sum();
                        double sum4 = collection2.stream().filter(metric7 -> {
                            return namespace.equals(metric7.tags.get("namespace"));
                        }).mapToDouble(metric8 -> {
                            return metric8.value;
                        }).sum();
                        Assert.assertTrue(sum3 > 0.0d);
                        Assert.assertTrue(sum4 > 0.0d);
                        Assert.assertEquals(sum3, sum4, sum4 * 0.1d);
                    }
                    if (Collections.singletonList(subscribe2).get(0) != null) {
                        subscribe2.close();
                    }
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe2).get(0) != null) {
                        subscribe2.close();
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th2;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    public PulsarService newPulsarService(ServiceConfiguration serviceConfiguration) throws Exception {
        return new PulsarService(serviceConfiguration) { // from class: org.apache.pulsar.broker.stats.ConsumerStatsTest.1
            protected BrokerService newBrokerService(PulsarService pulsarService) throws Exception {
                return (BrokerService) Mockito.spy(new BrokerService(this, this.ioEventLoopGroup));
            }
        };
    }

    @Test
    public void testAvgMessagesPerEntry() throws Exception {
        Producer create = this.pulsarClient.newProducer(Schema.STRING).producerName("producer1").enableBatching(true).topic("persistent://public/default/testFilterState").batchingMaxMessages(20).batchingMaxPublishDelay(5L, TimeUnit.SECONDS).batchingMaxBytes(Integer.MAX_VALUE).create();
        create.send("first-message");
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 20; i++) {
            arrayList.add(create.sendAsync("message"));
        }
        FutureUtil.waitForAll(arrayList);
        create.close();
        Producer create2 = this.pulsarClient.newProducer(Schema.STRING).producerName("producer2").enableBatching(false).topic("persistent://public/default/testFilterState").create();
        create2.newMessage().value("producer2-message").send();
        create2.close();
        ((BrokerService) Mockito.doReturn(ImmutableMap.of("filter", (EntryFilterWithClassLoader) BrokerTestUtil.spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, new EntryFilterProducerTest(), (NarClassLoader) Mockito.mock(NarClassLoader.class)))).when(this.pulsar.getBrokerService())).getEntryFilters();
        HashMap hashMap = new HashMap();
        hashMap.put("matchValueAccept", "producer1");
        hashMap.put("matchValueReschedule", "producer2");
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://public/default/testFilterState"}).properties(hashMap).subscriptionName("sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        int i2 = 0;
        while (true) {
            try {
                Message receive = subscribe.receive(10, TimeUnit.SECONDS);
                if (receive == null) {
                    break;
                }
                i2++;
                Assert.assertNotEquals(receive.getValue(), "producer2-message");
                subscribe.acknowledge(receive);
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        }
        AssertJUnit.assertEquals(21, i2);
        ConsumerStats consumerStats = (ConsumerStats) ((SubscriptionStats) this.admin.topics().getStats("persistent://public/default/testFilterState").getSubscriptions().get("sub")).getConsumers().get(0);
        AssertJUnit.assertEquals(21L, consumerStats.getMsgOutCounter());
        AssertJUnit.assertEquals(3, consumerStats.getAvgMessagesPerEntry());
        if (Collections.singletonList(subscribe).get(0) != null) {
            subscribe.close();
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 889373328:
                if (implMethodName.equals("lambda$testMessageAckRateMetric$35305259$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case SHARED_VALUE:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/broker/stats/ConsumerStatsTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/CountDownLatch;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    CountDownLatch countDownLatch = (CountDownLatch) serializedLambda.getCapturedArg(0);
                    return (consumer, message) -> {
                        try {
                            consumer.acknowledge(message);
                            countDownLatch.countDown();
                        } catch (PulsarClientException e) {
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
