package org.apache.pulsar.broker.service;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import lombok.Generated;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.admin.NamespacesTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/TopicGCTest.class */
public class TopicGCTest extends ProducerConsumerBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(TopicGCTest.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/broker/service/TopicGCTest$SubscribeTopicType.class */
    public enum SubscribeTopicType {
        MULTI_PARTITIONED_TOPIC,
        REGEX_TOPIC
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass(alwaysRun = true)
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    public void doInitConf() throws Exception {
        super.doInitConf();
        this.conf.setBrokerDeleteInactiveTopicsEnabled(true);
        this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
        this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(10);
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "subscribeTopicTypes")
    public Object[][] subTopicTypes() {
        return new Object[]{new Object[]{SubscribeTopicType.MULTI_PARTITIONED_TOPIC}, new Object[]{SubscribeTopicType.REGEX_TOPIC}};
    }

    private void setSubscribeTopic(ConsumerBuilder consumerBuilder, SubscribeTopicType subscribeTopicType, String str, String str2) {
        if (subscribeTopicType.equals(SubscribeTopicType.MULTI_PARTITIONED_TOPIC)) {
            consumerBuilder.topic(new String[]{str});
        } else {
            consumerBuilder.topicsPattern(Pattern.compile(str2));
        }
    }

    @Test(dataProvider = "subscribeTopicTypes", timeOut = 300000)
    public void testRecreateConsumerAfterOnePartGc(SubscribeTopicType subscribeTopicType) throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
        String str = newUniqueName + "-partition-0";
        String str2 = newUniqueName + "-partition-1";
        this.admin.topics().createPartitionedTopic(newUniqueName, 2);
        this.admin.topics().createSubscription(newUniqueName, "s1", MessageId.earliest);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).enableBatching(false).create();
        Producer create2 = this.pulsarClient.newProducer(Schema.STRING).topic(str2).enableBatching(false).create();
        ConsumerBuilder subscriptionType = this.pulsarClient.newConsumer(Schema.STRING).subscriptionName("s1").isAckReceiptEnabled(true).subscriptionType(SubscriptionType.Shared);
        setSubscribeTopic(subscriptionType, subscribeTopicType, newUniqueName, "persistent://public/default/tp.*");
        Consumer subscribe = subscriptionType.subscribe();
        create.send("1");
        create2.send("2");
        this.admin.topics().skipAllMessages(str, "s1");
        create.close();
        subscribe.close();
        Awaitility.await().atMost(2L, TimeUnit.MINUTES).untilAsserted(() -> {
            CompletableFuture topic = this.pulsar.getBrokerService().getTopic(str, false);
            CompletableFuture topic2 = this.pulsar.getBrokerService().getTopic(str2, false);
            Assert.assertTrue(topic == null || !((Optional) topic.get()).isPresent());
            Assert.assertTrue(topic2 != null && ((Optional) topic2.get()).isPresent());
        });
        ConsumerBuilder subscriptionType2 = this.pulsarClient.newConsumer(Schema.STRING).subscriptionName("s1").isAckReceiptEnabled(true).subscriptionType(SubscriptionType.Shared);
        setSubscribeTopic(subscriptionType2, subscribeTopicType, newUniqueName, "persistent://public/default/tp.*");
        Consumer subscribe2 = subscriptionType2.subscribe();
        Message receive = subscribe2.receive(2, TimeUnit.SECONDS);
        log.info("received msg: {}", (String) receive.getValue());
        subscribe2.acknowledge(receive);
        subscribe2.close();
        create.close();
        create2.close();
        this.admin.topics().deletePartitionedTopic(newUniqueName);
    }

    @Test(dataProvider = "subscribeTopicTypes", timeOut = 300000)
    public void testAppendCreateConsumerAfterOnePartGc(SubscribeTopicType subscribeTopicType) throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
        String str = newUniqueName + "-partition-0";
        String str2 = newUniqueName + "-partition-1";
        this.admin.topics().createPartitionedTopic(newUniqueName, 2);
        this.admin.topics().createSubscription(newUniqueName, "s1", MessageId.earliest);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).enableBatching(false).create();
        Producer create2 = this.pulsarClient.newProducer(Schema.STRING).topic(str2).enableBatching(false).create();
        ConsumerBuilder subscriptionType = this.pulsarClient.newConsumer(Schema.STRING).subscriptionName("s1").isAckReceiptEnabled(true).subscriptionType(SubscriptionType.Shared);
        setSubscribeTopic(subscriptionType, subscribeTopicType, newUniqueName, "persistent://public/default/tp.*");
        Consumer subscribe = subscriptionType.subscribe();
        create.send("partition-0-1");
        create2.send("partition-1-1");
        create2.send("partition-1-2");
        create2.send("partition-1-4");
        this.admin.topics().skipAllMessages(str, "s1");
        create.close();
        Awaitility.await().atMost(2L, TimeUnit.MINUTES).untilAsserted(() -> {
            CompletableFuture topic = this.pulsar.getBrokerService().getTopic(str, false);
            CompletableFuture topic2 = this.pulsar.getBrokerService().getTopic(str2, false);
            Assert.assertTrue(topic == null || !((Optional) topic.get()).isPresent());
            Assert.assertTrue(topic2 != null && ((Optional) topic2.get()).isPresent());
        });
        for (int i = 0; i < 2; i++) {
            Message receive = subscribe.receive(2, TimeUnit.SECONDS);
            Assert.assertNotNull(receive, "Expected at least received 2 messages.");
            log.info("received msg[{}]: {}", Integer.valueOf(i), receive.getValue());
            if (receive.getMessageId().getOwnerTopic().equals(str2)) {
                subscribe.acknowledgeAsync(receive);
            }
        }
        subscribe.close();
        ConsumerBuilder subscriptionType2 = this.pulsarClient.newConsumer(Schema.STRING).subscriptionName("s1").isAckReceiptEnabled(true).subscriptionType(SubscriptionType.Shared);
        setSubscribeTopic(subscriptionType2, subscribeTopicType, newUniqueName, "persistent://public/default/tp.*");
        Consumer subscribe2 = subscriptionType2.subscribe();
        create2.send("partition-1-5");
        Message receive2 = subscribe2.receive(2, TimeUnit.SECONDS);
        Assert.assertNotNull(receive2);
        log.info("received msg: {}", (String) receive2.getValue());
        subscribe2.acknowledge(receive2);
        subscribe2.close();
        create.close();
        create2.close();
        this.admin.topics().deletePartitionedTopic(newUniqueName);
    }

    @Test(timeOut = NamespacesTest.THREE_MINUTE_MILLIS)
    public void testPhasePartDeletion() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
        String str = newUniqueName + "-partition-0";
        String str2 = newUniqueName + "-partition-1";
        String str3 = newUniqueName + "-partition-2";
        this.admin.topics().createPartitionedTopic(newUniqueName, 3);
        PatternMultiTopicsConsumerImpl subscribe = this.pulsarClient.newConsumer(Schema.STRING).subscriptionName("s1").isAckReceiptEnabled(true).subscriptionType(SubscriptionType.Shared).topicsPattern(Pattern.compile("persistent://public/default/tp.*")).subscribe();
        Awaitility.await().untilAsserted(() -> {
            ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) WhiteboxImpl.getInternalState(subscribe, "consumers");
            ConcurrentHashMap concurrentHashMap2 = (ConcurrentHashMap) WhiteboxImpl.getInternalState(subscribe, "partitionedTopics");
            Assert.assertEquals(concurrentHashMap2.size(), 1);
            Assert.assertEquals((Integer) concurrentHashMap2.get(newUniqueName), 3);
            Assert.assertEquals(concurrentHashMap.size(), 3);
            Assert.assertTrue(concurrentHashMap.containsKey(str));
            Assert.assertTrue(concurrentHashMap.containsKey(str2));
            Assert.assertTrue(concurrentHashMap.containsKey(str3));
        });
        this.admin.topics().delete(str, true);
        Awaitility.await().untilAsserted(() -> {
            ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) WhiteboxImpl.getInternalState(subscribe, "consumers");
            ConcurrentHashMap concurrentHashMap2 = (ConcurrentHashMap) WhiteboxImpl.getInternalState(subscribe, "partitionedTopics");
            Assert.assertEquals(concurrentHashMap2.size(), 1);
            Assert.assertEquals((Integer) concurrentHashMap2.get(newUniqueName), 3);
            Assert.assertEquals(concurrentHashMap.size(), 2);
            Assert.assertTrue(concurrentHashMap.containsKey(str2));
            Assert.assertTrue(concurrentHashMap.containsKey(str3));
        });
        this.admin.topics().delete(str2, true);
        Awaitility.await().untilAsserted(() -> {
            ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) WhiteboxImpl.getInternalState(subscribe, "consumers");
            ConcurrentHashMap concurrentHashMap2 = (ConcurrentHashMap) WhiteboxImpl.getInternalState(subscribe, "partitionedTopics");
            Assert.assertEquals(concurrentHashMap2.size(), 1);
            Assert.assertEquals((Integer) concurrentHashMap2.get(newUniqueName), 3);
            Assert.assertEquals(concurrentHashMap.size(), 1);
            Assert.assertTrue(concurrentHashMap.containsKey(str3));
        });
        this.admin.topics().delete(str3, true);
        Awaitility.await().untilAsserted(() -> {
            ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) WhiteboxImpl.getInternalState(subscribe, "consumers");
            Assert.assertEquals(((ConcurrentHashMap) WhiteboxImpl.getInternalState(subscribe, "partitionedTopics")).size(), 0);
            Assert.assertEquals(concurrentHashMap.size(), 0);
        });
        subscribe.close();
        this.admin.topics().deletePartitionedTopic(newUniqueName);
    }

    @Test(timeOut = NamespacesTest.THREE_MINUTE_MILLIS)
    public void testExpandPartitions() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
        String str = newUniqueName + "-partition-0";
        String str2 = newUniqueName + "-partition-1";
        this.admin.topics().createPartitionedTopic(newUniqueName, 2);
        this.admin.topics().delete(str, true);
        this.admin.topics().delete(str2, true);
        PatternMultiTopicsConsumerImpl subscribe = this.pulsarClient.newConsumer(Schema.STRING).subscriptionName("s1").isAckReceiptEnabled(true).subscriptionType(SubscriptionType.Shared).topicsPattern(Pattern.compile("persistent://public/default/tp.*")).subscribe();
        Awaitility.await().untilAsserted(() -> {
            ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) WhiteboxImpl.getInternalState(subscribe, "consumers");
            Assert.assertEquals(((ConcurrentHashMap) WhiteboxImpl.getInternalState(subscribe, "partitionedTopics")).size(), 0);
            Assert.assertEquals(concurrentHashMap.size(), 0);
        });
        this.pulsarClient.newConsumer(Schema.STRING).subscriptionName("s1").subscriptionType(SubscriptionType.Shared).topic(new String[]{newUniqueName}).subscribe().close();
        Awaitility.await().untilAsserted(() -> {
            ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) WhiteboxImpl.getInternalState(subscribe, "consumers");
            ConcurrentHashMap concurrentHashMap2 = (ConcurrentHashMap) WhiteboxImpl.getInternalState(subscribe, "partitionedTopics");
            Assert.assertEquals(concurrentHashMap2.size(), 1);
            Assert.assertEquals((Integer) concurrentHashMap2.get(newUniqueName), 2);
            Assert.assertEquals(concurrentHashMap.size(), 2);
            Assert.assertTrue(concurrentHashMap.containsKey(str));
            Assert.assertTrue(concurrentHashMap.containsKey(str2));
        });
        this.admin.topics().updatePartitionedTopic(newUniqueName, 3);
        String str3 = newUniqueName + "-partition-2";
        Awaitility.await().untilAsserted(() -> {
            ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) WhiteboxImpl.getInternalState(subscribe, "consumers");
            ConcurrentHashMap concurrentHashMap2 = (ConcurrentHashMap) WhiteboxImpl.getInternalState(subscribe, "partitionedTopics");
            Assert.assertEquals(concurrentHashMap2.size(), 1);
            Assert.assertEquals((Integer) concurrentHashMap2.get(newUniqueName), 3);
            Assert.assertEquals(concurrentHashMap.size(), 3);
            Assert.assertTrue(concurrentHashMap.containsKey(str));
            Assert.assertTrue(concurrentHashMap.containsKey(str2));
            Assert.assertTrue(concurrentHashMap.containsKey(str3));
        });
        this.admin.topics().updatePartitionedTopic(newUniqueName, 4);
        String str4 = newUniqueName + "-partition-3";
        Awaitility.await().untilAsserted(() -> {
            ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) WhiteboxImpl.getInternalState(subscribe, "consumers");
            ConcurrentHashMap concurrentHashMap2 = (ConcurrentHashMap) WhiteboxImpl.getInternalState(subscribe, "partitionedTopics");
            Assert.assertEquals(concurrentHashMap2.size(), 1);
            Assert.assertEquals((Integer) concurrentHashMap2.get(newUniqueName), 4);
            Assert.assertEquals(concurrentHashMap.size(), 4);
            Assert.assertTrue(concurrentHashMap.containsKey(str));
            Assert.assertTrue(concurrentHashMap.containsKey(str2));
            Assert.assertTrue(concurrentHashMap.containsKey(str3));
            Assert.assertTrue(concurrentHashMap.containsKey(str4));
        });
        subscribe.close();
        this.admin.topics().deletePartitionedTopic(newUniqueName);
    }
}
