/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.connector.kafka.source.metrics;

import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Gauge;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Tag;
import io.mantisrx.runtime.Context;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

public class ConsumerMetrics {
    private static final String METRICS_PREFIX = "MantisKafkaConsumer_";
    private static final String METRIC_KAFKA_IN_COUNT = "kafkaInCount";
    private static final String METRIC_KAFKA_PROCESSED_COUNT = "kafkaProcessedCount";
    private static final String METRIC_KAFKA_ERROR_COUNT = "kafkaErrorCount";
    private static final String METRIC_KAFKA_WAIT_FOR_DATA_COUNT = "kafkaWaitForDataCount";
    private static final String METRIC_KAFKA_COMMIT_COUNT = "kafkaCommitCount";
    private static final String METRIC_CHECKPOINT_DELAY = "checkpointDelay";
    private static final String METRIC_PARSE_FAILURE_COUNT = "parseFailureCount";
    private static final String METRIC_KAFKA_MSG_VALUE_NULL_COUNT = "kafkaMessageValueNull";
    private static final String METRIC_TIME_SINCE_LAST_POLL_MS = "timeSinceLastPollMs";
    private static final String METRIC_TIME_SINCE_LAST_POLL_WITH_DATA_MS = "timeSinceLastPollWithDataMs";
    private static final String METRIC_KAFKA_PAUSE_PARTITIONS = "kafkaPausePartitions";
    private static final String METRIC_KAFKA_RESUME_PARTITIONS = "kafkaResumePartitions";
    private final Registry registry;
    private final List<Tag> commonTags;
    private final Counter kafkaInCount;
    private final Counter kafkaProcessedCount;
    private final Counter kafkaErrorCount;
    private final Counter kafkaWaitForDataCount;
    private final Counter kafkaCommitCount;
    private final Counter parseFailureCount;
    private final Counter kafkaPausePartitions;
    private final Counter kafkaResumePartitions;
    private final Counter kafkaMsgValueNullCount;
    private final Gauge checkpointDelay;
    private final Gauge timeSinceLastPollMs;
    private final Gauge timeSinceLastPollWithDataMs;
    private final ConcurrentMap<TopicPartition, Gauge> committedOffsets = new ConcurrentHashMap<TopicPartition, Gauge>();
    private final ConcurrentMap<TopicPartition, Gauge> readOffsets = new ConcurrentHashMap<TopicPartition, Gauge>();

    public ConsumerMetrics(Registry registry, int consumerId, Context context) {
        this.registry = registry;
        this.commonTags = this.createCommonTags(context, consumerId);
        this.kafkaErrorCount = registry.counter(this.createId(METRIC_KAFKA_ERROR_COUNT));
        this.kafkaInCount = registry.counter(this.createId(METRIC_KAFKA_IN_COUNT));
        this.kafkaProcessedCount = registry.counter(this.createId(METRIC_KAFKA_PROCESSED_COUNT));
        this.kafkaWaitForDataCount = registry.counter(this.createId(METRIC_KAFKA_WAIT_FOR_DATA_COUNT));
        this.kafkaCommitCount = registry.counter(this.createId(METRIC_KAFKA_COMMIT_COUNT));
        this.checkpointDelay = registry.gauge(this.createId(METRIC_CHECKPOINT_DELAY));
        this.timeSinceLastPollMs = registry.gauge(this.createId(METRIC_TIME_SINCE_LAST_POLL_MS));
        this.timeSinceLastPollWithDataMs = registry.gauge(this.createId(METRIC_TIME_SINCE_LAST_POLL_WITH_DATA_MS));
        this.parseFailureCount = registry.counter(this.createId(METRIC_PARSE_FAILURE_COUNT));
        this.kafkaPausePartitions = registry.counter(this.createId(METRIC_KAFKA_PAUSE_PARTITIONS));
        this.kafkaResumePartitions = registry.counter(this.createId(METRIC_KAFKA_RESUME_PARTITIONS));
        this.kafkaMsgValueNullCount = registry.counter(this.createId(METRIC_KAFKA_MSG_VALUE_NULL_COUNT));
    }

    private List<Tag> createCommonTags(Context context, int consumerId) {
        return Arrays.asList(Tag.of((String)"mantisWorkerNum", (String)Integer.toString(context.getWorkerInfo().getWorkerNumber())), Tag.of((String)"mantisWorkerIndex", (String)Integer.toString(context.getWorkerInfo().getWorkerIndex())), Tag.of((String)"mantisJobName", (String)context.getWorkerInfo().getJobClusterName()), Tag.of((String)"mantisJobId", (String)context.getJobId()), Tag.of((String)"consumerId", (String)String.valueOf(consumerId)));
    }

    private Id createId(String metricName) {
        return this.registry.createId(METRICS_PREFIX + metricName, this.commonTags);
    }

    public void recordCheckpointDelay(long value) {
        this.checkpointDelay.set((double)value);
    }

    public void recordTimeSinceLastPollMs(long value) {
        this.timeSinceLastPollMs.set((double)value);
    }

    public void recordTimeSinceLastPollWithDataMs(long value) {
        this.timeSinceLastPollWithDataMs.set((double)value);
    }

    public void incrementInCount() {
        this.kafkaInCount.increment();
    }

    public void incrementProcessedCount() {
        this.kafkaProcessedCount.increment();
    }

    public void incrementErrorCount() {
        this.kafkaErrorCount.increment();
    }

    public void incrementWaitForDataCount() {
        this.kafkaWaitForDataCount.increment();
    }

    public void incrementCommitCount() {
        this.kafkaCommitCount.increment();
    }

    public void incrementParseFailureCount() {
        this.parseFailureCount.increment();
    }

    public void incrementPausePartitionCount() {
        this.kafkaPausePartitions.increment();
    }

    public void incrementResumePartitionCount() {
        this.kafkaResumePartitions.increment();
    }

    public void incrementKafkaMessageValueNullCount() {
        this.kafkaMsgValueNullCount.increment();
    }

    public void recordCommittedOffset(Map<TopicPartition, OffsetAndMetadata> checkpoint) {
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : checkpoint.entrySet()) {
            TopicPartition tp = entry.getKey();
            if (!this.committedOffsets.containsKey(tp)) {
                ArrayList<Tag> tags = new ArrayList<Tag>(this.commonTags);
                tags.add(Tag.of((String)"topic", (String)tp.topic()));
                tags.add(Tag.of((String)"partition", (String)String.valueOf(tp.partition())));
                Gauge gauge = this.registry.gauge("committedOffsets", tags);
                this.committedOffsets.putIfAbsent(tp, gauge);
            }
            ((Gauge)this.committedOffsets.get(tp)).set((double)entry.getValue().offset());
        }
    }

    public void recordReadOffset(TopicPartition tp, long offset) {
        if (!this.readOffsets.containsKey(tp)) {
            ArrayList<Tag> tags = new ArrayList<Tag>(this.commonTags);
            tags.add(Tag.of((String)"topic", (String)tp.topic()));
            tags.add(Tag.of((String)"partition", (String)String.valueOf(tp.partition())));
            Gauge gauge = this.registry.gauge("minReadOffsets", tags);
            this.readOffsets.putIfAbsent(tp, gauge);
        }
        ((Gauge)this.readOffsets.get(tp)).set((double)offset);
    }
}

