package org.apache.pulsar.broker.service.persistent;

import com.google.common.collect.Multimap;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.PrometheusMetricsTestUtil;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.class */
public class BucketDelayedDeliveryTest extends DelayedDeliveryTest {
    @Override // org.apache.pulsar.broker.service.persistent.DelayedDeliveryTest, org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    public void setup() throws Exception {
        this.conf.setDelayedDeliveryTrackerFactoryClassName(BucketDelayedDeliveryTrackerFactory.class.getName());
        this.conf.setDelayedDeliveryMaxNumBuckets(10);
        this.conf.setDelayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds(1);
        this.conf.setDelayedDeliveryMaxIndexesPerBucketSnapshotSegment(10);
        this.conf.setDelayedDeliveryMinIndexCountPerBucket(50L);
        this.conf.setManagedLedgerMaxEntriesPerLedger(50);
        this.conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
        super.setup();
    }

    @Override // org.apache.pulsar.broker.service.persistent.DelayedDeliveryTest, org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testBucketDelayedDeliveryWithAllConsumersDisconnecting() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/testDelaysWithAllConsumerDis");
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{newUniqueName}).subscriptionName("sub").subscriptionType(SubscriptionType.Shared).subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(newUniqueName).create();
        for (int i = 0; i < 1000; i++) {
            try {
                create.newMessage().value("msg").deliverAfter(1L, TimeUnit.HOURS).send();
            } finally {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            }
        }
        AbstractPersistentDispatcherMultipleConsumers dispatcher = ((Topic) this.pulsar.getBrokerService().getTopicReference(newUniqueName).get()).getSubscription("sub").getDispatcher();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 1000L);
        });
        List list = dispatcher.getCursor().getCursorProperties().keySet().stream().filter(str -> {
            return str.startsWith("#pulsar.internal.");
        }).toList();
        subscribe.close();
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{newUniqueName}).subscriptionName("sub").subscriptionType(SubscriptionType.Shared).subscribe();
        try {
            AbstractPersistentDispatcherMultipleConsumers dispatcher2 = ((Topic) this.pulsar.getBrokerService().getTopicReference(newUniqueName).get()).getSubscription("sub").getDispatcher();
            List list2 = dispatcher2.getCursor().getCursorProperties().keySet().stream().filter(str2 -> {
                return str2.startsWith("#pulsar.internal.");
            }).toList();
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(dispatcher2.getNumberOfDelayedMessages(), 1000L);
            });
            Assert.assertEquals(list, list2);
            if (Collections.singletonList(subscribe2).get(0) != null) {
                subscribe2.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(subscribe2).get(0) != null) {
                subscribe2.close();
            }
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testUnsubscribe() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/testUnsubscribes");
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{newUniqueName}).subscriptionName("sub").subscriptionType(SubscriptionType.Shared).subscribe();
        try {
            Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(newUniqueName).create();
            for (int i = 0; i < 1000; i++) {
                try {
                    create.newMessage().value("msg").deliverAfter(1L, TimeUnit.HOURS).send();
                } catch (Throwable th) {
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    throw th;
                }
            }
            AbstractPersistentDispatcherMultipleConsumers dispatcher = ((Topic) this.pulsar.getBrokerService().getTopicReference(newUniqueName).get()).getSubscription("sub").getDispatcher();
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 1000L);
            });
            List list = dispatcher.getCursor().getCursorProperties().entrySet().stream().filter(entry -> {
                return ((String) entry.getKey()).startsWith("#pulsar.internal.delayed.bucket");
            }).map(entry2 -> {
                return Long.valueOf((String) entry2.getValue());
            }).toList();
            Assert.assertTrue(list.size() > 0);
            subscribe.close();
            restartBroker();
            this.admin.topics().deleteSubscription(newUniqueName, "sub");
            Iterator it = list.iterator();
            while (it.hasNext()) {
                try {
                    this.pulsarTestContext.getBookKeeperClient().openLedger(((Long) it.next()).longValue(), BookKeeper.DigestType.CRC32C, new byte[0]);
                    Assert.fail("Should fail");
                } catch (BKException.BKNoSuchLedgerExistsException e) {
                }
            }
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test
    public void testBucketDelayedIndexMetrics() throws Exception {
        cleanup();
        setup();
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/testBucketDelayedIndexMetrics");
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{newUniqueName}).subscriptionName("test_sub").subscriptionType(SubscriptionType.Shared).subscribe();
        try {
            subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{newUniqueName}).subscriptionName("test_sub2").subscriptionType(SubscriptionType.Shared).subscribe();
            try {
                Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(newUniqueName).create();
                for (int i = 0; i < 101; i++) {
                    try {
                        create.newMessage().value("msg-" + i).deliverAfter(3600 + i, TimeUnit.SECONDS).sendAsync();
                    } catch (Throwable th) {
                        if (Collections.singletonList(create).get(0) != null) {
                            create.close();
                        }
                        throw th;
                    }
                }
                create.flush();
                Thread.sleep(2000L);
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                PrometheusMetricsTestUtil.generate(this.pulsar, true, true, true, byteArrayOutputStream);
                Multimap parseMetrics = PrometheusMetricsClient.parseMetrics(byteArrayOutputStream.toString(StandardCharsets.UTF_8));
                List list = parseMetrics.get("pulsar_delayed_message_index_bucket_total").stream().filter(metric -> {
                    return ((String) metric.tags.get("topic")).equals(newUniqueName);
                }).toList();
                MutableInt mutableInt = new MutableInt();
                list.stream().filter(metric2 -> {
                    return metric2.tags.containsKey("subscription");
                }).forEach(metric3 -> {
                    Assert.assertEquals(3.0d, metric3.value);
                    mutableInt.add(Double.valueOf(metric3.value));
                });
                Assert.assertEquals(6, mutableInt.intValue());
                Optional findFirst = list.stream().filter(metric4 -> {
                    return !metric4.tags.containsKey("subscription");
                }).findFirst();
                Assert.assertTrue(findFirst.isPresent());
                Assert.assertEquals(mutableInt.intValue(), ((PrometheusMetricsClient.Metric) findFirst.get()).value);
                List list2 = parseMetrics.get("pulsar_delayed_message_index_loaded").stream().filter(metric5 -> {
                    return ((String) metric5.tags.get("topic")).equals(newUniqueName);
                }).toList();
                MutableInt mutableInt2 = new MutableInt();
                Assert.assertEquals(2L, list2.stream().filter(metric6 -> {
                    return metric6.tags.containsKey("subscription");
                }).peek(metric7 -> {
                    Assert.assertTrue(metric7.value > 0.0d && metric7.value <= 101.0d);
                    mutableInt2.add(Double.valueOf(metric7.value));
                }).count());
                Optional findFirst2 = list.stream().filter(metric8 -> {
                    return !metric8.tags.containsKey("subscription");
                }).findFirst();
                Assert.assertTrue(findFirst2.isPresent());
                Assert.assertEquals(mutableInt2.intValue(), ((PrometheusMetricsClient.Metric) findFirst2.get()).value);
                List list3 = parseMetrics.get("pulsar_delayed_message_index_bucket_snapshot_size_bytes").stream().filter(metric9 -> {
                    return ((String) metric9.tags.get("topic")).equals(newUniqueName);
                }).toList();
                MutableInt mutableInt3 = new MutableInt();
                Assert.assertEquals(2L, list3.stream().filter(metric10 -> {
                    return metric10.tags.containsKey("subscription");
                }).peek(metric11 -> {
                    Assert.assertTrue(metric11.value > 0.0d);
                    mutableInt3.add(Double.valueOf(metric11.value));
                }).count());
                Optional findFirst3 = list3.stream().filter(metric12 -> {
                    return !metric12.tags.containsKey("subscription");
                }).findFirst();
                Assert.assertTrue(findFirst3.isPresent());
                Assert.assertEquals(mutableInt3.intValue(), ((PrometheusMetricsClient.Metric) findFirst3.get()).value);
                List list4 = parseMetrics.get("pulsar_delayed_message_index_bucket_op_count").stream().filter(metric13 -> {
                    return ((String) metric13.tags.get("topic")).equals(newUniqueName);
                }).toList();
                MutableInt mutableInt4 = new MutableInt();
                Assert.assertEquals(2L, list4.stream().filter(metric14 -> {
                    return ((String) metric14.tags.get("state")).equals("succeed") && ((String) metric14.tags.get("type")).equals("create") && metric14.tags.containsKey("subscription");
                }).peek(metric15 -> {
                    Assert.assertTrue(metric15.value >= 2.0d);
                    mutableInt4.add(Double.valueOf(metric15.value));
                }).count());
                Optional findFirst4 = list4.stream().filter(metric16 -> {
                    return ((String) metric16.tags.get("state")).equals("succeed") && ((String) metric16.tags.get("type")).equals("create") && !metric16.tags.containsKey("subscription");
                }).findFirst();
                Assert.assertTrue(findFirst4.isPresent());
                Assert.assertEquals(mutableInt4.intValue(), ((PrometheusMetricsClient.Metric) findFirst4.get()).value);
                List list5 = parseMetrics.get("pulsar_delayed_message_index_bucket_op_latency_ms").stream().filter(metric17 -> {
                    return ((String) metric17.tags.get("topic")).equals(newUniqueName);
                }).toList();
                MutableInt mutableInt5 = new MutableInt();
                Assert.assertTrue(list5.stream().filter(metric18 -> {
                    return ((String) metric18.tags.get("type")).equals("create") && metric18.tags.containsKey("subscription");
                }).peek(metric19 -> {
                    Assert.assertTrue(metric19.tags.containsKey("quantile"));
                    mutableInt5.add(Double.valueOf(metric19.value));
                }).count() >= 2);
                Optional findFirst5 = list4.stream().filter(metric20 -> {
                    return ((String) metric20.tags.get("type")).equals("create") && !metric20.tags.containsKey("subscription");
                }).findFirst();
                Assert.assertTrue(findFirst5.isPresent());
                Assert.assertEquals(mutableInt5.intValue(), ((PrometheusMetricsClient.Metric) findFirst5.get()).value);
                ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
                PrometheusMetricsTestUtil.generate(this.pulsar, false, true, true, byteArrayOutputStream2);
                Optional findFirst6 = PrometheusMetricsClient.parseMetrics(byteArrayOutputStream2.toString(StandardCharsets.UTF_8)).get("pulsar_delayed_message_index_bucket_total").stream().findFirst();
                Assert.assertTrue(findFirst6.isPresent());
                Assert.assertEquals(6.0d, ((PrometheusMetricsClient.Metric) findFirst6.get()).value);
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } finally {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
            throw th2;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testDelete() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/testDelete");
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{newUniqueName}).subscriptionName("sub").subscriptionType(SubscriptionType.Shared).subscribe();
        try {
            Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(newUniqueName).create();
            for (int i = 0; i < 1000; i++) {
                try {
                    create.newMessage().value("msg").deliverAfter(1L, TimeUnit.HOURS).send();
                } catch (Throwable th) {
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    throw th;
                }
            }
            AbstractPersistentDispatcherMultipleConsumers dispatcher = ((Topic) this.pulsar.getBrokerService().getTopicReference(newUniqueName).get()).getSubscription("sub").getDispatcher();
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 1000L);
            });
            List list = dispatcher.getCursor().getCursorProperties().entrySet().stream().filter(entry -> {
                return ((String) entry.getKey()).startsWith("#pulsar.internal.delayed.bucket");
            }).map(entry2 -> {
                return Long.valueOf((String) entry2.getValue());
            }).toList();
            Assert.assertTrue(list.size() > 0);
            this.admin.topics().delete(newUniqueName, true);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                try {
                    this.pulsarTestContext.getBookKeeperClient().openLedger(((Long) it.next()).longValue(), BookKeeper.DigestType.CRC32C, new byte[0]);
                    Assert.fail("Should fail");
                } catch (BKException.BKNoSuchLedgerExistsException e) {
                }
            }
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "subscriptionTypes")
    public Object[][] subscriptionTypes() {
        return new Object[]{new Object[]{SubscriptionType.Shared}, new Object[]{SubscriptionType.Key_Shared}, new Object[]{SubscriptionType.Failover}, new Object[]{SubscriptionType.Exclusive}};
    }

    @Test(dataProvider = "subscriptionTypes")
    public void testDeleteTopicIfCursorPropsEmpty(SubscriptionType subscriptionType) throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        this.admin.topics().createSubscription(newUniqueName, "s1", MessageId.earliest);
        this.pulsarClient.newConsumer().topic(new String[]{newUniqueName}).subscriptionName("s1").subscriptionType(subscriptionType).subscribe().close();
        ManagedCursor findCursor = findCursor(newUniqueName, "s1");
        Assert.assertNotNull(findCursor);
        Assert.assertTrue(findCursor.getCursorProperties() == null || findCursor.getCursorProperties().isEmpty());
        this.admin.topics().delete(newUniqueName);
    }

    @Test(dataProvider = "subscriptionTypes")
    public void testDeletePartitionedTopicIfCursorPropsEmpty(SubscriptionType subscriptionType) throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
        this.admin.topics().createPartitionedTopic(newUniqueName, 2);
        this.admin.topics().createSubscription(newUniqueName, "s1", MessageId.earliest);
        this.pulsarClient.newConsumer().topic(new String[]{newUniqueName}).subscriptionName("s1").subscriptionType(subscriptionType).subscribe().close();
        ManagedCursor findCursor = findCursor(newUniqueName + "-partition-0", "s1");
        Assert.assertNotNull(findCursor);
        Assert.assertTrue(findCursor.getCursorProperties() == null || findCursor.getCursorProperties().isEmpty());
        this.admin.topics().deletePartitionedTopic(newUniqueName);
    }

    @Test(dataProvider = "subscriptionTypes")
    public void testDeleteTopicIfCursorPropsNotEmpty(SubscriptionType subscriptionType) throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        this.admin.topics().createSubscription(newUniqueName, "s1", MessageId.earliest);
        this.pulsarClient.newConsumer().topic(new String[]{newUniqueName}).subscriptionName("s1").subscriptionType(subscriptionType).subscribe().close();
        ManagedCursor findCursor = findCursor(newUniqueName, "s1");
        Assert.assertNotNull(findCursor);
        Assert.assertTrue(findCursor.getCursorProperties() == null || findCursor.getCursorProperties().isEmpty());
        HashMap hashMap = new HashMap();
        hashMap.put("ignore", "ignore");
        this.admin.topics().updateSubscriptionProperties(newUniqueName, "s1", hashMap);
        Assert.assertTrue((findCursor.getCursorProperties() == null || findCursor.getCursorProperties().isEmpty()) ? false : true);
        this.admin.topics().delete(newUniqueName);
    }

    @Test(dataProvider = "subscriptionTypes")
    public void testDeletePartitionedTopicIfCursorPropsNotEmpty(SubscriptionType subscriptionType) throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
        this.admin.topics().createPartitionedTopic(newUniqueName, 2);
        this.pulsarClient.newProducer().topic(newUniqueName).create().close();
        this.admin.topics().createSubscription(newUniqueName, "s1", MessageId.earliest);
        this.pulsarClient.newConsumer().topic(new String[]{newUniqueName}).subscriptionName("s1").subscriptionType(subscriptionType).subscribe().close();
        ManagedCursor findCursor = findCursor(newUniqueName + "-partition-0", "s1");
        Assert.assertNotNull(findCursor);
        Assert.assertTrue(findCursor.getCursorProperties() == null || findCursor.getCursorProperties().isEmpty());
        HashMap hashMap = new HashMap();
        hashMap.put("ignore", "ignore");
        this.admin.topics().updateSubscriptionProperties(newUniqueName, "s1", hashMap);
        Assert.assertTrue((findCursor.getCursorProperties() == null || findCursor.getCursorProperties().isEmpty()) ? false : true);
        this.admin.topics().deletePartitionedTopic(newUniqueName);
    }

    private ManagedCursor findCursor(String str, String str2) {
        for (ManagedCursor managedCursor : ((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).join()).get()).getManagedLedger().getCursors()) {
            if (managedCursor != null && managedCursor.getName().equals(str2)) {
                return managedCursor;
            }
        }
        return null;
    }
}
