/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.stats;

import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class TransactionMetricsTest
extends BrokerTestBase {
    @Override
    @BeforeMethod(alwaysRun=true)
    protected void setup() throws Exception {
        ServiceConfiguration serviceConfiguration = TransactionMetricsTest.getDefaultConf();
        serviceConfiguration.setTransactionCoordinatorEnabled(true);
        super.baseSetup(serviceConfiguration);
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testTransactionCoordinatorMetrics() throws Exception {
        long timeout = 10000L;
        TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get((long)1L);
        TransactionCoordinatorID transactionCoordinatorIDTwo = TransactionCoordinatorID.get((long)2L);
        this.pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(transactionCoordinatorIDOne);
        this.pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(transactionCoordinatorIDTwo);
        Awaitility.await().atMost(2000L, TimeUnit.MILLISECONDS).until(() -> this.pulsar.getTransactionMetadataStoreService().getStores().size() == 2);
        ((TransactionMetadataStore)this.pulsar.getTransactionMetadataStoreService().getStores().get(transactionCoordinatorIDOne)).newTransaction(timeout).get();
        ((TransactionMetadataStore)this.pulsar.getTransactionMetadataStoreService().getStores().get(transactionCoordinatorIDTwo)).newTransaction(timeout).get();
        ((TransactionMetadataStore)this.pulsar.getTransactionMetadataStoreService().getStores().get(transactionCoordinatorIDTwo)).newTransaction(timeout).get();
        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate((PulsarService)this.pulsar, (boolean)true, (boolean)false, (boolean)false, (OutputStream)statsOut);
        String metricsStr = statsOut.toString();
        Multimap<String, PrometheusMetricsTest.Metric> metrics = PrometheusMetricsTest.parseMetrics(metricsStr);
        Collection metric = metrics.get((Object)"pulsar_txn_active_count");
        Assert.assertEquals((int)metric.size(), (int)2);
        metric.forEach(item -> {
            if ("1".equals(item.tags.get("coordinator_id"))) {
                Assert.assertEquals((double)item.value, (double)1.0);
            } else {
                Assert.assertEquals((double)item.value, (double)2.0);
            }
        });
    }

    @Test
    public void testTransactionCoordinatorRateMetrics() throws Exception {
        int i;
        long timeout = 10000L;
        int txnCount = 120;
        String ns1 = "prop/ns-abc1";
        this.admin.namespaces().createNamespace(ns1);
        String topic = "persistent://" + ns1 + "/test_coordinator_metrics";
        String subName = "test_coordinator_metrics";
        TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get((long)1L);
        this.pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(transactionCoordinatorIDOne);
        this.admin.topics().createNonPartitionedTopic(topic);
        this.admin.topics().createSubscription(topic, subName, MessageId.earliest);
        Awaitility.await().atMost(2000L, TimeUnit.MILLISECONDS).until(() -> this.pulsar.getTransactionMetadataStoreService().getStores().size() == 1);
        ArrayList<TxnID> list = new ArrayList<TxnID>();
        for (i = 0; i < txnCount; ++i) {
            TxnID txnID = (TxnID)((TransactionMetadataStore)this.pulsar.getTransactionMetadataStoreService().getStores().get(transactionCoordinatorIDOne)).newTransaction(timeout).get();
            list.add(txnID);
            if (i % 2 == 0) {
                this.pulsar.getTransactionMetadataStoreService().addProducedPartitionToTxn((TxnID)list.get(i), Collections.singletonList(topic)).get();
                continue;
            }
            this.pulsar.getTransactionMetadataStoreService().addAckedPartitionToTxn((TxnID)list.get(i), Collections.singletonList(TransactionSubscription.builder().topic(topic).subscription(subName).build())).get();
        }
        for (i = 0; i < txnCount; ++i) {
            if (i % 2 == 0) {
                this.pulsar.getTransactionMetadataStoreService().endTransaction((TxnID)list.get(i), 0, false).get();
                continue;
            }
            this.pulsar.getTransactionMetadataStoreService().endTransaction((TxnID)list.get(i), 1, false).get();
        }
        this.pulsar.getBrokerService().updateRates();
        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate((PulsarService)this.pulsar, (boolean)true, (boolean)false, (boolean)false, (OutputStream)statsOut);
        String metricsStr = statsOut.toString();
        Multimap<String, PrometheusMetricsTest.Metric> metrics = PrometheusMetricsTest.parseMetrics(metricsStr);
        Collection metric = metrics.get((Object)"pulsar_txn_created_count");
        Assert.assertEquals((int)metric.size(), (int)1);
        metric.forEach(item -> Assert.assertEquals((double)item.value, (double)txnCount));
        metric = metrics.get((Object)"pulsar_txn_committed_count");
        Assert.assertEquals((int)metric.size(), (int)1);
        metric.forEach(item -> Assert.assertEquals((double)item.value, (double)(txnCount / 2)));
        metric = metrics.get((Object)"pulsar_txn_aborted_count");
        Assert.assertEquals((int)metric.size(), (int)1);
        metric.forEach(item -> Assert.assertEquals((double)item.value, (double)(txnCount / 2)));
        TxnID txnID = (TxnID)((TransactionMetadataStore)this.pulsar.getTransactionMetadataStoreService().getStores().get(transactionCoordinatorIDOne)).newTransaction(1000L).get();
        Awaitility.await().atMost(2000L, TimeUnit.SECONDS).until(() -> {
            try {
                TxnMeta txnMeta = (TxnMeta)((TransactionMetadataStore)this.pulsar.getTransactionMetadataStoreService().getStores().get(transactionCoordinatorIDOne)).getTxnMeta(txnID).get();
            }
            catch (Exception e) {
                return true;
            }
            return false;
        });
        statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate((PulsarService)this.pulsar, (boolean)true, (boolean)false, (boolean)false, (OutputStream)statsOut);
        metricsStr = statsOut.toString();
        metrics = PrometheusMetricsTest.parseMetrics(metricsStr);
        metric = metrics.get((Object)"pulsar_txn_timeout_count");
        Assert.assertEquals((int)metric.size(), (int)1);
        metric.forEach(item -> Assert.assertEquals((double)item.value, (double)1.0));
        metric = metrics.get((Object)"pulsar_txn_append_log_count");
        Assert.assertEquals((int)metric.size(), (int)1);
        metric.forEach(item -> Assert.assertEquals((double)item.value, (double)(txnCount * 4 + 3)));
        metric = metrics.get((Object)"pulsar_txn_execution_latency_le_5000");
        Assert.assertEquals((int)metric.size(), (int)1);
        metric.forEach(item -> Assert.assertEquals((double)item.value, (double)1.0));
    }

    @Test
    public void testManagedLedgerMetrics() throws Exception {
        String ns1 = "prop/ns-abc1";
        this.admin.namespaces().createNamespace(ns1);
        String topic = "persistent://" + ns1 + "/test_managed_ledger_metrics";
        String subName = "test_managed_ledger_metrics";
        this.admin.topics().createNonPartitionedTopic(topic);
        this.admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), TenantInfo.builder().adminRoles((Set)Sets.newHashSet((Object[])new String[]{"appid1"})).allowedClusters((Set)Sets.newHashSet((Object[])new String[]{"test"})).build());
        this.admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
        this.admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1);
        TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get((long)0L);
        this.pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(transactionCoordinatorIDOne);
        this.admin.topics().createSubscription(topic, subName, MessageId.earliest);
        Awaitility.await().atMost(2000L, TimeUnit.MILLISECONDS).until(() -> this.pulsar.getTransactionMetadataStoreService().getStores().size() == 1);
        this.pulsarClient = PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).enableTransaction(true).build();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).receiverQueueSize(10).subscriptionName(subName).subscriptionType(SubscriptionType.Key_Shared).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic(topic).create();
        Transaction transaction = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.SECONDS).build().get();
        producer.send((Object)"hello pulsar".getBytes());
        consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();
        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate((PulsarService)this.pulsar, (boolean)true, (boolean)false, (boolean)false, (OutputStream)statsOut);
        String metricsStr = statsOut.toString();
        Multimap<String, PrometheusMetricsTest.Metric> metrics = PrometheusMetricsTest.parseMetrics(metricsStr);
        Collection metric = metrics.get((Object)"pulsar_storage_size");
        this.checkManagedLedgerMetrics(subName, 32.0, metric);
        this.checkManagedLedgerMetrics("transaction.subscription", 252.0, metric);
        metric = metrics.get((Object)"pulsar_storage_logical_size");
        this.checkManagedLedgerMetrics(subName, 16.0, metric);
        this.checkManagedLedgerMetrics("transaction.subscription", 126.0, metric);
        metric = metrics.get((Object)"pulsar_storage_backlog_size");
        this.checkManagedLedgerMetrics(subName, 16.0, metric);
        this.checkManagedLedgerMetrics("transaction.subscription", 126.0, metric);
        statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate((PulsarService)this.pulsar, (boolean)false, (boolean)false, (boolean)false, (OutputStream)statsOut);
        metricsStr = statsOut.toString();
        metrics = PrometheusMetricsTest.parseMetrics(metricsStr);
        metric = metrics.get((Object)"pulsar_storage_size");
        Assert.assertEquals((int)metric.size(), (int)3);
        metric = metrics.get((Object)"pulsar_storage_logical_size");
        Assert.assertEquals((int)metric.size(), (int)3);
        metric = metrics.get((Object)"pulsar_storage_backlog_size");
        Assert.assertEquals((int)metric.size(), (int)2);
    }

    private void checkManagedLedgerMetrics(String tag, double value, Collection<PrometheusMetricsTest.Metric> metrics) {
        boolean exist = false;
        for (PrometheusMetricsTest.Metric metric1 : metrics) {
            if (!metric1.tags.containsValue(tag)) continue;
            Assert.assertEquals((double)metric1.value, (double)value);
            exist = true;
        }
        Assert.assertTrue((boolean)exist);
    }
}

