/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.source.metrics;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSourceReaderMetrics {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceReaderMetrics.class);
    public static final String KAFKA_SOURCE_READER_METRIC_GROUP = "KafkaSourceReader";
    public static final String TOPIC_GROUP = "topic";
    public static final String PARTITION_GROUP = "partition";
    public static final String CURRENT_OFFSET_METRIC_GAUGE = "currentOffset";
    public static final String COMMITTED_OFFSET_METRIC_GAUGE = "committedOffset";
    public static final String COMMITS_SUCCEEDED_METRIC_COUNTER = "commitsSucceeded";
    public static final String COMMITS_FAILED_METRIC_COUNTER = "commitsFailed";
    public static final String KAFKA_CONSUMER_METRIC_GROUP = "KafkaConsumer";
    public static final long INITIAL_OFFSET = -1L;
    private final MetricGroup kafkaSourceReaderMetricGroup;
    private final Counter commitsSucceeded;
    private final Counter commitsFailed;
    private final Map<TopicPartition, Offset> offsets = new HashMap<TopicPartition, Offset>();

    public KafkaSourceReaderMetrics(MetricGroup parentMetricGroup) {
        this.kafkaSourceReaderMetricGroup = parentMetricGroup.addGroup(KAFKA_SOURCE_READER_METRIC_GROUP);
        this.commitsSucceeded = this.kafkaSourceReaderMetricGroup.counter(COMMITS_SUCCEEDED_METRIC_COUNTER);
        this.commitsFailed = this.kafkaSourceReaderMetricGroup.counter(COMMITS_FAILED_METRIC_COUNTER);
    }

    public void registerKafkaConsumerMetrics(KafkaConsumer<?, ?> kafkaConsumer) {
        Map<MetricName, Metric> kafkaConsumerMetrics = kafkaConsumer.metrics();
        if (kafkaConsumerMetrics == null) {
            LOG.warn("Consumer implementation does not support metrics");
            return;
        }
        MetricGroup kafkaConsumerMetricGroup = this.kafkaSourceReaderMetricGroup.addGroup(KAFKA_CONSUMER_METRIC_GROUP);
        kafkaConsumerMetrics.forEach((name, metric) -> kafkaConsumerMetricGroup.gauge(name.name(), () -> metric.metricValue()));
    }

    public void registerTopicPartition(TopicPartition tp) {
        this.offsets.put(tp, new Offset(-1L, -1L));
        this.registerOffsetMetricsForTopicPartition(tp);
    }

    public void recordCurrentOffset(TopicPartition tp, long offset) {
        this.checkTopicPartitionTracked(tp);
        this.offsets.get((Object)tp).currentOffset = offset;
    }

    public void recordCommittedOffset(TopicPartition tp, long offset) {
        this.checkTopicPartitionTracked(tp);
        this.offsets.get((Object)tp).committedOffset = offset;
    }

    public void recordSucceededCommit() {
        this.commitsSucceeded.inc();
    }

    public void recordFailedCommit() {
        this.commitsFailed.inc();
    }

    private void registerOffsetMetricsForTopicPartition(TopicPartition tp) {
        MetricGroup topicPartitionGroup = this.kafkaSourceReaderMetricGroup.addGroup(TOPIC_GROUP, tp.topic()).addGroup(PARTITION_GROUP, String.valueOf(tp.partition()));
        topicPartitionGroup.gauge(CURRENT_OFFSET_METRIC_GAUGE, () -> this.offsets.getOrDefault((Object)tp, (Offset)new Offset((long)-1L, (long)-1L)).currentOffset);
        topicPartitionGroup.gauge(COMMITTED_OFFSET_METRIC_GAUGE, () -> this.offsets.getOrDefault((Object)tp, (Offset)new Offset((long)-1L, (long)-1L)).committedOffset);
    }

    private void checkTopicPartitionTracked(TopicPartition tp) {
        if (!this.offsets.containsKey(tp)) {
            throw new IllegalArgumentException(String.format("TopicPartition %s is not tracked", tp));
        }
    }

    private static class Offset {
        long currentOffset;
        long committedOffset;

        Offset(long currentOffset, long committedOffset) {
            this.currentOffset = currentOffset;
            this.committedOffset = committedOffset;
        }
    }
}

