package org.apache.kafka.streams.processor.internals.metrics;

import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.class */
public class StreamsMetricsImpl implements StreamsMetrics {
    private final Metrics metrics;
    private final Map<Sensor, Sensor> parentSensors;
    private final String clientId;
    private final Version version;
    private final Deque<MetricName> clientLevelMetrics = new LinkedList();
    private final Deque<String> clientLevelSensors = new LinkedList();
    private final Map<String, Deque<String>> threadLevelSensors = new HashMap();
    private final Map<String, Deque<String>> taskLevelSensors = new HashMap();
    private final Map<String, Deque<String>> nodeLevelSensors = new HashMap();
    private final Map<String, Deque<String>> cacheLevelSensors = new HashMap();
    private final ConcurrentMap<String, Deque<String>> storeLevelSensors = new ConcurrentHashMap();
    private final ConcurrentMap<String, Deque<MetricName>> storeLevelMetrics = new ConcurrentHashMap();
    private final RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger;
    private static final String SENSOR_PREFIX_DELIMITER = ".";
    private static final String SENSOR_NAME_DELIMITER = ".s.";
    private static final String SENSOR_TASK_LABEL = "task";
    private static final String SENSOR_NODE_LABEL = "node";
    private static final String SENSOR_CACHE_LABEL = "cache";
    private static final String SENSOR_STORE_LABEL = "store";
    private static final String SENSOR_ENTITY_LABEL = "entity";
    private static final String SENSOR_EXTERNAL_LABEL = "external";
    private static final String SENSOR_INTERNAL_LABEL = "internal";
    public static final String CLIENT_ID_TAG = "client-id";
    public static final String THREAD_ID_TAG = "thread-id";
    public static final String THREAD_ID_TAG_0100_TO_24 = "client-id";
    public static final String TASK_ID_TAG = "task-id";
    public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
    public static final String STORE_ID_TAG = "state-id";
    public static final String BUFFER_ID_TAG = "buffer-id";
    public static final String RECORD_CACHE_ID_TAG = "record-cache-id";
    public static final String ROLLUP_VALUE = "all";
    public static final String LATENCY_SUFFIX = "-latency";
    public static final String RECORDS_SUFFIX = "-records";
    public static final String AVG_SUFFIX = "-avg";
    public static final String MAX_SUFFIX = "-max";
    public static final String MIN_SUFFIX = "-min";
    public static final String RATE_SUFFIX = "-rate";
    public static final String TOTAL_SUFFIX = "-total";
    public static final String RATIO_SUFFIX = "-ratio";
    public static final String GROUP_PREFIX_WO_DELIMITER = "stream";
    public static final String GROUP_PREFIX = "stream-";
    public static final String GROUP_SUFFIX = "-metrics";
    public static final String CLIENT_LEVEL_GROUP = "stream-metrics";
    public static final String THREAD_LEVEL_GROUP = "stream-thread-metrics";
    public static final String THREAD_LEVEL_GROUP_0100_TO_24 = "stream-metrics";
    public static final String TASK_LEVEL_GROUP = "stream-task-metrics";
    public static final String PROCESSOR_NODE_LEVEL_GROUP = "stream-processor-node-metrics";
    public static final String STATE_STORE_LEVEL_GROUP = "stream-state-metrics";
    public static final String BUFFER_LEVEL_GROUP_0100_TO_24 = "stream-buffer-metrics";
    public static final String CACHE_LEVEL_GROUP = "stream-record-cache-metrics";
    public static final String OPERATIONS = " operations";
    public static final String TOTAL_DESCRIPTION = "The total number of ";
    public static final String RATE_DESCRIPTION = "The average per-second number of ";
    public static final String AVG_LATENCY_DESCRIPTION = "The average latency of ";
    public static final String MAX_LATENCY_DESCRIPTION = "The maximum latency of ";
    public static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
    public static final String RATE_DESCRIPTION_SUFFIX = " per second";
    public static final String RECORD_E2E_LATENCY = "record-e2e-latency";
    public static final String RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX = "end-to-end latency of a record, measuring by comparing the record timestamp with the system time when it has been fully processed by the node";
    public static final String RECORD_E2E_LATENCY_AVG_DESCRIPTION = "The average end-to-end latency of a record, measuring by comparing the record timestamp with the system time when it has been fully processed by the node";
    public static final String RECORD_E2E_LATENCY_MIN_DESCRIPTION = "The minimum end-to-end latency of a record, measuring by comparing the record timestamp with the system time when it has been fully processed by the node";
    public static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION = "The maximum end-to-end latency of a record, measuring by comparing the record timestamp with the system time when it has been fully processed by the node";

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl$ImmutableMetricValue.class */
    static class ImmutableMetricValue<T> implements Gauge<T> {
        private final T value;

        public ImmutableMetricValue(T t) {
            this.value = t;
        }

        @Override // org.apache.kafka.common.metrics.Gauge
        public T value(MetricConfig metricConfig, long j) {
            return this.value;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return Objects.equals(this.value, ((ImmutableMetricValue) obj).value);
        }

        public int hashCode() {
            return Objects.hash(this.value);
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl$Version.class */
    public enum Version {
        LATEST,
        FROM_0100_TO_24
    }

    public StreamsMetricsImpl(Metrics metrics, String str, String str2, Time time) {
        Objects.requireNonNull(metrics, "Metrics cannot be null");
        Objects.requireNonNull(str2, "Built-in metrics version cannot be null");
        this.metrics = metrics;
        this.clientId = str;
        this.version = parseBuiltInMetricsVersion(str2);
        this.rocksDBMetricsRecordingTrigger = new RocksDBMetricsRecordingTrigger(time);
        this.parentSensors = new HashMap();
    }

    private static Version parseBuiltInMetricsVersion(String str) {
        return str.equals(StreamsConfig.METRICS_LATEST) ? Version.LATEST : Version.FROM_0100_TO_24;
    }

    public Version version() {
        return this.version;
    }

    public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() {
        return this.rocksDBMetricsRecordingTrigger;
    }

    public <T> void addClientLevelImmutableMetric(String str, String str2, Sensor.RecordingLevel recordingLevel, T t) {
        MetricName metricName = this.metrics.metricName(str, "stream-metrics", str2, clientLevelTagMap());
        MetricConfig recordLevel = new MetricConfig().recordLevel(recordingLevel);
        synchronized (this.clientLevelMetrics) {
            this.metrics.addMetric(metricName, recordLevel, new ImmutableMetricValue(t));
            this.clientLevelMetrics.push(metricName);
        }
    }

    public <T> void addClientLevelMutableMetric(String str, String str2, Sensor.RecordingLevel recordingLevel, Gauge<T> gauge) {
        MetricName metricName = this.metrics.metricName(str, "stream-metrics", str2, clientLevelTagMap());
        MetricConfig recordLevel = new MetricConfig().recordLevel(recordingLevel);
        synchronized (this.clientLevelMetrics) {
            this.metrics.addMetric(metricName, recordLevel, gauge);
            this.clientLevelMetrics.push(metricName);
        }
    }

    public final Sensor clientLevelSensor(String str, Sensor.RecordingLevel recordingLevel, Sensor... sensorArr) {
        synchronized (this.clientLevelSensors) {
            String str2 = "stream-metrics.s." + str;
            Sensor sensor = this.metrics.getSensor(str2);
            if (sensor != null) {
                return sensor;
            }
            this.clientLevelSensors.push(str2);
            return this.metrics.sensor(str2, recordingLevel, sensorArr);
        }
    }

    public final Sensor threadLevelSensor(String str, String str2, Sensor.RecordingLevel recordingLevel, Sensor... sensorArr) {
        Sensor sensors;
        String threadSensorPrefix = threadSensorPrefix(str);
        synchronized (this.threadLevelSensors) {
            sensors = getSensors(this.threadLevelSensors, str2, threadSensorPrefix, recordingLevel, sensorArr);
        }
        return sensors;
    }

    private String threadSensorPrefix(String str) {
        return "internal." + str;
    }

    public Map<String, String> clientLevelTagMap() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("client-id", this.clientId);
        return linkedHashMap;
    }

    public Map<String, String> threadLevelTagMap(String str) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(this.version == Version.LATEST ? THREAD_ID_TAG : "client-id", str);
        return linkedHashMap;
    }

    public final void removeAllClientLevelSensorsAndMetrics() {
        removeAllClientLevelSensors();
        removeAllClientLevelMetrics();
    }

    private final void removeAllClientLevelMetrics() {
        synchronized (this.clientLevelMetrics) {
            while (!this.clientLevelMetrics.isEmpty()) {
                this.metrics.removeMetric(this.clientLevelMetrics.pop());
            }
        }
    }

    private final void removeAllClientLevelSensors() {
        synchronized (this.clientLevelSensors) {
            while (!this.clientLevelSensors.isEmpty()) {
                this.metrics.removeSensor(this.clientLevelSensors.pop());
            }
        }
    }

    public final void removeAllThreadLevelSensors(String str) {
        String threadSensorPrefix = threadSensorPrefix(str);
        synchronized (this.threadLevelSensors) {
            Deque<String> remove = this.threadLevelSensors.remove(threadSensorPrefix);
            while (remove != null && !remove.isEmpty()) {
                this.metrics.removeSensor(remove.pop());
            }
        }
    }

    public Map<String, String> taskLevelTagMap(String str, String str2) {
        Map<String, String> threadLevelTagMap = threadLevelTagMap(str);
        threadLevelTagMap.put(TASK_ID_TAG, str2);
        return threadLevelTagMap;
    }

    public Map<String, String> nodeLevelTagMap(String str, String str2, String str3) {
        Map<String, String> taskLevelTagMap = taskLevelTagMap(str, str2);
        taskLevelTagMap.put(PROCESSOR_NODE_ID_TAG, str3);
        return taskLevelTagMap;
    }

    public Map<String, String> storeLevelTagMap(String str, String str2, String str3) {
        Map<String, String> taskLevelTagMap = taskLevelTagMap(Thread.currentThread().getName(), str);
        taskLevelTagMap.put(str2 + "-" + STORE_ID_TAG, str3);
        return taskLevelTagMap;
    }

    public Map<String, String> bufferLevelTagMap(String str, String str2, String str3) {
        Map<String, String> taskLevelTagMap = taskLevelTagMap(str, str2);
        taskLevelTagMap.put(BUFFER_ID_TAG, str3);
        return taskLevelTagMap;
    }

    public final Sensor taskLevelSensor(String str, String str2, String str3, Sensor.RecordingLevel recordingLevel, Sensor... sensorArr) {
        Sensor sensors;
        String taskSensorPrefix = taskSensorPrefix(str, str2);
        synchronized (this.taskLevelSensors) {
            sensors = getSensors(this.taskLevelSensors, str3, taskSensorPrefix, recordingLevel, sensorArr);
        }
        return sensors;
    }

    public final void removeAllTaskLevelSensors(String str, String str2) {
        String taskSensorPrefix = taskSensorPrefix(str, str2);
        synchronized (this.taskLevelSensors) {
            Deque<String> remove = this.taskLevelSensors.remove(taskSensorPrefix);
            while (remove != null && !remove.isEmpty()) {
                this.metrics.removeSensor(remove.pop());
            }
        }
    }

    private String taskSensorPrefix(String str, String str2) {
        return threadSensorPrefix(str) + SENSOR_PREFIX_DELIMITER + SENSOR_TASK_LABEL + SENSOR_PREFIX_DELIMITER + str2;
    }

    public Sensor nodeLevelSensor(String str, String str2, String str3, String str4, Sensor.RecordingLevel recordingLevel, Sensor... sensorArr) {
        Sensor sensors;
        String nodeSensorPrefix = nodeSensorPrefix(str, str2, str3);
        synchronized (this.nodeLevelSensors) {
            sensors = getSensors(this.nodeLevelSensors, str4, nodeSensorPrefix, recordingLevel, sensorArr);
        }
        return sensors;
    }

    public final void removeAllNodeLevelSensors(String str, String str2, String str3) {
        String nodeSensorPrefix = nodeSensorPrefix(str, str2, str3);
        synchronized (this.nodeLevelSensors) {
            Deque<String> remove = this.nodeLevelSensors.remove(nodeSensorPrefix);
            while (remove != null && !remove.isEmpty()) {
                this.metrics.removeSensor(remove.pop());
            }
        }
    }

    private String nodeSensorPrefix(String str, String str2, String str3) {
        return taskSensorPrefix(str, str2) + SENSOR_PREFIX_DELIMITER + SENSOR_NODE_LABEL + SENSOR_PREFIX_DELIMITER + str3;
    }

    public Sensor cacheLevelSensor(String str, String str2, String str3, String str4, Sensor.RecordingLevel recordingLevel, Sensor... sensorArr) {
        Sensor sensors;
        String cacheSensorPrefix = cacheSensorPrefix(str, str2, str3);
        synchronized (this.cacheLevelSensors) {
            sensors = getSensors(this.cacheLevelSensors, str4, cacheSensorPrefix, recordingLevel, sensorArr);
        }
        return sensors;
    }

    public Map<String, String> cacheLevelTagMap(String str, String str2, String str3) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        if (this.version == Version.FROM_0100_TO_24) {
            linkedHashMap.put("client-id", str);
        } else {
            linkedHashMap.put(THREAD_ID_TAG, str);
        }
        linkedHashMap.put(TASK_ID_TAG, str2);
        linkedHashMap.put(RECORD_CACHE_ID_TAG, str3);
        return linkedHashMap;
    }

    public final void removeAllCacheLevelSensors(String str, String str2, String str3) {
        String cacheSensorPrefix = cacheSensorPrefix(str, str2, str3);
        synchronized (this.cacheLevelSensors) {
            Deque<String> remove = this.cacheLevelSensors.remove(cacheSensorPrefix);
            while (remove != null && !remove.isEmpty()) {
                this.metrics.removeSensor(remove.pop());
            }
        }
    }

    private String cacheSensorPrefix(String str, String str2, String str3) {
        return taskSensorPrefix(str, str2) + SENSOR_PREFIX_DELIMITER + SENSOR_CACHE_LABEL + SENSOR_PREFIX_DELIMITER + str3;
    }

    public final Sensor storeLevelSensor(String str, String str2, String str3, Sensor.RecordingLevel recordingLevel, Sensor... sensorArr) {
        return getSensors(this.storeLevelSensors, str3, storeSensorPrefix(Thread.currentThread().getName(), str, str2), recordingLevel, sensorArr);
    }

    public <T> void addStoreLevelMutableMetric(String str, String str2, String str3, String str4, String str5, Sensor.RecordingLevel recordingLevel, Gauge<T> gauge) {
        MetricName metricName = this.metrics.metricName(str4, STATE_STORE_LEVEL_GROUP, str5, storeLevelTagMap(str, str2, str3));
        if (this.metrics.metric(metricName) == null) {
            MetricConfig recordLevel = new MetricConfig().recordLevel(recordingLevel);
            String storeSensorPrefix = storeSensorPrefix(Thread.currentThread().getName(), str, str3);
            this.metrics.addMetric(metricName, recordLevel, gauge);
            this.storeLevelMetrics.computeIfAbsent(storeSensorPrefix, str6 -> {
                return new LinkedList();
            }).push(metricName);
        }
    }

    public final void removeAllStoreLevelSensorsAndMetrics(String str, String str2) {
        String name = Thread.currentThread().getName();
        removeAllStoreLevelSensors(name, str, str2);
        removeAllStoreLevelMetrics(name, str, str2);
    }

    private void removeAllStoreLevelSensors(String str, String str2, String str3) {
        Deque<String> remove = this.storeLevelSensors.remove(storeSensorPrefix(str, str2, str3));
        while (remove != null && !remove.isEmpty()) {
            this.metrics.removeSensor(remove.pop());
        }
    }

    private void removeAllStoreLevelMetrics(String str, String str2, String str3) {
        Deque<MetricName> remove = this.storeLevelMetrics.remove(storeSensorPrefix(str, str2, str3));
        while (remove != null && !remove.isEmpty()) {
            this.metrics.removeMetric(remove.pop());
        }
    }

    private String storeSensorPrefix(String str, String str2, String str3) {
        return taskSensorPrefix(str, str2) + SENSOR_PREFIX_DELIMITER + SENSOR_STORE_LABEL + SENSOR_PREFIX_DELIMITER + str3;
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public Sensor addSensor(String str, Sensor.RecordingLevel recordingLevel) {
        return this.metrics.sensor(str, recordingLevel);
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public Sensor addSensor(String str, Sensor.RecordingLevel recordingLevel, Sensor... sensorArr) {
        return this.metrics.sensor(str, recordingLevel, sensorArr);
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    @Deprecated
    public void recordLatency(Sensor sensor, long j, long j2) {
        sensor.record(j2 - j);
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    @Deprecated
    public void recordThroughput(Sensor sensor, long j) {
        sensor.record(j);
    }

    private Map<String, String> customizedTags(String str, String str2, String str3, String... strArr) {
        Map<String, String> threadLevelTagMap = threadLevelTagMap(str);
        threadLevelTagMap.put(str2 + "-id", str3);
        if (strArr != null) {
            if (strArr.length % 2 != 0) {
                throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
            }
            for (int i = 0; i < strArr.length; i += 2) {
                threadLevelTagMap.put(strArr[i], strArr[i + 1]);
            }
        }
        return threadLevelTagMap;
    }

    private Sensor customInvocationRateAndCountSensor(String str, String str2, String str3, String str4, Map<String, String> map, Sensor.RecordingLevel recordingLevel) {
        Sensor sensor = this.metrics.sensor(externalChildSensorName(str, str4, str3), recordingLevel);
        addInvocationRateAndCountToSensor(sensor, str2, map, str4, RATE_DESCRIPTION_PREFIX + str4 + OPERATIONS + RATE_DESCRIPTION_SUFFIX, TOTAL_DESCRIPTION + str4 + OPERATIONS);
        return sensor;
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public Sensor addLatencyRateTotalSensor(String str, String str2, String str3, Sensor.RecordingLevel recordingLevel, String... strArr) {
        String name = Thread.currentThread().getName();
        String groupNameFromScope = groupNameFromScope(str);
        Map<String, String> customizedTags = customizedTags(name, str, str2, strArr);
        Sensor customInvocationRateAndCountSensor = customInvocationRateAndCountSensor(name, groupNameFromScope, str2, str3, customizedTags, recordingLevel);
        addAvgAndMaxToSensor(customInvocationRateAndCountSensor, groupNameFromScope, customizedTags, str3 + LATENCY_SUFFIX, AVG_LATENCY_DESCRIPTION + str3, MAX_LATENCY_DESCRIPTION + str3);
        return customInvocationRateAndCountSensor;
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public Sensor addRateTotalSensor(String str, String str2, String str3, Sensor.RecordingLevel recordingLevel, String... strArr) {
        String name = Thread.currentThread().getName();
        return customInvocationRateAndCountSensor(name, groupNameFromScope(str), str2, str3, customizedTags(name, str, str2, strArr), recordingLevel);
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    @Deprecated
    public Sensor addLatencyAndThroughputSensor(String str, String str2, String str3, Sensor.RecordingLevel recordingLevel, String... strArr) {
        String groupNameFromScope = groupNameFromScope(str);
        String name = Thread.currentThread().getName();
        Map<String, String> customizedTags = customizedTags(name, str, str2, strArr);
        Map<String, String> customizedTags2 = customizedTags(name, str, "all", strArr);
        Sensor sensor = this.metrics.sensor(externalParentSensorName(name, str3), recordingLevel);
        addAvgAndMaxLatencyToSensor(sensor, groupNameFromScope, customizedTags2, str3);
        addInvocationRateAndCountToSensor(sensor, groupNameFromScope, customizedTags2, str3);
        Sensor sensor2 = this.metrics.sensor(externalChildSensorName(name, str3, str2), recordingLevel, sensor);
        addAvgAndMaxLatencyToSensor(sensor2, groupNameFromScope, customizedTags, str3);
        addInvocationRateAndCountToSensor(sensor2, groupNameFromScope, customizedTags, str3);
        this.parentSensors.put(sensor2, sensor);
        return sensor2;
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    @Deprecated
    public Sensor addThroughputSensor(String str, String str2, String str3, Sensor.RecordingLevel recordingLevel, String... strArr) {
        String groupNameFromScope = groupNameFromScope(str);
        String name = Thread.currentThread().getName();
        Map<String, String> customizedTags = customizedTags(name, str, str2, strArr);
        Map<String, String> customizedTags2 = customizedTags(name, str, "all", strArr);
        Sensor sensor = this.metrics.sensor(externalParentSensorName(name, str3), recordingLevel);
        addInvocationRateAndCountToSensor(sensor, groupNameFromScope, customizedTags2, str3);
        Sensor sensor2 = this.metrics.sensor(externalChildSensorName(name, str3, str2), recordingLevel, sensor);
        addInvocationRateAndCountToSensor(sensor2, groupNameFromScope, customizedTags, str3);
        this.parentSensors.put(sensor2, sensor);
        return sensor2;
    }

    private String externalChildSensorName(String str, String str2, String str3) {
        return "external." + str + SENSOR_PREFIX_DELIMITER + SENSOR_ENTITY_LABEL + SENSOR_PREFIX_DELIMITER + str3 + SENSOR_NAME_DELIMITER + str2;
    }

    private String externalParentSensorName(String str, String str2) {
        return "external." + str + SENSOR_NAME_DELIMITER + str2;
    }

    public static void addAvgAndMaxToSensor(Sensor sensor, String str, Map<String, String> map, String str2, String str3, String str4) {
        sensor.add(new MetricName(str2 + AVG_SUFFIX, str, str3, map), new Avg());
        sensor.add(new MetricName(str2 + MAX_SUFFIX, str, str4, map), new Max());
    }

    public static void addMinAndMaxToSensor(Sensor sensor, String str, Map<String, String> map, String str2, String str3, String str4) {
        sensor.add(new MetricName(str2 + MIN_SUFFIX, str, str3, map), new Min());
        sensor.add(new MetricName(str2 + MAX_SUFFIX, str, str4, map), new Max());
    }

    public static void addAvgAndMaxLatencyToSensor(Sensor sensor, String str, Map<String, String> map, String str2) {
        sensor.add(new MetricName(str2 + "-latency-avg", str, AVG_LATENCY_DESCRIPTION + str2 + " operation.", map), new Avg());
        sensor.add(new MetricName(str2 + "-latency-max", str, MAX_LATENCY_DESCRIPTION + str2 + " operation.", map), new Max());
    }

    public static void addAvgAndMinAndMaxToSensor(Sensor sensor, String str, Map<String, String> map, String str2, String str3, String str4, String str5) {
        addAvgAndMaxToSensor(sensor, str, map, str2, str3, str5);
        sensor.add(new MetricName(str2 + MIN_SUFFIX, str, str4, map), new Min());
    }

    public static void addInvocationRateAndCountToSensor(Sensor sensor, String str, Map<String, String> map, String str2, String str3, String str4) {
        addInvocationRateToSensor(sensor, str, map, str2, str3);
        sensor.add(new MetricName(str2 + TOTAL_SUFFIX, str, str4, map), new CumulativeCount());
    }

    public static void addInvocationRateToSensor(Sensor sensor, String str, Map<String, String> map, String str2, String str3) {
        sensor.add(new MetricName(str2 + RATE_SUFFIX, str, str3, map), new Rate(TimeUnit.SECONDS, new WindowedCount()));
    }

    public static void addInvocationRateAndCountToSensor(Sensor sensor, String str, Map<String, String> map, String str2) {
        addInvocationRateAndCountToSensor(sensor, str, map, str2, RATE_DESCRIPTION + str2, TOTAL_DESCRIPTION + str2);
    }

    public static void addRateOfSumAndSumMetricsToSensor(Sensor sensor, String str, Map<String, String> map, String str2, String str3, String str4) {
        addRateOfSumMetricToSensor(sensor, str, map, str2, str3);
        addSumMetricToSensor(sensor, str, map, str2, str4);
    }

    public static void addRateOfSumMetricToSensor(Sensor sensor, String str, Map<String, String> map, String str2, String str3) {
        sensor.add(new MetricName(str2 + RATE_SUFFIX, str, str3, map), new Rate(TimeUnit.SECONDS, new WindowedSum()));
    }

    public static void addSumMetricToSensor(Sensor sensor, String str, Map<String, String> map, String str2, String str3) {
        addSumMetricToSensor(sensor, str, map, str2, true, str3);
    }

    public static void addSumMetricToSensor(Sensor sensor, String str, Map<String, String> map, String str2, boolean z, String str3) {
        sensor.add(new MetricName(z ? str2 + TOTAL_SUFFIX : str2, str, str3, map), new CumulativeSum());
    }

    public static void addValueMetricToSensor(Sensor sensor, String str, Map<String, String> map, String str2, String str3) {
        sensor.add(new MetricName(str2, str, str3, map), new Value());
    }

    public static void addAvgAndSumMetricsToSensor(Sensor sensor, String str, Map<String, String> map, String str2, String str3, String str4) {
        sensor.add(new MetricName(str2 + AVG_SUFFIX, str, str3, map), new Avg());
        sensor.add(new MetricName(str2 + TOTAL_SUFFIX, str, str4, map), new CumulativeSum());
    }

    public static void maybeMeasureLatency(Runnable runnable, Time time, Sensor sensor) {
        if (!sensor.shouldRecord() || !sensor.hasMetrics()) {
            runnable.run();
            return;
        }
        long nanoseconds = time.nanoseconds();
        try {
            runnable.run();
            sensor.record(time.nanoseconds() - nanoseconds);
        } catch (Throwable th) {
            sensor.record(time.nanoseconds() - nanoseconds);
            throw th;
        }
    }

    public static <T> T maybeMeasureLatency(Supplier<T> supplier, Time time, Sensor sensor) {
        if (!sensor.shouldRecord() || !sensor.hasMetrics()) {
            return supplier.get();
        }
        long nanoseconds = time.nanoseconds();
        try {
            T t = supplier.get();
            sensor.record(time.nanoseconds() - nanoseconds);
            return t;
        } catch (Throwable th) {
            sensor.record(time.nanoseconds() - nanoseconds);
            throw th;
        }
    }

    private Sensor getSensors(Map<String, Deque<String>> map, String str, String str2, Sensor.RecordingLevel recordingLevel, Sensor... sensorArr) {
        String str3 = str2 + SENSOR_NAME_DELIMITER + str;
        Sensor sensor = this.metrics.getSensor(str3);
        if (sensor != null) {
            return sensor;
        }
        map.computeIfAbsent(str2, str4 -> {
            return new LinkedList();
        }).push(str3);
        return this.metrics.sensor(str3, recordingLevel, sensorArr);
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public void removeSensor(Sensor sensor) {
        Objects.requireNonNull(sensor, "Sensor is null");
        this.metrics.removeSensor(sensor.name());
        Sensor remove = this.parentSensors.remove(sensor);
        if (remove != null) {
            this.metrics.removeSensor(remove.name());
        }
    }

    Map<Sensor, Sensor> parentSensors() {
        return Collections.unmodifiableMap(this.parentSensors);
    }

    private static String groupNameFromScope(String str) {
        return GROUP_PREFIX + str + GROUP_SUFFIX;
    }
}
