package org.apache.pulsar.broker.service.schema;

import com.google.common.annotations.VisibleForTesting;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.prometheus.client.Counter;
import io.prometheus.client.Summary;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.MetricsUtil;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/pulsar/broker/service/schema/SchemaRegistryStats.class */
public class SchemaRegistryStats implements AutoCloseable, Runnable {
    public static final String SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME = "pulsar.broker.request.schema_registry.duration";
    private final DoubleHistogram latencyHistogram;
    public static final String COMPATIBLE_COUNTER_METRIC_NAME = "pulsar.broker.operation.schema_registry.compatibility_check.count";
    private final LongCounter schemaCompatibilityCounter;
    private final Map<String, Long> namespaceAccess = new ConcurrentHashMap();
    private final ScheduledFuture<?> future;
    private static final double[] QUANTILES = {0.5d, 0.75d, 0.95d, 0.99d, 0.999d, 0.9999d, 1.0d};
    public static final AttributeKey<String> REQUEST_TYPE_KEY = AttributeKey.stringKey("pulsar.schema_registry.request");
    public static final AttributeKey<String> RESPONSE_TYPE_KEY = AttributeKey.stringKey("pulsar.schema_registry.response");
    public static final AttributeKey<String> COMPATIBILITY_CHECK_RESPONSE_KEY = AttributeKey.stringKey("pulsar.schema_registry.compatibility_check.response");
    private static final String NAMESPACE = "namespace";
    private static final Counter getOpsFailedCounter = Counter.build("pulsar_schema_get_ops_failed_total", "-").labelNames(new String[]{NAMESPACE}).create().register();
    private static final Counter putOpsFailedCounter = Counter.build("pulsar_schema_put_ops_failed_total", "-").labelNames(new String[]{NAMESPACE}).create().register();
    private static final Counter deleteOpsFailedCounter = Counter.build("pulsar_schema_del_ops_failed_total", "-").labelNames(new String[]{NAMESPACE}).create().register();
    private static final Counter compatibleCounter = Counter.build("pulsar_schema_compatible_total", "-").labelNames(new String[]{NAMESPACE}).create().register();
    private static final Counter incompatibleCounter = Counter.build("pulsar_schema_incompatible_total", "-").labelNames(new String[]{NAMESPACE}).create().register();
    private static final Summary deleteOpsLatency = buildSummary("pulsar_schema_del_ops_latency", "-");
    private static final Summary getOpsLatency = buildSummary("pulsar_schema_get_ops_latency", "-");
    private static final Summary putOpsLatency = buildSummary("pulsar_schema_put_ops_latency", "-");

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/pulsar/broker/service/schema/SchemaRegistryStats$CompatibilityCheckResponse.class */
    public enum CompatibilityCheckResponse {
        COMPATIBLE,
        INCOMPATIBLE;

        public final Attributes attributes = Attributes.of(SchemaRegistryStats.COMPATIBILITY_CHECK_RESPONSE_KEY, name().toLowerCase());

        CompatibilityCheckResponse() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/pulsar/broker/service/schema/SchemaRegistryStats$RequestType.class */
    public enum RequestType {
        GET,
        LIST,
        PUT,
        DELETE;

        public final Attributes attributes = Attributes.of(SchemaRegistryStats.REQUEST_TYPE_KEY, name().toLowerCase());

        RequestType() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/pulsar/broker/service/schema/SchemaRegistryStats$ResponseType.class */
    public enum ResponseType {
        SUCCESS,
        FAILURE;

        public final Attributes attributes = Attributes.of(SchemaRegistryStats.RESPONSE_TYPE_KEY, name().toLowerCase());

        ResponseType() {
        }
    }

    public SchemaRegistryStats(PulsarService pulsarService) {
        this.future = pulsarService.getExecutor().scheduleAtFixedRate(this, 1L, 1L, TimeUnit.MINUTES);
        Meter meter = pulsarService.getOpenTelemetry().getMeter();
        this.latencyHistogram = meter.histogramBuilder(SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME).setDescription("The duration of Schema Registry requests.").setUnit("s").build();
        this.schemaCompatibilityCounter = meter.counterBuilder(COMPATIBLE_COUNTER_METRIC_NAME).setDescription("The number of Schema Registry compatibility check operations performed by the broker.").setUnit("{operation}").build();
    }

    private static Summary buildSummary(String str, String str2) {
        Summary.Builder labelNames = Summary.build(str, str2).labelNames(new String[]{NAMESPACE});
        for (double d : QUANTILES) {
            labelNames.quantile(d, 0.01d);
        }
        return labelNames.create().register();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recordDelFailed(String str, long j) {
        ((Counter.Child) deleteOpsFailedCounter.labels(new String[]{getNamespace(str)})).inc();
        recordOperationLatency(str, j, RequestType.DELETE, ResponseType.FAILURE);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recordGetFailed(String str, long j) {
        ((Counter.Child) getOpsFailedCounter.labels(new String[]{getNamespace(str)})).inc();
        recordOperationLatency(str, j, RequestType.GET, ResponseType.FAILURE);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recordListFailed(String str, long j) {
        ((Counter.Child) getOpsFailedCounter.labels(new String[]{getNamespace(str)})).inc();
        recordOperationLatency(str, j, RequestType.LIST, ResponseType.FAILURE);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recordPutFailed(String str, long j) {
        ((Counter.Child) putOpsFailedCounter.labels(new String[]{getNamespace(str)})).inc();
        recordOperationLatency(str, j, RequestType.PUT, ResponseType.FAILURE);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recordDelLatency(String str, long j) {
        ((Summary.Child) deleteOpsLatency.labels(new String[]{getNamespace(str)})).observe(j);
        recordOperationLatency(str, j, RequestType.DELETE, ResponseType.SUCCESS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recordGetLatency(String str, long j) {
        ((Summary.Child) getOpsLatency.labels(new String[]{getNamespace(str)})).observe(j);
        recordOperationLatency(str, j, RequestType.GET, ResponseType.SUCCESS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recordListLatency(String str, long j) {
        ((Summary.Child) getOpsLatency.labels(new String[]{getNamespace(str)})).observe(j);
        recordOperationLatency(str, j, RequestType.LIST, ResponseType.SUCCESS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recordPutLatency(String str, long j) {
        ((Summary.Child) putOpsLatency.labels(new String[]{getNamespace(str)})).observe(j);
        recordOperationLatency(str, j, RequestType.PUT, ResponseType.SUCCESS);
    }

    private void recordOperationLatency(String str, long j, RequestType requestType, ResponseType responseType) {
        this.latencyHistogram.record(MetricsUtil.convertToSeconds(j, TimeUnit.MILLISECONDS), Attributes.builder().put(OpenTelemetryAttributes.PULSAR_NAMESPACE, getNamespace(str)).putAll(requestType.attributes).putAll(responseType.attributes).build());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recordSchemaIncompatible(String str) {
        String namespace = getNamespace(str);
        ((Counter.Child) incompatibleCounter.labels(new String[]{namespace})).inc();
        recordSchemaCompabilityResult(namespace, CompatibilityCheckResponse.INCOMPATIBLE);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recordSchemaCompatible(String str) {
        String namespace = getNamespace(str);
        ((Counter.Child) compatibleCounter.labels(new String[]{namespace})).inc();
        recordSchemaCompabilityResult(namespace, CompatibilityCheckResponse.COMPATIBLE);
    }

    private void recordSchemaCompabilityResult(String str, CompatibilityCheckResponse compatibilityCheckResponse) {
        this.schemaCompatibilityCounter.add(1L, Attributes.builder().put(OpenTelemetryAttributes.PULSAR_NAMESPACE, str).putAll(compatibilityCheckResponse.attributes).build());
    }

    private String getNamespace(String str) {
        String str2;
        try {
            str2 = TopicName.get(str).getNamespace();
        } catch (IllegalArgumentException e) {
            str2 = "unknown";
        }
        this.namespaceAccess.put(str2, Long.valueOf(System.currentTimeMillis()));
        return str2;
    }

    private void removeChild(String str) {
        getOpsFailedCounter.remove(new String[]{str});
        putOpsFailedCounter.remove(new String[]{str});
        deleteOpsFailedCounter.remove(new String[]{str});
        compatibleCounter.remove(new String[]{str});
        incompatibleCounter.remove(new String[]{str});
        deleteOpsLatency.remove(new String[]{str});
        getOpsLatency.remove(new String[]{str});
        putOpsLatency.remove(new String[]{str});
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() throws Exception {
        this.namespaceAccess.keySet().forEach(this::removeChild);
        this.future.cancel(false);
    }

    @Override // java.lang.Runnable
    public void run() {
        long currentTimeMillis = System.currentTimeMillis();
        long millis = TimeUnit.MINUTES.toMillis(5L);
        this.namespaceAccess.entrySet().removeIf(entry -> {
            String str = (String) entry.getKey();
            if (currentTimeMillis - ((Long) entry.getValue()).longValue() <= millis) {
                return false;
            }
            removeChild(str);
            return true;
        });
    }
}
