package org.apache.pulsar.broker.stats.prometheus;

import io.netty.util.concurrent.FastThreadLocal;
import java.util.Optional;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.CompactorMXBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.class */
public class NamespaceStatsAggregator {
    private static final Logger log = LoggerFactory.getLogger(NamespaceStatsAggregator.class);
    private static FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats = new FastThreadLocal<AggregatedNamespaceStats>() { // from class: org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: initialValue, reason: merged with bridge method [inline-methods] */
        public AggregatedNamespaceStats m495initialValue() throws Exception {
            return new AggregatedNamespaceStats();
        }
    };
    private static FastThreadLocal<TopicStats> localTopicStats = new FastThreadLocal<TopicStats>() { // from class: org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.2
        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: initialValue, reason: merged with bridge method [inline-methods] */
        public TopicStats m496initialValue() throws Exception {
            return new TopicStats();
        }
    };

    public static void generate(PulsarService pulsarService, boolean z, boolean z2, boolean z3, boolean z4, SimpleTextOutputStream simpleTextOutputStream) {
        String clusterName = pulsarService.getConfiguration().getClusterName();
        AggregatedNamespaceStats aggregatedNamespaceStats = (AggregatedNamespaceStats) localNamespaceStats.get();
        TopicStats.resetTypes();
        TopicStats topicStats = (TopicStats) localTopicStats.get();
        printDefaultBrokerStats(simpleTextOutputStream, clusterName);
        Optional<CompactorMXBean> compactorMXBean = getCompactorMXBean(pulsarService);
        LongAdder longAdder = new LongAdder();
        pulsarService.getBrokerService().getMultiLayerTopicMap().forEach((str, concurrentOpenHashMap) -> {
            aggregatedNamespaceStats.reset();
            longAdder.reset();
            concurrentOpenHashMap.forEach((str, concurrentOpenHashMap) -> {
                concurrentOpenHashMap.forEach((str, topic) -> {
                    getTopicStats(topic, topicStats, z2, z3, pulsarService.getConfiguration().isExposePreciseBacklogInPrometheus(), pulsarService.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus(), compactorMXBean);
                    if (!z) {
                        aggregatedNamespaceStats.updateStats(topicStats);
                    } else {
                        longAdder.add(1L);
                        TopicStats.printTopicStats(simpleTextOutputStream, clusterName, str, str, topicStats, compactorMXBean, z4);
                    }
                });
            });
            if (z) {
                printTopicsCountStats(simpleTextOutputStream, clusterName, str, longAdder);
            } else {
                printNamespaceStats(simpleTextOutputStream, clusterName, str, aggregatedNamespaceStats);
            }
        });
    }

    private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService pulsarService) {
        Compactor compactor = null;
        try {
            compactor = pulsarService.getCompactor(false);
        } catch (PulsarServerException e) {
            log.error("get compactor error", e);
        }
        return Optional.ofNullable(compactor).map(compactor2 -> {
            return compactor2.getStats();
        });
    }

    private static void getTopicStats(Topic topic, TopicStats topicStats, boolean z, boolean z2, boolean z3, boolean z4, Optional<CompactorMXBean> optional) {
        topicStats.reset();
        if (topic instanceof PersistentTopic) {
            ManagedLedger managedLedger = ((PersistentTopic) topic).getManagedLedger();
            ManagedLedgerMBeanImpl stats = managedLedger.getStats();
            topicStats.managedLedgerStats.storageSize = stats.getStoredMessagesSize();
            topicStats.managedLedgerStats.storageLogicalSize = stats.getStoredMessagesLogicalSize();
            topicStats.managedLedgerStats.backlogSize = managedLedger.getEstimatedBacklogSize();
            topicStats.managedLedgerStats.offloadedStorageUsed = managedLedger.getOffloadedSize();
            topicStats.backlogQuotaLimit = topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize();
            topicStats.backlogQuotaLimitTime = topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime();
            topicStats.managedLedgerStats.storageWriteLatencyBuckets.addAll(stats.getInternalAddEntryLatencyBuckets());
            topicStats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
            topicStats.managedLedgerStats.storageLedgerWriteLatencyBuckets.addAll(stats.getInternalLedgerAddEntryLatencyBuckets());
            topicStats.managedLedgerStats.storageLedgerWriteLatencyBuckets.refresh();
            topicStats.managedLedgerStats.entrySizeBuckets.addAll(stats.getInternalEntrySizeBuckets());
            topicStats.managedLedgerStats.entrySizeBuckets.refresh();
            topicStats.managedLedgerStats.storageWriteRate = stats.getAddEntryMessagesRate();
            topicStats.managedLedgerStats.storageReadRate = stats.getReadEntriesRate();
        }
        TopicStatsImpl mo147getStats = topic.mo147getStats(z3, z4);
        topicStats.msgInCounter = mo147getStats.msgInCounter;
        topicStats.bytesInCounter = mo147getStats.bytesInCounter;
        topicStats.msgOutCounter = mo147getStats.msgOutCounter;
        topicStats.bytesOutCounter = mo147getStats.bytesOutCounter;
        topicStats.averageMsgSize = mo147getStats.averageMsgSize;
        topicStats.publishRateLimitedTimes = mo147getStats.publishRateLimitedTimes;
        topicStats.producersCount = 0;
        topic.getProducers().values().forEach(producer -> {
            if (producer.isRemote()) {
                AggregatedReplicationStats computeIfAbsent = topicStats.replicationStats.computeIfAbsent(producer.getRemoteCluster(), str -> {
                    return new AggregatedReplicationStats();
                });
                computeIfAbsent.msgRateIn += producer.getStats().msgRateIn;
                computeIfAbsent.msgThroughputIn += producer.getStats().msgThroughputIn;
                return;
            }
            topicStats.producersCount++;
            topicStats.rateIn += producer.getStats().msgRateIn;
            topicStats.throughputIn += producer.getStats().msgThroughputIn;
            if (z2) {
                AggregatedProducerStats computeIfAbsent2 = topicStats.producerStats.computeIfAbsent(producer.getProducerName(), str2 -> {
                    return new AggregatedProducerStats();
                });
                computeIfAbsent2.producerId = producer.getStats().producerId;
                computeIfAbsent2.msgRateIn = producer.getStats().msgRateIn;
                computeIfAbsent2.msgThroughputIn = producer.getStats().msgThroughputIn;
                computeIfAbsent2.averageMsgSize = producer.getStats().averageMsgSize;
            }
        });
        mo147getStats.subscriptions.forEach((str, subscriptionStatsImpl) -> {
            topicStats.subscriptionsCount++;
            topicStats.msgBacklog += subscriptionStatsImpl.msgBacklog;
            AggregatedSubscriptionStats computeIfAbsent = topicStats.subscriptionStats.computeIfAbsent(str, str -> {
                return new AggregatedSubscriptionStats();
            });
            computeIfAbsent.msgBacklog = subscriptionStatsImpl.msgBacklog;
            computeIfAbsent.msgDelayed = subscriptionStatsImpl.msgDelayed;
            computeIfAbsent.msgRateExpired = subscriptionStatsImpl.msgRateExpired;
            computeIfAbsent.totalMsgExpired = subscriptionStatsImpl.totalMsgExpired;
            computeIfAbsent.msgBacklogNoDelayed = computeIfAbsent.msgBacklog - computeIfAbsent.msgDelayed;
            computeIfAbsent.lastExpireTimestamp = subscriptionStatsImpl.lastExpireTimestamp;
            computeIfAbsent.lastAckedTimestamp = subscriptionStatsImpl.lastAckedTimestamp;
            computeIfAbsent.lastConsumedFlowTimestamp = subscriptionStatsImpl.lastConsumedFlowTimestamp;
            computeIfAbsent.lastConsumedTimestamp = subscriptionStatsImpl.lastConsumedTimestamp;
            computeIfAbsent.lastMarkDeleteAdvancedTimestamp = subscriptionStatsImpl.lastMarkDeleteAdvancedTimestamp;
            subscriptionStatsImpl.consumers.forEach(consumerStatsImpl -> {
                topicStats.consumersCount++;
                computeIfAbsent.unackedMessages += consumerStatsImpl.unackedMessages;
                computeIfAbsent.msgRateRedeliver += consumerStatsImpl.msgRateRedeliver;
                computeIfAbsent.msgRateOut += consumerStatsImpl.msgRateOut;
                computeIfAbsent.msgThroughputOut += consumerStatsImpl.msgThroughputOut;
                computeIfAbsent.bytesOutCounter += consumerStatsImpl.bytesOutCounter;
                computeIfAbsent.msgOutCounter += consumerStatsImpl.msgOutCounter;
                if (computeIfAbsent.blockedSubscriptionOnUnackedMsgs || !consumerStatsImpl.blockedConsumerOnUnackedMsgs) {
                    return;
                }
                computeIfAbsent.blockedSubscriptionOnUnackedMsgs = true;
            });
            topicStats.rateOut += computeIfAbsent.msgRateOut;
            topicStats.throughputOut += computeIfAbsent.msgThroughputOut;
        });
        if (z) {
            topic.getSubscriptions().forEach((str2, subscription) -> {
                AggregatedSubscriptionStats computeIfAbsent = topicStats.subscriptionStats.computeIfAbsent(str2, str2 -> {
                    return new AggregatedSubscriptionStats();
                });
                subscription.getConsumers().forEach(consumer -> {
                    ConsumerStatsImpl stats2 = consumer.getStats();
                    AggregatedConsumerStats computeIfAbsent2 = computeIfAbsent.consumerStat.computeIfAbsent(consumer, consumer -> {
                        return new AggregatedConsumerStats();
                    });
                    computeIfAbsent2.unackedMessages = stats2.unackedMessages;
                    computeIfAbsent2.msgRateRedeliver = stats2.msgRateRedeliver;
                    computeIfAbsent2.msgRateOut = stats2.msgRateOut;
                    computeIfAbsent2.msgAckRate = stats2.messageAckRate;
                    computeIfAbsent2.msgThroughputOut = stats2.msgThroughputOut;
                    computeIfAbsent2.bytesOutCounter = stats2.bytesOutCounter;
                    computeIfAbsent2.msgOutCounter = stats2.msgOutCounter;
                    computeIfAbsent2.availablePermits = stats2.availablePermits;
                    computeIfAbsent2.blockedSubscriptionOnUnackedMsgs = stats2.blockedConsumerOnUnackedMsgs;
                });
            });
        }
        topic.getReplicators().forEach((str3, replicator) -> {
            AggregatedReplicationStats computeIfAbsent = topicStats.replicationStats.computeIfAbsent(str3, str3 -> {
                return new AggregatedReplicationStats();
            });
            ReplicatorStatsImpl mo138getStats = replicator.mo138getStats();
            computeIfAbsent.msgRateOut += mo138getStats.msgRateOut;
            computeIfAbsent.msgThroughputOut += mo138getStats.msgThroughputOut;
            computeIfAbsent.replicationBacklog += mo138getStats.replicationBacklog;
            computeIfAbsent.msgRateIn += mo138getStats.msgRateIn;
            computeIfAbsent.msgThroughputIn += mo138getStats.msgThroughputIn;
            computeIfAbsent.msgRateExpired += mo138getStats.msgRateExpired;
            computeIfAbsent.connectedCount += mo138getStats.connected ? 1L : 0L;
            computeIfAbsent.replicationDelayInSeconds += mo138getStats.replicationDelayInSeconds;
        });
        optional.flatMap(compactorMXBean -> {
            return compactorMXBean.getCompactionRecordForTopic(topic.getName());
        }).map(compactionRecord -> {
            topicStats.compactionRemovedEventCount = compactionRecord.getCompactionRemovedEventCount();
            topicStats.compactionSucceedCount = compactionRecord.getCompactionSucceedCount();
            topicStats.compactionFailedCount = compactionRecord.getCompactionFailedCount();
            topicStats.compactionDurationTimeInMills = compactionRecord.getCompactionDurationTimeInMills();
            topicStats.compactionReadThroughput = compactionRecord.getCompactionReadThroughput();
            topicStats.compactionWriteThroughput = compactionRecord.getCompactionWriteThroughput();
            topicStats.compactionLatencyBuckets.addAll(compactionRecord.getCompactionLatencyStats());
            topicStats.compactionLatencyBuckets.refresh();
            Optional<CompactedTopicContext> compactedTopicContext = ((PersistentTopic) topic).getCompactedTopicContext();
            if (compactedTopicContext.isPresent()) {
                LedgerHandle ledger = compactedTopicContext.get().getLedger();
                long lastAddConfirmed = ledger.getLastAddConfirmed() + 1;
                long length = ledger.getLength();
                topicStats.compactionCompactedEntriesCount = lastAddConfirmed;
                topicStats.compactionCompactedEntriesSize = length;
            }
            return compactionRecord;
        });
    }

    private static void printDefaultBrokerStats(SimpleTextOutputStream simpleTextOutputStream, String str) {
        metric(simpleTextOutputStream, str, "pulsar_topics_count", 0L);
        metric(simpleTextOutputStream, str, "pulsar_subscriptions_count", 0L);
        metric(simpleTextOutputStream, str, "pulsar_producers_count", 0L);
        metric(simpleTextOutputStream, str, "pulsar_consumers_count", 0L);
        metric(simpleTextOutputStream, str, "pulsar_rate_in", 0L);
        metric(simpleTextOutputStream, str, "pulsar_rate_out", 0L);
        metric(simpleTextOutputStream, str, "pulsar_throughput_in", 0L);
        metric(simpleTextOutputStream, str, "pulsar_throughput_out", 0L);
        metric(simpleTextOutputStream, str, "pulsar_storage_size", 0L);
        metric(simpleTextOutputStream, str, "pulsar_storage_logical_size", 0L);
        metric(simpleTextOutputStream, str, "pulsar_storage_write_rate", 0L);
        metric(simpleTextOutputStream, str, "pulsar_storage_read_rate", 0L);
        metric(simpleTextOutputStream, str, "pulsar_msg_backlog", 0L);
    }

    private static void printTopicsCountStats(SimpleTextOutputStream simpleTextOutputStream, String str, String str2, LongAdder longAdder) {
        metric(simpleTextOutputStream, str, str2, "pulsar_topics_count", longAdder.sum());
    }

    private static void printNamespaceStats(SimpleTextOutputStream simpleTextOutputStream, String str, String str2, AggregatedNamespaceStats aggregatedNamespaceStats) {
        metric(simpleTextOutputStream, str, str2, "pulsar_topics_count", aggregatedNamespaceStats.topicsCount);
        metric(simpleTextOutputStream, str, str2, "pulsar_subscriptions_count", aggregatedNamespaceStats.subscriptionsCount);
        metric(simpleTextOutputStream, str, str2, "pulsar_producers_count", aggregatedNamespaceStats.producersCount);
        metric(simpleTextOutputStream, str, str2, "pulsar_consumers_count", aggregatedNamespaceStats.consumersCount);
        metric(simpleTextOutputStream, str, str2, "pulsar_rate_in", aggregatedNamespaceStats.rateIn);
        metric(simpleTextOutputStream, str, str2, "pulsar_rate_out", aggregatedNamespaceStats.rateOut);
        metric(simpleTextOutputStream, str, str2, "pulsar_throughput_in", aggregatedNamespaceStats.throughputIn);
        metric(simpleTextOutputStream, str, str2, "pulsar_throughput_out", aggregatedNamespaceStats.throughputOut);
        metric(simpleTextOutputStream, str, str2, "pulsar_consumer_msg_ack_rate", aggregatedNamespaceStats.messageAckRate);
        metric(simpleTextOutputStream, str, str2, "pulsar_in_bytes_total", aggregatedNamespaceStats.bytesInCounter);
        metric(simpleTextOutputStream, str, str2, "pulsar_in_messages_total", aggregatedNamespaceStats.msgInCounter);
        metric(simpleTextOutputStream, str, str2, "pulsar_out_bytes_total", aggregatedNamespaceStats.bytesOutCounter);
        metric(simpleTextOutputStream, str, str2, "pulsar_out_messages_total", aggregatedNamespaceStats.msgOutCounter);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_size", aggregatedNamespaceStats.managedLedgerStats.storageSize);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_logical_size", aggregatedNamespaceStats.managedLedgerStats.storageLogicalSize);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_backlog_size", aggregatedNamespaceStats.managedLedgerStats.backlogSize);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_offloaded_size", aggregatedNamespaceStats.managedLedgerStats.offloadedStorageUsed);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_rate", aggregatedNamespaceStats.managedLedgerStats.storageWriteRate);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_read_rate", aggregatedNamespaceStats.managedLedgerStats.storageReadRate);
        metric(simpleTextOutputStream, str, str2, "pulsar_subscription_delayed", aggregatedNamespaceStats.msgDelayed);
        metricWithRemoteCluster(simpleTextOutputStream, str, str2, "pulsar_msg_backlog", "local", aggregatedNamespaceStats.msgBacklog);
        aggregatedNamespaceStats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
        long[] buckets = aggregatedNamespaceStats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_le_0_5", buckets[0]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_le_1", buckets[1]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_le_5", buckets[2]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_le_10", buckets[3]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_le_20", buckets[4]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_le_50", buckets[5]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_le_100", buckets[6]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_le_200", buckets[7]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_le_1000", buckets[8]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_overflow", buckets[9]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_count", aggregatedNamespaceStats.managedLedgerStats.storageWriteLatencyBuckets.getCount());
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_sum", aggregatedNamespaceStats.managedLedgerStats.storageWriteLatencyBuckets.getSum());
        aggregatedNamespaceStats.managedLedgerStats.storageLedgerWriteLatencyBuckets.refresh();
        long[] buckets2 = aggregatedNamespaceStats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_ledger_write_latency_le_0_5", buckets2[0]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_ledger_write_latency_le_1", buckets2[1]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_ledger_write_latency_le_5", buckets2[2]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_ledger_write_latency_le_10", buckets2[3]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_ledger_write_latency_le_20", buckets2[4]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_ledger_write_latency_le_50", buckets2[5]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_ledger_write_latency_le_100", buckets2[6]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_ledger_write_latency_le_200", buckets2[7]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_ledger_write_latency_le_1000", buckets2[8]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_ledger_write_latency_overflow", buckets2[9]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_ledger_write_latency_count", aggregatedNamespaceStats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount());
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_ledger_write_latency_sum", aggregatedNamespaceStats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum());
        aggregatedNamespaceStats.managedLedgerStats.entrySizeBuckets.refresh();
        long[] buckets3 = aggregatedNamespaceStats.managedLedgerStats.entrySizeBuckets.getBuckets();
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_le_128", buckets3[0]);
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_le_512", buckets3[1]);
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_le_1_kb", buckets3[2]);
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_le_2_kb", buckets3[3]);
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_le_4_kb", buckets3[4]);
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_le_16_kb", buckets3[5]);
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_le_100_kb", buckets3[6]);
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_le_1_mb", buckets3[7]);
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_le_overflow", buckets3[8]);
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_count", aggregatedNamespaceStats.managedLedgerStats.entrySizeBuckets.getCount());
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_sum", aggregatedNamespaceStats.managedLedgerStats.entrySizeBuckets.getSum());
        if (aggregatedNamespaceStats.replicationStats.isEmpty()) {
            return;
        }
        aggregatedNamespaceStats.replicationStats.forEach((str3, aggregatedReplicationStats) -> {
            metricWithRemoteCluster(simpleTextOutputStream, str, str2, "pulsar_replication_rate_in", str3, aggregatedReplicationStats.msgRateIn);
            metricWithRemoteCluster(simpleTextOutputStream, str, str2, "pulsar_replication_rate_out", str3, aggregatedReplicationStats.msgRateOut);
            metricWithRemoteCluster(simpleTextOutputStream, str, str2, "pulsar_replication_throughput_in", str3, aggregatedReplicationStats.msgThroughputIn);
            metricWithRemoteCluster(simpleTextOutputStream, str, str2, "pulsar_replication_throughput_out", str3, aggregatedReplicationStats.msgThroughputOut);
            metricWithRemoteCluster(simpleTextOutputStream, str, str2, "pulsar_replication_backlog", str3, aggregatedReplicationStats.replicationBacklog);
            metricWithRemoteCluster(simpleTextOutputStream, str, str2, "pulsar_replication_connected_count", str3, aggregatedReplicationStats.connectedCount);
            metricWithRemoteCluster(simpleTextOutputStream, str, str2, "pulsar_replication_rate_expired", str3, aggregatedReplicationStats.msgRateExpired);
            metricWithRemoteCluster(simpleTextOutputStream, str, str2, "pulsar_replication_delay_in_seconds", str3, aggregatedReplicationStats.replicationDelayInSeconds);
        });
    }

    private static void metric(SimpleTextOutputStream simpleTextOutputStream, String str, String str2, long j) {
        TopicStats.metricType(simpleTextOutputStream, str2);
        simpleTextOutputStream.write(str2).write("{cluster=\"").write(str).write("\"} ").write(j).write(' ').write(System.currentTimeMillis()).write('\n');
    }

    private static void metric(SimpleTextOutputStream simpleTextOutputStream, String str, String str2, String str3, long j) {
        TopicStats.metricType(simpleTextOutputStream, str3);
        simpleTextOutputStream.write(str3).write("{cluster=\"").write(str).write("\",namespace=\"").write(str2).write("\"} ");
        simpleTextOutputStream.write(j).write(' ').write(System.currentTimeMillis()).write('\n');
    }

    private static void metric(SimpleTextOutputStream simpleTextOutputStream, String str, String str2, String str3, double d) {
        TopicStats.metricType(simpleTextOutputStream, str3);
        simpleTextOutputStream.write(str3).write("{cluster=\"").write(str).write("\",namespace=\"").write(str2).write("\"} ");
        simpleTextOutputStream.write(d).write(' ').write(System.currentTimeMillis()).write('\n');
    }

    private static void metricWithRemoteCluster(SimpleTextOutputStream simpleTextOutputStream, String str, String str2, String str3, String str4, double d) {
        TopicStats.metricType(simpleTextOutputStream, str3);
        simpleTextOutputStream.write(str3).write("{cluster=\"").write(str).write("\",namespace=\"").write(str2);
        simpleTextOutputStream.write("\",remote_cluster=\"").write(str4).write("\"} ");
        simpleTextOutputStream.write(d).write(' ').write(System.currentTimeMillis()).write('\n');
    }
}
