package org.apache.pulsar.broker.stats;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/stats/OpenTelemetryTopicStatsTest.class */
public class OpenTelemetryTopicStatsTest extends BrokerTestBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod(alwaysRun = true)
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    public void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
        super.customizeMainPulsarTestContextBuilder(builder);
        builder.enableOpenTelemetry(true);
    }

    @Test(timeOut = 30000)
    public void testMessagingMetrics() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testMessagingMetrics");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        int i = 5 * 2;
        for (int i2 = 0; i2 < 5; i2++) {
            Producer registerCloseable = registerCloseable(this.pulsarClient.newProducer().topic(newUniqueName).create());
            for (int i3 = 0; i3 < 2; i3++) {
                registerCloseable.send(String.format("producer-%d-msg-%d", Integer.valueOf(i2), Integer.valueOf(i3)).getBytes());
            }
        }
        CountDownLatch countDownLatch = new CountDownLatch(3);
        for (int i4 = 0; i4 < 3; i4++) {
            registerCloseable(this.pulsarClient.newConsumer().topic(new String[]{newUniqueName}).subscriptionName("test").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionType(SubscriptionType.Shared).subscribe()).receiveAsync().orTimeout(100L, TimeUnit.MILLISECONDS).handle((message, th) -> {
                countDownLatch.countDown();
                return null;
            });
        }
        countDownLatch.await();
        Attributes build = Attributes.builder().put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent").put(OpenTelemetryAttributes.PULSAR_TENANT, "prop").put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "prop/ns-abc").put(OpenTelemetryAttributes.PULSAR_TOPIC, newUniqueName).build();
        Collection collectAllMetrics = this.pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
        BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.topic.subscription.count", build, 1L);
        BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.topic.producer.count", build, 5);
        BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.topic.consumer.count", build, 3);
        BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.topic.message.incoming.count", build, i);
        BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.topic.message.outgoing.count", build, i);
        BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.topic.message.incoming.size", build, (Consumer<Long>) l -> {
            Assertions.assertThat(l).isPositive();
        });
        BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.topic.message.outgoing.size", build, (Consumer<Long>) l2 -> {
            Assertions.assertThat(l2).isPositive();
        });
        BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.topic.storage.size", build, (Consumer<Long>) l3 -> {
            Assertions.assertThat(l3).isPositive();
        });
        BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.topic.storage.logical.size", build, (Consumer<Long>) l4 -> {
            Assertions.assertThat(l4).isPositive();
        });
        BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.topic.storage.backlog.size", build, (Consumer<Long>) l5 -> {
            Assertions.assertThat(l5).isPositive();
        });
        BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.topic.storage.entry.outgoing.count", build, i);
        BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.topic.storage.entry.incoming.count", build, i);
    }

    @Test(timeOut = 30000)
    public void testPublishRateLimitMetric() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testPublishRateLimitMetric");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        PublishRate publishRate = new PublishRate(1, -1L);
        this.admin.topicPolicies().setPublishRate(newUniqueName, publishRate);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(Objects.equals(publishRate, this.admin.topicPolicies().getPublishRate(newUniqueName)));
        });
        Producer create = this.pulsarClient.newProducer().topic(newUniqueName).create();
        try {
            create.send("msg".getBytes());
            BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) this.pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(), "pulsar.broker.topic.publish.rate.limit.count", Attributes.builder().put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent").put(OpenTelemetryAttributes.PULSAR_TENANT, "prop").put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "prop/ns-abc").put(OpenTelemetryAttributes.PULSAR_TOPIC, newUniqueName).build(), 1L);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }
}
