/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker-api"})
public class NonPersistentTopicTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(NonPersistentTopicTest.class);
    private final String configClusterName = "r1";

    @DataProvider(name="subscriptionType")
    public Object[][] getSubscriptionType() {
        return new Object[][]{{SubscriptionType.Shared}, {SubscriptionType.Exclusive}};
    }

    @DataProvider(name="loadManager")
    public Object[][] getLoadManager() {
        return new Object[][]{{SimpleLoadManagerImpl.class.getCanonicalName()}, {ModularLoadManagerImpl.class.getCanonicalName()}};
    }

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(timeOut=90000L)
    public void testNonPersistentPartitionsAreNotAutoCreatedWhenThePartitionedTopicDoesNotExist() throws Exception {
        boolean defaultAllowAutoTopicCreation = this.conf.isAllowAutoTopicCreation();
        try {
            this.cleanup();
            this.conf.setAllowAutoTopicCreation(false);
            this.setup();
            String topicPartitionName = "non-persistent://public/default/issue-9173-partition-0";
            Assert.assertThrows(PulsarClientException.TopicDoesNotExistException.class, () -> this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://public/default/issue-9173-partition-0"}).subscriptionName("sub-issue-9173").subscribe());
            Assert.assertThrows(PulsarClientException.TopicDoesNotExistException.class, () -> this.pulsarClient.newProducer().topic("non-persistent://public/default/issue-9173-partition-0").create());
        }
        finally {
            this.conf.setAllowAutoTopicCreation(defaultAllowAutoTopicCreation);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=90000L)
    public void testAutoCreateNonPersistentPartitionsWhenThePartitionedTopicExists() throws Exception {
        boolean defaultAllowAutoTopicCreation = this.conf.isAllowAutoTopicCreation();
        try {
            this.cleanup();
            this.conf.setAllowAutoTopicCreation(false);
            this.setup();
            String topic = "non-persistent://public/default/issue-9173";
            this.admin.topics().createPartitionedTopic("non-persistent://public/default/issue-9173", 3);
            MultiTopicsConsumerImpl consumer = (MultiTopicsConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://public/default/issue-9173"}).subscriptionName("sub-issue-9173").subscribe();
            Assert.assertEquals((int)consumer.getConsumers().size(), (int)3);
            PartitionedProducerImpl producer = (PartitionedProducerImpl)this.pulsarClient.newProducer().topic("non-persistent://public/default/issue-9173").create();
            Assert.assertEquals((int)producer.getProducers().size(), (int)3);
            consumer.close();
            producer.close();
        }
        finally {
            this.conf.setAllowAutoTopicCreation(defaultAllowAutoTopicCreation);
        }
    }

    @Test(dataProvider="subscriptionType")
    public void testNonPersistentTopic(SubscriptionType type) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topic = "non-persistent://my-property/my-ns/unacked-topic";
        ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").subscriptionType(type).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/unacked-topic").create();
        int totalProduceMsg = 500;
        for (int i = 0; i < totalProduceMsg; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            Thread.sleep(10L);
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < totalProduceMsg && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
            consumer.acknowledge(msg);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        Assert.assertEquals((int)messageSet.size(), (int)totalProduceMsg);
        producer.close();
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="subscriptionType")
    public void testPartitionedNonPersistentTopic(SubscriptionType type) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topic = "non-persistent://my-property/my-ns/partitioned-topic";
        this.admin.topics().createPartitionedTopic("non-persistent://my-property/my-ns/partitioned-topic", 5);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/partitioned-topic"}).subscriptionName("subscriber-1").subscriptionType(type).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/partitioned-topic").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        int totalProduceMsg = 500;
        for (int i = 0; i < totalProduceMsg; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            Thread.sleep(10L);
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < totalProduceMsg && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
            consumer.acknowledge(msg);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        Assert.assertEquals((int)messageSet.size(), (int)totalProduceMsg);
        producer.close();
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="subscriptionType")
    public void testPartitionedNonPersistentTopicWithTcpLookup(SubscriptionType type) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int numPartitions = 5;
        String topic = "non-persistent://my-property/my-ns/partitioned-topic";
        this.admin.topics().createPartitionedTopic("non-persistent://my-property/my-ns/partitioned-topic", 5);
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Consumer consumer = client.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/partitioned-topic"}).subscriptionName("subscriber-1").subscriptionType(type).subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/partitioned-topic").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            for (int i = 0; i < 5; ++i) {
                TopicName partition = TopicName.get((String)"non-persistent://my-property/my-ns/partitioned-topic").getPartition(i);
                Assert.assertNotNull((Object)this.pulsar.getBrokerService().getTopicReference(partition.toString()));
            }
            int totalProduceMsg = 500;
            for (int i = 0; i < totalProduceMsg; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
                Thread.sleep(10L);
            }
            Message msg = null;
            HashSet messageSet = Sets.newHashSet();
            for (int i = 0; i < totalProduceMsg && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                consumer.acknowledge(msg);
                String receivedMessage = new String(msg.getData());
                log.debug("Received message: [{}]", (Object)receivedMessage);
                String expectedMessage = "my-message-" + i;
                this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
            }
            Assert.assertEquals((int)messageSet.size(), (int)totalProduceMsg);
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    @Test(dataProvider="subscriptionType")
    public void testConsumerInternalQueueMaxOut(SubscriptionType type) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topic = "non-persistent://my-property/my-ns/unacked-topic";
        int queueSize = 10;
        ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/unacked-topic"}).receiverQueueSize(10).subscriptionName("subscriber-1").subscriptionType(type).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/unacked-topic").create();
        int totalProduceMsg = 50;
        for (int i = 0; i < totalProduceMsg; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            Thread.sleep(10L);
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < totalProduceMsg && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
            consumer.acknowledge(msg);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        Assert.assertEquals((int)messageSet.size(), (int)10);
        producer.close();
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProducerRateLimit() throws Exception {
        int defaultNonPersistentMessageRate = this.conf.getMaxConcurrentNonPersistentMessagePerConnection();
        try {
            String topic = "non-persistent://my-property/my-ns/unacked-topic";
            this.conf.setMaxConcurrentNonPersistentMessagePerConnection(1);
            this.stopBroker();
            this.startBroker();
            ExecutorService executor = Executors.newFixedThreadPool(5);
            try {
                AtomicBoolean failed = new AtomicBoolean(false);
                Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").subscribe();
                Producer producer = this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/unacked-topic").create();
                byte[] msgData = "testData".getBytes();
                int totalProduceMessages = 10;
                CountDownLatch latch = new CountDownLatch(10);
                for (int i = 0; i < 10; ++i) {
                    executor.submit(() -> {
                        try {
                            producer.send((Object)msgData);
                        }
                        catch (Exception e) {
                            log.error("Failed to send message", (Throwable)e);
                            failed.set(true);
                        }
                        latch.countDown();
                    });
                }
                latch.await();
                Message msg = null;
                HashSet messageSet = Sets.newHashSet();
                for (int i = 0; i < 10 && (msg = consumer.receive(500, TimeUnit.MILLISECONDS)) != null; ++i) {
                    messageSet.add(new String(msg.getData()));
                }
                Assert.assertFalse((boolean)failed.get());
                Assert.assertNotEquals((Object)messageSet.size(), (Object)10);
                producer.close();
            }
            finally {
                if (Collections.singletonList(executor).get(0) != null) {
                    executor.shutdownNow();
                }
            }
        }
        finally {
            this.conf.setMaxConcurrentNonPersistentMessagePerConnection(defaultNonPersistentMessageRate);
        }
    }

    @Test
    public void testMultipleSubscription() throws Exception {
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topic = "non-persistent://my-property/my-ns/unacked-topic";
        ConsumerImpl consumer1Shared = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-shared").subscriptionType(SubscriptionType.Shared).subscribe();
        ConsumerImpl consumer2Shared = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-shared").subscriptionType(SubscriptionType.Shared).subscribe();
        ConsumerImpl consumer1FailOver = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-fo").subscriptionType(SubscriptionType.Failover).subscribe();
        ConsumerImpl consumer2FailOver = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-fo").subscriptionType(SubscriptionType.Failover).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/unacked-topic").create();
        int totalProduceMsg = 500;
        for (int i2 = 0; i2 < totalProduceMsg; ++i2) {
            String message = "my-message-" + i2;
            producer.send((Object)message.getBytes());
            Thread.sleep(10L);
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (i = 0; i < totalProduceMsg && (msg = consumer1Shared.receive(500, TimeUnit.MILLISECONDS)) != null; ++i) {
            messageSet.add(new String(msg.getData()));
        }
        for (i = 0; i < totalProduceMsg && (msg = consumer2Shared.receive(500, TimeUnit.MILLISECONDS)) != null; ++i) {
            messageSet.add(new String(msg.getData()));
        }
        Assert.assertEquals((int)messageSet.size(), (int)totalProduceMsg);
        messageSet.clear();
        for (i = 0; i < totalProduceMsg && (msg = consumer1FailOver.receive(500, TimeUnit.MILLISECONDS)) != null; ++i) {
            messageSet.add(new String(msg.getData()));
        }
        for (i = 0; i < totalProduceMsg && (msg = consumer2FailOver.receive(500, TimeUnit.MILLISECONDS)) != null; ++i) {
            messageSet.add(new String(msg.getData()));
        }
        Assert.assertEquals((int)messageSet.size(), (int)totalProduceMsg);
        producer.close();
        consumer1Shared.close();
        consumer2Shared.close();
        consumer1FailOver.close();
        consumer2FailOver.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testTopicStats() throws Exception {
        String topicName = "non-persistent://my-property/my-ns/unacked-topic";
        String subName = "non-persistent";
        int timeWaitToSync = 100;
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/unacked-topic"}).subscriptionType(SubscriptionType.Shared).subscriptionName("non-persistent").subscribe();
        Thread.sleep(100L);
        NonPersistentTopic topicRef = (NonPersistentTopic)this.pulsar.getBrokerService().getTopicReference("non-persistent://my-property/my-ns/unacked-topic").get();
        Assert.assertNotNull((Object)topicRef);
        this.rolloverPerIntervalStats(this.pulsar);
        NonPersistentTopicStats stats = topicRef.getStats(false, false);
        SubscriptionStats subStats = (SubscriptionStats)stats.getSubscriptions().values().iterator().next();
        Assert.assertEquals((int)stats.getSubscriptions().keySet().size(), (int)1);
        Assert.assertEquals((int)subStats.consumers.size(), (int)1);
        Producer producer = this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/unacked-topic").create();
        Thread.sleep(100L);
        int totalProducedMessages = 100;
        for (int i = 0; i < totalProducedMessages; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Thread.sleep(100L);
        this.rolloverPerIntervalStats(this.pulsar);
        stats = topicRef.getStats(false, false);
        subStats = (SubscriptionStats)stats.getSubscriptions().values().iterator().next();
        Assert.assertTrue((subStats.msgRateOut > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((int)subStats.consumers.size(), (int)1);
        Assert.assertTrue((subStats.msgThroughputOut > 0.0 ? 1 : 0) != 0);
        Assert.assertTrue((((ConsumerStats)subStats.consumers.get((int)0)).msgRateOut > 0.0 ? 1 : 0) != 0);
        Assert.assertTrue((((ConsumerStats)subStats.consumers.get((int)0)).msgThroughputOut > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((double)subStats.msgRateRedeliver, (double)0.0);
        producer.close();
        consumer.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReplicator() throws Exception {
        ReplicationClusterManager replication = new ReplicationClusterManager();
        replication.setupReplicationCluster();
        try {
            String globalTopicName = "non-persistent://pulsar/global/ns/nonPersistentTopic";
            int timeWaitToSync = 100;
            PulsarClient client1 = PulsarClient.builder().serviceUrl(replication.url1.toString()).build();
            try {
                PulsarClient client2 = PulsarClient.builder().serviceUrl(replication.url2.toString()).build();
                try {
                    PulsarClient client3 = PulsarClient.builder().serviceUrl(replication.url3.toString()).build();
                    try {
                        String receivedMessage;
                        int i;
                        ConsumerImpl consumer1 = (ConsumerImpl)client1.newConsumer().topic(new String[]{"non-persistent://pulsar/global/ns/nonPersistentTopic"}).subscriptionName("subscriber-1").subscribe();
                        ConsumerImpl consumer2 = (ConsumerImpl)client1.newConsumer().topic(new String[]{"non-persistent://pulsar/global/ns/nonPersistentTopic"}).subscriptionName("subscriber-2").subscribe();
                        ConsumerImpl repl2Consumer = (ConsumerImpl)client2.newConsumer().topic(new String[]{"non-persistent://pulsar/global/ns/nonPersistentTopic"}).subscriptionName("subscriber-1").subscribe();
                        ConsumerImpl repl3Consumer = (ConsumerImpl)client3.newConsumer().topic(new String[]{"non-persistent://pulsar/global/ns/nonPersistentTopic"}).subscriptionName("subscriber-1").subscribe();
                        Producer producer = client1.newProducer().topic("non-persistent://pulsar/global/ns/nonPersistentTopic").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                        Thread.sleep(100L);
                        PulsarService replicationPulasr = replication.pulsar1;
                        NonPersistentTopic topicRef = (NonPersistentTopic)replication.pulsar1.getBrokerService().getTopicReference("non-persistent://pulsar/global/ns/nonPersistentTopic").get();
                        NonPersistentReplicator replicatorR2 = (NonPersistentReplicator)topicRef.getPersistentReplicator("r2");
                        NonPersistentReplicator replicatorR3 = (NonPersistentReplicator)topicRef.getPersistentReplicator("r3");
                        Assert.assertNotNull((Object)topicRef);
                        Assert.assertNotNull((Object)replicatorR2);
                        Assert.assertNotNull((Object)replicatorR3);
                        this.rolloverPerIntervalStats(replicationPulasr);
                        NonPersistentTopicStats stats = topicRef.getStats(false, false);
                        SubscriptionStats subStats = (SubscriptionStats)stats.getSubscriptions().values().iterator().next();
                        Assert.assertEquals((int)stats.getSubscriptions().keySet().size(), (int)2);
                        Assert.assertEquals((int)subStats.consumers.size(), (int)1);
                        Thread.sleep(100L);
                        int totalProducedMessages = 100;
                        for (int i2 = 0; i2 < totalProducedMessages; ++i2) {
                            String message = "my-message-" + i2;
                            producer.send((Object)message.getBytes());
                        }
                        Message msg = null;
                        HashSet messageSet = Sets.newHashSet();
                        for (i = 0; i < totalProducedMessages && (msg = consumer1.receive(300, TimeUnit.MILLISECONDS)) != null; ++i) {
                            receivedMessage = new String(msg.getData());
                            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, "my-message-" + i);
                        }
                        Assert.assertEquals((int)messageSet.size(), (int)totalProducedMessages);
                        messageSet.clear();
                        for (i = 0; i < totalProducedMessages && (msg = consumer2.receive(300, TimeUnit.MILLISECONDS)) != null; ++i) {
                            receivedMessage = new String(msg.getData());
                            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, "my-message-" + i);
                        }
                        Assert.assertEquals((int)messageSet.size(), (int)totalProducedMessages);
                        messageSet.clear();
                        for (i = 0; i < totalProducedMessages && (msg = repl2Consumer.receive(300, TimeUnit.MILLISECONDS)) != null; ++i) {
                            receivedMessage = new String(msg.getData());
                            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, "my-message-" + i);
                        }
                        Assert.assertEquals((int)messageSet.size(), (int)totalProducedMessages);
                        messageSet.clear();
                        for (i = 0; i < totalProducedMessages && (msg = repl3Consumer.receive(300, TimeUnit.MILLISECONDS)) != null; ++i) {
                            receivedMessage = new String(msg.getData());
                            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, "my-message-" + i);
                        }
                        Assert.assertEquals((int)messageSet.size(), (int)totalProducedMessages);
                        Thread.sleep(100L);
                        this.rolloverPerIntervalStats(replicationPulasr);
                        stats = topicRef.getStats(false, false);
                        subStats = (SubscriptionStats)stats.getSubscriptions().values().iterator().next();
                        Assert.assertTrue((subStats.msgRateOut > 0.0 ? 1 : 0) != 0);
                        Assert.assertEquals((int)subStats.consumers.size(), (int)1);
                        Assert.assertTrue((subStats.msgThroughputOut > 0.0 ? 1 : 0) != 0);
                        Assert.assertTrue((((ConsumerStats)subStats.consumers.get((int)0)).msgRateOut > 0.0 ? 1 : 0) != 0);
                        Assert.assertTrue((((ConsumerStats)subStats.consumers.get((int)0)).msgThroughputOut > 0.0 ? 1 : 0) != 0);
                        Assert.assertEquals((double)subStats.msgRateRedeliver, (double)0.0);
                        producer.close();
                        consumer1.close();
                        repl2Consumer.close();
                        repl3Consumer.close();
                    }
                    finally {
                        if (Collections.singletonList(client3).get(0) != null) {
                            client3.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(client2).get(0) != null) {
                        client2.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(client1).get(0) != null) {
                    client1.close();
                }
            }
        }
        finally {
            replication.shutdownReplicationCluster();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="loadManager")
    public void testLoadManagerAssignmentForNonPersistentTestAssignment(String loadManagerName) throws Exception {
        String namespace = "my-property/my-ns";
        String topicName = "non-persistent://my-property/my-ns/loadManager";
        String defaultLoadManagerName = this.conf.getLoadManagerClassName();
        boolean defaultENableNonPersistentTopic = this.conf.isEnableNonPersistentTopics();
        try {
            this.stopBroker();
            this.conf.setEnableNonPersistentTopics(false);
            this.conf.setLoadManagerClassName(loadManagerName);
            this.startBroker();
            Field field = PulsarService.class.getDeclaredField("loadManager");
            field.setAccessible(true);
            AtomicReference loadManagerRef = (AtomicReference)field.get(this.pulsar);
            LoadManager manager = LoadManager.create((PulsarService)this.pulsar);
            manager.start();
            LoadManager oldLoadManager = loadManagerRef.getAndSet(manager);
            oldLoadManager.stop();
            NamespaceBundle fdqn = this.pulsar.getNamespaceService().getBundle(TopicName.get((String)"non-persistent://my-property/my-ns/loadManager"));
            LoadManager loadManager = (LoadManager)this.pulsar.getLoadManager().get();
            ResourceUnit broker = null;
            try {
                broker = (ResourceUnit)loadManager.getLeastLoaded((ServiceUnitId)fdqn).get();
            }
            catch (Exception exception) {
                // empty catch block
            }
            Assert.assertNull(broker);
            try {
                Producer producer = (Producer)this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/loadManager").createAsync().get(1L, TimeUnit.SECONDS);
                producer.close();
                Assert.fail((String)"topic loading should have failed");
            }
            catch (Exception exception) {
                // empty catch block
            }
            Assert.assertFalse((boolean)this.pulsar.getBrokerService().getTopicReference("non-persistent://my-property/my-ns/loadManager").isPresent());
        }
        finally {
            this.conf.setEnableNonPersistentTopics(defaultENableNonPersistentTopic);
            this.conf.setLoadManagerClassName(defaultLoadManagerName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNonPersistentTopicUnderPersistentNamespace() throws Exception {
        String namespace = "my-property/my-ns";
        String topicName = "non-persistent://my-property/my-ns/persitentNamespace";
        boolean defaultENableNonPersistentTopic = this.conf.isEnableNonPersistentTopics();
        try {
            this.conf.setEnableNonPersistentTopics(false);
            this.stopBroker();
            this.startBroker();
            try {
                Producer producer = (Producer)this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/persitentNamespace").createAsync().get(1L, TimeUnit.SECONDS);
                producer.close();
                Assert.fail((String)"topic loading should have failed");
            }
            catch (Exception exception) {
                // empty catch block
            }
            Assert.assertFalse((boolean)this.pulsar.getBrokerService().getTopicReference("non-persistent://my-property/my-ns/persitentNamespace").isPresent());
        }
        finally {
            this.conf.setEnableNonPersistentTopics(defaultENableNonPersistentTopic);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="loadManager")
    public void testNonPersistentBrokerModeRejectPersistentTopic(String loadManagerName) throws Exception {
        String namespace = "my-property/my-ns";
        String topicName = "persistent://my-property/my-ns/loadManager";
        String defaultLoadManagerName = this.conf.getLoadManagerClassName();
        boolean defaultEnablePersistentTopic = this.conf.isEnablePersistentTopics();
        boolean defaultEnableNonPersistentTopic = this.conf.isEnableNonPersistentTopics();
        try {
            this.stopBroker();
            this.conf.setEnableNonPersistentTopics(true);
            this.conf.setEnablePersistentTopics(false);
            this.conf.setLoadManagerClassName(loadManagerName);
            this.startBroker();
            Field field = PulsarService.class.getDeclaredField("loadManager");
            field.setAccessible(true);
            AtomicReference loadManagerRef = (AtomicReference)field.get(this.pulsar);
            LoadManager manager = LoadManager.create((PulsarService)this.pulsar);
            manager.start();
            LoadManager oldLoadManager = loadManagerRef.getAndSet(manager);
            oldLoadManager.stop();
            NamespaceBundle fdqn = this.pulsar.getNamespaceService().getBundle(TopicName.get((String)"persistent://my-property/my-ns/loadManager"));
            LoadManager loadManager = (LoadManager)this.pulsar.getLoadManager().get();
            ResourceUnit broker = null;
            try {
                broker = (ResourceUnit)loadManager.getLeastLoaded((ServiceUnitId)fdqn).get();
            }
            catch (Exception exception) {
                // empty catch block
            }
            Assert.assertNull(broker);
            try {
                Producer producer = (Producer)this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/loadManager").createAsync().get(1L, TimeUnit.SECONDS);
                producer.close();
                Assert.fail((String)"topic loading should have failed");
            }
            catch (Exception exception) {
                // empty catch block
            }
            Assert.assertFalse((boolean)this.pulsar.getBrokerService().getTopicReference("persistent://my-property/my-ns/loadManager").isPresent());
        }
        finally {
            this.conf.setEnablePersistentTopics(defaultEnablePersistentTopic);
            this.conf.setEnableNonPersistentTopics(defaultEnableNonPersistentTopic);
            this.conf.setLoadManagerClassName(defaultLoadManagerName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMsgDropStat() throws Exception {
        int defaultNonPersistentMessageRate = this.conf.getMaxConcurrentNonPersistentMessagePerConnection();
        try {
            String topicName = "non-persistent://my-property/my-ns/stats-topic";
            this.conf.setMaxConcurrentNonPersistentMessagePerConnection(1);
            this.stopBroker();
            this.startBroker();
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/stats-topic"}).subscriptionName("subscriber-1").receiverQueueSize(1).subscribe();
            Consumer consumer2 = this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/stats-topic"}).subscriptionName("subscriber-2").receiverQueueSize(1).subscriptionType(SubscriptionType.Shared).subscribe();
            ProducerImpl producer = (ProducerImpl)this.pulsarClient.newProducer().topic("non-persistent://my-property/my-ns/stats-topic").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            ExecutorService executor = Executors.newFixedThreadPool(5);
            try {
                byte[] msgData = "testData".getBytes();
                int totalProduceMessages = 200;
                CountDownLatch latch = new CountDownLatch(200);
                for (int i = 0; i < 200; ++i) {
                    executor.submit(() -> producer.sendAsync((Object)msgData).handle((msg, e) -> {
                        latch.countDown();
                        return null;
                    }));
                }
                latch.await();
                NonPersistentTopic topic = (NonPersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("non-persistent://my-property/my-ns/stats-topic").get();
                this.pulsar.getBrokerService().updateRates();
                NonPersistentTopicStats stats = topic.getStats(false, false);
                NonPersistentPublisherStats npStats = (NonPersistentPublisherStats)stats.getPublishers().get(0);
                NonPersistentSubscriptionStats sub1Stats = (NonPersistentSubscriptionStats)stats.getSubscriptions().get("subscriber-1");
                NonPersistentSubscriptionStats sub2Stats = (NonPersistentSubscriptionStats)stats.getSubscriptions().get("subscriber-2");
                Assert.assertTrue((npStats.msgDropRate > 0.0 ? 1 : 0) != 0);
                Assert.assertTrue((sub1Stats.msgDropRate > 0.0 ? 1 : 0) != 0);
                Assert.assertTrue((sub2Stats.msgDropRate > 0.0 ? 1 : 0) != 0);
                producer.close();
                consumer.close();
                consumer2.close();
            }
            finally {
                if (Collections.singletonList(executor).get(0) != null) {
                    executor.shutdownNow();
                }
            }
        }
        finally {
            this.conf.setMaxConcurrentNonPersistentMessagePerConnection(defaultNonPersistentMessageRate);
        }
    }

    private void rolloverPerIntervalStats(PulsarService pulsar) {
        try {
            pulsar.getExecutor().submit(() -> pulsar.getBrokerService().updateRates()).get();
        }
        catch (Exception e) {
            log.error("Stats executor error", (Throwable)e);
        }
    }

    class ReplicationClusterManager {
        URL url1;
        PulsarService pulsar1;
        BrokerService ns1;
        PulsarAdmin admin1;
        LocalBookkeeperEnsemble bkEnsemble1;
        URL url2;
        ServiceConfiguration config2;
        PulsarService pulsar2;
        BrokerService ns2;
        PulsarAdmin admin2;
        LocalBookkeeperEnsemble bkEnsemble2;
        URL url3;
        ServiceConfiguration config3;
        PulsarService pulsar3;
        BrokerService ns3;
        PulsarAdmin admin3;
        LocalBookkeeperEnsemble bkEnsemble3;
        ZookeeperServerTest globalZkS;
        ExecutorService executor = new ThreadPoolExecutor(5, 20, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;

        ReplicationClusterManager() {
        }

        public int getBrokerServicePurgeInactiveFrequency() {
            return 60;
        }

        public boolean isBrokerServicePurgeInactiveTopic() {
            return false;
        }

        void setupReplicationCluster() throws Exception {
            log.info("--- Starting ReplicatorTestBase::setup ---");
            this.globalZkS = new ZookeeperServerTest(0);
            this.globalZkS.start();
            this.bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
            this.bkEnsemble1.start();
            ServiceConfiguration config1 = new ServiceConfiguration();
            config1.setClusterName("r1");
            config1.setAdvertisedAddress("localhost");
            config1.setWebServicePort(Optional.of(0));
            config1.setZookeeperServers("127.0.0.1:" + this.bkEnsemble1.getZookeeperPort());
            config1.setConfigurationStoreServers("127.0.0.1:" + this.globalZkS.getZookeeperPort() + "/foo");
            config1.setBrokerDeleteInactiveTopicsEnabled(this.isBrokerServicePurgeInactiveTopic());
            config1.setBrokerDeleteInactiveTopicsFrequencySeconds(this.inSec(this.getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
            config1.setBrokerShutdownTimeoutMs(0L);
            config1.setBrokerServicePort(Optional.of(0));
            config1.setBacklogQuotaCheckIntervalInSeconds(5);
            config1.setAllowAutoTopicCreationType("non-partitioned");
            this.pulsar1 = new PulsarService(config1);
            this.pulsar1.start();
            this.ns1 = this.pulsar1.getBrokerService();
            this.url1 = new URL(this.pulsar1.getWebServiceAddress());
            this.admin1 = PulsarAdmin.builder().serviceHttpUrl(this.url1.toString()).build();
            this.bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
            this.bkEnsemble2.start();
            this.config2 = new ServiceConfiguration();
            this.config2.setClusterName("r2");
            this.config2.setWebServicePort(Optional.of(0));
            this.config2.setAdvertisedAddress("localhost");
            this.config2.setZookeeperServers("127.0.0.1:" + this.bkEnsemble2.getZookeeperPort());
            this.config2.setConfigurationStoreServers("127.0.0.1:" + this.globalZkS.getZookeeperPort() + "/foo");
            this.config2.setBrokerDeleteInactiveTopicsEnabled(this.isBrokerServicePurgeInactiveTopic());
            this.config2.setBrokerDeleteInactiveTopicsFrequencySeconds(this.inSec(this.getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
            this.config2.setBrokerShutdownTimeoutMs(0L);
            this.config2.setBrokerServicePort(Optional.of(0));
            this.config2.setBacklogQuotaCheckIntervalInSeconds(5);
            this.config2.setAllowAutoTopicCreationType("non-partitioned");
            this.pulsar2 = new PulsarService(this.config2);
            this.pulsar2.start();
            this.ns2 = this.pulsar2.getBrokerService();
            this.url2 = new URL(this.pulsar2.getWebServiceAddress());
            this.admin2 = PulsarAdmin.builder().serviceHttpUrl(this.url2.toString()).build();
            this.bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
            this.bkEnsemble3.start();
            this.config3 = new ServiceConfiguration();
            this.config3.setClusterName("r3");
            this.config3.setWebServicePort(Optional.of(0));
            this.config3.setAdvertisedAddress("localhost");
            this.config3.setZookeeperServers("127.0.0.1:" + this.bkEnsemble3.getZookeeperPort());
            this.config3.setConfigurationStoreServers("127.0.0.1:" + this.globalZkS.getZookeeperPort() + "/foo");
            this.config3.setBrokerDeleteInactiveTopicsEnabled(this.isBrokerServicePurgeInactiveTopic());
            this.config3.setBrokerDeleteInactiveTopicsFrequencySeconds(this.inSec(this.getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
            this.config3.setBrokerShutdownTimeoutMs(0L);
            this.config3.setBrokerServicePort(Optional.of(0));
            this.config3.setAllowAutoTopicCreationType("non-partitioned");
            this.pulsar3 = new PulsarService(this.config3);
            this.pulsar3.start();
            this.ns3 = this.pulsar3.getBrokerService();
            this.url3 = new URL(this.pulsar3.getWebServiceAddress());
            this.admin3 = PulsarAdmin.builder().serviceHttpUrl(this.url3.toString()).build();
            this.admin1.clusters().createCluster("r1", new ClusterData(this.url1.toString(), null, this.pulsar1.getSafeBrokerServiceUrl(), this.pulsar1.getBrokerServiceUrlTls()));
            this.admin1.clusters().createCluster("r2", new ClusterData(this.url2.toString(), null, this.pulsar2.getSafeBrokerServiceUrl(), this.pulsar1.getBrokerServiceUrlTls()));
            this.admin1.clusters().createCluster("r3", new ClusterData(this.url3.toString(), null, this.pulsar3.getSafeBrokerServiceUrl(), this.pulsar1.getBrokerServiceUrlTls()));
            this.admin1.clusters().createCluster("global", new ClusterData("http://global:8080"));
            this.admin1.tenants().createTenant("pulsar", new TenantInfo((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2", "appid3"}), (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2", "r3"})));
            this.admin1.namespaces().createNamespace("pulsar/global/ns");
            this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2", "r3"}));
            Assert.assertEquals((String)this.admin2.clusters().getCluster("r1").getServiceUrl(), (String)this.url1.toString());
            Assert.assertEquals((String)this.admin2.clusters().getCluster("r2").getServiceUrl(), (String)this.url2.toString());
            Assert.assertEquals((String)this.admin2.clusters().getCluster("r3").getServiceUrl(), (String)this.url3.toString());
            Assert.assertEquals((String)this.admin2.clusters().getCluster("r1").getBrokerServiceUrl(), (String)this.pulsar1.getSafeBrokerServiceUrl());
            Assert.assertEquals((String)this.admin2.clusters().getCluster("r2").getBrokerServiceUrl(), (String)this.pulsar2.getSafeBrokerServiceUrl());
            Assert.assertEquals((String)this.admin2.clusters().getCluster("r3").getBrokerServiceUrl(), (String)this.pulsar3.getSafeBrokerServiceUrl());
            Thread.sleep(100L);
            log.info("--- ReplicatorTestBase::setup completed ---");
        }

        private int inSec(int time, TimeUnit unit) {
            return (int)TimeUnit.SECONDS.convert(time, unit);
        }

        void shutdownReplicationCluster() throws Exception {
            log.info("--- Shutting down ---");
            this.executor.shutdownNow();
            this.admin1.close();
            this.admin2.close();
            this.admin3.close();
            this.pulsar3.close();
            this.ns3.close();
            this.pulsar2.close();
            this.ns2.close();
            this.pulsar1.close();
            this.ns1.close();
            this.bkEnsemble1.stop();
            this.bkEnsemble2.stop();
            this.bkEnsemble3.stop();
            this.globalZkS.stop();
        }
    }
}

