package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"flaky"})
/* loaded from: input_file:org/apache/pulsar/broker/service/PersistentTopicE2ETest.class */
public class PersistentTopicE2ETest extends BrokerTestBase {
    private final List<AutoCloseable> closeables = new ArrayList();

    /* loaded from: input_file:org/apache/pulsar/broker/service/PersistentTopicE2ETest$Foo.class */
    private static class Foo {
        private String field1;
        private String field2;
        private int field3;

        public String getField1() {
            return this.field1;
        }

        public String getField2() {
            return this.field2;
        }

        public int getField3() {
            return this.field3;
        }

        public void setField1(String str) {
            this.field1 = str;
        }

        public void setField2(String str) {
            this.field2 = str;
        }

        public void setField3(int i) {
            this.field3 = i;
        }

        public String toString() {
            return "PersistentTopicE2ETest.Foo(field1=" + getField1() + ", field2=" + getField2() + ", field3=" + getField3() + ")";
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Foo)) {
                return false;
            }
            Foo foo = (Foo) obj;
            if (!foo.canEqual(this) || getField3() != foo.getField3()) {
                return false;
            }
            String field1 = getField1();
            String field12 = foo.getField1();
            if (field1 == null) {
                if (field12 != null) {
                    return false;
                }
            } else if (!field1.equals(field12)) {
                return false;
            }
            String field2 = getField2();
            String field22 = foo.getField2();
            return field2 == null ? field22 == null : field2.equals(field22);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof Foo;
        }

        public int hashCode() {
            int field3 = (1 * 59) + getField3();
            String field1 = getField1();
            int hashCode = (field3 * 59) + (field1 == null ? 43 : field1.hashCode());
            String field2 = getField2();
            return (hashCode * 59) + (field2 == null ? 43 : field2.hashCode());
        }
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod(alwaysRun = true)
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        Iterator<AutoCloseable> it = this.closeables.iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Exception e) {
            }
        }
        this.closeables.clear();
        super.internalCleanup();
    }

    @Test
    public void testSimpleProducerEvents() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic0").get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        rolloverPerIntervalStats();
        Assert.assertTrue(((Producer) persistentTopic.getProducers().values().iterator().next()).getStats().msgRateIn > 0.0d);
        create.close();
        Thread.sleep(100L);
        Assert.assertEquals(persistentTopic.getProducers().size(), 0);
    }

    @Test
    public void testSimpleConsumerEvents() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic1"}).subscriptionName("sub1").subscribe();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic1").get();
        PersistentSubscription subscription = persistentTopic.getSubscription("sub1");
        Assert.assertNotNull(persistentTopic);
        Assert.assertNotNull(subscription);
        Assert.assertTrue(subscription.getDispatcher().isConsumerConnected());
        Thread.sleep(100L);
        Assert.assertEquals(getAvailablePermits(subscription), 1000);
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 20; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Assert.assertTrue(subscription.getDispatcher().isConsumerConnected());
        rolloverPerIntervalStats();
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 20L);
        Thread.sleep(100L);
        Assert.assertEquals(getAvailablePermits(subscription), 980);
        for (int i2 = 0; i2 < 10; i2++) {
            Message receive = subscribe.receive();
            Assert.assertEquals(new String(receive.getData()), "my-message-" + i2);
            subscribe.acknowledge(receive);
        }
        rolloverPerIntervalStats();
        Thread.sleep(100L);
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10L);
        for (int i3 = 0; i3 < 10; i3++) {
            Message receive2 = subscribe.receive();
            if (i3 == 9) {
                subscribe.acknowledgeCumulative(receive2);
            }
        }
        rolloverPerIntervalStats();
        Thread.sleep(100L);
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 0L);
        subscribe.unsubscribe();
        subscribe.close();
        try {
            subscribe.unsubscribe();
            Assert.fail("Should have failed");
        } catch (PulsarClientException.AlreadyClosedException e) {
        }
        Thread.sleep(100L);
        Assert.assertNull(persistentTopic.getSubscription("sub1"));
        create.close();
        Thread.sleep(100L);
    }

    @Test
    public void testConsumerFlowControl() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic2"}).subscriptionName("sub2").receiverQueueSize(4).subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic2").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic2").get();
        Assert.assertNotNull(persistentTopic);
        PersistentSubscription subscription = persistentTopic.getSubscription("sub2");
        Assert.assertNotNull(subscription);
        Thread.sleep(100L);
        Assert.assertEquals(getAvailablePermits(subscription), 4);
        for (int i = 0; i < 4 / 2; i++) {
            create.send(("my-message-" + i).getBytes());
            subscribe.acknowledge(subscribe.receive());
        }
        Thread.sleep(100L);
        Assert.assertEquals(getAvailablePermits(subscription), 4);
        subscribe.close();
        Assert.assertFalse(subscription.getDispatcher().isConsumerConnected());
    }

    @Test
    public void testActiveSubscriptionWithCache() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic2"}).subscriptionName("sub2").receiverQueueSize(4).subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic2").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 4 / 2; i++) {
            create.send(("my-message-" + i).getBytes());
            subscribe.acknowledge(subscribe.receive());
        }
        ManagedLedgerImpl managedLedger = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic2").get()).getManagedLedger();
        Field declaredField = ManagedLedgerImpl.class.getDeclaredField("entryCache");
        declaredField.setAccessible(true);
        EntryCacheImpl entryCacheImpl = (EntryCacheImpl) declaredField.get(managedLedger);
        ManagedCursor managedCursor = (ManagedCursor) managedLedger.getActiveCursors().iterator().next();
        Assert.assertNotNull(managedCursor);
        Assert.assertEquals("sub2", managedCursor.getName());
        subscribe.close();
        Thread.sleep(1000L);
        Assert.assertFalse(managedLedger.getActiveCursors().iterator().hasNext());
        Assert.assertEquals(entryCacheImpl.getSize(), 0L);
    }

    @Test(enabled = false)
    public void testConcurrentConsumerThreads() throws Exception {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        try {
            final CyclicBarrier cyclicBarrier = new CyclicBarrier(11);
            for (int i = 0; i < 10; i++) {
                newCachedThreadPool.submit(new Callable<Void>() { // from class: org.apache.pulsar.broker.service.PersistentTopicE2ETest.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        cyclicBarrier.await();
                        Consumer subscribe = PersistentTopicE2ETest.this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic3"}).subscriptionName("sub3").receiverQueueSize(100).subscribe();
                        for (int i2 = 0; i2 < 10; i2++) {
                            subscribe.acknowledge(subscribe.receive());
                        }
                        return null;
                    }
                });
            }
            Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic3").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            for (int i2 = 0; i2 < 1000; i2++) {
                create.send(("my-message-" + i2).getBytes());
            }
            cyclicBarrier.await();
            Thread.sleep(100L);
            PersistentSubscription subscription = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic3").get()).getSubscription("sub3");
            Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 0L);
            Thread.sleep(100L);
            Assert.assertEquals(getAvailablePermits(subscription), 100);
            if (Collections.singletonList(newCachedThreadPool).get(0) != null) {
                newCachedThreadPool.shutdownNow();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newCachedThreadPool).get(0) != null) {
                newCachedThreadPool.shutdownNow();
            }
            throw th;
        }
    }

    @Test(enabled = false)
    public void testGracefulClose() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic4").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Thread.sleep(100L);
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic4").get();
        Assert.assertNotNull(persistentTopic);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        try {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            newCachedThreadPool.submit(() -> {
                for (int i = 0; i < 10; i++) {
                    create.send(("my-message-" + i).getBytes());
                }
                countDownLatch.countDown();
                return null;
            });
            create.close();
            Assert.assertEquals(((Producer) persistentTopic.getProducers().values().iterator().next()).getPendingPublishAcks(), 0L);
            countDownLatch.await();
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic4"}).subscriptionName("sub4").subscribe();
            PersistentSubscription subscription = persistentTopic.getSubscription("sub4");
            Assert.assertNotNull(subscription);
            Message message = null;
            for (int i = 0; i < 10; i++) {
                message = subscribe.receive();
            }
            try {
                subscribe.close();
                Assert.fail("should have failed");
            } catch (IllegalStateException e) {
            }
            subscribe.acknowledgeCumulative(message);
            Thread.sleep(100L);
            subscribe.close();
            Thread.sleep(100L);
            Assert.assertTrue(subscription.getDispatcher().isConsumerConnected());
            if (Collections.singletonList(newCachedThreadPool).get(0) != null) {
                newCachedThreadPool.shutdownNow();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newCachedThreadPool).get(0) != null) {
                newCachedThreadPool.shutdownNow();
            }
            throw th;
        }
    }

    @Test
    public void testSimpleCloseTopic() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic5"}).subscriptionName("sub5").subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic5").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic5").get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertNotNull(persistentTopic.getSubscription("sub5"));
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
            subscribe.acknowledge(subscribe.receive());
        }
        create.close();
        subscribe.close();
        persistentTopic.close().get();
        Assert.assertFalse(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic5").isPresent());
    }

    @Test
    public void testSingleClientMultipleSubscriptions() throws Exception {
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic6"}).subscriptionName("sub6").subscribe();
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic6").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        try {
            this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic6"}).subscriptionName("sub6").subscribe();
            Assert.fail("Should have thrown an exception since one consumer is already connected");
        } catch (PulsarClientException e) {
            Assert.assertTrue(e.getMessage().contains("Exclusive consumer is already connected"));
        }
    }

    @Test
    public void testMultipleClientsMultipleSubscriptions() throws Exception {
        PulsarClient build = PulsarClient.builder().serviceUrl(this.brokerUrl.toString()).build();
        try {
            build = PulsarClient.builder().serviceUrl(this.brokerUrl.toString()).build();
            try {
                try {
                    build.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic7"}).subscriptionName("sub7").subscribe();
                    build.newProducer().topic("persistent://prop/ns-abc/topic7").create();
                    build.newProducer().topic("persistent://prop/ns-abc/topic7").create();
                    build.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic7"}).subscriptionName("sub7").subscribe();
                    Assert.fail("Should have thrown an exception since one consumer is already connected");
                } finally {
                    if (Collections.singletonList(build).get(0) != null) {
                        build.shutdown();
                    }
                }
            } catch (PulsarClientException e) {
                Assert.assertTrue(e.getMessage().contains("Exclusive consumer is already connected"));
            }
            if (Collections.singletonList(build).get(0) != null) {
                build.shutdown();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.shutdown();
            }
            throw th;
        }
    }

    @Test
    public void testTopicDeleteWithDisconnectedSubscription() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic8"}).subscriptionName("sub1").subscribe();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic8").get();
        PersistentSubscription subscription = persistentTopic.getSubscription("sub1");
        Assert.assertNotNull(persistentTopic);
        Assert.assertNotNull(subscription);
        Assert.assertTrue(subscription.getDispatcher().isConsumerConnected());
        subscribe.close();
        Assert.assertFalse(subscription.getDispatcher().isConsumerConnected());
        this.admin.topics().delete("persistent://prop/ns-abc/topic8");
        try {
            this.admin.topics().getStats("persistent://prop/ns-abc/topic8");
        } catch (PulsarAdminException e) {
        }
    }

    int getAvailablePermits(PersistentSubscription persistentSubscription) {
        return ((Consumer) persistentSubscription.getDispatcher().getConsumers().get(0)).getAvailablePermits();
    }

    @Test(enabled = false)
    public void testUnloadNamespace() throws Exception {
        TopicName topicName = TopicName.get("persistent://prop/ns-abc/topic-9");
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-9").create();
        this.pulsarClient.close();
        Assert.assertNotNull(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-9"));
        Assert.assertTrue(this.pulsar.getManagedLedgerFactory().getManagedLedgers().containsKey(topicName.getPersistenceNamingEncoding()));
        this.admin.namespaces().unload("prop/ns-abc");
        int i = 0;
        while (i < 30 && this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-9").isPresent()) {
            Thread.sleep(1000L);
            i++;
        }
        if (i == 30) {
            Assert.fail("The topic reference should be null");
        }
        Assert.assertFalse(this.pulsar.getManagedLedgerFactory().getManagedLedgers().containsKey(topicName.getPersistenceNamingEncoding()));
    }

    @Test
    public void testGC() throws Exception {
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-10").create().close();
        Assert.assertTrue(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-10").isPresent());
        runGC();
        Assert.assertFalse(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-10").isPresent());
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic-10"}).subscriptionName("sub1").subscribe();
        runGC();
        Assert.assertTrue(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-10").isPresent());
        subscribe.close();
        runGC();
        Assert.assertTrue(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-10").isPresent());
        this.admin.topics().deleteSubscription("persistent://prop/ns-abc/topic-10", "sub1");
        runGC();
        Assert.assertFalse(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-10").isPresent());
    }

    private Optional<Topic> getTopic(String str) {
        return this.pulsar.getBrokerService().getTopicReference(str);
    }

    private boolean topicHasSchema(String str) {
        SchemaRegistry.SchemaAndMetadata schemaAndMetadata = (SchemaRegistry.SchemaAndMetadata) this.pulsar.getSchemaRegistryService().getSchema(TopicName.get(TopicName.get(str).getPartitionedTopicName()).getSchemaName()).join();
        return (schemaAndMetadata == null || schemaAndMetadata.schema.isDeleted()) ? false : true;
    }

    @Test
    public void testGCWillDeleteSchema() throws Exception {
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-1").create().close();
        Optional<Topic> topic = getTopic("persistent://prop/ns-abc/topic-1");
        Assert.assertTrue(topic.isPresent());
        SchemaData build = SchemaData.builder().data(JSONSchema.of(SchemaDefinition.builder().withPojo(Foo.class).build()).getSchemaInfo().getSchema()).type(SchemaType.BYTES).user("foo").build();
        topic.get().addSchema(build).join();
        Assert.assertTrue(topicHasSchema("persistent://prop/ns-abc/topic-1"));
        runGC();
        Assert.assertFalse(getTopic("persistent://prop/ns-abc/topic-1").isPresent());
        Assert.assertFalse(topicHasSchema("persistent://prop/ns-abc/topic-1"));
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic-2"}).subscriptionName("sub1").subscribe();
        Optional<Topic> topic2 = getTopic("persistent://prop/ns-abc/topic-2");
        Assert.assertTrue(topic2.isPresent());
        topic2.get().addSchema(build).join();
        Assert.assertTrue(topicHasSchema("persistent://prop/ns-abc/topic-2"));
        runGC();
        Assert.assertTrue(getTopic("persistent://prop/ns-abc/topic-2").isPresent());
        Assert.assertTrue(topicHasSchema("persistent://prop/ns-abc/topic-2"));
        subscribe.close();
        runGC();
        Assert.assertTrue(getTopic("persistent://prop/ns-abc/topic-2").isPresent());
        Assert.assertTrue(topicHasSchema("persistent://prop/ns-abc/topic-2"));
        this.admin.topics().deleteSubscription("persistent://prop/ns-abc/topic-2", "sub1");
        runGC();
        Assert.assertFalse(getTopic("persistent://prop/ns-abc/topic-2").isPresent());
        Assert.assertFalse(topicHasSchema("persistent://prop/ns-abc/topic-2"));
    }

    @Test
    public void testDeleteSchema() throws Exception {
        PulsarClientImpl build = PulsarClient.builder().serviceUrl(this.brokerUrl.toString()).build();
        try {
            LookupService lookup = this.pulsarClient.getLookup();
            LookupService lookup2 = build.getLookup();
            Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-1").create();
            try {
                Optional<Topic> topic = getTopic("persistent://prop/ns-abc/topic-1");
                Assert.assertTrue(topic.isPresent());
                topic.get().addSchema(SchemaData.builder().data(JSONSchema.of(SchemaDefinition.builder().withPojo(Foo.class).build()).getSchemaInfo().getSchema()).type(SchemaType.BYTES).user("foo").build()).join();
                Assert.assertTrue(topicHasSchema("persistent://prop/ns-abc/topic-1"));
                Assert.assertEquals(this.admin.schemas().getAllSchemas("persistent://prop/ns-abc/topic-1").size(), 1);
                Assert.assertTrue(((Optional) lookup2.getSchema(TopicName.get("persistent://prop/ns-abc/topic-1"), ByteBuffer.allocate(8).putLong(0L).array()).get()).isPresent());
                Assert.assertTrue(((Optional) lookup.getSchema(TopicName.get("persistent://prop/ns-abc/topic-1"), ByteBuffer.allocate(8).putLong(0L).array()).get()).isPresent());
                topic.get().deleteSchema().join();
                Assert.assertEquals(this.admin.schemas().getAllSchemas("persistent://prop/ns-abc/topic-1").size(), 0);
                Assert.assertFalse(((Optional) lookup2.getSchema(TopicName.get("persistent://prop/ns-abc/topic-1"), ByteBuffer.allocate(8).putLong(0L).array()).get()).isPresent());
                Assert.assertFalse(((Optional) lookup.getSchema(TopicName.get("persistent://prop/ns-abc/topic-1"), ByteBuffer.allocate(8).putLong(0L).array()).get()).isPresent());
                Assert.assertFalse(topicHasSchema("persistent://prop/ns-abc/topic-1"));
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        }
    }

    @Test
    public void testGcAndRetentionPolicy() throws Exception {
        this.admin.namespaces().setRetention("prop/ns-abc", new RetentionPolicies(10, 10));
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-10").create().close();
        Assert.assertNotNull(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-10"));
        runGC();
        Assert.assertNotNull(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-10"));
        this.admin.namespaces().setRetention("prop/ns-abc", new RetentionPolicies());
        Thread.sleep(300L);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic-10"}).subscriptionName("sub1").subscribe();
        runGC();
        Assert.assertNotNull(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-10"));
        subscribe.close();
        runGC();
        Assert.assertNotNull(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-10"));
        this.admin.topics().deleteSubscription("persistent://prop/ns-abc/topic-10", "sub1");
        runGC();
        Assert.assertFalse(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-10").isPresent());
    }

    @Test
    public void testInfiniteRetentionPolicy() throws Exception {
        this.admin.namespaces().setRetention("prop/ns-abc", new RetentionPolicies(-1, -1));
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-10").create().close();
        Assert.assertNotNull(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-10"));
        runGC();
        Assert.assertNotNull(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-10"));
        this.admin.namespaces().setRetention("prop/ns-abc", new RetentionPolicies());
        Thread.sleep(300L);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic-10"}).subscriptionName("sub1").subscribe();
        runGC();
        Assert.assertNotNull(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-10"));
        subscribe.close();
        runGC();
        Assert.assertNotNull(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-10"));
        this.admin.topics().deleteSubscription("persistent://prop/ns-abc/topic-10", "sub1");
        runGC();
        Assert.assertFalse(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-10").isPresent());
    }

    @Test
    public void testServiceConfigurationRetentionPolicy() throws Exception {
        this.pulsar.getConfiguration().setDefaultRetentionSizeInMB(-1);
        this.pulsar.getConfiguration().setDefaultRetentionTimeInMinutes(-1);
        this.admin.namespaces().createNamespace("prop/ns-default-retention-policy");
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-10").create().close();
        Assert.assertTrue(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-10").isPresent());
        runGC();
        Assert.assertTrue(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-10").isPresent());
        this.admin.namespaces().setRetention("prop/ns-abc", new RetentionPolicies());
        Thread.sleep(300L);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic-10"}).subscriptionName("sub1").subscribe();
        runGC();
        Assert.assertTrue(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-10").isPresent());
        subscribe.close();
        runGC();
        Assert.assertTrue(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-10").isPresent());
        this.admin.topics().deleteSubscription("persistent://prop/ns-abc/topic-10", "sub1");
        runGC();
        Assert.assertFalse(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-10").isPresent());
    }

    @Test
    public void testMessageExpiry() throws Exception {
        this.admin.namespaces().createNamespace("prop/expiry-check");
        this.admin.namespaces().setNamespaceReplicationClusters("prop/expiry-check", Sets.newHashSet(new String[]{"test"}));
        this.admin.namespaces().setNamespaceMessageTTL("prop/expiry-check", 1);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/expiry-check/topic1"}).subscriptionName("sub1").subscribe();
        PersistentSubscription subscription = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/expiry-check/topic1").get()).getSubscription("sub1");
        subscribe.close();
        Assert.assertFalse(subscription.getDispatcher().isConsumerConnected());
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/expiry-check/topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        rolloverPerIntervalStats();
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10L);
        Thread.sleep(TimeUnit.SECONDS.toMillis(1));
        runMessageExpiryCheck();
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 0L);
        create.close();
        subscribe.close();
        this.admin.topics().deleteSubscription("persistent://prop/expiry-check/topic1", "sub1");
        this.admin.topics().delete("persistent://prop/expiry-check/topic1");
        this.admin.namespaces().deleteNamespace("prop/expiry-check");
    }

    @Test
    public void testMessageExpiryWithTopicMessageTTL() throws Exception {
        cleanup();
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        setup();
        this.admin.namespaces().createNamespace("prop/expiry-check-2");
        this.admin.namespaces().setNamespaceReplicationClusters("prop/expiry-check-2", Sets.newHashSet(new String[]{"test"}));
        this.admin.namespaces().setNamespaceMessageTTL("prop/expiry-check-2", 10);
        this.admin.topics().createNonPartitionedTopic("persistent://prop/expiry-check-2/topic2");
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/expiry-check-2/topic2"}).subscriptionName("sub1").subscribe();
        Thread.sleep(3000L);
        this.admin.topics().setMessageTTL("persistent://prop/expiry-check-2/topic2", 2);
        PersistentSubscription subscription = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/expiry-check-2/topic2").get()).getSubscription("sub1");
        subscribe.close();
        Assert.assertFalse(subscription.getDispatcher().isConsumerConnected());
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/expiry-check-2/topic2").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        rolloverPerIntervalStats();
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10L);
        Thread.sleep(TimeUnit.SECONDS.toMillis(2));
        runMessageExpiryCheck();
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 0L);
        create.close();
        Thread.sleep(3000L);
        this.admin.topics().removeMessageTTL("persistent://prop/expiry-check-2/topic2");
        Consumer subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/expiry-check-2/topic2"}).subscriptionName("sub1").subscribe();
        PersistentSubscription subscription2 = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/expiry-check-2/topic2").get()).getSubscription("sub1");
        subscribe2.close();
        Assert.assertFalse(subscription2.getDispatcher().isConsumerConnected());
        Producer create2 = this.pulsarClient.newProducer().topic("persistent://prop/expiry-check-2/topic2").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i2 = 0; i2 < 10; i2++) {
            create2.send(("my-message-" + i2).getBytes());
        }
        Thread.sleep(TimeUnit.SECONDS.toMillis(2));
        rolloverPerIntervalStats();
        Assert.assertEquals(subscription2.getNumberOfEntriesInBacklog(false), 10L);
        Thread.sleep(TimeUnit.SECONDS.toMillis(10 - 2));
        runMessageExpiryCheck();
        Assert.assertEquals(subscription2.getNumberOfEntriesInBacklog(false), 0L);
        try {
            create2.close();
            subscribe2.close();
            this.admin.topics().deleteSubscription("persistent://prop/expiry-check-2/topic2", "sub1");
            this.admin.topics().delete("persistent://prop/expiry-check-2/topic2");
            this.admin.namespaces().deleteNamespace("prop/expiry-check-2");
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 500);
        }
    }

    @Test
    public void testMessageExpiryWithFewExpiredBacklog() throws Exception {
        this.admin.namespaces().createNamespace("prop/expiry-check-1");
        this.admin.namespaces().setNamespaceReplicationClusters("prop/expiry-check-1", Sets.newHashSet(new String[]{"test"}));
        this.admin.namespaces().setNamespaceMessageTTL("prop/expiry-check-1", 10);
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/expiry-check-1/topic1"}).subscriptionName("sub1").subscribe();
        PersistentSubscription subscription = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/expiry-check-1/topic1").get()).getSubscription("sub1");
        Assert.assertTrue(subscription.getDispatcher().isConsumerConnected());
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/expiry-check-1/topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        rolloverPerIntervalStats();
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10L);
        Thread.sleep(TimeUnit.SECONDS.toMillis(10));
        runMessageExpiryCheck();
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10L);
        Thread.sleep(TimeUnit.SECONDS.toMillis(10 / 2));
        runMessageExpiryCheck();
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 0L);
    }

    @Test
    public void testSubscriptionTypeTransitions() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/shared-topic2"}).subscriptionName("sub2").subscriptionType(SubscriptionType.Exclusive).subscribe();
        Consumer consumer = null;
        Consumer consumer2 = null;
        PersistentSubscription subscription = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/shared-topic2").get()).getSubscription("sub2");
        try {
            AutoCloseable newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
            this.closeables.add(newPulsarClient);
            consumer = newPulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/shared-topic2"}).subscriptionName("sub2").subscriptionType(SubscriptionType.Shared).subscribe();
            Assert.fail("should have failed");
        } catch (PulsarClientException e) {
            Assert.assertTrue(e.getMessage().contains("Subscription is of different type"));
        }
        try {
            AutoCloseable newPulsarClient2 = newPulsarClient(this.lookupUrl.toString(), 0);
            this.closeables.add(newPulsarClient2);
            consumer2 = newPulsarClient2.newConsumer().topic(new String[]{"persistent://prop/ns-abc/shared-topic2"}).subscriptionName("sub2").subscriptionType(SubscriptionType.Failover).subscribe();
            Assert.fail("should have failed");
        } catch (PulsarClientException e2) {
            Assert.assertTrue(e2.getMessage().contains("Subscription is of different type"));
        }
        subscribe.close();
        try {
            AutoCloseable newPulsarClient3 = newPulsarClient(this.lookupUrl.toString(), 0);
            this.closeables.add(newPulsarClient3);
            consumer = newPulsarClient3.newConsumer().topic(new String[]{"persistent://prop/ns-abc/shared-topic2"}).subscriptionName("sub2").subscriptionType(SubscriptionType.Shared).subscribe();
            Assert.assertEquals(subscription.getDispatcher().getType(), CommandSubscribe.SubType.Shared);
        } catch (PulsarClientException e3) {
            Assert.fail("should not fail");
        }
        try {
            AutoCloseable newPulsarClient4 = newPulsarClient(this.lookupUrl.toString(), 0);
            this.closeables.add(newPulsarClient4);
            subscribe = newPulsarClient4.newConsumer().topic(new String[]{"persistent://prop/ns-abc/shared-topic2"}).subscriptionName("sub2").subscriptionType(SubscriptionType.Exclusive).subscribe();
            Assert.fail("should have failed");
        } catch (PulsarClientException e4) {
            Assert.assertTrue(e4.getMessage().contains("Subscription is of different type"));
        }
        consumer.close();
        try {
            AutoCloseable newPulsarClient5 = newPulsarClient(this.lookupUrl.toString(), 0);
            this.closeables.add(newPulsarClient5);
            consumer2 = newPulsarClient5.newConsumer().topic(new String[]{"persistent://prop/ns-abc/shared-topic2"}).subscriptionName("sub2").subscriptionType(SubscriptionType.Failover).subscribe();
            Assert.assertEquals(subscription.getDispatcher().getType(), CommandSubscribe.SubType.Failover);
        } catch (PulsarClientException e5) {
            Assert.fail("should not fail");
        }
        consumer2.close();
        try {
            AutoCloseable newPulsarClient6 = newPulsarClient(this.lookupUrl.toString(), 0);
            this.closeables.add(newPulsarClient6);
            subscribe = newPulsarClient6.newConsumer().topic(new String[]{"persistent://prop/ns-abc/shared-topic2"}).subscriptionName("sub2").subscriptionType(SubscriptionType.Exclusive).subscribe();
            Assert.assertEquals(subscription.getDispatcher().getType(), CommandSubscribe.SubType.Exclusive);
        } catch (PulsarClientException e6) {
            Assert.fail("should not fail");
        }
        subscribe.close();
        this.admin.topics().delete("persistent://prop/ns-abc/shared-topic2");
    }

    @Test
    public void testReceiveWithTimeout() throws Exception {
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic-receive-timeout"}).subscriptionName("sub").receiverQueueSize(1000).subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-receive-timeout").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Assert.assertEquals(subscribe.getAvailablePermits(), 0);
        Assert.assertNull(subscribe.receive(10, TimeUnit.MILLISECONDS));
        Assert.assertEquals(subscribe.getAvailablePermits(), 0);
        create.send("test".getBytes());
        Thread.sleep(100L);
        Assert.assertEquals(subscribe.getAvailablePermits(), 0);
        Assert.assertNotNull(subscribe.receive(10, TimeUnit.MILLISECONDS));
        Assert.assertEquals(subscribe.getAvailablePermits(), 1);
        Assert.assertNull(subscribe.receive(10, TimeUnit.MILLISECONDS));
        Assert.assertEquals(subscribe.getAvailablePermits(), 1);
    }

    @Test
    public void testProducerReturnedMessageId() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-xyz").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic-xyz").get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        long ledgerId = ((MLDataFormats.ManagedLedgerInfo.LedgerInfo) persistentTopic.getManagedLedger().getLedgersInfoAsList().get(0)).getLedgerId();
        for (int i = 0; i < 10; i++) {
            Assert.assertEquals(create.send(("my-message-" + i).getBytes()), new MessageIdImpl(ledgerId, i, -1));
        }
        CountDownLatch countDownLatch = new CountDownLatch(10);
        for (int i2 = 10; i2 < 20; i2++) {
            int i3 = i2;
            create.sendAsync(("my-message-" + i2).getBytes()).thenAccept(messageId -> {
                Assert.assertEquals(messageId, new MessageIdImpl(ledgerId, i3, -1));
                countDownLatch.countDown();
            }).exceptionally(th -> {
                return null;
            });
        }
        countDownLatch.await();
        create.close();
    }

    @Test
    public void testProducerQueueFullBlocking() throws Exception {
        PulsarClient build = PulsarClient.builder().serviceUrl(this.brokerUrl.toString()).build();
        try {
            ProducerImpl create = build.newProducer().topic("persistent://prop/ns-abc/topic-xyzx").maxPendingMessages(10).blockIfQueueFull(true).sendTimeout(1, TimeUnit.SECONDS).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            super.internalCleanup();
            long nanoTime = System.nanoTime();
            for (int i = 0; i < 10; i++) {
                create.sendAsync("msg".getBytes());
            }
            Assert.assertTrue(System.nanoTime() - nanoTime < TimeUnit.SECONDS.toNanos(1L));
            Assert.assertEquals(create.getPendingQueueSize(), 10);
            long nanoTime2 = System.nanoTime();
            create.sendAsync("msg".getBytes());
            long nanoTime3 = System.nanoTime() - nanoTime2;
            Assert.assertTrue(nanoTime3 > TimeUnit.MILLISECONDS.toNanos(500L));
            Assert.assertTrue(nanoTime3 < TimeUnit.MILLISECONDS.toNanos(1500L));
            Assert.assertEquals(create.getPendingQueueSize(), 1);
            create.close();
            setup();
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testProducerQueueFullNonBlocking() throws Exception {
        PulsarClient build = PulsarClient.builder().serviceUrl(this.brokerUrl.toString()).build();
        try {
            ProducerImpl create = build.newProducer().topic("persistent://prop/ns-abc/topic-xyzx").maxPendingMessages(10).blockIfQueueFull(false).sendTimeout(1, TimeUnit.SECONDS).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            super.internalCleanup();
            long nanoTime = System.nanoTime();
            for (int i = 0; i < 10; i++) {
                create.sendAsync("msg".getBytes());
            }
            Assert.assertTrue(System.nanoTime() - nanoTime < TimeUnit.SECONDS.toNanos(1L));
            Assert.assertEquals(create.getPendingQueueSize(), 10);
            long nanoTime2 = System.nanoTime();
            try {
                create.send("msg".getBytes());
                Assert.fail("Send should have failed");
            } catch (PulsarClientException.ProducerQueueIsFullError e) {
            }
            Assert.assertTrue(System.nanoTime() - nanoTime2 < TimeUnit.SECONDS.toNanos(1L));
            Assert.assertEquals(create.getPendingQueueSize(), 10);
            create.close();
            setup();
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testDeleteTopics() throws Exception {
        BrokerService brokerService = this.pulsar.getBrokerService();
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-2").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        brokerService.updateRates();
        Map bundleStats = brokerService.getBundleStats();
        Assert.assertEquals(bundleStats.size(), 1);
        Assert.assertNotNull((NamespaceBundleStats) bundleStats.get("prop/ns-abc/0x00000000_0xffffffff"));
        create.close();
        this.admin.topics().delete("persistent://prop/ns-abc/topic-1");
        brokerService.updateRates();
        Map bundleStats2 = brokerService.getBundleStats();
        Assert.assertEquals(bundleStats2.size(), 1);
        Assert.assertNotNull((NamespaceBundleStats) bundleStats2.get("prop/ns-abc/0x00000000_0xffffffff"));
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "codec")
    public Object[][] codecProvider() {
        return new Object[]{new Object[]{CompressionType.NONE}, new Object[]{CompressionType.LZ4}, new Object[]{CompressionType.ZLIB}};
    }

    @Test(dataProvider = "codec")
    public void testCompression(CompressionType compressionType) throws Exception {
        String str = "persistent://prop/ns-abc/topic0" + compressionType;
        Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).compressionType(compressionType).create();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-sub").subscribe();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference(str).get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        for (int i2 = 0; i2 < 10; i2++) {
            Message receive = subscribe.receive(5, TimeUnit.SECONDS);
            Assert.assertNotNull(receive);
            Assert.assertEquals(receive.getData(), ("my-message-" + i2).getBytes());
        }
        create.close();
        subscribe.close();
    }

    @Test
    public void testBrokerTopicStats() throws Exception {
        BrokerService brokerService = this.pulsar.getBrokerService();
        Field declaredField = BrokerService.class.getDeclaredField("statsUpdater");
        declaredField.setAccessible(true);
        ((ScheduledExecutorService) declaredField.get(brokerService)).shutdownNow();
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Metrics metrics = null;
        Thread.sleep(1000L);
        brokerService.updateRates();
        List topicMetrics = brokerService.getTopicMetrics();
        Iterator it = topicMetrics.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Metrics metrics2 = (Metrics) it.next();
            if (metrics2.getDimension("namespace").equalsIgnoreCase("prop/ns-abc")) {
                metrics = metrics2;
                break;
            }
        }
        Assert.assertNotNull(metrics);
        Assert.assertTrue(((Double) ((Metrics) topicMetrics.get(0)).getMetrics().get("brk_in_rate")).doubleValue() > 0.0d);
    }

    @Test(groups = {"quarantine"})
    public void testBrokerConnectionStats() throws Exception {
        BrokerService brokerService = this.pulsar.getBrokerService();
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").create();
        Map map = null;
        brokerService.updateRates();
        List topicMetrics = brokerService.getTopicMetrics();
        int i = 0;
        while (true) {
            if (i >= topicMetrics.size()) {
                break;
            }
            if (((Metrics) topicMetrics.get(i)).getDimensions().containsValue("broker_connection")) {
                map = ((Metrics) topicMetrics.get(i)).getMetrics();
                break;
            }
            i++;
        }
        Assert.assertNotNull(map);
        Assert.assertEquals(((Long) map.get("brk_connection_created_total_count")).longValue(), 1L);
        Assert.assertEquals(((Long) map.get("brk_active_connections")).longValue(), 1L);
        Assert.assertEquals(((Long) map.get("brk_connection_closed_total_count")).longValue(), 0L);
        Assert.assertEquals(((Long) map.get("brk_connection_create_success_count")).longValue(), 1L);
        Assert.assertEquals(((Long) map.get("brk_connection_create_fail_count")).longValue(), 0L);
        create.close();
        this.pulsarClient.close();
        Awaitility.await().until(() -> {
            brokerService.updateRates();
            List topicMetrics2 = brokerService.getTopicMetrics();
            Map map2 = null;
            int i2 = 0;
            while (true) {
                if (i2 >= topicMetrics2.size()) {
                    break;
                }
                if (((Metrics) topicMetrics2.get(i2)).getDimensions().containsValue("broker_connection")) {
                    map2 = ((Metrics) topicMetrics2.get(i2)).getMetrics();
                    break;
                }
                i2++;
            }
            return map2 != null && ((Long) map2.get("brk_connection_created_total_count")).longValue() == 1 && ((Long) map2.get("brk_active_connections")).longValue() == 0 && ((Long) map2.get("brk_connection_closed_total_count")).longValue() == 1 && ((Long) map2.get("brk_connection_create_fail_count")).longValue() == 0 && ((Long) map2.get("brk_connection_create_success_count")).longValue() == 1;
        });
        this.pulsar.getConfiguration().setAuthenticationEnabled(true);
        if (this.pulsarClient != null) {
            this.pulsarClient.shutdown();
        }
        this.pulsarClient = PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).operationTimeout(1, TimeUnit.MILLISECONDS).build();
        try {
            this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").create();
            Assert.fail();
        } catch (Exception e) {
            Assert.assertTrue(e instanceof PulsarClientException.AuthenticationException);
        }
        brokerService.updateRates();
        List topicMetrics2 = brokerService.getTopicMetrics();
        int i2 = 0;
        while (true) {
            if (i2 >= topicMetrics2.size()) {
                break;
            }
            if (((Metrics) topicMetrics2.get(i2)).getDimensions().containsValue("broker_connection")) {
                map = ((Metrics) topicMetrics2.get(i2)).getMetrics();
                break;
            }
            i2++;
        }
        Assert.assertNotNull(map);
        Assert.assertEquals(((Long) map.get("brk_connection_created_total_count")).longValue(), 2L);
        Assert.assertEquals(((Long) map.get("brk_active_connections")).longValue(), 0L);
        Assert.assertEquals(((Long) map.get("brk_connection_closed_total_count")).longValue(), 2L);
        Assert.assertEquals(((Long) map.get("brk_connection_create_success_count")).longValue(), 1L);
        Assert.assertEquals(((Long) map.get("brk_connection_create_fail_count")).longValue(), 1L);
    }

    @Test
    public void testPayloadCorruptionDetection() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic1"}).subscriptionName("my-sub").subscribe();
        CompletableFuture sendAsync = create.newMessage().value("message-1".getBytes()).sendAsync();
        stopBroker();
        byte[] bytes = "message-2".getBytes();
        TypedMessageBuilderImpl value = create.newMessage().value(bytes);
        CompletableFuture sendAsync2 = value.sendAsync();
        value.getContent().put(bytes.length - 1, (byte) 51);
        startBroker();
        sendAsync.get();
        try {
            sendAsync2.get();
            Assert.fail("since we corrupted the message, it should be rejected by the broker");
        } catch (Exception e) {
        }
        Assert.assertEquals(new String(subscribe.receive(1, TimeUnit.SECONDS).getData()), "message-1");
        while (true) {
            Message receive = subscribe.receive(1, TimeUnit.SECONDS);
            if (receive == null) {
                return;
            } else {
                Assert.assertEquals(new String(receive.getData()), "message-1");
            }
        }
    }

    @Test
    public void testMessageRedelivery() throws Exception {
        ArrayList arrayList = new ArrayList();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/topic2"}).subscriptionName("sub2").subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://prop/ns-abc/topic2").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 20; i++) {
            create.send("my-message-" + i);
        }
        for (int i2 = 0; i2 < 20; i2++) {
            Message receive = subscribe.receive();
            if (i2 >= 10) {
                arrayList.add(receive);
            } else {
                subscribe.acknowledge(receive);
            }
        }
        subscribe.redeliverUnacknowledgedMessages();
        for (int i3 = 0; i3 < 10; i3++) {
            try {
                Message receive2 = subscribe.receive(1, TimeUnit.SECONDS);
                arrayList.removeIf(message -> {
                    return ((String) message.getValue()).equals(receive2.getValue());
                });
            } catch (Exception e) {
                Assert.fail("msg should be redelivered ", e);
            }
        }
        Assert.assertEquals(arrayList.size(), 0);
        Assert.assertNull(subscribe.receive(100, TimeUnit.MILLISECONDS));
        subscribe.close();
        create.close();
    }

    @Test
    public void testMessageReplay() throws Exception {
        int i = 10 / 2;
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic2"}).subscriptionName("sub2").subscriptionType(SubscriptionType.Shared).receiverQueueSize(1).subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic2").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/topic2").get();
        Assert.assertNotNull(persistentTopic);
        PersistentSubscription subscription = persistentTopic.getSubscription("sub2");
        PersistentDispatcherMultipleConsumers dispatcher = subscription.getDispatcher();
        Field declaredField = PersistentDispatcherMultipleConsumers.class.getDeclaredField("redeliveryMessages");
        declaredField.setAccessible(true);
        MessageRedeliveryController messageRedeliveryController = new MessageRedeliveryController(true);
        Assert.assertNotNull(subscription);
        for (int i2 = 0; i2 < 10; i2++) {
            create.send(("my-message-" + i2).getBytes());
        }
        MessageIdImpl messageIdImpl = null;
        for (int i3 = 0; i3 < 10; i3++) {
            Message receive = subscribe.receive();
            subscribe.acknowledge(receive);
            MessageIdImpl messageIdImpl2 = (MessageIdImpl) receive.getMessageId();
            if (i3 == 0) {
                messageIdImpl = messageIdImpl2;
            }
            if (i3 < i) {
                messageRedeliveryController.add(messageIdImpl2.getLedgerId(), messageIdImpl2.getEntryId());
            }
        }
        Thread.sleep(1000L);
        declaredField.set(dispatcher, messageRedeliveryController);
        dispatcher.redeliverUnacknowledgedMessages((Consumer) dispatcher.getConsumers().get(0));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(messageRedeliveryController.isEmpty());
        });
        Assert.assertTrue(messageRedeliveryController.isEmpty());
        messageRedeliveryController.add(messageIdImpl.getLedgerId(), messageIdImpl.getEntryId());
        declaredField.set(dispatcher, messageRedeliveryController);
        create.send("testMsg".getBytes());
        dispatcher.consumerFlow((Consumer) dispatcher.getConsumers().get(0), 1);
        Message receive2 = subscribe.receive(1, TimeUnit.SECONDS);
        Assert.assertNotNull(receive2);
        Assert.assertEquals(receive2.getData(), "testMsg".getBytes());
        subscribe.close();
        create.close();
    }

    @Test
    public void testCreateProducerWithSameName() throws Exception {
        ProducerBuilder messageRoutingMode = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testCreateProducerWithSameName").producerName("test-producer-a").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition);
        Producer create = messageRoutingMode.create();
        try {
            messageRoutingMode.create();
            Assert.fail("Should have thrown ProducerBusyException");
        } catch (PulsarClientException.ProducerBusyException e) {
        }
        create.close();
        messageRoutingMode.create().close();
    }

    @Test
    public void testGetOrCreateTopic() throws Exception {
        this.admin.lookups().lookupTopic("persistent://prop/ns-abc/testGetOrCreateTopic");
        Assert.assertNotNull((Topic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://prop/ns-abc/testGetOrCreateTopic").get());
        Assert.assertTrue(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testGetOrCreateTopic").isPresent());
    }

    @Test
    public void testGetTopicIfExists() throws Exception {
        this.admin.lookups().lookupTopic("persistent://prop/ns-abc/testGetTopicIfExists");
        Assert.assertFalse(((Optional) this.pulsar.getBrokerService().getTopicIfExists("persistent://prop/ns-abc/testGetTopicIfExists").join()).isPresent());
        Assert.assertFalse(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testGetTopicIfExists").isPresent());
    }

    @Test
    public void testWithEventTime() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"prop/ns-abc/topic-event-time"}).subscriptionName("sub").subscribe();
        this.pulsarClient.newProducer(Schema.STRING).topic("prop/ns-abc/topic-event-time").create().newMessage().value("test").eventTime(5L).send();
        Message receive = subscribe.receive();
        Assert.assertNotNull(receive);
        Assert.assertEquals((String) receive.getValue(), "test");
        Assert.assertEquals(receive.getEventTime(), 5L);
    }
}
