/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.google.common.collect.Lists;
import java.lang.invoke.LambdaMetafactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker-impl"})
public class PatternTopicsConsumerImplTest
extends ProducerConsumerBase {
    private static final long testTimeout = 90000L;
    private static final Logger log = LoggerFactory.getLogger(PatternTopicsConsumerImplTest.class);
    private final long ackTimeOutMillis = TimeUnit.SECONDS.toMillis(2L);

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        this.isTcpLookup = true;
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(timeOut=90000L)
    public void testPatternTopicsSubscribeWithBuilderFail() throws Exception {
        String key = "PatternTopicsSubscribeWithBuilderFail";
        String subscriptionName = "my-ex-subscription-" + key;
        String topicName1 = "persistent://my-property/my-ns/topic-1-" + key;
        String topicName2 = "persistent://my-property/my-ns/topic-2-" + key;
        String topicName3 = "persistent://my-property/my-ns/topic-3-" + key;
        String topicName4 = "non-persistent://my-property/my-ns/topic-4-" + key;
        ArrayList topicNames = Lists.newArrayList((Object[])new String[]{topicName1, topicName2, topicName3, topicName4});
        String patternString = "persistent://my-property/my-ns/pattern-topic.*";
        Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
        TenantInfo tenantInfo = this.createDefaultTenantInfo();
        this.admin.tenants().createTenant("prop", tenantInfo);
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        try {
            this.pulsarClient.newConsumer().topicsPattern(pattern).topic(new String[]{topicName1}).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
            Assert.fail((String)"subscribe1 with pattern and topic should fail.");
        }
        catch (PulsarClientException pulsarClientException) {
            // empty catch block
        }
        try {
            this.pulsarClient.newConsumer().topicsPattern(pattern).topics((List)topicNames).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
            Assert.fail((String)"subscribe2 with pattern and topics should fail.");
        }
        catch (PulsarClientException pulsarClientException) {
            // empty catch block
        }
        try {
            this.pulsarClient.newConsumer().topicsPattern(pattern).topicsPattern("persistent://my-property/my-ns/pattern-topic.*").subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
            Assert.fail((String)"subscribe3 with pattern and patternString should fail.");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test(timeOut=90000L)
    public void testBinaryProtoToGetTopicsOfNamespacePersistent() throws Exception {
        String key = "BinaryProtoToGetTopics";
        String subscriptionName = "my-ex-subscription-" + key;
        String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key;
        String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key;
        String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key;
        String topicName4 = "non-persistent://my-property/my-ns/pattern-topic-4-" + key;
        Pattern pattern = Pattern.compile("my-property/my-ns/pattern-topic.*");
        TenantInfo tenantInfo = this.createDefaultTenantInfo();
        this.admin.tenants().createTenant("prop", tenantInfo);
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 30;
        Producer producer1 = this.pulsarClient.newProducer().topic(topicName1).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer producer2 = this.pulsarClient.newProducer().topic(topicName2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer producer3 = this.pulsarClient.newProducer().topic(topicName3).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer producer4 = this.pulsarClient.newProducer().topic(topicName4).enableBatching(false).create();
        Consumer consumer = this.pulsarClient.newConsumer().topicsPattern(pattern).patternAutoDiscoveryPeriod(2).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue((boolean)consumer.getTopic().startsWith("MultiTopicsConsumer-"));
        Assert.assertSame((Object)pattern, (Object)((PatternMultiTopicsConsumerImpl)consumer).getPattern());
        List topics = ((PatternMultiTopicsConsumerImpl)consumer).getPartitionedTopics();
        List consumers = ((PatternMultiTopicsConsumerImpl)consumer).getConsumers();
        Assert.assertEquals((int)topics.size(), (int)6);
        Assert.assertEquals((int)consumers.size(), (int)6);
        Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getTopics().size(), (int)3);
        topics.forEach(topic -> log.debug("topic: {}", topic));
        consumers.forEach(c -> log.debug("consumer: {}", (Object)c.getTopic()));
        IntStream.range(0, topics.size()).forEach(index -> Assert.assertEquals((String)((ConsumerImpl)consumers.get(index)).getTopic(), (String)((String)topics.get(index))));
        ((PatternMultiTopicsConsumerImpl)consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
        for (int i = 0; i < totalMessages / 3; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-" + i).getBytes());
            producer4.send((Object)(messagePredicate + "producer4-" + i).getBytes());
        }
        int messageSet = 0;
        Message message = consumer.receive();
        do {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            ++messageSet;
            consumer.acknowledge(message);
            log.debug("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)totalMessages);
        consumer.unsubscribe();
        consumer.close();
        producer1.close();
        producer2.close();
        producer3.close();
        producer4.close();
    }

    @Test(timeOut=90000L)
    public void testPubRateOnNonPersistent() throws Exception {
        this.internalCleanup();
        this.conf.setMaxPublishRatePerTopicInBytes(10000L);
        this.conf.setMaxPublishRatePerTopicInMessages(100);
        Thread.sleep(500L);
        this.isTcpLookup = true;
        super.internalSetup();
        super.producerBaseSetup();
        this.testBinaryProtoToGetTopicsOfNamespaceNonPersistent();
    }

    @Test(timeOut=90000L)
    public void testBinaryProtoToGetTopicsOfNamespaceNonPersistent() throws Exception {
        String key = "BinaryProtoToGetTopics";
        String subscriptionName = "my-ex-subscription-" + key;
        String topicName1 = "persistent://my-property/my-ns/np-pattern-topic-1-" + key;
        String topicName2 = "persistent://my-property/my-ns/np-pattern-topic-2-" + key;
        String topicName3 = "persistent://my-property/my-ns/np-pattern-topic-3-" + key;
        String topicName4 = "non-persistent://my-property/my-ns/np-pattern-topic-4-" + key;
        Pattern pattern = Pattern.compile("my-property/my-ns/np-pattern-topic.*");
        TenantInfo tenantInfo = this.createDefaultTenantInfo();
        this.admin.tenants().createTenant("prop", tenantInfo);
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 40;
        Producer producer1 = this.pulsarClient.newProducer().topic(topicName1).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer producer2 = this.pulsarClient.newProducer().topic(topicName2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer producer3 = this.pulsarClient.newProducer().topic(topicName3).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer producer4 = this.pulsarClient.newProducer().topic(topicName4).enableBatching(false).create();
        Consumer consumer = this.pulsarClient.newConsumer().topicsPattern(pattern).patternAutoDiscoveryPeriod(2).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscriptionTopicsMode(RegexSubscriptionMode.NonPersistentOnly).subscribe();
        Assert.assertSame((Object)pattern, (Object)((PatternMultiTopicsConsumerImpl)consumer).getPattern());
        List topics = ((PatternMultiTopicsConsumerImpl)consumer).getPartitionedTopics();
        List consumers = ((PatternMultiTopicsConsumerImpl)consumer).getConsumers();
        Assert.assertEquals((int)topics.size(), (int)1);
        Assert.assertEquals((int)consumers.size(), (int)1);
        Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getTopics().size(), (int)1);
        topics.forEach(topic -> log.debug("topic: {}", topic));
        consumers.forEach(c -> log.debug("consumer: {}", (Object)c.getTopic()));
        IntStream.range(0, topics.size()).forEach(index -> Assert.assertEquals((String)((ConsumerImpl)consumers.get(index)).getTopic(), (String)((String)topics.get(index))));
        ((PatternMultiTopicsConsumerImpl)consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
        for (int i = 0; i < totalMessages / 4; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-" + i).getBytes());
            producer4.send((Object)(messagePredicate + "producer4-" + i).getBytes());
        }
        int messageSet = 0;
        Message message = consumer.receive();
        do {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            ++messageSet;
            consumer.acknowledge(message);
            log.debug("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)(totalMessages / 4));
        consumer.unsubscribe();
        consumer.close();
        producer1.close();
        producer2.close();
        producer3.close();
        producer4.close();
    }

    @Test(timeOut=90000L)
    public void testBinaryProtoToGetTopicsOfNamespaceAll() throws Exception {
        String key = "BinaryProtoToGetTopics";
        String subscriptionName = "my-ex-subscription-" + key;
        String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key;
        String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key;
        String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key;
        String topicName4 = "non-persistent://my-property/my-ns/pattern-topic-4-" + key;
        Pattern pattern = Pattern.compile("my-property/my-ns/pattern-topic.*");
        TenantInfo tenantInfo = this.createDefaultTenantInfo();
        this.admin.tenants().createTenant("prop", tenantInfo);
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 40;
        Producer producer1 = this.pulsarClient.newProducer().topic(topicName1).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer producer2 = this.pulsarClient.newProducer().topic(topicName2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer producer3 = this.pulsarClient.newProducer().topic(topicName3).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer producer4 = this.pulsarClient.newProducer().topic(topicName4).enableBatching(false).create();
        Consumer consumer = this.pulsarClient.newConsumer().topicsPattern(pattern).patternAutoDiscoveryPeriod(2).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).subscriptionTopicsMode(RegexSubscriptionMode.AllTopics).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
        Assert.assertSame((Object)pattern, (Object)((PatternMultiTopicsConsumerImpl)consumer).getPattern());
        List topics = ((PatternMultiTopicsConsumerImpl)consumer).getPartitionedTopics();
        List consumers = ((PatternMultiTopicsConsumerImpl)consumer).getConsumers();
        Assert.assertEquals((int)topics.size(), (int)7);
        Assert.assertEquals((int)consumers.size(), (int)7);
        Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getTopics().size(), (int)4);
        topics.forEach(topic -> log.debug("topic: {}", topic));
        consumers.forEach(c -> log.debug("consumer: {}", (Object)c.getTopic()));
        IntStream.range(0, topics.size()).forEach(index -> Assert.assertEquals((String)((ConsumerImpl)consumers.get(index)).getTopic(), (String)((String)topics.get(index))));
        ((PatternMultiTopicsConsumerImpl)consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
        for (int i = 0; i < totalMessages / 4; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-" + i).getBytes());
            producer4.send((Object)(messagePredicate + "producer4-" + i).getBytes());
        }
        int messageSet = 0;
        Message message = consumer.receive();
        do {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            ++messageSet;
            consumer.acknowledge(message);
            log.debug("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)totalMessages);
        consumer.unsubscribe();
        consumer.close();
        producer1.close();
        producer2.close();
        producer3.close();
        producer4.close();
    }

    /*
     * Unable to fully structure code
     */
    @Test(timeOut=90000L)
    public void testTopicsPatternFilter() {
        topicName1 = "persistent://my-property/my-ns/pattern-topic-1";
        topicName2 = "persistent://my-property/my-ns/pattern-topic-2";
        topicName3 = "persistent://my-property/my-ns/hello-3";
        topicName4 = "non-persistent://my-property/my-ns/hello-4";
        topicsNames = Lists.newArrayList((Object[])new String[]{topicName1, topicName2, topicName3, topicName4});
        result1 = PulsarClientImpl.topicsPatternFilter((List)topicsNames, (Pattern)(pattern1 = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*")));
        Assert.assertTrue((boolean)(result1.size() == 2 && result1.contains(topicName1) != false && result1.contains(topicName2) != false));
        pattern2 = Pattern.compile("persistent://my-property/my-ns/.*");
        result2 = PulsarClientImpl.topicsPatternFilter((List)topicsNames, (Pattern)pattern2);
        if (result2.size() != 4) ** GOTO lbl-1000
        if (Stream.of(new String[]{topicName1, topicName2, topicName3, topicName4}).allMatch((Predicate<String>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Z, contains(java.lang.Object ), (Ljava/lang/String;)Z)((List)result2))) {
            v0 = true;
        } else lbl-1000:
        // 2 sources

        {
            v0 = false;
        }
        Assert.assertTrue((boolean)v0);
    }

    @Test(timeOut=90000L)
    public void testTopicsListMinus() {
        String topicName1 = "persistent://my-property/my-ns/pattern-topic-1";
        String topicName2 = "persistent://my-property/my-ns/pattern-topic-2";
        String topicName3 = "persistent://my-property/my-ns/pattern-topic-3";
        String topicName4 = "persistent://my-property/my-ns/pattern-topic-4";
        String topicName5 = "persistent://my-property/my-ns/pattern-topic-5";
        String topicName6 = "persistent://my-property/my-ns/pattern-topic-6";
        ArrayList oldNames = Lists.newArrayList((Object[])new String[]{topicName1, topicName2, topicName3, topicName4});
        ArrayList newNames = Lists.newArrayList((Object[])new String[]{topicName3, topicName4, topicName5, topicName6});
        List addedNames = PatternMultiTopicsConsumerImpl.topicsListsMinus((List)newNames, (List)oldNames);
        List removedNames = PatternMultiTopicsConsumerImpl.topicsListsMinus((List)oldNames, (List)newNames);
        Assert.assertTrue((addedNames.size() == 2 && addedNames.contains(topicName5) && addedNames.contains(topicName6) ? 1 : 0) != 0);
        Assert.assertTrue((removedNames.size() == 2 && removedNames.contains(topicName1) && removedNames.contains(topicName2) ? 1 : 0) != 0);
        List addedNames2 = PatternMultiTopicsConsumerImpl.topicsListsMinus((List)addedNames, (List)removedNames);
        Assert.assertTrue((addedNames2.size() == 2 && addedNames2.contains(topicName5) && addedNames2.contains(topicName6) ? 1 : 0) != 0);
        List addedNames3 = PatternMultiTopicsConsumerImpl.topicsListsMinus((List)addedNames, (List)addedNames);
        Assert.assertEquals((int)addedNames3.size(), (int)0);
        List addedNames4 = PatternMultiTopicsConsumerImpl.topicsListsMinus((List)addedNames2, (List)addedNames3);
        Assert.assertEquals((int)addedNames2.size(), (int)addedNames4.size());
        addedNames4.forEach(name -> Assert.assertTrue((boolean)addedNames2.contains(name)));
        List addedNames5 = PatternMultiTopicsConsumerImpl.topicsListsMinus((List)addedNames3, (List)addedNames2);
        Assert.assertEquals((int)addedNames5.size(), (int)0);
    }

    @Test(timeOut=90000L)
    public void testStartEmptyPatternConsumer() throws Exception {
        String key = "StartEmptyPatternConsumerTest";
        String subscriptionName = "my-ex-subscription-" + key;
        String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key;
        String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key;
        String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key;
        Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
        TenantInfo tenantInfo = this.createDefaultTenantInfo();
        this.admin.tenants().createTenant("prop", tenantInfo);
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        Consumer consumer = this.pulsarClient.newConsumer().topicsPattern(pattern).patternAutoDiscoveryPeriod(2).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertSame((Object)pattern, (Object)((PatternMultiTopicsConsumerImpl)consumer).getPattern());
        Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getPartitionedTopics().size(), (int)5);
        Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getConsumers().size(), (int)5);
        Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getTopics().size(), (int)2);
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 30;
        Producer producer1 = this.pulsarClient.newProducer().topic(topicName1).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer producer2 = this.pulsarClient.newProducer().topic(topicName2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer producer3 = this.pulsarClient.newProducer().topic(topicName3).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        log.debug("recheck topics change");
        PatternMultiTopicsConsumerImpl consumer1 = (PatternMultiTopicsConsumerImpl)consumer;
        consumer1.run(consumer1.getRecheckPatternTimeout());
        Thread.sleep(100L);
        Assert.assertSame((Object)pattern, (Object)((PatternMultiTopicsConsumerImpl)consumer).getPattern());
        Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getPartitionedTopics().size(), (int)6);
        Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getConsumers().size(), (int)6);
        Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getTopics().size(), (int)3);
        for (int i = 0; i < totalMessages / 3; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-" + i).getBytes());
        }
        int messageSet = 0;
        Message message = consumer.receive();
        do {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            ++messageSet;
            consumer.acknowledge(message);
            log.debug("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)totalMessages);
        consumer.unsubscribe();
        consumer.close();
        producer1.close();
        producer2.close();
        producer3.close();
    }

    @Test(timeOut=90000L)
    public void testAutoSubscribePatternConsumer() throws Exception {
        String key = "AutoSubscribePatternConsumer";
        String subscriptionName = "my-ex-subscription-" + key;
        String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key;
        String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key;
        String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key;
        Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
        TenantInfo tenantInfo = this.createDefaultTenantInfo();
        this.admin.tenants().createTenant("prop", tenantInfo);
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 30;
        Producer producer1 = this.pulsarClient.newProducer().topic(topicName1).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer producer2 = this.pulsarClient.newProducer().topic(topicName2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer producer3 = this.pulsarClient.newProducer().topic(topicName3).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topicsPattern(pattern).patternAutoDiscoveryPeriod(2).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue((boolean)(consumer instanceof PatternMultiTopicsConsumerImpl));
        Assert.assertSame((Object)pattern, (Object)((PatternMultiTopicsConsumerImpl)consumer).getPattern());
        Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getPartitionedTopics().size(), (int)6);
        Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getConsumers().size(), (int)6);
        Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getTopics().size(), (int)3);
        for (int i = 0; i < totalMessages / 3; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-" + i).getBytes());
        }
        int messageSet = 0;
        Message message = consumer.receive();
        do {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            ++messageSet;
            consumer.acknowledge(message);
            log.debug("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)totalMessages);
        String topicName4 = "persistent://my-property/my-ns/pattern-topic-4-" + key;
        this.admin.topics().createPartitionedTopic(topicName4, 4);
        Producer producer4 = this.pulsarClient.newProducer().topic(topicName4).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        log.debug("recheck topics change");
        PatternMultiTopicsConsumerImpl consumer1 = (PatternMultiTopicsConsumerImpl)consumer;
        consumer1.run(consumer1.getRecheckPatternTimeout());
        Thread.sleep(100L);
        Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getPartitionedTopics().size(), (int)10);
        Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getConsumers().size(), (int)10);
        Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getTopics().size(), (int)4);
        for (int i = 0; i < totalMessages / 2; ++i) {
            producer3.send((Object)(messagePredicate + "round2-producer4-" + i).getBytes());
            producer4.send((Object)(messagePredicate + "round2-producer4-" + i).getBytes());
        }
        messageSet = 0;
        message = consumer.receive();
        do {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            ++messageSet;
            consumer.acknowledge(message);
            log.debug("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)totalMessages);
        consumer.unsubscribe();
        consumer.close();
        producer1.close();
        producer2.close();
        producer3.close();
        producer4.close();
    }

    @Test(timeOut=90000L)
    public void testAutoUnsubscribePatternConsumer() throws Exception {
        String key = "AutoUnsubscribePatternConsumer";
        String subscriptionName = "my-ex-subscription-" + key;
        String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key;
        String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key;
        String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key;
        Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
        TenantInfo tenantInfo = this.createDefaultTenantInfo();
        this.admin.tenants().createTenant("prop", tenantInfo);
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 30;
        Producer producer1 = this.pulsarClient.newProducer().topic(topicName1).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer producer2 = this.pulsarClient.newProducer().topic(topicName2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer producer3 = this.pulsarClient.newProducer().topic(topicName3).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topicsPattern(pattern).patternAutoDiscoveryPeriod(10, TimeUnit.SECONDS).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue((boolean)(consumer instanceof PatternMultiTopicsConsumerImpl));
        Assert.assertSame((Object)pattern, (Object)((PatternMultiTopicsConsumerImpl)consumer).getPattern());
        Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getPartitionedTopics().size(), (int)6);
        Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getConsumers().size(), (int)6);
        Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getTopics().size(), (int)3);
        for (int i = 0; i < totalMessages / 3; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-" + i).getBytes());
        }
        int messageSet = 0;
        Message message = consumer.receive();
        do {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            ++messageSet;
            consumer.acknowledge(message);
            log.debug("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)totalMessages);
        ArrayList topicNames = Lists.newArrayList((Object[])new String[]{topicName2});
        NamespaceService nss = this.pulsar.getNamespaceService();
        ((NamespaceService)Mockito.doReturn(CompletableFuture.completedFuture(topicNames)).when((Object)nss)).getListOfPersistentTopics(NamespaceName.get((String)"my-property/my-ns"));
        log.debug("recheck topics change");
        PatternMultiTopicsConsumerImpl consumer1 = (PatternMultiTopicsConsumerImpl)consumer;
        consumer1.run(consumer1.getRecheckPatternTimeout());
        Thread.sleep(100L);
        Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getPartitionedTopics().size(), (int)2);
        Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getConsumers().size(), (int)2);
        Assert.assertEquals((int)((PatternMultiTopicsConsumerImpl)consumer).getTopics().size(), (int)1);
        for (int i = 0; i < totalMessages; ++i) {
            producer2.send((Object)(messagePredicate + "round2-producer2-" + i).getBytes());
        }
        messageSet = 0;
        message = consumer.receive();
        do {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            ++messageSet;
            consumer.acknowledge(message);
            log.debug("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)totalMessages);
        consumer.unsubscribe();
        consumer.close();
        producer1.close();
        producer2.close();
        producer3.close();
    }

    @Test
    public void testTopicDeletion() throws Exception {
        String baseTopicName = "persistent://my-property/my-ns/pattern-topic-" + System.currentTimeMillis();
        Pattern pattern = Pattern.compile(baseTopicName + ".*");
        Producer producer1 = this.pulsarClient.newProducer(Schema.STRING).topic(baseTopicName + "-1").create();
        Producer producer2 = this.pulsarClient.newProducer(Schema.STRING).topic(baseTopicName + "-2").create();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topicsPattern(pattern).patternAutoDiscoveryPeriod(1).subscriptionName("sub").subscribe();
        Assert.assertTrue((boolean)(consumer instanceof PatternMultiTopicsConsumerImpl));
        PatternMultiTopicsConsumerImpl consumerImpl = (PatternMultiTopicsConsumerImpl)consumer;
        Assert.assertSame((Object)consumerImpl.getPattern(), (Object)pattern);
        Assert.assertEquals((int)consumerImpl.getTopics().size(), (int)2);
        producer1.send((Object)"msg-1");
        producer1.close();
        Message message = consumer.receive();
        Assert.assertEquals((String)((String)message.getValue()), (String)"msg-1");
        consumer.acknowledge(message);
        this.admin.topics().delete(baseTopicName + "-1", true);
        producer2.send((Object)"msg-2");
        message = consumer.receive();
        Assert.assertEquals((String)((String)message.getValue()), (String)"msg-2");
        consumer.acknowledge(message);
        Assert.assertEquals(this.pulsar.getBrokerService().getTopicIfExists(baseTopicName + "-1").join(), Optional.empty());
        Assert.assertTrue((boolean)((Optional)this.pulsar.getBrokerService().getTopicIfExists(baseTopicName + "-2").join()).isPresent());
    }
}

