/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"flaky"})
public class PersistentTopicE2ETest
extends BrokerTestBase {
    private final List<AutoCloseable> closeables = new ArrayList<AutoCloseable>();

    @Override
    @BeforeMethod(alwaysRun=true)
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        for (AutoCloseable closeable : this.closeables) {
            try {
                closeable.close();
            }
            catch (Exception exception) {}
        }
        this.closeables.clear();
        super.internalCleanup();
    }

    @Test
    public void testSimpleProducerEvents() throws Exception {
        String topicName = "persistent://prop/ns-abc/topic0";
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic0").get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((int)topicRef.getProducers().size(), (int)1);
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        this.rolloverPerIntervalStats();
        Assert.assertTrue((((Producer)topicRef.getProducers().values().iterator().next()).getStats().msgRateIn > 0.0 ? 1 : 0) != 0);
        producer.close();
        Thread.sleep(100L);
        Assert.assertEquals((int)topicRef.getProducers().size(), (int)0);
    }

    @Test
    public void testSimpleConsumerEvents() throws Exception {
        int i;
        String topicName = "persistent://prop/ns-abc/topic1";
        String subName = "sub1";
        int numMsgs = 10;
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic1"}).subscriptionName("sub1").subscribe();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic1").get();
        PersistentSubscription subRef = topicRef.getSubscription("sub1");
        Assert.assertNotNull((Object)topicRef);
        Assert.assertNotNull((Object)subRef);
        Assert.assertTrue((boolean)subRef.getDispatcher().isConsumerConnected());
        Thread.sleep(100L);
        Assert.assertEquals((int)this.getAvailablePermits(subRef), (int)1000);
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i2 = 0; i2 < 20; ++i2) {
            String message = "my-message-" + i2;
            producer.send((Object)message.getBytes());
        }
        Assert.assertTrue((boolean)subRef.getDispatcher().isConsumerConnected());
        this.rolloverPerIntervalStats();
        Assert.assertEquals((long)subRef.getNumberOfEntriesInBacklog(false), (long)20L);
        Thread.sleep(100L);
        Assert.assertEquals((int)this.getAvailablePermits(subRef), (int)980);
        Message msg = null;
        for (i = 0; i < 10; ++i) {
            msg = consumer.receive();
            Assert.assertEquals((String)new String(msg.getData()), (String)("my-message-" + i));
            consumer.acknowledge(msg);
        }
        this.rolloverPerIntervalStats();
        Thread.sleep(100L);
        Assert.assertEquals((long)subRef.getNumberOfEntriesInBacklog(false), (long)10L);
        for (i = 0; i < 10; ++i) {
            msg = consumer.receive();
            if (i != 9) continue;
            consumer.acknowledgeCumulative(msg);
        }
        this.rolloverPerIntervalStats();
        Thread.sleep(100L);
        Assert.assertEquals((long)subRef.getNumberOfEntriesInBacklog(false), (long)0L);
        consumer.unsubscribe();
        consumer.close();
        try {
            consumer.unsubscribe();
            Assert.fail((String)"Should have failed");
        }
        catch (PulsarClientException.AlreadyClosedException alreadyClosedException) {
            // empty catch block
        }
        Thread.sleep(100L);
        subRef = topicRef.getSubscription("sub1");
        Assert.assertNull((Object)subRef);
        producer.close();
        Thread.sleep(100L);
    }

    @Test
    public void testConsumerFlowControl() throws Exception {
        String topicName = "persistent://prop/ns-abc/topic2";
        String subName = "sub2";
        int recvQueueSize = 4;
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic2"}).subscriptionName("sub2").receiverQueueSize(recvQueueSize).subscribe();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic2").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic2").get();
        Assert.assertNotNull((Object)topicRef);
        PersistentSubscription subRef = topicRef.getSubscription("sub2");
        Assert.assertNotNull((Object)subRef);
        Thread.sleep(100L);
        Assert.assertEquals((int)this.getAvailablePermits(subRef), (int)recvQueueSize);
        for (int i = 0; i < recvQueueSize / 2; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            Message msg = consumer.receive();
            consumer.acknowledge(msg);
        }
        Thread.sleep(100L);
        Assert.assertEquals((int)this.getAvailablePermits(subRef), (int)recvQueueSize);
        consumer.close();
        Assert.assertFalse((boolean)subRef.getDispatcher().isConsumerConnected());
    }

    @Test
    public void testActiveSubscriptionWithCache() throws Exception {
        String topicName = "persistent://prop/ns-abc/topic2";
        String subName = "sub2";
        int recvQueueSize = 4;
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic2"}).subscriptionName("sub2").receiverQueueSize(recvQueueSize).subscribe();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic2").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < recvQueueSize / 2; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            Message msg = consumer.receive();
            consumer.acknowledge(msg);
        }
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic2").get();
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)topicRef.getManagedLedger();
        Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache");
        cacheField.setAccessible(true);
        EntryCacheImpl entryCache = (EntryCacheImpl)cacheField.get(ledger);
        ManagedCursorContainer activeCursors = ledger.getActiveCursors();
        ManagedCursor curosr = (ManagedCursor)activeCursors.iterator().next();
        Assert.assertNotNull((Object)curosr);
        Assert.assertEquals((String)"sub2", (String)curosr.getName());
        consumer.close();
        Thread.sleep(1000L);
        Assert.assertFalse((boolean)ledger.getActiveCursors().iterator().hasNext());
        Assert.assertEquals((long)entryCache.getSize(), (long)0L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(enabled=false)
    public void testConcurrentConsumerThreads() throws Exception {
        String topicName = "persistent://prop/ns-abc/topic3";
        String subName = "sub3";
        int recvQueueSize = 100;
        int numConsumersThreads = 10;
        ExecutorService executor = Executors.newCachedThreadPool();
        try {
            final CyclicBarrier barrier = new CyclicBarrier(11);
            for (int i = 0; i < 10; ++i) {
                executor.submit(new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        barrier.await();
                        org.apache.pulsar.client.api.Consumer consumer = PersistentTopicE2ETest.this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic3"}).subscriptionName("sub3").receiverQueueSize(100).subscribe();
                        for (int i = 0; i < 10; ++i) {
                            Message msg = consumer.receive();
                            consumer.acknowledge(msg);
                        }
                        return null;
                    }
                });
            }
            org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic3").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            for (int i = 0; i < 1000; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            barrier.await();
            Thread.sleep(100L);
            PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic3").get();
            PersistentSubscription subRef = topicRef.getSubscription("sub3");
            Assert.assertEquals((long)subRef.getNumberOfEntriesInBacklog(false), (long)0L);
            Thread.sleep(100L);
            Assert.assertEquals((int)this.getAvailablePermits(subRef), (int)100);
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(enabled=false)
    public void testGracefulClose() throws Exception {
        String topicName = "persistent://prop/ns-abc/topic4";
        String subName = "sub4";
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic4").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Thread.sleep(100L);
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic4").get();
        Assert.assertNotNull((Object)topicRef);
        ExecutorService executor = Executors.newCachedThreadPool();
        try {
            CountDownLatch latch = new CountDownLatch(1);
            executor.submit(() -> {
                for (int i = 0; i < 10; ++i) {
                    String message = "my-message-" + i;
                    producer.send((Object)message.getBytes());
                }
                latch.countDown();
                return null;
            });
            producer.close();
            Assert.assertEquals((long)((Producer)topicRef.getProducers().values().iterator().next()).getPendingPublishAcks(), (long)0L);
            latch.await();
            org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic4"}).subscriptionName("sub4").subscribe();
            PersistentSubscription subRef = topicRef.getSubscription("sub4");
            Assert.assertNotNull((Object)subRef);
            Message msg = null;
            for (int i = 0; i < 10; ++i) {
                msg = consumer.receive();
            }
            try {
                consumer.close();
                Assert.fail((String)"should have failed");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            consumer.acknowledgeCumulative(msg);
            Thread.sleep(100L);
            consumer.close();
            Thread.sleep(100L);
            Assert.assertTrue((boolean)subRef.getDispatcher().isConsumerConnected());
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    @Test
    public void testSimpleCloseTopic() throws Exception {
        String topicName = "persistent://prop/ns-abc/topic5";
        String subName = "sub5";
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic5"}).subscriptionName("sub5").subscribe();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic5").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic5").get();
        Assert.assertNotNull((Object)topicRef);
        PersistentSubscription subRef = topicRef.getSubscription("sub5");
        Assert.assertNotNull((Object)subRef);
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            Message msg = consumer.receive();
            consumer.acknowledge(msg);
        }
        producer.close();
        consumer.close();
        topicRef.close().get();
        Assert.assertFalse((boolean)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic5").isPresent());
    }

    @Test
    public void testSingleClientMultipleSubscriptions() throws Exception {
        String topicName = "persistent://prop/ns-abc/topic6";
        String subName = "sub6";
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic6"}).subscriptionName("sub6").subscribe();
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic6").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        try {
            this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic6"}).subscriptionName("sub6").subscribe();
            Assert.fail((String)"Should have thrown an exception since one consumer is already connected");
        }
        catch (PulsarClientException cce) {
            Assert.assertTrue((boolean)cce.getMessage().contains("Exclusive consumer is already connected"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMultipleClientsMultipleSubscriptions() throws Exception {
        String topicName = "persistent://prop/ns-abc/topic7";
        String subName = "sub7";
        PulsarClient client1 = PulsarClient.builder().serviceUrl(this.brokerUrl.toString()).build();
        try {
            PulsarClient client2 = PulsarClient.builder().serviceUrl(this.brokerUrl.toString()).build();
            try {
                try {
                    client1.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic7"}).subscriptionName("sub7").subscribe();
                    client1.newProducer().topic("persistent://prop/ns-abc/topic7").create();
                    client2.newProducer().topic("persistent://prop/ns-abc/topic7").create();
                    client2.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic7"}).subscriptionName("sub7").subscribe();
                    Assert.fail((String)"Should have thrown an exception since one consumer is already connected");
                }
                catch (PulsarClientException cce) {
                    Assert.assertTrue((boolean)cce.getMessage().contains("Exclusive consumer is already connected"));
                }
            }
            finally {
                if (Collections.singletonList(client2).get(0) != null) {
                    client2.shutdown();
                }
            }
        }
        finally {
            if (Collections.singletonList(client1).get(0) != null) {
                client1.shutdown();
            }
        }
    }

    @Test
    public void testTopicDeleteWithDisconnectedSubscription() throws Exception {
        String topicName = "persistent://prop/ns-abc/topic8";
        String subName = "sub1";
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic8"}).subscriptionName("sub1").subscribe();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic8").get();
        PersistentSubscription subRef = topicRef.getSubscription("sub1");
        Assert.assertNotNull((Object)topicRef);
        Assert.assertNotNull((Object)subRef);
        Assert.assertTrue((boolean)subRef.getDispatcher().isConsumerConnected());
        consumer.close();
        Assert.assertFalse((boolean)subRef.getDispatcher().isConsumerConnected());
        this.admin.topics().delete("persistent://prop/ns-abc/topic8");
        try {
            this.admin.topics().getStats("persistent://prop/ns-abc/topic8");
        }
        catch (PulsarAdminException pulsarAdminException) {
            // empty catch block
        }
    }

    int getAvailablePermits(PersistentSubscription sub) {
        return ((Consumer)sub.getDispatcher().getConsumers().get(0)).getAvailablePermits();
    }

    @Test(enabled=false)
    public void testUnloadNamespace() throws Exception {
        String topic = "persistent://prop/ns-abc/topic-9";
        TopicName topicName = TopicName.get((String)topic);
        this.pulsarClient.newProducer().topic(topic).create();
        this.pulsarClient.close();
        Assert.assertNotNull((Object)this.pulsar.getBrokerService().getTopicReference(topic));
        Assert.assertTrue((boolean)((ManagedLedgerFactoryImpl)this.pulsar.getManagedLedgerFactory()).getManagedLedgers().containsKey(topicName.getPersistenceNamingEncoding()));
        this.admin.namespaces().unload("prop/ns-abc");
        int i = 0;
        for (i = 0; i < 30 && this.pulsar.getBrokerService().getTopicReference(topic).isPresent(); ++i) {
            Thread.sleep(1000L);
        }
        if (i == 30) {
            Assert.fail((String)"The topic reference should be null");
        }
        Assert.assertFalse((boolean)((ManagedLedgerFactoryImpl)this.pulsar.getManagedLedgerFactory()).getManagedLedgers().containsKey(topicName.getPersistenceNamingEncoding()));
    }

    @Test
    public void testGC() throws Exception {
        String topicName = "persistent://prop/ns-abc/topic-10";
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
        producer.close();
        Assert.assertTrue((boolean)this.pulsar.getBrokerService().getTopicReference(topicName).isPresent());
        this.runGC();
        Assert.assertFalse((boolean)this.pulsar.getBrokerService().getTopicReference(topicName).isPresent());
        String subName = "sub1";
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subName).subscribe();
        this.runGC();
        Assert.assertTrue((boolean)this.pulsar.getBrokerService().getTopicReference(topicName).isPresent());
        consumer.close();
        this.runGC();
        Assert.assertTrue((boolean)this.pulsar.getBrokerService().getTopicReference(topicName).isPresent());
        this.admin.topics().deleteSubscription(topicName, subName);
        this.runGC();
        Assert.assertFalse((boolean)this.pulsar.getBrokerService().getTopicReference(topicName).isPresent());
    }

    private Optional<Topic> getTopic(String topicName) {
        return this.pulsar.getBrokerService().getTopicReference(topicName);
    }

    private boolean topicHasSchema(String topicName) {
        String base = TopicName.get((String)topicName).getPartitionedTopicName();
        String schemaName = TopicName.get((String)base).getSchemaName();
        SchemaRegistry.SchemaAndMetadata result = (SchemaRegistry.SchemaAndMetadata)this.pulsar.getSchemaRegistryService().getSchema(schemaName).join();
        return result != null && !result.schema.isDeleted();
    }

    @Test
    public void testGCWillDeleteSchema() throws Exception {
        String topicName = "persistent://prop/ns-abc/topic-1";
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
        producer.close();
        Optional<Topic> topic = this.getTopic(topicName);
        Assert.assertTrue((boolean)topic.isPresent());
        byte[] data = JSONSchema.of((SchemaDefinition)SchemaDefinition.builder().withPojo(Foo.class).build()).getSchemaInfo().getSchema();
        SchemaData schemaData = SchemaData.builder().data(data).type(SchemaType.BYTES).user("foo").build();
        topic.get().addSchema(schemaData).join();
        Assert.assertTrue((boolean)this.topicHasSchema(topicName));
        this.runGC();
        topic = this.getTopic(topicName);
        Assert.assertFalse((boolean)topic.isPresent());
        Assert.assertFalse((boolean)this.topicHasSchema(topicName));
        topicName = "persistent://prop/ns-abc/topic-2";
        String subName = "sub1";
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subName).subscribe();
        topic = this.getTopic(topicName);
        Assert.assertTrue((boolean)topic.isPresent());
        topic.get().addSchema(schemaData).join();
        Assert.assertTrue((boolean)this.topicHasSchema(topicName));
        this.runGC();
        topic = this.getTopic(topicName);
        Assert.assertTrue((boolean)topic.isPresent());
        Assert.assertTrue((boolean)this.topicHasSchema(topicName));
        consumer.close();
        this.runGC();
        topic = this.getTopic(topicName);
        Assert.assertTrue((boolean)topic.isPresent());
        Assert.assertTrue((boolean)this.topicHasSchema(topicName));
        this.admin.topics().deleteSubscription(topicName, subName);
        this.runGC();
        topic = this.getTopic(topicName);
        Assert.assertFalse((boolean)topic.isPresent());
        Assert.assertFalse((boolean)this.topicHasSchema(topicName));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDeleteSchema() throws Exception {
        PulsarClientImpl httpProtocolClient = (PulsarClientImpl)PulsarClient.builder().serviceUrl(this.brokerUrl.toString()).build();
        try {
            PulsarClientImpl binaryProtocolClient = (PulsarClientImpl)this.pulsarClient;
            LookupService binaryLookupService = binaryProtocolClient.getLookup();
            LookupService httpLookupService = httpProtocolClient.getLookup();
            String topicName = "persistent://prop/ns-abc/topic-1";
            org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
            try {
                Optional<Topic> topic = this.getTopic(topicName);
                Assert.assertTrue((boolean)topic.isPresent());
                byte[] data = JSONSchema.of((SchemaDefinition)SchemaDefinition.builder().withPojo(Foo.class).build()).getSchemaInfo().getSchema();
                SchemaData schemaData = SchemaData.builder().data(data).type(SchemaType.BYTES).user("foo").build();
                topic.get().addSchema(schemaData).join();
                Assert.assertTrue((boolean)this.topicHasSchema(topicName));
                Assert.assertEquals((int)this.admin.schemas().getAllSchemas(topicName).size(), (int)1);
                Assert.assertTrue((boolean)((Optional)httpLookupService.getSchema(TopicName.get((String)topicName), ByteBuffer.allocate(8).putLong(0L).array()).get()).isPresent());
                Assert.assertTrue((boolean)((Optional)binaryLookupService.getSchema(TopicName.get((String)topicName), ByteBuffer.allocate(8).putLong(0L).array()).get()).isPresent());
                topic.get().deleteSchema().join();
                Assert.assertEquals((int)this.admin.schemas().getAllSchemas(topicName).size(), (int)0);
                Assert.assertFalse((boolean)((Optional)httpLookupService.getSchema(TopicName.get((String)topicName), ByteBuffer.allocate(8).putLong(0L).array()).get()).isPresent());
                Assert.assertFalse((boolean)((Optional)binaryLookupService.getSchema(TopicName.get((String)topicName), ByteBuffer.allocate(8).putLong(0L).array()).get()).isPresent());
                Assert.assertFalse((boolean)this.topicHasSchema(topicName));
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(httpProtocolClient).get(0) != null) {
                httpProtocolClient.close();
            }
        }
    }

    @Test
    public void testGcAndRetentionPolicy() throws Exception {
        this.admin.namespaces().setRetention("prop/ns-abc", new RetentionPolicies(10, 10));
        String topicName = "persistent://prop/ns-abc/topic-10";
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
        producer.close();
        Assert.assertNotNull((Object)this.pulsar.getBrokerService().getTopicReference(topicName));
        this.runGC();
        Assert.assertNotNull((Object)this.pulsar.getBrokerService().getTopicReference(topicName));
        this.admin.namespaces().setRetention("prop/ns-abc", new RetentionPolicies());
        Thread.sleep(300L);
        String subName = "sub1";
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subName).subscribe();
        this.runGC();
        Assert.assertNotNull((Object)this.pulsar.getBrokerService().getTopicReference(topicName));
        consumer.close();
        this.runGC();
        Assert.assertNotNull((Object)this.pulsar.getBrokerService().getTopicReference(topicName));
        this.admin.topics().deleteSubscription(topicName, subName);
        this.runGC();
        Assert.assertFalse((boolean)this.pulsar.getBrokerService().getTopicReference(topicName).isPresent());
    }

    @Test
    public void testInfiniteRetentionPolicy() throws Exception {
        this.admin.namespaces().setRetention("prop/ns-abc", new RetentionPolicies(-1, -1));
        String topicName = "persistent://prop/ns-abc/topic-10";
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
        producer.close();
        Assert.assertNotNull((Object)this.pulsar.getBrokerService().getTopicReference(topicName));
        this.runGC();
        Assert.assertNotNull((Object)this.pulsar.getBrokerService().getTopicReference(topicName));
        this.admin.namespaces().setRetention("prop/ns-abc", new RetentionPolicies());
        Thread.sleep(300L);
        String subName = "sub1";
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subName).subscribe();
        this.runGC();
        Assert.assertNotNull((Object)this.pulsar.getBrokerService().getTopicReference(topicName));
        consumer.close();
        this.runGC();
        Assert.assertNotNull((Object)this.pulsar.getBrokerService().getTopicReference(topicName));
        this.admin.topics().deleteSubscription(topicName, subName);
        this.runGC();
        Assert.assertFalse((boolean)this.pulsar.getBrokerService().getTopicReference(topicName).isPresent());
    }

    @Test
    public void testServiceConfigurationRetentionPolicy() throws Exception {
        this.pulsar.getConfiguration().setDefaultRetentionSizeInMB(-1);
        this.pulsar.getConfiguration().setDefaultRetentionTimeInMinutes(-1);
        String namespaceName = "prop/ns-default-retention-policy";
        this.admin.namespaces().createNamespace(namespaceName);
        String topicName = "persistent://prop/ns-abc/topic-10";
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
        producer.close();
        Assert.assertTrue((boolean)this.pulsar.getBrokerService().getTopicReference(topicName).isPresent());
        this.runGC();
        Assert.assertTrue((boolean)this.pulsar.getBrokerService().getTopicReference(topicName).isPresent());
        this.admin.namespaces().setRetention("prop/ns-abc", new RetentionPolicies());
        Thread.sleep(300L);
        String subName = "sub1";
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subName).subscribe();
        this.runGC();
        Assert.assertTrue((boolean)this.pulsar.getBrokerService().getTopicReference(topicName).isPresent());
        consumer.close();
        this.runGC();
        Assert.assertTrue((boolean)this.pulsar.getBrokerService().getTopicReference(topicName).isPresent());
        this.admin.topics().deleteSubscription(topicName, subName);
        this.runGC();
        Assert.assertFalse((boolean)this.pulsar.getBrokerService().getTopicReference(topicName).isPresent());
    }

    @Test
    public void testMessageExpiry() throws Exception {
        int messageTTLSecs = 1;
        String namespaceName = "prop/expiry-check";
        this.admin.namespaces().createNamespace(namespaceName);
        this.admin.namespaces().setNamespaceReplicationClusters(namespaceName, (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.namespaces().setNamespaceMessageTTL(namespaceName, messageTTLSecs);
        String topicName = "persistent://prop/expiry-check/topic1";
        String subName = "sub1";
        int numMsgs = 10;
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/expiry-check/topic1"}).subscriptionName("sub1").subscribe();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/expiry-check/topic1").get();
        PersistentSubscription subRef = topicRef.getSubscription("sub1");
        consumer.close();
        Assert.assertFalse((boolean)subRef.getDispatcher().isConsumerConnected());
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/expiry-check/topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        this.rolloverPerIntervalStats();
        Assert.assertEquals((long)subRef.getNumberOfEntriesInBacklog(false), (long)10L);
        Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs));
        this.runMessageExpiryCheck();
        Assert.assertEquals((long)subRef.getNumberOfEntriesInBacklog(false), (long)0L);
        producer.close();
        consumer.close();
        this.admin.topics().deleteSubscription("persistent://prop/expiry-check/topic1", "sub1");
        this.admin.topics().delete("persistent://prop/expiry-check/topic1");
        this.admin.namespaces().deleteNamespace(namespaceName);
    }

    @Test
    public void testMessageExpiryWithTopicMessageTTL() throws Exception {
        String message;
        int i;
        int namespaceMessageTTLSecs = 10;
        int topicMessageTTLSecs = 2;
        String namespaceName = "prop/expiry-check-2";
        this.cleanup();
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        this.setup();
        this.admin.namespaces().createNamespace(namespaceName);
        this.admin.namespaces().setNamespaceReplicationClusters(namespaceName, (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.namespaces().setNamespaceMessageTTL(namespaceName, namespaceMessageTTLSecs);
        String topicName = "persistent://prop/expiry-check-2/topic2";
        String subName = "sub1";
        int numMsgs = 10;
        this.admin.topics().createNonPartitionedTopic("persistent://prop/expiry-check-2/topic2");
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/expiry-check-2/topic2"}).subscriptionName("sub1").subscribe();
        Thread.sleep(3000L);
        this.admin.topics().setMessageTTL("persistent://prop/expiry-check-2/topic2", topicMessageTTLSecs);
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/expiry-check-2/topic2").get();
        PersistentSubscription subRef = topicRef.getSubscription("sub1");
        consumer.close();
        Assert.assertFalse((boolean)subRef.getDispatcher().isConsumerConnected());
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/expiry-check-2/topic2").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (i = 0; i < 10; ++i) {
            message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        this.rolloverPerIntervalStats();
        Assert.assertEquals((long)subRef.getNumberOfEntriesInBacklog(false), (long)10L);
        Thread.sleep(TimeUnit.SECONDS.toMillis(topicMessageTTLSecs));
        this.runMessageExpiryCheck();
        Assert.assertEquals((long)subRef.getNumberOfEntriesInBacklog(false), (long)0L);
        producer.close();
        Thread.sleep(3000L);
        this.admin.topics().removeMessageTTL("persistent://prop/expiry-check-2/topic2");
        consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/expiry-check-2/topic2"}).subscriptionName("sub1").subscribe();
        topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/expiry-check-2/topic2").get();
        subRef = topicRef.getSubscription("sub1");
        consumer.close();
        Assert.assertFalse((boolean)subRef.getDispatcher().isConsumerConnected());
        producer = this.pulsarClient.newProducer().topic("persistent://prop/expiry-check-2/topic2").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (i = 0; i < 10; ++i) {
            message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Thread.sleep(TimeUnit.SECONDS.toMillis(topicMessageTTLSecs));
        this.rolloverPerIntervalStats();
        Assert.assertEquals((long)subRef.getNumberOfEntriesInBacklog(false), (long)10L);
        Thread.sleep(TimeUnit.SECONDS.toMillis(namespaceMessageTTLSecs - topicMessageTTLSecs));
        this.runMessageExpiryCheck();
        Assert.assertEquals((long)subRef.getNumberOfEntriesInBacklog(false), (long)0L);
        try {
            producer.close();
            consumer.close();
            this.admin.topics().deleteSubscription("persistent://prop/expiry-check-2/topic2", "sub1");
            this.admin.topics().delete("persistent://prop/expiry-check-2/topic2");
            this.admin.namespaces().deleteNamespace(namespaceName);
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((int)e.getStatusCode(), (int)500);
        }
    }

    @Test
    public void testMessageExpiryWithFewExpiredBacklog() throws Exception {
        int messageTTLSecs = 10;
        String namespaceName = "prop/expiry-check-1";
        this.admin.namespaces().createNamespace(namespaceName);
        this.admin.namespaces().setNamespaceReplicationClusters(namespaceName, (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.namespaces().setNamespaceMessageTTL(namespaceName, messageTTLSecs);
        String topicName = "persistent://prop/expiry-check-1/topic1";
        String subName = "sub1";
        int numMsgs = 10;
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/expiry-check-1/topic1"}).subscriptionName("sub1").subscribe();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/expiry-check-1/topic1").get();
        PersistentSubscription subRef = topicRef.getSubscription("sub1");
        Assert.assertTrue((boolean)subRef.getDispatcher().isConsumerConnected());
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/expiry-check-1/topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        this.rolloverPerIntervalStats();
        Assert.assertEquals((long)subRef.getNumberOfEntriesInBacklog(false), (long)10L);
        Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs));
        this.runMessageExpiryCheck();
        Assert.assertEquals((long)subRef.getNumberOfEntriesInBacklog(false), (long)10L);
        Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs / 2));
        this.runMessageExpiryCheck();
        Assert.assertEquals((long)subRef.getNumberOfEntriesInBacklog(false), (long)0L);
    }

    @Test
    public void testSubscriptionTypeTransitions() throws Exception {
        PulsarClient pulsarClient;
        String topicName = "persistent://prop/ns-abc/shared-topic2";
        String subName = "sub2";
        org.apache.pulsar.client.api.Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/shared-topic2"}).subscriptionName("sub2").subscriptionType(SubscriptionType.Exclusive).subscribe();
        org.apache.pulsar.client.api.Consumer consumer2 = null;
        org.apache.pulsar.client.api.Consumer consumer3 = null;
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/shared-topic2").get();
        PersistentSubscription subRef = topicRef.getSubscription("sub2");
        try {
            pulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
            this.closeables.add((AutoCloseable)pulsarClient);
            consumer2 = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/shared-topic2"}).subscriptionName("sub2").subscriptionType(SubscriptionType.Shared).subscribe();
            Assert.fail((String)"should have failed");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Subscription is of different type"));
        }
        try {
            pulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
            this.closeables.add((AutoCloseable)pulsarClient);
            consumer3 = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/shared-topic2"}).subscriptionName("sub2").subscriptionType(SubscriptionType.Failover).subscribe();
            Assert.fail((String)"should have failed");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Subscription is of different type"));
        }
        consumer1.close();
        try {
            pulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
            this.closeables.add((AutoCloseable)pulsarClient);
            consumer2 = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/shared-topic2"}).subscriptionName("sub2").subscriptionType(SubscriptionType.Shared).subscribe();
            Assert.assertEquals((Object)subRef.getDispatcher().getType(), (Object)CommandSubscribe.SubType.Shared);
        }
        catch (PulsarClientException e) {
            Assert.fail((String)"should not fail");
        }
        try {
            pulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
            this.closeables.add((AutoCloseable)pulsarClient);
            consumer1 = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/shared-topic2"}).subscriptionName("sub2").subscriptionType(SubscriptionType.Exclusive).subscribe();
            Assert.fail((String)"should have failed");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Subscription is of different type"));
        }
        consumer2.close();
        try {
            pulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
            this.closeables.add((AutoCloseable)pulsarClient);
            consumer3 = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/shared-topic2"}).subscriptionName("sub2").subscriptionType(SubscriptionType.Failover).subscribe();
            Assert.assertEquals((Object)subRef.getDispatcher().getType(), (Object)CommandSubscribe.SubType.Failover);
        }
        catch (PulsarClientException e) {
            Assert.fail((String)"should not fail");
        }
        consumer3.close();
        try {
            pulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
            this.closeables.add((AutoCloseable)pulsarClient);
            consumer1 = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/shared-topic2"}).subscriptionName("sub2").subscriptionType(SubscriptionType.Exclusive).subscribe();
            Assert.assertEquals((Object)subRef.getDispatcher().getType(), (Object)CommandSubscribe.SubType.Exclusive);
        }
        catch (PulsarClientException e) {
            Assert.fail((String)"should not fail");
        }
        consumer1.close();
        this.admin.topics().delete("persistent://prop/ns-abc/shared-topic2");
    }

    @Test
    public void testReceiveWithTimeout() throws Exception {
        String topicName = "persistent://prop/ns-abc/topic-receive-timeout";
        String subName = "sub";
        ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic-receive-timeout"}).subscriptionName("sub").receiverQueueSize(1000).subscribe();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-receive-timeout").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Assert.assertEquals((int)consumer.getAvailablePermits(), (int)0);
        Message msg = consumer.receive(10, TimeUnit.MILLISECONDS);
        Assert.assertNull((Object)msg);
        Assert.assertEquals((int)consumer.getAvailablePermits(), (int)0);
        producer.send((Object)"test".getBytes());
        Thread.sleep(100L);
        Assert.assertEquals((int)consumer.getAvailablePermits(), (int)0);
        msg = consumer.receive(10, TimeUnit.MILLISECONDS);
        Assert.assertNotNull((Object)msg);
        Assert.assertEquals((int)consumer.getAvailablePermits(), (int)1);
        msg = consumer.receive(10, TimeUnit.MILLISECONDS);
        Assert.assertNull((Object)msg);
        Assert.assertEquals((int)consumer.getAvailablePermits(), (int)1);
    }

    @Test
    public void testProducerReturnedMessageId() throws Exception {
        String topicName = "persistent://prop/ns-abc/topic-xyz";
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-xyz").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-xyz").get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((int)topicRef.getProducers().size(), (int)1);
        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)topicRef.getManagedLedger();
        long ledgerId = ((MLDataFormats.ManagedLedgerInfo.LedgerInfo)managedLedger.getLedgersInfoAsList().get(0)).getLedgerId();
        int SyncMessages = 10;
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            MessageId receivedMessageId = producer.send((Object)message.getBytes());
            Assert.assertEquals((Object)receivedMessageId, (Object)new MessageIdImpl(ledgerId, (long)i, -1));
        }
        int AsyncMessages = 10;
        CountDownLatch counter = new CountDownLatch(10);
        int i = 10;
        while (i < 20) {
            String content = "my-message-" + i;
            int index = i++;
            ((CompletableFuture)producer.sendAsync((Object)content.getBytes()).thenAccept(msgId -> {
                Assert.assertEquals((Object)msgId, (Object)new MessageIdImpl(ledgerId, (long)index, -1));
                counter.countDown();
            })).exceptionally(ex -> null);
        }
        counter.await();
        producer.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProducerQueueFullBlocking() throws Exception {
        String topicName = "persistent://prop/ns-abc/topic-xyzx";
        int messages = 10;
        PulsarClient client = PulsarClient.builder().serviceUrl(this.brokerUrl.toString()).build();
        try {
            ProducerImpl producer = (ProducerImpl)client.newProducer().topic("persistent://prop/ns-abc/topic-xyzx").maxPendingMessages(10).blockIfQueueFull(true).sendTimeout(1, TimeUnit.SECONDS).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            super.internalCleanup();
            long startTime = System.nanoTime();
            for (int i = 0; i < 10; ++i) {
                producer.sendAsync((Object)"msg".getBytes());
            }
            long delayNs = System.nanoTime() - startTime;
            Assert.assertTrue((delayNs < TimeUnit.SECONDS.toNanos(1L) ? 1 : 0) != 0);
            Assert.assertEquals((int)producer.getPendingQueueSize(), (int)10);
            startTime = System.nanoTime();
            producer.sendAsync((Object)"msg".getBytes());
            delayNs = System.nanoTime() - startTime;
            Assert.assertTrue((delayNs > TimeUnit.MILLISECONDS.toNanos(500L) ? 1 : 0) != 0);
            Assert.assertTrue((delayNs < TimeUnit.MILLISECONDS.toNanos(1500L) ? 1 : 0) != 0);
            Assert.assertEquals((int)producer.getPendingQueueSize(), (int)1);
            producer.close();
            this.setup();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProducerQueueFullNonBlocking() throws Exception {
        String topicName = "persistent://prop/ns-abc/topic-xyzx";
        int messages = 10;
        PulsarClient client = PulsarClient.builder().serviceUrl(this.brokerUrl.toString()).build();
        try {
            ProducerImpl producer = (ProducerImpl)client.newProducer().topic("persistent://prop/ns-abc/topic-xyzx").maxPendingMessages(10).blockIfQueueFull(false).sendTimeout(1, TimeUnit.SECONDS).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            super.internalCleanup();
            long startTime = System.nanoTime();
            for (int i = 0; i < 10; ++i) {
                producer.sendAsync((Object)"msg".getBytes());
            }
            long delayNs = System.nanoTime() - startTime;
            Assert.assertTrue((delayNs < TimeUnit.SECONDS.toNanos(1L) ? 1 : 0) != 0);
            Assert.assertEquals((int)producer.getPendingQueueSize(), (int)10);
            startTime = System.nanoTime();
            try {
                producer.send((Object)"msg".getBytes());
                Assert.fail((String)"Send should have failed");
            }
            catch (PulsarClientException.ProducerQueueIsFullError producerQueueIsFullError) {
                // empty catch block
            }
            delayNs = System.nanoTime() - startTime;
            Assert.assertTrue((delayNs < TimeUnit.SECONDS.toNanos(1L) ? 1 : 0) != 0);
            Assert.assertEquals((int)producer.getPendingQueueSize(), (int)10);
            producer.close();
            this.setup();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    @Test
    public void testDeleteTopics() throws Exception {
        BrokerService brokerService = this.pulsar.getBrokerService();
        org.apache.pulsar.client.api.Producer producer1 = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-2").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        brokerService.updateRates();
        Map bundleStatsMap = brokerService.getBundleStats();
        Assert.assertEquals((int)bundleStatsMap.size(), (int)1);
        NamespaceBundleStats bundleStats = (NamespaceBundleStats)bundleStatsMap.get("prop/ns-abc/0x00000000_0xffffffff");
        Assert.assertNotNull((Object)bundleStats);
        producer1.close();
        this.admin.topics().delete("persistent://prop/ns-abc/topic-1");
        brokerService.updateRates();
        bundleStatsMap = brokerService.getBundleStats();
        Assert.assertEquals((int)bundleStatsMap.size(), (int)1);
        bundleStats = (NamespaceBundleStats)bundleStatsMap.get("prop/ns-abc/0x00000000_0xffffffff");
        Assert.assertNotNull((Object)bundleStats);
    }

    @DataProvider(name="codec")
    public Object[][] codecProvider() {
        return new Object[][]{{CompressionType.NONE}, {CompressionType.LZ4}, {CompressionType.ZLIB}};
    }

    @Test(dataProvider="codec")
    public void testCompression(CompressionType compressionType) throws Exception {
        int i;
        String topicName = "persistent://prop/ns-abc/topic0" + compressionType;
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).compressionType(compressionType).create();
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("my-sub").subscribe();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((int)topicRef.getProducers().size(), (int)1);
        for (i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        for (i = 0; i < 10; ++i) {
            Message msg = consumer.receive(5, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)msg);
            Assert.assertEquals((byte[])msg.getData(), (byte[])("my-message-" + i).getBytes());
        }
        producer.close();
        consumer.close();
    }

    @Test
    public void testBrokerTopicStats() throws Exception {
        BrokerService brokerService = this.pulsar.getBrokerService();
        Field field = BrokerService.class.getDeclaredField("statsUpdater");
        field.setAccessible(true);
        ScheduledExecutorService statsUpdater = (ScheduledExecutorService)field.get(brokerService);
        statsUpdater.shutdownNow();
        String namespace = "prop/ns-abc";
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Metrics metric = null;
        Thread.sleep(1000L);
        brokerService.updateRates();
        List metrics = brokerService.getTopicMetrics();
        for (Metrics value : metrics) {
            if (!value.getDimension("namespace").equalsIgnoreCase("prop/ns-abc")) continue;
            metric = value;
            break;
        }
        Assert.assertNotNull(metric);
        double msgInRate = (Double)((Metrics)metrics.get(0)).getMetrics().get("brk_in_rate");
        Assert.assertTrue((msgInRate > 0.0 ? 1 : 0) != 0);
    }

    @Test(groups={"quarantine"})
    public void testBrokerConnectionStats() throws Exception {
        BrokerService brokerService = this.pulsar.getBrokerService();
        String namespace = "prop/ns-abc";
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").create();
        Map map = null;
        brokerService.updateRates();
        List metrics = brokerService.getTopicMetrics();
        for (int i = 0; i < metrics.size(); ++i) {
            if (!((Metrics)metrics.get(i)).getDimensions().containsValue("broker_connection")) continue;
            map = ((Metrics)metrics.get(i)).getMetrics();
            break;
        }
        Assert.assertNotNull(map);
        Assert.assertEquals((long)((Long)map.get("brk_connection_created_total_count")), (long)1L);
        Assert.assertEquals((long)((Long)map.get("brk_active_connections")), (long)1L);
        Assert.assertEquals((long)((Long)map.get("brk_connection_closed_total_count")), (long)0L);
        Assert.assertEquals((long)((Long)map.get("brk_connection_create_success_count")), (long)1L);
        Assert.assertEquals((long)((Long)map.get("brk_connection_create_fail_count")), (long)0L);
        producer.close();
        this.pulsarClient.close();
        Awaitility.await().until(() -> {
            brokerService.updateRates();
            List closeMetrics = brokerService.getTopicMetrics();
            Map closeMap = null;
            for (int i = 0; i < closeMetrics.size(); ++i) {
                if (!((Metrics)closeMetrics.get(i)).getDimensions().containsValue("broker_connection")) continue;
                closeMap = ((Metrics)closeMetrics.get(i)).getMetrics();
                break;
            }
            if (closeMap != null && (Long)closeMap.get("brk_connection_created_total_count") == 1L && (Long)closeMap.get("brk_active_connections") == 0L && (Long)closeMap.get("brk_connection_closed_total_count") == 1L && (Long)closeMap.get("brk_connection_create_fail_count") == 0L && (Long)closeMap.get("brk_connection_create_success_count") == 1L) {
                return true;
            }
            return false;
        });
        this.pulsar.getConfiguration().setAuthenticationEnabled(true);
        if (this.pulsarClient != null) {
            this.pulsarClient.shutdown();
        }
        this.pulsarClient = PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).operationTimeout(1, TimeUnit.MILLISECONDS).build();
        try {
            this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").create();
            Assert.fail();
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e instanceof PulsarClientException.AuthenticationException));
        }
        brokerService.updateRates();
        metrics = brokerService.getTopicMetrics();
        for (int i = 0; i < metrics.size(); ++i) {
            if (!((Metrics)metrics.get(i)).getDimensions().containsValue("broker_connection")) continue;
            map = ((Metrics)metrics.get(i)).getMetrics();
            break;
        }
        Assert.assertNotNull((Object)map);
        Assert.assertEquals((long)((Long)map.get("brk_connection_created_total_count")), (long)2L);
        Assert.assertEquals((long)((Long)map.get("brk_active_connections")), (long)0L);
        Assert.assertEquals((long)((Long)map.get("brk_connection_closed_total_count")), (long)2L);
        Assert.assertEquals((long)((Long)map.get("brk_connection_create_success_count")), (long)1L);
        Assert.assertEquals((long)((Long)map.get("brk_connection_create_fail_count")), (long)1L);
    }

    @Test
    public void testPayloadCorruptionDetection() throws Exception {
        String topicName = "persistent://prop/ns-abc/topic1";
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic1"}).subscriptionName("my-sub").subscribe();
        CompletableFuture future1 = producer.newMessage().value((Object)"message-1".getBytes()).sendAsync();
        this.stopBroker();
        byte[] a2 = "message-2".getBytes();
        TypedMessageBuilder msg2 = producer.newMessage().value((Object)a2);
        CompletableFuture future2 = msg2.sendAsync();
        ((TypedMessageBuilderImpl)msg2).getContent().put(a2.length - 1, (byte)51);
        this.startBroker();
        future1.get();
        try {
            future2.get();
            Assert.fail((String)"since we corrupted the message, it should be rejected by the broker");
        }
        catch (Exception exception) {
            // empty catch block
        }
        Message msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertEquals((String)new String(msg.getData()), (String)"message-1");
        while ((msg = consumer.receive(1, TimeUnit.SECONDS)) != null) {
            Assert.assertEquals((String)new String(msg.getData()), (String)"message-1");
        }
    }

    @Test
    public void testMessageRedelivery() throws Exception {
        Message msg;
        int i;
        String topicName = "persistent://prop/ns-abc/topic2";
        String subName = "sub2";
        ArrayList<Message> unackedMessages = new ArrayList<Message>();
        int totalMessages = 20;
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/topic2"}).subscriptionName("sub2").subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://prop/ns-abc/topic2").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (i = 0; i < totalMessages; ++i) {
            producer.send((Object)("my-message-" + i));
        }
        for (i = 0; i < totalMessages; ++i) {
            msg = consumer.receive();
            if (i >= 10) {
                unackedMessages.add(msg);
                continue;
            }
            consumer.acknowledge(msg);
        }
        consumer.redeliverUnacknowledgedMessages();
        for (i = 0; i < 10; ++i) {
            try {
                Message redeliveredMsg = consumer.receive(1, TimeUnit.SECONDS);
                unackedMessages.removeIf(unackedMessage -> ((String)unackedMessage.getValue()).equals(redeliveredMsg.getValue()));
                continue;
            }
            catch (Exception e) {
                Assert.fail((String)"msg should be redelivered ", (Throwable)e);
            }
        }
        Assert.assertEquals((int)unackedMessages.size(), (int)0);
        msg = consumer.receive(100, TimeUnit.MILLISECONDS);
        Assert.assertNull((Object)msg);
        consumer.close();
        producer.close();
    }

    @Test
    public void testMessageReplay() throws Exception {
        Message msg;
        String topicName = "persistent://prop/ns-abc/topic2";
        String subName = "sub2";
        int totalMessages = 10;
        int replayIndex = totalMessages / 2;
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic2"}).subscriptionName("sub2").subscriptionType(SubscriptionType.Shared).receiverQueueSize(1).subscribe();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic2").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic2").get();
        Assert.assertNotNull((Object)topicRef);
        PersistentSubscription subRef = topicRef.getSubscription("sub2");
        PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers)subRef.getDispatcher();
        Field replayMap = PersistentDispatcherMultipleConsumers.class.getDeclaredField("messagesToRedeliver");
        replayMap.setAccessible(true);
        ConcurrentLongPairSet messagesToReplay = new ConcurrentLongPairSet(64, 1);
        Assert.assertNotNull((Object)subRef);
        for (int i = 0; i < totalMessages; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        MessageIdImpl firstAckedMsg = null;
        for (int i = 0; i < totalMessages; ++i) {
            msg = consumer.receive();
            consumer.acknowledge(msg);
            MessageIdImpl msgId = (MessageIdImpl)msg.getMessageId();
            if (i == 0) {
                firstAckedMsg = msgId;
            }
            if (i >= replayIndex) continue;
            messagesToReplay.add(msgId.getLedgerId(), msgId.getEntryId());
        }
        Thread.sleep(1000L);
        replayMap.set(dispatcher, messagesToReplay);
        dispatcher.redeliverUnacknowledgedMessages((Consumer)dispatcher.getConsumers().get(0));
        Assert.assertEquals((long)messagesToReplay.size(), (long)0L);
        messagesToReplay.add(firstAckedMsg.getLedgerId(), firstAckedMsg.getEntryId());
        replayMap.set(dispatcher, messagesToReplay);
        String testMsg = "testMsg";
        producer.send((Object)"testMsg".getBytes());
        dispatcher.consumerFlow((Consumer)dispatcher.getConsumers().get(0), 1);
        msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertNotNull((Object)msg);
        Assert.assertEquals((byte[])msg.getData(), (byte[])"testMsg".getBytes());
        consumer.close();
        producer.close();
    }

    @Test
    public void testCreateProducerWithSameName() throws Exception {
        String topic = "persistent://prop/ns-abc/testCreateProducerWithSameName";
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic(topic).producerName("test-producer-a").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition);
        org.apache.pulsar.client.api.Producer p1 = producerBuilder.create();
        try {
            producerBuilder.create();
            Assert.fail((String)"Should have thrown ProducerBusyException");
        }
        catch (PulsarClientException.ProducerBusyException producerBusyException) {
            // empty catch block
        }
        p1.close();
        org.apache.pulsar.client.api.Producer p2 = producerBuilder.create();
        p2.close();
    }

    @Test
    public void testGetOrCreateTopic() throws Exception {
        String topicName = "persistent://prop/ns-abc/testGetOrCreateTopic";
        this.admin.lookups().lookupTopic(topicName);
        Topic topic = (Topic)this.pulsar.getBrokerService().getOrCreateTopic(topicName).get();
        Assert.assertNotNull((Object)topic);
        Optional t = this.pulsar.getBrokerService().getTopicReference(topicName);
        Assert.assertTrue((boolean)t.isPresent());
    }

    @Test
    public void testGetTopicIfExists() throws Exception {
        String topicName = "persistent://prop/ns-abc/testGetTopicIfExists";
        this.admin.lookups().lookupTopic(topicName);
        Optional topic = (Optional)this.pulsar.getBrokerService().getTopicIfExists(topicName).join();
        Assert.assertFalse((boolean)topic.isPresent());
        Optional t = this.pulsar.getBrokerService().getTopicReference(topicName);
        Assert.assertFalse((boolean)t.isPresent());
    }

    @Test
    public void testWithEventTime() throws Exception {
        String topicName = "prop/ns-abc/topic-event-time";
        String subName = "sub";
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"prop/ns-abc/topic-event-time"}).subscriptionName("sub").subscribe();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("prop/ns-abc/topic-event-time").create();
        producer.newMessage().value((Object)"test").eventTime(5L).send();
        Message msg = consumer.receive();
        Assert.assertNotNull((Object)msg);
        Assert.assertEquals((String)((String)msg.getValue()), (String)"test");
        Assert.assertEquals((long)msg.getEventTime(), (long)5L);
    }

    private static class Foo {
        private String field1;
        private String field2;
        private int field3;

        public String getField1() {
            return this.field1;
        }

        public String getField2() {
            return this.field2;
        }

        public int getField3() {
            return this.field3;
        }

        public void setField1(String field1) {
            this.field1 = field1;
        }

        public void setField2(String field2) {
            this.field2 = field2;
        }

        public void setField3(int field3) {
            this.field3 = field3;
        }

        public String toString() {
            return "PersistentTopicE2ETest.Foo(field1=" + this.getField1() + ", field2=" + this.getField2() + ", field3=" + this.getField3() + ")";
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof Foo)) {
                return false;
            }
            Foo other = (Foo)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getField3() != other.getField3()) {
                return false;
            }
            String this$field1 = this.getField1();
            String other$field1 = other.getField1();
            if (this$field1 == null ? other$field1 != null : !this$field1.equals(other$field1)) {
                return false;
            }
            String this$field2 = this.getField2();
            String other$field2 = other.getField2();
            return !(this$field2 == null ? other$field2 != null : !this$field2.equals(other$field2));
        }

        protected boolean canEqual(Object other) {
            return other instanceof Foo;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            result = result * 59 + this.getField3();
            String $field1 = this.getField1();
            result = result * 59 + ($field1 == null ? 43 : $field1.hashCode());
            String $field2 = this.getField2();
            result = result * 59 + ($field2 == null ? 43 : $field2.hashCode());
            return result;
        }
    }
}

