package org.apache.pulsar.broker.stats;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.TenantResources;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PoliciesUtil;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/stats/TransactionBatchWriterMetricsTest.class */
public class TransactionBatchWriterMetricsTest extends MockedPulsarServiceBaseTest {
    private final String clusterName = "test";
    private final String topicNameSuffix = "t-rest-topic";
    private final String topicName = DEFAULT_NAMESPACE.getPersistentTopicName("t-rest-topic");
    private static final Logger log = LoggerFactory.getLogger(TransactionBatchWriterMetricsTest.class);
    public static final NamespaceName DEFAULT_NAMESPACE = NamespaceName.get("public/default");

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    public void setup() throws Exception {
        MLTransactionMetadataStoreProvider.initBufferedWriterMetrics("localhost");
        MLPendingAckStoreProvider.initBufferedWriterMetrics("localhost");
        super.internalSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    public ServiceConfiguration getDefaultConf() {
        ServiceConfiguration defaultConf = super.getDefaultConf();
        defaultConf.setSystemTopicEnabled(true);
        defaultConf.setTransactionCoordinatorEnabled(true);
        defaultConf.setTransactionPendingAckBatchedWriteEnabled(true);
        defaultConf.setTransactionPendingAckBatchedWriteMaxRecords(10);
        defaultConf.setTransactionLogBatchedWriteEnabled(true);
        defaultConf.setTransactionLogBatchedWriteMaxRecords(10);
        defaultConf.setBrokerShutdownTimeoutMs(5000L);
        return defaultConf;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    public void startBroker() throws Exception {
        super.startBroker();
        ensureClusterExists(this.pulsar, "test");
        ensureTenantExists(this.pulsar.getPulsarResources().getTenantResources(), "public", "test");
        ensureNamespaceExists(this.pulsar.getPulsarResources().getNamespaceResources(), DEFAULT_NAMESPACE, "test");
        ensureNamespaceExists(this.pulsar.getPulsarResources().getNamespaceResources(), NamespaceName.SYSTEM_NAMESPACE, "test");
        ensureTopicExists(this.pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources(), SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, 16);
    }

    @Test
    public void testTransactionMetaLogMetrics() throws Exception {
        String str = this.pulsar.getAdvertisedAddress().split(":")[0];
        this.admin.topics().createNonPartitionedTopic(this.topicName);
        sendAndAckSomeMessages();
        Client newClient = ClientBuilder.newClient();
        try {
            Response invoke = newClient.target(this.brokerUrl + "/metrics/get").request(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).buildGet().invoke();
            Assert.assertTrue(invoke.getStatus() / 200 == 1);
            List list = (List) parseResponseEntityToList(invoke).stream().filter(str2 -> {
                return !str2.startsWith("#") && str2.contains("bufferedwriter");
            }).collect(Collectors.toList());
            Assert.assertTrue(searchMetricsValue(list, String.format("pulsar_txn_tc_bufferedwriter_batch_records_sum{cluster=\"%s\",broker=\"%s\"} ", "test", str)).doubleValue() > 0.0d);
            Assert.assertTrue(searchMetricsValue(list, String.format("pulsar_txn_tc_bufferedwriter_flush_trigger_max_delay_total{cluster=\"%s\",broker=\"%s\"} ", "test", str)).doubleValue() > 0.0d);
            Assert.assertTrue(searchMetricsValue(list, String.format("pulsar_txn_tc_bufferedwriter_batch_size_bytes_sum{cluster=\"%s\",broker=\"%s\"} ", "test", str)).doubleValue() > 0.0d);
            Assert.assertTrue(searchMetricsValue(list, String.format("pulsar_txn_pending_ack_store_bufferedwriter_batch_records_sum{cluster=\"%s\",broker=\"%s\"} ", "test", str)).doubleValue() > 0.0d);
            Assert.assertTrue(searchMetricsValue(list, String.format("pulsar_txn_pending_ack_store_bufferedwriter_flush_trigger_max_delay_total{cluster=\"%s\",broker=\"%s\"} ", "test", str)).doubleValue() > 0.0d);
            Assert.assertTrue(searchMetricsValue(list, String.format("pulsar_txn_pending_ack_store_bufferedwriter_batch_size_bytes_sum{cluster=\"%s\",broker=\"%s\"} ", "test", str)).doubleValue() > 0.0d);
            invoke.close();
            this.admin.topics().delete(this.topicName, true);
            if (Collections.singletonList(newClient).get(0) != null) {
                newClient.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newClient).get(0) != null) {
                newClient.close();
            }
            throw th;
        }
    }

    private static Double searchMetricsValue(List<String> list, String str) {
        for (int i = 0; i < list.size(); i++) {
            String str2 = list.get(i);
            if (!str2.startsWith("#") && str2.startsWith(str)) {
                return Double.valueOf(str2.split(" ")[1]);
            }
        }
        return null;
    }

    private void sendAndAckSomeMessages() throws Exception {
        Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic(this.topicName).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).batchingMaxMessages(2).create();
        Consumer subscribe = this.pulsarClient.newConsumer().subscriptionType(SubscriptionType.Shared).topic(new String[]{this.topicName}).isAckReceiptEnabled(true).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscriptionName("my-subscription").subscribe();
        create.sendAsync("normal message x".getBytes()).get();
        for (int i = 0; i < 100; i++) {
            Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.SECONDS).build().get();
            Message receive = subscribe.receive();
            create.newMessage(transaction).value(("tx msg a-" + i).getBytes(StandardCharsets.UTF_8)).sendAsync();
            create.newMessage(transaction).value(("tx msg b-" + i).getBytes(StandardCharsets.UTF_8)).sendAsync();
            subscribe.acknowledgeAsync(receive.getMessageId(), transaction);
            transaction.commit().get();
        }
    }

    private static void ensureClusterExists(PulsarService pulsarService, String str) throws Exception {
        ClusterResources clusterResources = pulsarService.getPulsarResources().getClusterResources();
        ClusterData build = ClusterData.builder().serviceUrl(pulsarService.getWebServiceAddress()).serviceUrlTls(pulsarService.getWebServiceAddress()).brokerServiceUrl(pulsarService.getBrokerServiceUrl()).brokerServiceUrlTls(pulsarService.getBrokerServiceUrl()).build();
        if (clusterResources.clusterExists(str)) {
            return;
        }
        clusterResources.createCluster(str, build);
    }

    private static void ensureTopicExists(NamespaceResources.PartitionedTopicResources partitionedTopicResources, TopicName topicName, int i) throws Exception {
        Optional optional = (Optional) partitionedTopicResources.getPartitionedTopicMetadataAsync(topicName).get();
        if (!optional.isPresent()) {
            partitionedTopicResources.createPartitionedTopic(topicName, new PartitionedTopicMetadata(i));
        } else if (((PartitionedTopicMetadata) optional.get()).partitions < i) {
            partitionedTopicResources.updatePartitionedTopicAsync(topicName, partitionedTopicMetadata -> {
                return new PartitionedTopicMetadata(i);
            }).get();
        }
    }

    private static void ensureNamespaceExists(NamespaceResources namespaceResources, NamespaceName namespaceName, String str) throws Exception {
        if (namespaceResources.namespaceExists(namespaceName)) {
            namespaceResources.setPolicies(namespaceName, policies -> {
                policies.replication_clusters.add(str);
                return policies;
            });
            return;
        }
        Policies policies2 = new Policies();
        policies2.bundles = PoliciesUtil.getBundles(16);
        policies2.replication_clusters = Collections.singleton(str);
        namespaceResources.createPolicies(namespaceName, policies2);
    }

    private static void ensureTenantExists(TenantResources tenantResources, String str, String str2) throws Exception {
        if (tenantResources.tenantExists(str)) {
            tenantResources.updateTenantAsync(str, tenantInfo -> {
                tenantInfo.getAllowedClusters().add(str2);
                return tenantInfo;
            }).get();
        } else {
            tenantResources.createTenant(str, new TenantInfoImpl(Collections.emptySet(), Collections.singleton(str2)));
        }
    }

    private List<String> parseResponseEntityToList(Response response) throws Exception {
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader((InputStream) response.getEntity()));
        ArrayList arrayList = new ArrayList();
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                return arrayList;
            }
            arrayList.add(readLine);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    public PulsarClient newPulsarClient(String str, int i) throws PulsarClientException {
        org.apache.pulsar.client.api.ClientBuilder statsInterval = PulsarClient.builder().serviceUrl(str).enableTransaction(true).statsInterval(i, TimeUnit.SECONDS);
        customizeNewPulsarClientBuilder(statsInterval);
        return createNewPulsarClient(statsInterval);
    }
}
