package org.apache.pulsar.broker.stats;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.BatchCallback;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.api.metrics.ObservableMeasurement;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl;

/* loaded from: input_file:org/apache/pulsar/broker/stats/OpenTelemetryProducerStats.class */
public class OpenTelemetryProducerStats implements AutoCloseable {
    public static final String MESSAGE_IN_COUNTER = "pulsar.broker.producer.message.incoming.count";
    private final ObservableLongMeasurement messageInCounter;
    public static final String BYTES_IN_COUNTER = "pulsar.broker.producer.message.incoming.size";
    private final ObservableLongMeasurement bytesInCounter;
    public static final String MESSAGE_DROP_COUNTER = "pulsar.broker.producer.message.drop.count";
    private final ObservableLongMeasurement messageDropCounter;
    private final BatchCallback batchCallback;

    public OpenTelemetryProducerStats(PulsarService pulsarService) {
        Meter meter = pulsarService.getOpenTelemetry().getMeter();
        this.messageInCounter = meter.counterBuilder(MESSAGE_IN_COUNTER).setUnit("{message}").setDescription("The total number of messages received from this producer.").buildObserver();
        this.bytesInCounter = meter.counterBuilder(BYTES_IN_COUNTER).setUnit("By").setDescription("The total number of messages bytes received from this producer.").buildObserver();
        this.messageDropCounter = meter.counterBuilder(MESSAGE_DROP_COUNTER).setUnit("{message}").setDescription("The total number of messages dropped from this producer.").buildObserver();
        this.batchCallback = meter.batchCallback(() -> {
            pulsarService.getBrokerService().getTopics().values().stream().filter(completableFuture -> {
                return completableFuture.isDone() && !completableFuture.isCompletedExceptionally();
            }).map((v0) -> {
                return v0.join();
            }).filter((v0) -> {
                return v0.isPresent();
            }).flatMap(optional -> {
                return ((Topic) optional.get()).getProducers().values().stream();
            }).forEach(this::recordMetricsForProducer);
        }, this.messageInCounter, new ObservableMeasurement[]{this.bytesInCounter, this.messageDropCounter});
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.batchCallback.close();
    }

    private void recordMetricsForProducer(Producer producer) {
        Attributes openTelemetryAttributes = producer.getOpenTelemetryAttributes();
        NonPersistentPublisherStatsImpl stats = producer.getStats();
        this.messageInCounter.record(stats.getMsgInCounter(), openTelemetryAttributes);
        this.bytesInCounter.record(stats.getBytesInCounter(), openTelemetryAttributes);
        if (stats instanceof NonPersistentPublisherStatsImpl) {
            this.messageDropCounter.record(stats.getMsgDropCount(), openTelemetryAttributes);
        }
    }
}
