package org.apache.pulsar.broker.stats;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PulsarTestClient;
import org.apache.pulsar.common.stats.Metrics;
import org.awaitility.Awaitility;
import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.class */
public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws PulsarClientException {
        return PulsarTestClient.create(clientBuilder);
    }

    @Test
    public void testManagedCursorMetrics() throws Exception {
        ManagedCursorMetrics managedCursorMetrics = new ManagedCursorMetrics(this.pulsar);
        Assert.assertTrue(managedCursorMetrics.generate().isEmpty());
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-namespace/use/my-ns/my-topic1"}).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscriptionName("my-sub").isAckReceiptEnabled(true).subscribe();
        try {
            Producer create = this.pulsarClient.newProducer().topic("persistent://my-namespace/use/my-ns/my-topic1").enableBatching(false).create();
            try {
                ManagedCursorImpl cursor = ((Topic) ((Optional) this.pulsar.getBrokerService().getTopic("persistent://my-namespace/use/my-ns/my-topic1", false).get()).get()).getSubscription("my-sub").getCursor();
                ManagedCursorMXBean stats = cursor.getStats();
                List generate = managedCursorMetrics.generate();
                Assert.assertFalse(generate.isEmpty());
                Assert.assertEquals(((Metrics) generate.get(0)).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 1L);
                Assert.assertEquals(((Metrics) generate.get(0)).getMetrics().get("brk_ml_cursor_persistLedgerErrors"), 0L);
                Assert.assertEquals(((Metrics) generate.get(0)).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L);
                Assert.assertEquals(((Metrics) generate.get(0)).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L);
                Assert.assertEquals(((Metrics) generate.get(0)).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L);
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < 30; i++) {
                    create.send(("my-message-" + i).getBytes());
                    if (i < 10 || i > 20) {
                        subscribe.acknowledge(subscribe.receive().getMessageId());
                    } else {
                        arrayList.add(subscribe.receive().getMessageId());
                    }
                }
                Awaitility.await().atMost(2L, TimeUnit.SECONDS).until(() -> {
                    return Boolean.valueOf(stats.getPersistLedgerSucceed() > 0);
                });
                List generate2 = managedCursorMetrics.generate();
                Assert.assertFalse(generate2.isEmpty());
                Assert.assertNotEquals(((Metrics) generate2.get(0)).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 1L);
                Assert.assertEquals(((Metrics) generate2.get(0)).getMetrics().get("brk_ml_cursor_persistLedgerErrors"), 0L);
                Assert.assertEquals(((Metrics) generate2.get(0)).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L);
                Assert.assertEquals(((Metrics) generate2.get(0)).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L);
                Assert.assertNotEquals(((Metrics) generate2.get(0)).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L);
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    subscribe.acknowledge((MessageId) it.next());
                }
                Awaitility.await().atMost(2L, TimeUnit.SECONDS).until(() -> {
                    return Boolean.valueOf(cursor.getTotalNonContiguousDeletedMessagesRange() == 0);
                });
                List generate3 = managedCursorMetrics.generate();
                Assert.assertFalse(generate3.isEmpty());
                Assert.assertNotEquals(((Metrics) generate3.get(0)).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 1L);
                Assert.assertEquals(((Metrics) generate3.get(0)).getMetrics().get("brk_ml_cursor_persistLedgerErrors"), 0L);
                Assert.assertEquals(((Metrics) generate3.get(0)).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L);
                Assert.assertEquals(((Metrics) generate3.get(0)).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L);
                Assert.assertEquals(((Metrics) generate3.get(0)).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L);
                Awaitility.await().atMost(2L, TimeUnit.SECONDS).until(() -> {
                    create.send(UUID.randomUUID().toString().getBytes());
                    subscribe.acknowledge(subscribe.receive().getMessageId());
                    ((LedgerHandle) Whitebox.getInternalState(cursor, "cursorLedger")).close();
                    return Boolean.valueOf(stats.getPersistLedgerErrors() > 0 && stats.getPersistZookeeperSucceed() > 0);
                });
                List generate4 = managedCursorMetrics.generate();
                Assert.assertFalse(generate4.isEmpty());
                Assert.assertNotEquals(((Metrics) generate4.get(0)).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 1L);
                Assert.assertNotEquals(((Metrics) generate4.get(0)).getMetrics().get("brk_ml_cursor_persistLedgerErrors"), 0L);
                Assert.assertNotEquals(((Metrics) generate4.get(0)).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L);
                Assert.assertEquals(((Metrics) generate4.get(0)).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L);
                Assert.assertEquals(((Metrics) generate4.get(0)).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L);
                this.mockZooKeeper.unsetAlwaysFail();
                create.close();
                subscribe.close();
                cursor.close();
                this.admin.topics().delete("persistent://my-namespace/use/my-ns/my-topic1", true);
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    private ManagedCursorMXBean getManagedCursorMXBean(String str, String str2) throws ExecutionException, InterruptedException {
        return ((Topic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).get()).get()).getSubscription(str2).getCursor().getStats();
    }

    @Test
    public void testCursorReadWriteMetrics() throws Exception {
        ManagedCursorMetrics managedCursorMetrics = new ManagedCursorMetrics(this.pulsar);
        Assert.assertTrue(managedCursorMetrics.generate().isEmpty());
        Assert.assertTrue(managedCursorMetrics.generate().isEmpty());
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-namespace/use/my-ns/read-write"}).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscriptionName("read-write-sub-1").subscribe();
        try {
            subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-namespace/use/my-ns/read-write"}).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscriptionName("read-write-sub-2").subscribe();
            try {
                Producer create = this.pulsarClient.newProducer().topic("persistent://my-namespace/use/my-ns/read-write").create();
                for (int i = 0; i < 10; i++) {
                    try {
                        create.send(("my-message-" + i).getBytes());
                        if (i % 2 == 0) {
                            subscribe.acknowledge(subscribe.receive().getMessageId());
                        } else {
                            subscribe.acknowledge(subscribe.receive().getMessageId());
                        }
                    } catch (Throwable th) {
                        if (Collections.singletonList(create).get(0) != null) {
                            create.close();
                        }
                        throw th;
                    }
                }
                ManagedCursorMXBean managedCursorMXBean = getManagedCursorMXBean("persistent://my-namespace/use/my-ns/read-write", "read-write-sub-1");
                ManagedCursorMXBean managedCursorMXBean2 = getManagedCursorMXBean("persistent://my-namespace/use/my-ns/read-write", "read-write-sub-2");
                Awaitility.await().until(() -> {
                    return Boolean.valueOf(managedCursorMXBean.getWriteCursorLedgerLogicalSize() > 0);
                });
                Awaitility.await().until(() -> {
                    return Boolean.valueOf(managedCursorMXBean2.getWriteCursorLedgerLogicalSize() > 0);
                });
                List generate = managedCursorMetrics.generate();
                Assert.assertEquals(generate.size(), 2);
                Assert.assertNotEquals(((Metrics) generate.get(0)).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 0L);
                Assert.assertNotEquals(((Metrics) generate.get(0)).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 0L);
                Assert.assertEquals(((Metrics) generate.get(0)).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);
                Assert.assertNotEquals(((Metrics) generate.get(1)).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 0L);
                Assert.assertNotEquals(((Metrics) generate.get(1)).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 0L);
                Assert.assertEquals(((Metrics) generate.get(1)).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);
                subscribe.close();
                subscribe.close();
                create.close();
                this.admin.topics().delete("persistent://my-namespace/use/my-ns/read-write", true);
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } finally {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
            throw th2;
        }
    }
}
