package org.apache.pulsar.broker.stats;

import com.google.common.collect.Multimap;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.pulsar.PrometheusMetricsTestUtil;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryFilterSupport;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.plugin.EntryFilterTest;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/stats/SubscriptionStatsTest.class */
public class SubscriptionStatsTest extends ProducerConsumerBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(SubscriptionStatsTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    public ServiceConfiguration getDefaultConf() {
        ServiceConfiguration defaultConf = super.getDefaultConf();
        defaultConf.setBrokerShutdownTimeoutMs(5000L);
        return defaultConf;
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testConsumersAfterMarkDelete() throws PulsarClientException, PulsarAdminException {
        String str = "persistent://my-property/my-ns/testConsumersAfterMarkDelete-" + UUID.randomUUID();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).receiverQueueSize(10).subscriptionName("my-sub").subscriptionType(SubscriptionType.Key_Shared).subscribe();
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        for (int i = 0; i < 100; i++) {
            create.send(String.valueOf(i).getBytes());
        }
        subscribe.receive();
        Consumer subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{str}).receiverQueueSize(10).subscriptionName("my-sub").subscriptionType(SubscriptionType.Key_Shared).subscribe();
        TopicStats stats = this.admin.topics().getStats(str);
        Assert.assertEquals(stats.getSubscriptions().size(), 1);
        Assert.assertEquals(((SubscriptionStats) ((Map.Entry) stats.getSubscriptions().entrySet().iterator().next()).getValue()).getConsumersAfterMarkDeletePosition().size(), 1);
        subscribe.close();
        subscribe2.close();
        create.close();
    }

    @Test
    public void testNonContiguousDeletedMessagesRanges() throws Exception {
        String str = "persistent://my-property/my-ns/testNonContiguousDeletedMessagesRanges-" + UUID.randomUUID().toString();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-sub").subscribe();
        try {
            Producer create = this.pulsarClient.newProducer().topic(str).create();
            for (int i = 0; i < 100; i++) {
                try {
                    create.send(String.valueOf(i).getBytes());
                } catch (Throwable th) {
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    throw th;
                }
            }
            for (int i2 = 0; i2 < 100; i2++) {
                Message receive = subscribe.receive();
                if (i2 != 50) {
                    subscribe.acknowledge(receive);
                }
            }
            Awaitility.await().untilAsserted(() -> {
                TopicStats stats = this.admin.topics().getStats(str);
                Assert.assertEquals(stats.getNonContiguousDeletedMessagesRanges(), 1);
                Assert.assertEquals(stats.getSubscriptions().size(), 1);
                Assert.assertEquals(((SubscriptionStats) stats.getSubscriptions().get("my-sub")).getNonContiguousDeletedMessagesRanges(), 1);
                Assert.assertTrue(stats.getNonContiguousDeletedMessagesRangesSerializedSize() > 0);
                Assert.assertTrue(((SubscriptionStats) stats.getSubscriptions().get("my-sub")).getNonContiguousDeletedMessagesRangesSerializedSize() > 0);
            });
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "testSubscriptionMetrics")
    public Object[][] topicAndSubscription() {
        return new Object[]{new Object[]{"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true, true}, new Object[]{"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true, true}, new Object[]{"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false, true}, new Object[]{"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false, true}, new Object[]{"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true, false}, new Object[]{"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true, false}, new Object[]{"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false, false}, new Object[]{"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false, false}};
    }

    @Test(dataProvider = "testSubscriptionMetrics")
    public void testSubscriptionStats(String str, String str2, boolean z, boolean z2) throws Exception {
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).enableBatching(false).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionType(SubscriptionType.Exclusive).subscriptionName(str2).subscribe();
            try {
                ((Topic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).get()).get()).isPersistent();
                Dispatcher dispatcher = ((Topic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).get()).get()).getSubscription(str2).getDispatcher();
                if (z2) {
                    Field declaredField = EntryFilterSupport.class.getDeclaredField("entryFilters");
                    declaredField.setAccessible(true);
                    Field declaredField2 = EntryFilterSupport.class.getDeclaredField("hasFilter");
                    declaredField2.setAccessible(true);
                    declaredField.set(dispatcher, List.of((EntryFilterWithClassLoader) BrokerTestUtil.spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, new EntryFilterTest(), (NarClassLoader) Mockito.mock(NarClassLoader.class), false)));
                    declaredField2.set(dispatcher, true);
                }
                for (int i = 0; i < 100; i++) {
                    create.newMessage().property("REJECT", " ").value(UUID.randomUUID().toString()).send();
                }
                for (int i2 = 0; i2 < 100; i2++) {
                    create.newMessage().property("ACCEPT", " ").value(UUID.randomUUID().toString()).send();
                }
                for (int i3 = 0; i3 < 100; i3++) {
                    create.newMessage().property("RESCHEDULE", " ").value(UUID.randomUUID().toString()).send();
                }
                for (int i4 = 0; i4 < 100; i4++) {
                    Message receive = subscribe.receive(1, TimeUnit.SECONDS);
                    Assert.assertNotNull(receive);
                    subscribe.acknowledge(receive);
                }
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                PrometheusMetricsTestUtil.generate(this.pulsar, z, false, false, byteArrayOutputStream);
                Multimap<String, PrometheusMetricsClient.Metric> parseMetrics = PrometheusMetricsClient.parseMetrics(byteArrayOutputStream.toString());
                Collection collection = parseMetrics.get("pulsar_subscription_filter_processed_msg_count");
                Collection collection2 = parseMetrics.get("pulsar_subscription_filter_accepted_msg_count");
                Collection collection3 = parseMetrics.get("pulsar_subscription_filter_rejected_msg_count");
                Collection collection4 = parseMetrics.get("pulsar_subscription_filter_rescheduled_msg_count");
                if (z) {
                    Assert.assertTrue(collection.size() > 0);
                    Assert.assertTrue(collection2.size() > 0);
                    Assert.assertTrue(collection3.size() > 0);
                    Assert.assertTrue(collection4.size() > 0);
                    double sum = collection.stream().filter(metric -> {
                        return metric.tags.get("subscription").equals(str2) && metric.tags.get("topic").equals(str);
                    }).mapToDouble(metric2 -> {
                        return metric2.value;
                    }).sum();
                    double sum2 = collection2.stream().filter(metric3 -> {
                        return metric3.tags.get("subscription").equals(str2) && metric3.tags.get("topic").equals(str);
                    }).mapToDouble(metric4 -> {
                        return metric4.value;
                    }).sum();
                    double sum3 = collection3.stream().filter(metric5 -> {
                        return metric5.tags.get("subscription").equals(str2) && metric5.tags.get("topic").equals(str);
                    }).mapToDouble(metric6 -> {
                        return metric6.value;
                    }).sum();
                    double sum4 = collection4.stream().filter(metric7 -> {
                        return metric7.tags.get("subscription").equals(str2) && metric7.tags.get("topic").equals(str);
                    }).mapToDouble(metric8 -> {
                        return metric8.value;
                    }).sum();
                    if (z2) {
                        Assert.assertEquals(sum2, 100);
                        Assert.assertEquals(sum3, 100);
                        Assert.assertEquals(sum, sum2 + sum3 + sum4, 0.01d * sum);
                    } else {
                        Assert.assertEquals(sum, 0.0d);
                        Assert.assertEquals(sum2, 0.0d);
                        Assert.assertEquals(sum3, 0.0d);
                        Assert.assertEquals(sum4, 0.0d);
                    }
                } else {
                    Assert.assertEquals(collection.size(), 0);
                    Assert.assertEquals(collection2.size(), 0);
                    Assert.assertEquals(collection3.size(), 0);
                    Assert.assertEquals(collection4.size(), 0);
                }
                testSubscriptionStatsAdminApi(str, str2, z2, 100, 100);
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    private void testSubscriptionStatsAdminApi(String str, String str2, boolean z, int i, int i2) throws Exception {
        boolean isPersistent = TopicName.get(str).isPersistent();
        SubscriptionStats subscriptionStats = (SubscriptionStats) this.admin.topics().getStats(str).getSubscriptions().get(str2);
        Assert.assertNotNull(subscriptionStats);
        if (z) {
            Assert.assertEquals(subscriptionStats.getFilterAcceptedMsgCount(), i);
            if (isPersistent) {
                Assert.assertEquals(subscriptionStats.getFilterRejectedMsgCount(), i2);
                Assert.assertEquals(subscriptionStats.getFilterProcessedMsgCount(), subscriptionStats.getFilterAcceptedMsgCount() + subscriptionStats.getFilterRejectedMsgCount() + subscriptionStats.getFilterRescheduledMsgCount(), 0.01d * subscriptionStats.getFilterProcessedMsgCount());
                return;
            }
            return;
        }
        Assert.assertEquals(subscriptionStats.getFilterAcceptedMsgCount(), 0L);
        if (isPersistent) {
            Assert.assertEquals(subscriptionStats.getFilterRejectedMsgCount(), 0L);
            Assert.assertEquals(subscriptionStats.getFilterAcceptedMsgCount(), 0L);
            Assert.assertEquals(subscriptionStats.getFilterRescheduledMsgCount(), 0L);
        }
    }
}
