package org.apache.pulsar.client.metrics;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.HistogramPointData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.MetricDataType;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import java.util.Collections;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker-api"})
/* loaded from: input_file:org/apache/pulsar/client/metrics/ClientMetricsTest.class */
public class ClientMetricsTest extends ProducerConsumerBase {
    InMemoryMetricReader reader;
    OpenTelemetry otel;

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
        this.reader = InMemoryMetricReader.create();
        this.otel = OpenTelemetrySdk.builder().setMeterProvider(SdkMeterProvider.builder().registerMetricReader(this.reader).build()).build();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    private Map<String, MetricData> collectMetrics() {
        TreeMap treeMap = new TreeMap();
        for (MetricData metricData : this.reader.collectAllMetrics()) {
            treeMap.put(metricData.getName(), metricData);
        }
        return treeMap;
    }

    private void assertCounterValue(Map<String, MetricData> map, String str, long j, Attributes attributes) {
        Assert.assertEquals(getCounterValue(map, str, attributes), j);
    }

    private long getCounterValue(Map<String, MetricData> map, String str, Attributes attributes) {
        MetricData metricData = map.get(str);
        Assert.assertNotNull(metricData, "metric not found: " + str);
        Assert.assertEquals(metricData.getType(), MetricDataType.LONG_SUM);
        for (LongPointData longPointData : metricData.getLongSumData().getPoints()) {
            if (longPointData.getAttributes().equals(attributes)) {
                return longPointData.getValue();
            }
        }
        Assert.fail("metric attributes not found: " + attributes);
        return -1L;
    }

    private void assertHistoCountValue(Map<String, MetricData> map, String str, long j, Attributes attributes) {
        Assert.assertEquals(getHistoCountValue(map, str, attributes), j);
    }

    private long getHistoCountValue(Map<String, MetricData> map, String str, Attributes attributes) {
        MetricData metricData = map.get(str);
        Assert.assertNotNull(metricData, "metric not found: " + str);
        Assert.assertEquals(metricData.getType(), MetricDataType.HISTOGRAM);
        for (HistogramPointData histogramPointData : metricData.getHistogramData().getPoints()) {
            if (histogramPointData.getAttributes().equals(attributes)) {
                return histogramPointData.getCount();
            }
        }
        Assert.fail("metric attributes not found: " + attributes);
        return -1L;
    }

    @Test
    public void testProducerMetrics() throws Exception {
        String newTopicName = newTopicName();
        PulsarClient build = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).openTelemetry(this.otel).build();
        Producer create = build.newProducer(Schema.STRING).topic(newTopicName).create();
        for (int i = 0; i < 5; i++) {
            create.send("Hello");
        }
        Attributes build2 = Attributes.builder().put("pulsar.tenant", "my-property").put("pulsar.namespace", "my-property/my-ns").build();
        Attributes build3 = build2.toBuilder().put("pulsar.response.status", "success").build();
        Map<String, MetricData> collectMetrics = collectMetrics();
        assertCounterValue(collectMetrics, "pulsar.client.connection.opened", 1L, Attributes.empty());
        assertCounterValue(collectMetrics, "pulsar.client.producer.message.pending.count", 0L, build2);
        assertCounterValue(collectMetrics, "pulsar.client.producer.message.pending.size", 0L, build2);
        assertHistoCountValue(collectMetrics, "pulsar.client.lookup.duration", 1L, Attributes.builder().put("pulsar.lookup.transport-type", "binary").put("pulsar.lookup.type", "topic").put("pulsar.response.status", "success").build());
        assertHistoCountValue(collectMetrics, "pulsar.client.lookup.duration", 1L, Attributes.builder().put("pulsar.lookup.transport-type", "binary").put("pulsar.lookup.type", "metadata").put("pulsar.response.status", "success").build());
        assertHistoCountValue(collectMetrics, "pulsar.client.producer.message.send.duration", 5L, build3);
        assertHistoCountValue(collectMetrics, "pulsar.client.producer.rpc.send.duration", 5L, build3);
        assertCounterValue(collectMetrics, "pulsar.client.producer.message.send.size", "hello".length() * 5, build2);
        assertCounterValue(collectMetrics, "pulsar.client.producer.opened", 1L, build2);
        create.close();
        build.close();
        Map<String, MetricData> collectMetrics2 = collectMetrics();
        assertCounterValue(collectMetrics2, "pulsar.client.producer.closed", 1L, build2);
        assertCounterValue(collectMetrics2, "pulsar.client.connection.closed", 1L, Attributes.empty());
    }

    @Test
    public void testConnectionsFailedMetrics() throws Exception {
        String newTopicName = newTopicName();
        PulsarClient build = PulsarClient.builder().serviceUrl("pulsar://invalid-pulsar-address:1234").operationTimeout(3, TimeUnit.SECONDS).openTelemetry(this.otel).build();
        try {
            Assertions.assertThatThrownBy(() -> {
                build.newProducer(Schema.STRING).topic(newTopicName).create();
            }).isInstanceOf(Exception.class);
            Assertions.assertThat(getCounterValue(collectMetrics(), "pulsar.client.connection.failed", Attributes.builder().put("pulsar.failure.type", "tcp-failed").build())).isGreaterThanOrEqualTo(1L);
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testPublishFailedMetrics() throws Exception {
        String newTopicName = newTopicName();
        PulsarClient build = PulsarClient.builder().serviceUrl(this.admin.getServiceUrl()).operationTimeout(3, TimeUnit.SECONDS).openTelemetry(this.otel).build();
        try {
            Producer create = build.newProducer(Schema.STRING).topic(newTopicName).sendTimeout(3, TimeUnit.SECONDS).create();
            try {
                build.updateServiceUrl("pulsar://invalid-address:6650");
                try {
                    create.send("Hello");
                    Assert.fail("Should have failed to publish");
                } catch (Exception e) {
                }
                Map<String, MetricData> collectMetrics = collectMetrics();
                Attributes build2 = Attributes.builder().put("pulsar.tenant", "my-property").put("pulsar.namespace", "my-property/my-ns").build();
                Attributes build3 = build2.toBuilder().put("pulsar.response.status", "failed").build();
                assertCounterValue(collectMetrics, "pulsar.client.producer.message.pending.count", 0L, build2);
                assertCounterValue(collectMetrics, "pulsar.client.producer.message.pending.size", 0L, build2);
                assertHistoCountValue(collectMetrics, "pulsar.client.producer.message.send.duration", 1L, build3);
                assertHistoCountValue(collectMetrics, "pulsar.client.producer.rpc.send.duration", 1L, build3);
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        }
    }

    @Test
    public void testConsumerMetrics() throws Exception {
        String newTopicName = newTopicName();
        PulsarClient build = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).openTelemetry(this.otel).build();
        Producer create = build.newProducer(Schema.STRING).topic(newTopicName).create();
        try {
            Consumer subscribe = build.newConsumer(Schema.STRING).topic(new String[]{newTopicName}).subscriptionName("my-sub").ackTimeout(1L, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Shared).subscribe();
            for (int i = 0; i < 10; i++) {
                create.send("Hello");
            }
            Thread.sleep(1000L);
            Attributes build2 = Attributes.builder().put("pulsar.tenant", "my-property").put("pulsar.namespace", "my-property/my-ns").put("pulsar.subscription", "my-sub").build();
            Map<String, MetricData> collectMetrics = collectMetrics();
            assertCounterValue(collectMetrics, "pulsar.client.connection.opened", 1L, Attributes.empty());
            assertHistoCountValue(collectMetrics, "pulsar.client.lookup.duration", 2L, Attributes.builder().put("pulsar.lookup.transport-type", "binary").put("pulsar.lookup.type", "topic").put("pulsar.response.status", "success").build());
            assertHistoCountValue(collectMetrics, "pulsar.client.lookup.duration", 2L, Attributes.builder().put("pulsar.lookup.transport-type", "binary").put("pulsar.lookup.type", "metadata").put("pulsar.response.status", "success").build());
            assertCounterValue(collectMetrics, "pulsar.client.consumer.receive_queue.count", 10L, build2);
            assertCounterValue(collectMetrics, "pulsar.client.consumer.receive_queue.size", "hello".length() * 10, build2);
            assertCounterValue(collectMetrics, "pulsar.client.consumer.opened", 1L, build2);
            subscribe.acknowledge(subscribe.receive());
            subscribe.negativeAcknowledge(subscribe.receive());
            subscribe.receive();
            Map<String, MetricData> collectMetrics2 = collectMetrics();
            assertCounterValue(collectMetrics2, "pulsar.client.consumer.receive_queue.count", 7L, build2);
            assertCounterValue(collectMetrics2, "pulsar.client.consumer.receive_queue.size", "hello".length() * 7, build2);
            assertCounterValue(collectMetrics2, "pulsar.client.consumer.message.received.count", 3L, build2);
            assertCounterValue(collectMetrics2, "pulsar.client.consumer.message.received.size", "hello".length() * 3, build2);
            Thread.sleep(3000L);
            Map<String, MetricData> collectMetrics3 = collectMetrics();
            assertCounterValue(collectMetrics3, "pulsar.client.consumer.receive_queue.count", 8L, build2);
            assertCounterValue(collectMetrics3, "pulsar.client.consumer.receive_queue.size", "hello".length() * 8, build2);
            assertCounterValue(collectMetrics3, "pulsar.client.consumer.message.ack", 1L, build2);
            assertCounterValue(collectMetrics3, "pulsar.client.consumer.message.nack", 1L, build2);
            assertCounterValue(collectMetrics3, "pulsar.client.consumer.message.ack.timeout", 1L, build2);
            build.close();
            Map<String, MetricData> collectMetrics4 = collectMetrics();
            assertCounterValue(collectMetrics4, "pulsar.client.consumer.closed", 1L, build2);
            assertCounterValue(collectMetrics4, "pulsar.client.connection.closed", 1L, Attributes.empty());
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }
}
