/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.PulsarCommandSenderImpl;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerInterceptors;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.MockZooKeeper;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class PersistentTopicTest
extends MockedBookKeeperTestCase {
    protected PulsarService pulsar;
    private BrokerService brokerService;
    private ManagedLedgerFactory mlFactoryMock;
    private ServerCnx serverCnx;
    private ManagedLedger ledgerMock;
    private ManagedCursor cursorMock;
    private ConfigurationCacheService configCacheService;
    final String successTopicName = "persistent://prop/use/ns-abc/successTopic";
    final String successPartitionTopicName = "persistent://prop/use/ns-abc/successTopic-partition-0";
    final String failTopicName = "persistent://prop/use/ns-abc/failTopic";
    final String successSubName = "successSub";
    final String successSubName2 = "successSub2";
    final String successSubName3 = "successSub3";
    private static final Logger log = LoggerFactory.getLogger(PersistentTopicTest.class);
    private OrderedExecutor executor;
    private EventLoopGroup eventLoopGroup;

    @BeforeMethod
    public void setup() throws Exception {
        this.eventLoopGroup = new NioEventLoopGroup();
        this.executor = OrderedExecutor.newBuilder().numThreads(1).build();
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        svcConfig.setAdvertisedAddress("localhost");
        svcConfig.setBrokerShutdownTimeoutMs(0L);
        this.pulsar = (PulsarService)Mockito.spy((Object)new PulsarService(svcConfig));
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)this.pulsar)).getConfiguration();
        ((PulsarService)Mockito.doReturn((Object)Mockito.mock(Compactor.class)).when((Object)this.pulsar)).getCompactor();
        this.mlFactoryMock = (ManagedLedgerFactory)Mockito.mock(ManagedLedgerFactory.class);
        ((PulsarService)Mockito.doReturn((Object)this.mlFactoryMock).when((Object)this.pulsar)).getManagedLedgerFactory();
        MockZooKeeper mockZk = MockedPulsarServiceBaseTest.createMockZooKeeper();
        ((PulsarService)Mockito.doReturn((Object)mockZk).when((Object)this.pulsar)).getZkClient();
        ((PulsarService)Mockito.doReturn((Object)((Object)MockedPulsarServiceBaseTest.createMockBookKeeper(this.executor))).when((Object)this.pulsar)).getBookKeeperClient();
        ZooKeeperCache cache = (ZooKeeperCache)Mockito.mock(ZooKeeperCache.class);
        ((ZooKeeperCache)Mockito.doReturn((Object)30).when((Object)cache)).getZkOperationTimeoutSeconds();
        ((PulsarService)Mockito.doReturn((Object)cache).when((Object)this.pulsar)).getLocalZkCache();
        this.configCacheService = (ConfigurationCacheService)Mockito.mock(ConfigurationCacheService.class);
        ZooKeeperDataCache zkDataCache = (ZooKeeperDataCache)Mockito.mock(ZooKeeperDataCache.class);
        ((ConfigurationCacheService)Mockito.doReturn((Object)zkDataCache).when((Object)this.configCacheService)).policiesCache();
        ((PulsarService)Mockito.doReturn((Object)this.configCacheService).when((Object)this.pulsar)).getConfigurationCache();
        ((ZooKeeperDataCache)Mockito.doReturn(Optional.empty()).when((Object)zkDataCache)).get((String)ArgumentMatchers.any());
        LocalZooKeeperCacheService zkCache = (LocalZooKeeperCacheService)Mockito.mock(LocalZooKeeperCacheService.class);
        ((ZooKeeperDataCache)Mockito.doReturn(CompletableFuture.completedFuture(Optional.empty())).when((Object)zkDataCache)).getAsync((String)ArgumentMatchers.any());
        ((LocalZooKeeperCacheService)Mockito.doReturn((Object)zkDataCache).when((Object)zkCache)).policiesCache();
        ((PulsarService)Mockito.doReturn((Object)this.configCacheService).when((Object)this.pulsar)).getConfigurationCache();
        ((PulsarService)Mockito.doReturn((Object)zkCache).when((Object)this.pulsar)).getLocalZkCacheService();
        ((PulsarService)Mockito.doReturn((Object)this.executor).when((Object)this.pulsar)).getOrderedExecutor();
        this.brokerService = (BrokerService)Mockito.spy((Object)new BrokerService(this.pulsar, this.eventLoopGroup));
        ((PulsarService)Mockito.doReturn((Object)this.brokerService).when((Object)this.pulsar)).getBrokerService();
        this.serverCnx = (ServerCnx)Mockito.spy((Object)new ServerCnx(this.pulsar));
        ((ServerCnx)Mockito.doReturn((Object)true).when((Object)this.serverCnx)).isActive();
        ((ServerCnx)Mockito.doReturn((Object)true).when((Object)this.serverCnx)).isWritable();
        ((ServerCnx)Mockito.doReturn((Object)new InetSocketAddress("localhost", 1234)).when((Object)this.serverCnx)).clientAddress();
        ((ServerCnx)Mockito.doReturn((Object)new PulsarCommandSenderImpl(null, this.serverCnx)).when((Object)this.serverCnx)).getCommandSender();
        NamespaceService nsSvc = (NamespaceService)Mockito.mock(NamespaceService.class);
        NamespaceBundle bundle = (NamespaceBundle)Mockito.mock(NamespaceBundle.class);
        ((NamespaceService)Mockito.doReturn(CompletableFuture.completedFuture(bundle)).when((Object)nsSvc)).getBundleAsync((TopicName)ArgumentMatchers.any());
        ((PulsarService)Mockito.doReturn((Object)nsSvc).when((Object)this.pulsar)).getNamespaceService();
        ((NamespaceService)Mockito.doReturn((Object)true).when((Object)nsSvc)).isServiceUnitOwned((ServiceUnitId)ArgumentMatchers.any());
        ((NamespaceService)Mockito.doReturn((Object)true).when((Object)nsSvc)).isServiceUnitActive((TopicName)ArgumentMatchers.any());
        ((NamespaceService)Mockito.doReturn(CompletableFuture.completedFuture(true)).when((Object)nsSvc)).checkTopicOwnership((TopicName)ArgumentMatchers.any());
        this.setupMLAsyncCallbackMocks();
    }

    @AfterMethod(alwaysRun=true)
    public void teardown() throws Exception {
        this.brokerService.getTopics().clear();
        this.brokerService.close();
        try {
            this.pulsar.close();
        }
        catch (Exception e) {
            log.warn("Failed to close pulsar service", (Throwable)e);
            throw e;
        }
        this.executor.shutdownNow();
        this.eventLoopGroup.shutdownGracefully().get();
    }

    @Test
    public void testCreateTopic() {
        final ManagedLedger ledgerMock = (ManagedLedger)Mockito.mock(ManagedLedger.class);
        ((ManagedLedger)Mockito.doReturn(new ArrayList()).when((Object)ledgerMock)).getCursors();
        String topicName = "persistent://prop/use/ns-abc/topic1";
        ((ManagedLedgerFactory)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) {
                ((AsyncCallbacks.OpenLedgerCallback)invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
                return null;
            }
        }).when((Object)this.mlFactoryMock)).asyncOpen(Mockito.anyString(), (ManagedLedgerConfig)ArgumentMatchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback)ArgumentMatchers.any(AsyncCallbacks.OpenLedgerCallback.class), (Supplier)ArgumentMatchers.any(Supplier.class), ArgumentMatchers.any());
        CompletionStage future = ((CompletableFuture)this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/topic1").thenAccept(topic -> Assert.assertTrue((boolean)topic.toString().contains("persistent://prop/use/ns-abc/topic1")))).exceptionally(t -> {
            Assert.fail((String)"should not fail");
            return null;
        });
        try {
            ((CompletableFuture)future).get(1L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            Assert.fail((String)"Should not fail or time out");
        }
    }

    @Test
    public void testCreateTopicMLFailure() {
        String jinxedTopicName = "persistent://prop/use/ns-abc/topic3";
        ((ManagedLedgerFactory)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) {
                new Thread(() -> ((AsyncCallbacks.OpenLedgerCallback)invocationOnMock.getArguments()[2]).openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null)).start();
                return null;
            }
        }).when((Object)this.mlFactoryMock)).asyncOpen(Mockito.anyString(), (ManagedLedgerConfig)ArgumentMatchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback)ArgumentMatchers.any(AsyncCallbacks.OpenLedgerCallback.class), (Supplier)ArgumentMatchers.any(Supplier.class), ArgumentMatchers.any());
        CompletableFuture future = this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/topic3");
        try {
            future.get(1L, TimeUnit.SECONDS);
            Assert.fail((String)"should have failed");
        }
        catch (TimeoutException e) {
            Assert.fail((String)"Should not time out");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testPublishMessage() throws Exception {
        ((ManagedLedger)Mockito.doAnswer(invocationOnMock -> {
            ByteBuf payload = (ByteBuf)invocationOnMock.getArguments()[0];
            AsyncCallbacks.AddEntryCallback callback = (AsyncCallbacks.AddEntryCallback)invocationOnMock.getArguments()[1];
            Topic.PublishContext ctx = (Topic.PublishContext)invocationOnMock.getArguments()[2];
            callback.addComplete((Position)PositionImpl.latest, payload, (Object)ctx);
            return null;
        }).when((Object)this.ledgerMock)).asyncAddEntry((ByteBuf)ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback)ArgumentMatchers.any(AsyncCallbacks.AddEntryCallback.class), ArgumentMatchers.any());
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        final ByteBuf payload = Unpooled.wrappedBuffer((byte[])"content".getBytes());
        final CountDownLatch latch = new CountDownLatch(1);
        Topic.PublishContext publishContext = new Topic.PublishContext(){

            public void completed(Exception e, long ledgerId, long entryId) {
                Assert.assertEquals((long)ledgerId, (long)PositionImpl.latest.getLedgerId());
                Assert.assertEquals((long)entryId, (long)PositionImpl.latest.getEntryId());
                latch.countDown();
            }

            public void setMetadataFromEntryData(ByteBuf entryData) {
                Assert.assertEquals((long)latch.getCount(), (long)1L);
                Assert.assertEquals((byte[])entryData.array(), (byte[])payload.array());
            }
        };
        topic.publishMessage(payload, publishContext);
        Assert.assertTrue((boolean)latch.await(1L, TimeUnit.SECONDS));
    }

    @Test
    public void testDispatcherMultiConsumerReadFailed() throws Exception {
        PersistentTopic topic = (PersistentTopic)Mockito.spy((Object)new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService));
        ManagedCursor cursor = (ManagedCursor)Mockito.mock(ManagedCursor.class);
        Mockito.when((Object)cursor.getName()).thenReturn((Object)"cursor");
        PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor, null);
        dispatcher.readEntriesFailed((ManagedLedgerException)new ManagedLedgerException.InvalidCursorPositionException("failed"), null);
        ((PersistentTopic)Mockito.verify((Object)topic, (VerificationMode)Mockito.atLeast((int)1))).getBrokerService();
    }

    @Test
    public void testDispatcherSingleConsumerReadFailed() throws Exception {
        PersistentTopic topic = (PersistentTopic)Mockito.spy((Object)new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService));
        ManagedCursor cursor = (ManagedCursor)Mockito.mock(ManagedCursor.class);
        Mockito.when((Object)cursor.getName()).thenReturn((Object)"cursor");
        PersistentDispatcherSingleActiveConsumer dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, CommandSubscribe.SubType.Exclusive, 1, topic, null);
        Consumer consumer = (Consumer)Mockito.mock(Consumer.class);
        dispatcher.readEntriesFailed((ManagedLedgerException)new ManagedLedgerException.InvalidCursorPositionException("failed"), (Object)consumer);
        ((PersistentTopic)Mockito.verify((Object)topic, (VerificationMode)Mockito.atLeast((int)1))).getBrokerService();
    }

    @Test
    public void testPublishMessageMLFailure() throws Exception {
        String successTopicName = "persistent://prop/use/ns-abc/successTopic";
        ManagedLedger ledgerMock = (ManagedLedger)Mockito.mock(ManagedLedger.class);
        ((ManagedLedger)Mockito.doReturn(new ArrayList()).when((Object)ledgerMock)).getCursors();
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", ledgerMock, this.brokerService);
        MessageMetadata messageMetadata = new MessageMetadata().setPublishTime(System.currentTimeMillis()).setProducerName("prod-name").setSequenceId(1L);
        ByteBuf payload = Unpooled.wrappedBuffer((byte[])"content".getBytes());
        CountDownLatch latch = new CountDownLatch(1);
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.AddEntryCallback)invocationOnMock.getArguments()[1]).addFailed(new ManagedLedgerException("Managed ledger failure"), invocationOnMock.getArguments()[2]);
                return null;
            }
        }).when((Object)ledgerMock)).asyncAddEntry((ByteBuf)ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback)ArgumentMatchers.any(AsyncCallbacks.AddEntryCallback.class), ArgumentMatchers.any());
        topic.publishMessage(payload, (exception, ledgerId, entryId) -> {
            if (exception == null) {
                Assert.fail((String)"publish should have failed");
            } else {
                latch.countDown();
            }
        });
        Assert.assertTrue((boolean)latch.await(1L, TimeUnit.SECONDS));
    }

    @Test
    public void testAddRemoveProducer() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        String role = "appid1";
        Producer producer = new Producer((Topic)topic, (TransportCnx)this.serverCnx, 1L, "prod-name", role, false, null, SchemaVersion.Latest, 0L, false, ProducerAccessMode.Shared, Optional.empty());
        topic.addProducer(producer, new CompletableFuture());
        Assert.assertEquals((int)topic.getProducers().size(), (int)1);
        try {
            topic.addProducer(producer, new CompletableFuture()).join();
            Assert.fail((String)"Should have failed with naming exception because producer 'null' is already connected to the topic");
        }
        catch (Exception e) {
            Assert.assertEquals(e.getCause().getClass(), BrokerServiceException.NamingException.class);
        }
        Assert.assertEquals((int)topic.getProducers().size(), (int)1);
        PersistentTopic failTopic = new PersistentTopic("persistent://prop/use/ns-abc/failTopic", this.ledgerMock, this.brokerService);
        Producer failProducer = new Producer((Topic)failTopic, (TransportCnx)this.serverCnx, 2L, "prod-name", role, false, null, SchemaVersion.Latest, 0L, false, ProducerAccessMode.Shared, Optional.empty());
        try {
            topic.addProducer(failProducer, new CompletableFuture());
            Assert.fail((String)"should have failed");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        topic.removeProducer(producer);
        Assert.assertEquals((int)topic.getProducers().size(), (int)0);
        topic.removeProducer(producer);
    }

    @Test
    public void testProducerOverwrite() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        String role = "appid1";
        Producer producer1 = new Producer((Topic)topic, (TransportCnx)this.serverCnx, 1L, "prod-name", role, false, null, SchemaVersion.Latest, 0L, true, ProducerAccessMode.Shared, Optional.empty());
        Producer producer2 = new Producer((Topic)topic, (TransportCnx)this.serverCnx, 2L, "prod-name", role, false, null, SchemaVersion.Latest, 0L, true, ProducerAccessMode.Shared, Optional.empty());
        try {
            topic.addProducer(producer1, new CompletableFuture()).join();
            topic.addProducer(producer2, new CompletableFuture()).join();
            Assert.fail((String)"should have failed");
        }
        catch (Exception e) {
            Assert.assertEquals(e.getCause().getClass(), BrokerServiceException.NamingException.class);
        }
        Assert.assertEquals((int)topic.getProducers().size(), (int)1);
        Producer producer3 = new Producer((Topic)topic, (TransportCnx)this.serverCnx, 2L, "prod-name", role, false, null, SchemaVersion.Latest, 1L, false, ProducerAccessMode.Shared, Optional.empty());
        try {
            topic.addProducer(producer3, new CompletableFuture()).join();
            Assert.fail((String)"should have failed");
        }
        catch (Exception e) {
            Assert.assertEquals(e.getCause().getClass(), BrokerServiceException.NamingException.class);
        }
        Assert.assertEquals((int)topic.getProducers().size(), (int)1);
        topic.removeProducer(producer1);
        Assert.assertEquals((int)topic.getProducers().size(), (int)0);
        Producer producer4 = new Producer((Topic)topic, (TransportCnx)this.serverCnx, 2L, "prod-name", role, false, null, SchemaVersion.Latest, 2L, false, ProducerAccessMode.Shared, Optional.empty());
        topic.addProducer(producer3, new CompletableFuture());
        topic.addProducer(producer4, new CompletableFuture());
        Assert.assertEquals((int)topic.getProducers().size(), (int)1);
        topic.getProducers().values().forEach(producer -> Assert.assertEquals((long)producer.getEpoch(), (long)2L));
        topic.removeProducer(producer4);
        Assert.assertEquals((int)topic.getProducers().size(), (int)0);
        Producer producer5 = new Producer((Topic)topic, (TransportCnx)this.serverCnx, 2L, "pulsar.repl.cluster1", role, false, null, SchemaVersion.Latest, 1L, false, ProducerAccessMode.Shared, Optional.empty());
        topic.addProducer(producer5, new CompletableFuture());
        Assert.assertEquals((int)topic.getProducers().size(), (int)1);
        Producer producer6 = new Producer((Topic)topic, (TransportCnx)this.serverCnx, 2L, "pulsar.repl.cluster1", role, false, null, SchemaVersion.Latest, 2L, false, ProducerAccessMode.Shared, Optional.empty());
        topic.addProducer(producer6, new CompletableFuture());
        Assert.assertEquals((int)topic.getProducers().size(), (int)1);
        topic.getProducers().values().forEach(producer -> Assert.assertEquals((long)producer.getEpoch(), (long)2L));
        Producer producer7 = new Producer((Topic)topic, (TransportCnx)this.serverCnx, 2L, "pulsar.repl.cluster1", role, false, null, SchemaVersion.Latest, 3L, true, ProducerAccessMode.Shared, Optional.empty());
        topic.addProducer(producer7, new CompletableFuture());
        Assert.assertEquals((int)topic.getProducers().size(), (int)1);
        topic.getProducers().values().forEach(producer -> Assert.assertEquals((long)producer.getEpoch(), (long)3L));
    }

    private void testMaxProducers() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        String role = "appid1";
        Producer producer = new Producer((Topic)topic, (TransportCnx)this.serverCnx, 1L, "prod-name1", role, false, null, SchemaVersion.Latest, 0L, false, ProducerAccessMode.Shared, Optional.empty());
        topic.addProducer(producer, new CompletableFuture());
        Assert.assertEquals((int)topic.getProducers().size(), (int)1);
        Producer producer2 = new Producer((Topic)topic, (TransportCnx)this.serverCnx, 2L, "prod-name2", role, false, null, SchemaVersion.Latest, 0L, false, ProducerAccessMode.Shared, Optional.empty());
        topic.addProducer(producer2, new CompletableFuture());
        Assert.assertEquals((int)topic.getProducers().size(), (int)2);
        try {
            Producer producer3 = new Producer((Topic)topic, (TransportCnx)this.serverCnx, 3L, "prod-name3", role, false, null, SchemaVersion.Latest, 0L, false, ProducerAccessMode.Shared, Optional.empty());
            topic.addProducer(producer3, new CompletableFuture()).join();
            Assert.fail((String)"should have failed");
        }
        catch (Exception e) {
            Assert.assertEquals(e.getCause().getClass(), BrokerServiceException.ProducerBusyException.class);
        }
    }

    @Test
    public void testMaxProducersForBroker() throws Exception {
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        ((ServiceConfiguration)Mockito.doReturn((Object)2).when((Object)svcConfig)).getMaxProducersPerTopic();
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)this.pulsar)).getConfiguration();
        this.testMaxProducers();
    }

    @Test
    public void testMaxProducersForNamespace() throws Exception {
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)this.pulsar)).getConfiguration();
        Policies policies = new Policies();
        policies.max_producers_per_topic = 2;
        Mockito.when((Object)this.pulsar.getConfigurationCache().policiesCache().get(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/use/ns-abc/successTopic").getNamespace()}))).thenReturn(Optional.of(policies));
        this.testMaxProducers();
    }

    private Producer getMockedProducerWithSpecificAddress(Topic topic, long producerId, InetAddress address) throws Exception {
        String producerNameBase = "producer";
        String role = "appid1";
        ServerCnx cnx = (ServerCnx)Mockito.spy((Object)new ServerCnx(this.pulsar));
        ((ServerCnx)Mockito.doReturn((Object)true).when((Object)cnx)).isActive();
        ((ServerCnx)Mockito.doReturn((Object)true).when((Object)cnx)).isWritable();
        ((ServerCnx)Mockito.doReturn((Object)new InetSocketAddress(address, 1234)).when((Object)cnx)).clientAddress();
        ((ServerCnx)Mockito.doReturn((Object)address.getHostAddress()).when((Object)cnx)).clientSourceAddress();
        ((ServerCnx)Mockito.doReturn((Object)new PulsarCommandSenderImpl(null, cnx)).when((Object)cnx)).getCommandSender();
        return new Producer(topic, (TransportCnx)cnx, producerId, "producer" + producerId, "appid1", false, null, SchemaVersion.Latest, 0L, false, ProducerAccessMode.Shared, Optional.empty());
    }

    @Test
    public void testMaxSameAddressProducers() throws Exception {
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        ((ServiceConfiguration)Mockito.doReturn((Object)2).when((Object)svcConfig)).getMaxSameAddressProducersPerTopic();
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)this.pulsar)).getConfiguration();
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        InetAddress address1 = InetAddress.getByName("127.0.0.1");
        InetAddress address2 = InetAddress.getByName("0.0.0.0");
        String ipAddress1 = address1.getHostAddress();
        String ipAddress2 = address2.getHostAddress();
        Producer producer1 = this.getMockedProducerWithSpecificAddress((Topic)topic, 1L, address1);
        topic.addProducer(producer1, new CompletableFuture());
        Assert.assertEquals((int)topic.getProducers().size(), (int)1);
        Assert.assertEquals((int)topic.getNumberOfSameAddressProducers(ipAddress1), (int)1);
        Producer producer2 = this.getMockedProducerWithSpecificAddress((Topic)topic, 2L, address1);
        topic.addProducer(producer2, new CompletableFuture());
        Assert.assertEquals((int)topic.getProducers().size(), (int)2);
        Assert.assertEquals((int)topic.getNumberOfSameAddressProducers(ipAddress1), (int)2);
        try {
            Producer producer3 = this.getMockedProducerWithSpecificAddress((Topic)topic, 3L, address1);
            topic.addProducer(producer3, new CompletableFuture()).join();
            Assert.fail((String)"should have failed");
        }
        catch (Exception e) {
            Assert.assertEquals(e.getCause().getClass(), BrokerServiceException.ProducerBusyException.class);
        }
        Assert.assertEquals((int)topic.getProducers().size(), (int)2);
        Assert.assertEquals((int)topic.getNumberOfSameAddressProducers(ipAddress1), (int)2);
        Producer producer4 = this.getMockedProducerWithSpecificAddress((Topic)topic, 4L, address2);
        topic.addProducer(producer4, new CompletableFuture());
        Assert.assertEquals((int)topic.getProducers().size(), (int)3);
        Assert.assertEquals((int)topic.getNumberOfSameAddressProducers(ipAddress2), (int)1);
        Producer producer5 = this.getMockedProducerWithSpecificAddress((Topic)topic, 5L, address2);
        topic.addProducer(producer5, new CompletableFuture());
        Assert.assertEquals((int)topic.getProducers().size(), (int)4);
        Assert.assertEquals((int)topic.getNumberOfSameAddressProducers(ipAddress2), (int)2);
        try {
            Producer producer6 = this.getMockedProducerWithSpecificAddress((Topic)topic, 6L, address2);
            topic.addProducer(producer6, new CompletableFuture()).join();
            Assert.fail((String)"should have failed");
        }
        catch (Exception e) {
            Assert.assertEquals(e.getCause().getClass(), BrokerServiceException.ProducerBusyException.class);
        }
        Assert.assertEquals((int)topic.getProducers().size(), (int)4);
        Assert.assertEquals((int)topic.getNumberOfSameAddressProducers(ipAddress2), (int)2);
        topic.removeProducer(producer1);
        Assert.assertEquals((int)topic.getProducers().size(), (int)3);
        Assert.assertEquals((int)topic.getNumberOfSameAddressProducers(ipAddress1), (int)1);
        Producer producer7 = this.getMockedProducerWithSpecificAddress((Topic)topic, 7L, address1);
        topic.addProducer(producer7, new CompletableFuture());
        Assert.assertEquals((int)topic.getProducers().size(), (int)4);
        Assert.assertEquals((int)topic.getNumberOfSameAddressProducers(ipAddress1), (int)2);
    }

    @Test
    public void testSubscribeFail() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        CommandSubscribe cmd = new CommandSubscribe().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("").setConsumerName("consumer-name").setReadCompacted(false).setRequestId(1L).setSubType(CommandSubscribe.SubType.Exclusive);
        CompletableFuture f1 = topic.subscribe((TransportCnx)this.serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.isDurable(), null, Collections.emptyMap(), cmd.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, null);
        try {
            f1.get();
            Assert.fail((String)"should fail with exception");
        }
        catch (ExecutionException ee) {
            Assert.assertTrue((boolean)(ee.getCause() instanceof BrokerServiceException.NamingException));
        }
    }

    @Test
    public void testSubscribeUnsubscribe() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        CommandSubscribe cmd = new CommandSubscribe().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setConsumerName("consumer-name").setReadCompacted(false).setRequestId(1L).setSubType(CommandSubscribe.SubType.Exclusive);
        CompletableFuture f1 = topic.subscribe((TransportCnx)this.serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.isDurable(), null, Collections.emptyMap(), cmd.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, null);
        f1.get();
        CompletableFuture f2 = topic.subscribe((TransportCnx)this.serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.isDurable(), null, Collections.emptyMap(), cmd.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, null);
        try {
            f2.get();
            Assert.fail((String)"should fail with exception");
        }
        catch (ExecutionException ee) {
            Assert.assertTrue((boolean)(ee.getCause() instanceof BrokerServiceException.ConsumerBusyException));
        }
        CompletableFuture f3 = topic.unsubscribe("successSub");
        f3.get();
        Assert.assertNull((Object)topic.getSubscription("successSub"));
    }

    @Test
    public void testChangeSubscriptionType() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription sub = new PersistentSubscription(topic, "change-sub-type", this.cursorMock, false);
        Consumer consumer = new Consumer((Subscription)sub, CommandSubscribe.SubType.Exclusive, topic.getName(), 1L, 0, "Cons1", 50000, (TransportCnx)this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT));
        sub.addConsumer(consumer);
        consumer.close();
        CommandSubscribe.SubType previousSubType = CommandSubscribe.SubType.Exclusive;
        for (CommandSubscribe.SubType subType : Lists.newArrayList((Object[])new CommandSubscribe.SubType[]{CommandSubscribe.SubType.Shared, CommandSubscribe.SubType.Failover, CommandSubscribe.SubType.Key_Shared, CommandSubscribe.SubType.Exclusive})) {
            Dispatcher previousDispatcher = sub.getDispatcher();
            consumer = new Consumer((Subscription)sub, subType, topic.getName(), 1L, 0, "Cons1", 50000, (TransportCnx)this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT));
            sub.addConsumer(consumer);
            Assert.assertTrue((boolean)sub.getDispatcher().isConsumerConnected());
            Assert.assertFalse((boolean)sub.getDispatcher().isClosed());
            Assert.assertEquals((Object)sub.getDispatcher().getType(), (Object)subType);
            Assert.assertFalse((boolean)previousDispatcher.isConsumerConnected());
            Assert.assertTrue((boolean)previousDispatcher.isClosed());
            Assert.assertEquals((Object)previousDispatcher.getType(), (Object)previousSubType);
            consumer.close();
            previousSubType = subType;
        }
    }

    @Test
    public void testAddRemoveConsumer() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", this.cursorMock, false);
        Consumer consumer = new Consumer((Subscription)sub, CommandSubscribe.SubType.Exclusive, topic.getName(), 1L, 0, "Cons1", 50000, (TransportCnx)this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null);
        sub.addConsumer(consumer);
        Assert.assertTrue((boolean)sub.getDispatcher().isConsumerConnected());
        try {
            sub.addConsumer(consumer).get();
            Assert.fail((String)"Should fail with ConsumerBusyException");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof BrokerServiceException.ConsumerBusyException));
        }
        sub.removeConsumer(consumer);
        Assert.assertFalse((boolean)sub.getDispatcher().isConsumerConnected());
        try {
            sub.removeConsumer(consumer);
            Assert.fail((String)"Should fail with ServerMetadataException");
        }
        catch (BrokerServiceException e) {
            Assert.assertTrue((boolean)(e instanceof BrokerServiceException.ServerMetadataException));
        }
    }

    @Test
    public void testAddRemoveConsumerDurableCursor() throws Exception {
        ((ManagedCursor)Mockito.doReturn((Object)false).when((Object)this.cursorMock)).isDurable();
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription sub = new PersistentSubscription(topic, "non-durable-sub", this.cursorMock, false);
        Consumer consumer = new Consumer((Subscription)sub, CommandSubscribe.SubType.Exclusive, topic.getName(), 1L, 0, "Cons1", 50000, (TransportCnx)this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null);
        sub.addConsumer(consumer);
        Assert.assertFalse((boolean)sub.getDispatcher().isClosed());
        sub.removeConsumer(consumer);
        for (int i = 0; i < 100 && !sub.getDispatcher().isClosed(); ++i) {
            Thread.sleep(100L);
        }
        Assert.assertTrue((boolean)sub.getDispatcher().isClosed());
    }

    private void testMaxConsumersShared() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", this.cursorMock, false);
        PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", this.cursorMock, false);
        Method addConsumerToSubscription = AbstractTopic.class.getDeclaredMethod("addConsumerToSubscription", Subscription.class, Consumer.class);
        addConsumerToSubscription.setAccessible(true);
        ConcurrentOpenHashMap subscriptions = new ConcurrentOpenHashMap(16, 1);
        subscriptions.put((Object)"sub-1", (Object)sub);
        subscriptions.put((Object)"sub-2", (Object)sub2);
        Field field = topic.getClass().getDeclaredField("subscriptions");
        field.setAccessible(true);
        field.set(topic, subscriptions);
        Consumer consumer = new Consumer((Subscription)sub, CommandSubscribe.SubType.Shared, topic.getName(), 1L, 0, "Cons1", 50000, (TransportCnx)this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null);
        addConsumerToSubscription.invoke((Object)topic, sub, consumer);
        Assert.assertEquals((int)sub.getConsumers().size(), (int)1);
        Consumer consumer2 = new Consumer((Subscription)sub, CommandSubscribe.SubType.Shared, topic.getName(), 2L, 0, "Cons2", 50000, (TransportCnx)this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null);
        addConsumerToSubscription.invoke((Object)topic, sub, consumer2);
        Assert.assertEquals((int)sub.getConsumers().size(), (int)2);
        try {
            Consumer consumer3 = new Consumer((Subscription)sub, CommandSubscribe.SubType.Shared, topic.getName(), 3L, 0, "Cons3", 50000, (TransportCnx)this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null);
            ((CompletableFuture)addConsumerToSubscription.invoke((Object)topic, sub, consumer3)).get();
            Assert.fail((String)"should have failed");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof BrokerServiceException.ConsumerBusyException));
        }
        Assert.assertEquals((int)topic.getNumberOfConsumers(), (int)2);
        Consumer consumer4 = new Consumer((Subscription)sub2, CommandSubscribe.SubType.Shared, topic.getName(), 4L, 0, "Cons4", 50000, (TransportCnx)this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null);
        addConsumerToSubscription.invoke((Object)topic, sub2, consumer4);
        Assert.assertEquals((int)sub2.getConsumers().size(), (int)1);
        Assert.assertEquals((int)topic.getNumberOfConsumers(), (int)3);
        try {
            Consumer consumer5 = new Consumer((Subscription)sub2, CommandSubscribe.SubType.Shared, topic.getName(), 5L, 0, "Cons5", 50000, (TransportCnx)this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null);
            ((CompletableFuture)addConsumerToSubscription.invoke((Object)topic, sub2, consumer5)).get();
            Assert.fail((String)"should have failed");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof BrokerServiceException.ConsumerBusyException));
        }
    }

    @Test
    public void testMaxConsumersSharedForBroker() throws Exception {
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        ((ServiceConfiguration)Mockito.doReturn((Object)2).when((Object)svcConfig)).getMaxConsumersPerSubscription();
        ((ServiceConfiguration)Mockito.doReturn((Object)3).when((Object)svcConfig)).getMaxConsumersPerTopic();
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)this.pulsar)).getConfiguration();
        this.testMaxConsumersShared();
    }

    @Test
    public void testMaxConsumersSharedForNamespace() throws Exception {
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)this.pulsar)).getConfiguration();
        Policies policies = new Policies();
        policies.max_consumers_per_subscription = 2;
        policies.max_consumers_per_topic = 3;
        Mockito.when((Object)this.pulsar.getConfigurationCache().policiesCache().getDataIfPresent(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/use/ns-abc/successTopic").getNamespace()}))).thenReturn((Object)policies);
        this.testMaxConsumersShared();
    }

    private void testMaxConsumersFailover() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", this.cursorMock, false);
        PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", this.cursorMock, false);
        Method addConsumerToSubscription = AbstractTopic.class.getDeclaredMethod("addConsumerToSubscription", Subscription.class, Consumer.class);
        addConsumerToSubscription.setAccessible(true);
        ConcurrentOpenHashMap subscriptions = new ConcurrentOpenHashMap(16, 1);
        subscriptions.put((Object)"sub-1", (Object)sub);
        subscriptions.put((Object)"sub-2", (Object)sub2);
        Field field = topic.getClass().getDeclaredField("subscriptions");
        field.setAccessible(true);
        field.set(topic, subscriptions);
        Consumer consumer = new Consumer((Subscription)sub, CommandSubscribe.SubType.Failover, topic.getName(), 1L, 0, "Cons1", 50000, (TransportCnx)this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null);
        addConsumerToSubscription.invoke((Object)topic, sub, consumer);
        Assert.assertEquals((int)sub.getConsumers().size(), (int)1);
        Consumer consumer2 = new Consumer((Subscription)sub, CommandSubscribe.SubType.Failover, topic.getName(), 2L, 0, "Cons2", 50000, (TransportCnx)this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null);
        addConsumerToSubscription.invoke((Object)topic, sub, consumer2);
        Assert.assertEquals((int)sub.getConsumers().size(), (int)2);
        try {
            Consumer consumer3 = new Consumer((Subscription)sub, CommandSubscribe.SubType.Failover, topic.getName(), 3L, 0, "Cons3", 50000, (TransportCnx)this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null);
            ((CompletableFuture)addConsumerToSubscription.invoke((Object)topic, sub, consumer3)).get();
            Assert.fail((String)"should have failed");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof BrokerServiceException.ConsumerBusyException));
        }
        Assert.assertEquals((int)topic.getNumberOfConsumers(), (int)2);
        Consumer consumer4 = new Consumer((Subscription)sub2, CommandSubscribe.SubType.Failover, topic.getName(), 4L, 0, "Cons4", 50000, (TransportCnx)this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null);
        addConsumerToSubscription.invoke((Object)topic, sub2, consumer4);
        Assert.assertEquals((int)sub2.getConsumers().size(), (int)1);
        Assert.assertEquals((int)topic.getNumberOfConsumers(), (int)3);
        try {
            Consumer consumer5 = new Consumer((Subscription)sub2, CommandSubscribe.SubType.Failover, topic.getName(), 5L, 0, "Cons5", 50000, (TransportCnx)this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null);
            ((CompletableFuture)addConsumerToSubscription.invoke((Object)topic, sub2, consumer5)).get();
            Assert.fail((String)"should have failed");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof BrokerServiceException.ConsumerBusyException));
        }
    }

    @Test
    public void testMaxConsumersFailoverForBroker() throws Exception {
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        ((ServiceConfiguration)Mockito.doReturn((Object)2).when((Object)svcConfig)).getMaxConsumersPerSubscription();
        ((ServiceConfiguration)Mockito.doReturn((Object)3).when((Object)svcConfig)).getMaxConsumersPerTopic();
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)this.pulsar)).getConfiguration();
        this.testMaxConsumersFailover();
    }

    @Test
    public void testMaxConsumersFailoverForNamespace() throws Exception {
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)this.pulsar)).getConfiguration();
        Policies policies = new Policies();
        policies.max_consumers_per_subscription = 2;
        policies.max_consumers_per_topic = 3;
        Mockito.when((Object)this.pulsar.getConfigurationCache().policiesCache().getDataIfPresent(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/use/ns-abc/successTopic").getNamespace()}))).thenReturn((Object)policies);
        this.testMaxConsumersFailover();
    }

    private Consumer getMockedConsumerWithSpecificAddress(Topic topic, Subscription sub, long consumerId, InetAddress address) throws Exception {
        String consumerNameBase = "consumer";
        String role = "appid1";
        ServerCnx cnx = (ServerCnx)Mockito.spy((Object)new ServerCnx(this.pulsar));
        ((ServerCnx)Mockito.doReturn((Object)true).when((Object)cnx)).isActive();
        ((ServerCnx)Mockito.doReturn((Object)true).when((Object)cnx)).isWritable();
        ((ServerCnx)Mockito.doReturn((Object)new InetSocketAddress(address, 1234)).when((Object)cnx)).clientAddress();
        ((ServerCnx)Mockito.doReturn((Object)address.getHostAddress()).when((Object)cnx)).clientSourceAddress();
        ((ServerCnx)Mockito.doReturn((Object)new PulsarCommandSenderImpl(null, cnx)).when((Object)cnx)).getCommandSender();
        return new Consumer(sub, CommandSubscribe.SubType.Shared, topic.getName(), consumerId, 0, "consumer" + consumerId, 50000, (TransportCnx)cnx, "appid1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null);
    }

    @Test
    public void testMaxSameAddressConsumers() throws Exception {
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        ((ServiceConfiguration)Mockito.doReturn((Object)2).when((Object)svcConfig)).getMaxSameAddressConsumersPerTopic();
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)this.pulsar)).getConfiguration();
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription sub1 = new PersistentSubscription(topic, "sub1", this.cursorMock, false);
        PersistentSubscription sub2 = new PersistentSubscription(topic, "sub2", this.cursorMock, false);
        InetAddress address1 = InetAddress.getByName("127.0.0.1");
        InetAddress address2 = InetAddress.getByName("0.0.0.0");
        String ipAddress1 = address1.getHostAddress();
        String ipAddress2 = address2.getHostAddress();
        Method addConsumerToSubscription = AbstractTopic.class.getDeclaredMethod("addConsumerToSubscription", Subscription.class, Consumer.class);
        addConsumerToSubscription.setAccessible(true);
        ConcurrentOpenHashMap subscriptions = new ConcurrentOpenHashMap(16, 1);
        subscriptions.put((Object)"sub1", (Object)sub1);
        subscriptions.put((Object)"sub2", (Object)sub2);
        Field field = topic.getClass().getDeclaredField("subscriptions");
        field.setAccessible(true);
        field.set(topic, subscriptions);
        Consumer consumer1 = this.getMockedConsumerWithSpecificAddress((Topic)topic, (Subscription)sub1, 1L, address1);
        ((CompletableFuture)addConsumerToSubscription.invoke((Object)topic, sub1, consumer1)).get();
        Assert.assertEquals((int)topic.getNumberOfConsumers(), (int)1);
        Assert.assertEquals((int)topic.getNumberOfSameAddressConsumers(ipAddress1), (int)1);
        Assert.assertEquals((int)sub1.getNumberOfSameAddressConsumers(ipAddress1), (int)1);
        Consumer consumer2 = this.getMockedConsumerWithSpecificAddress((Topic)topic, (Subscription)sub2, 2L, address1);
        ((CompletableFuture)addConsumerToSubscription.invoke((Object)topic, sub2, consumer2)).get();
        Assert.assertEquals((int)topic.getNumberOfConsumers(), (int)2);
        Assert.assertEquals((int)topic.getNumberOfSameAddressConsumers(ipAddress1), (int)2);
        Assert.assertEquals((int)sub1.getNumberOfSameAddressConsumers(ipAddress1), (int)1);
        Assert.assertEquals((int)sub2.getNumberOfSameAddressConsumers(ipAddress1), (int)1);
        Consumer consumer3 = this.getMockedConsumerWithSpecificAddress((Topic)topic, (Subscription)sub1, 3L, address2);
        ((CompletableFuture)addConsumerToSubscription.invoke((Object)topic, sub1, consumer3)).get();
        Assert.assertEquals((int)topic.getNumberOfConsumers(), (int)3);
        Assert.assertEquals((int)topic.getNumberOfSameAddressConsumers(ipAddress1), (int)2);
        Assert.assertEquals((int)topic.getNumberOfSameAddressConsumers(ipAddress2), (int)1);
        Assert.assertEquals((int)sub1.getNumberOfSameAddressConsumers(ipAddress1), (int)1);
        Assert.assertEquals((int)sub1.getNumberOfSameAddressConsumers(ipAddress2), (int)1);
        Consumer consumer4 = this.getMockedConsumerWithSpecificAddress((Topic)topic, (Subscription)sub2, 4L, address2);
        ((CompletableFuture)addConsumerToSubscription.invoke((Object)topic, sub2, consumer4)).get();
        Assert.assertEquals((int)topic.getNumberOfConsumers(), (int)4);
        Assert.assertEquals((int)topic.getNumberOfSameAddressConsumers(ipAddress1), (int)2);
        Assert.assertEquals((int)topic.getNumberOfSameAddressConsumers(ipAddress2), (int)2);
        Assert.assertEquals((int)sub2.getNumberOfSameAddressConsumers(ipAddress1), (int)1);
        Assert.assertEquals((int)sub2.getNumberOfSameAddressConsumers(ipAddress2), (int)1);
        try {
            Consumer consumer5 = this.getMockedConsumerWithSpecificAddress((Topic)topic, (Subscription)sub1, 5L, address1);
            ((CompletableFuture)addConsumerToSubscription.invoke((Object)topic, sub1, consumer5)).get();
            Assert.fail((String)"should have failed");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof BrokerServiceException.ConsumerBusyException));
        }
        Assert.assertEquals((int)topic.getNumberOfConsumers(), (int)4);
        Assert.assertEquals((int)topic.getNumberOfSameAddressConsumers(ipAddress1), (int)2);
        Assert.assertEquals((int)sub1.getNumberOfSameAddressConsumers(ipAddress1), (int)1);
        try {
            Consumer consumer6 = this.getMockedConsumerWithSpecificAddress((Topic)topic, (Subscription)sub2, 6L, address2);
            ((CompletableFuture)addConsumerToSubscription.invoke((Object)topic, sub2, consumer6)).get();
            Assert.fail((String)"should have failed");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof BrokerServiceException.ConsumerBusyException));
        }
        Assert.assertEquals((int)topic.getNumberOfConsumers(), (int)4);
        Assert.assertEquals((int)topic.getNumberOfSameAddressConsumers(ipAddress2), (int)2);
        Assert.assertEquals((int)sub2.getNumberOfSameAddressConsumers(ipAddress2), (int)1);
        consumer1.close();
        Assert.assertEquals((int)topic.getNumberOfConsumers(), (int)3);
        Assert.assertEquals((int)topic.getNumberOfSameAddressConsumers(ipAddress1), (int)1);
        Assert.assertEquals((int)sub1.getNumberOfSameAddressConsumers(ipAddress1), (int)0);
        Consumer consumer7 = this.getMockedConsumerWithSpecificAddress((Topic)topic, (Subscription)sub1, 7L, address1);
        addConsumerToSubscription.invoke((Object)topic, sub1, consumer7);
        Assert.assertEquals((int)topic.getNumberOfConsumers(), (int)4);
        Assert.assertEquals((int)topic.getNumberOfSameAddressConsumers(ipAddress1), (int)2);
        Assert.assertEquals((int)sub1.getNumberOfSameAddressConsumers(ipAddress1), (int)1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUbsubscribeRaceConditions() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", this.cursorMock, false);
        Consumer consumer1 = new Consumer((Subscription)sub, CommandSubscribe.SubType.Exclusive, topic.getName(), 1L, 0, "Cons1", 50000, (TransportCnx)this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null);
        sub.addConsumer(consumer1);
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.DeleteCursorCallback)invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
                Thread.sleep(1000L);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncDeleteCursor(Mockito.matches((String)".*success.*"), (AsyncCallbacks.DeleteCursorCallback)ArgumentMatchers.any(AsyncCallbacks.DeleteCursorCallback.class), ArgumentMatchers.any());
        ExecutorService executor = Executors.newCachedThreadPool();
        try {
            executor.submit(() -> {
                sub.doUnsubscribe(consumer1);
                return null;
            }).get();
            try {
                Thread.sleep(10L);
                sub.addConsumer(new Consumer((Subscription)sub, CommandSubscribe.SubType.Exclusive, topic.getName(), 2L, 0, "Cons2", 50000, (TransportCnx)this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null)).get();
                Assert.fail();
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)(e.getCause() instanceof BrokerServiceException.SubscriptionFencedException));
            }
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    @Test
    public void testCloseTopic() throws Exception {
        PersistentTopic topic = (PersistentTopic)this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/successTopic").get();
        Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced");
        isFencedField.setAccessible(true);
        Field isClosingOrDeletingField = PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
        isClosingOrDeletingField.setAccessible(true);
        Assert.assertFalse((boolean)((Boolean)isFencedField.get(topic)));
        Assert.assertFalse((boolean)((Boolean)isClosingOrDeletingField.get(topic)));
        topic.close().get();
        Assert.assertFalse((boolean)this.brokerService.getTopicReference("persistent://prop/use/ns-abc/successTopic").isPresent());
        Assert.assertTrue((boolean)((Boolean)isFencedField.get(topic)));
        Assert.assertTrue((boolean)((Boolean)isClosingOrDeletingField.get(topic)));
        ByteBuf payload = Unpooled.wrappedBuffer((byte[])"content".getBytes());
        CountDownLatch latch = new CountDownLatch(1);
        topic.publishMessage(payload, (exception, ledgerId, entryId) -> {
            Assert.assertTrue((boolean)(exception instanceof BrokerServiceException.TopicFencedException));
            latch.countDown();
        });
        Assert.assertTrue((boolean)latch.await(1L, TimeUnit.SECONDS));
        Assert.assertTrue((boolean)((Boolean)isFencedField.get(topic)));
        Assert.assertTrue((boolean)((Boolean)isClosingOrDeletingField.get(topic)));
    }

    @Test
    public void testDeleteTopic() throws Exception {
        PersistentTopic topic = (PersistentTopic)this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/successTopic").get();
        Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced");
        isFencedField.setAccessible(true);
        Field isClosingOrDeletingField = PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
        isClosingOrDeletingField.setAccessible(true);
        Assert.assertFalse((boolean)((Boolean)isFencedField.get(topic)));
        Assert.assertFalse((boolean)((Boolean)isClosingOrDeletingField.get(topic)));
        String role = "appid1";
        topic.delete().get();
        Assert.assertFalse((boolean)this.brokerService.getTopicReference("persistent://prop/use/ns-abc/successTopic").isPresent());
        Assert.assertTrue((boolean)((Boolean)isFencedField.get(topic)));
        Assert.assertTrue((boolean)((Boolean)isClosingOrDeletingField.get(topic)));
        ByteBuf payload = Unpooled.wrappedBuffer((byte[])"content".getBytes());
        CountDownLatch latch = new CountDownLatch(1);
        topic.publishMessage(payload, (exception, ledgerId, entryId) -> {
            Assert.assertTrue((boolean)(exception instanceof BrokerServiceException.TopicFencedException));
            latch.countDown();
        });
        Assert.assertTrue((boolean)latch.await(1L, TimeUnit.SECONDS));
        Assert.assertTrue((boolean)((Boolean)isFencedField.get(topic)));
        Assert.assertTrue((boolean)((Boolean)isClosingOrDeletingField.get(topic)));
        topic = (PersistentTopic)this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/successTopic").get();
        Producer producer = new Producer((Topic)topic, (TransportCnx)this.serverCnx, 1L, "prod-name", role, false, null, SchemaVersion.Latest, 0L, false, ProducerAccessMode.Shared, Optional.empty());
        topic.addProducer(producer, new CompletableFuture()).join();
        Assert.assertTrue((boolean)topic.delete().isCompletedExceptionally());
        Assert.assertFalse((boolean)((Boolean)isFencedField.get(topic)));
        Assert.assertFalse((boolean)((Boolean)isClosingOrDeletingField.get(topic)));
        topic.removeProducer(producer);
        CommandSubscribe cmd = new CommandSubscribe().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setConsumerName("consumer-name").setRequestId(1L).setSubType(CommandSubscribe.SubType.Exclusive);
        CompletableFuture f1 = topic.subscribe((TransportCnx)this.serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.isDurable(), null, Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, 0L, false, null);
        f1.get();
        Assert.assertTrue((boolean)topic.delete().isCompletedExceptionally());
        Assert.assertFalse((boolean)((Boolean)isFencedField.get(topic)));
        Assert.assertFalse((boolean)((Boolean)isClosingOrDeletingField.get(topic)));
        topic.unsubscribe("successSub");
    }

    @Test
    public void testDeleteAndUnsubscribeTopic() throws Exception {
        final PersistentTopic topic = (PersistentTopic)this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/successTopic").get();
        CommandSubscribe cmd = new CommandSubscribe().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setConsumerName("consumer-name").setRequestId(1L).setReadCompacted(false).setSubType(CommandSubscribe.SubType.Exclusive);
        CompletableFuture f1 = topic.subscribe((TransportCnx)this.serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.isDurable(), null, Collections.emptyMap(), cmd.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, null);
        f1.get();
        final CyclicBarrier barrier = new CyclicBarrier(2);
        final CountDownLatch counter = new CountDownLatch(2);
        final AtomicBoolean gotException = new AtomicBoolean(false);
        Thread deleter = new Thread(){

            @Override
            public void run() {
                try {
                    barrier.await();
                    Assert.assertFalse((boolean)topic.delete().isCompletedExceptionally());
                }
                catch (Exception e) {
                    e.printStackTrace();
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            }
        };
        Thread unsubscriber = new Thread(){

            @Override
            public void run() {
                try {
                    barrier.await();
                    topic.unsubscribe("successSub");
                }
                catch (Exception e) {
                    e.printStackTrace();
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            }
        };
        deleter.start();
        unsubscriber.start();
        counter.await();
        Assert.assertFalse((boolean)gotException.get());
    }

    @Test(enabled=false)
    public void testConcurrentTopicAndSubscriptionDelete() throws Exception {
        final PersistentTopic topic = (PersistentTopic)this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/successTopic").get();
        CommandSubscribe cmd = new CommandSubscribe().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setRequestId(1L).setSubType(CommandSubscribe.SubType.Exclusive);
        CompletableFuture f1 = topic.subscribe((TransportCnx)this.serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.isDurable(), null, Collections.emptyMap(), cmd.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, null);
        f1.get();
        final CyclicBarrier barrier = new CyclicBarrier(2);
        final CountDownLatch counter = new CountDownLatch(2);
        final AtomicBoolean gotException = new AtomicBoolean(false);
        Thread deleter = new Thread(){

            @Override
            public void run() {
                try {
                    barrier.await();
                    Thread.sleep(5L, 0);
                    log.info("deleter outcome is {}", topic.delete().get());
                }
                catch (Exception e) {
                    e.printStackTrace();
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            }
        };
        Thread unsubscriber = new Thread(){

            @Override
            public void run() {
                try {
                    barrier.await();
                    ConcurrentOpenHashMap subscriptions = topic.getSubscriptions();
                    PersistentSubscription ps = (PersistentSubscription)subscriptions.get((Object)"successSub");
                    log.info("unsubscriber outcome is {}", ps.doUnsubscribe((Consumer)ps.getConsumers().get(0)).get());
                }
                catch (Exception e) {
                    e.printStackTrace();
                    gotException.set(true);
                }
                finally {
                    counter.countDown();
                }
            }
        };
        deleter.start();
        unsubscriber.start();
        counter.await();
        Assert.assertFalse((boolean)gotException.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDeleteTopicRaceConditions() throws Exception {
        PersistentTopic topic = (PersistentTopic)this.brokerService.getOrCreateTopic("persistent://prop/use/ns-abc/successTopic").get();
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                Thread.sleep(1000L);
                ((AsyncCallbacks.DeleteLedgerCallback)invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncDelete((AsyncCallbacks.DeleteLedgerCallback)ArgumentMatchers.any(AsyncCallbacks.DeleteLedgerCallback.class), ArgumentMatchers.any());
        ExecutorService executor = Executors.newCachedThreadPool();
        try {
            executor.submit(() -> {
                topic.delete();
                return null;
            }).get();
            try {
                String role = "appid1";
                Thread.sleep(10L);
                Producer producer = new Producer((Topic)topic, (TransportCnx)this.serverCnx, 1L, "prod-name", role, false, null, SchemaVersion.Latest, 0L, false, ProducerAccessMode.Shared, Optional.empty());
                topic.addProducer(producer, new CompletableFuture()).join();
                Assert.fail((String)"Should have failed");
            }
            catch (Exception e) {
                Assert.assertEquals(e.getCause().getClass(), BrokerServiceException.TopicFencedException.class);
            }
            CommandSubscribe cmd = new CommandSubscribe().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setConsumerName("consumer-name").setReadCompacted(false).setRequestId(1L).setSubType(CommandSubscribe.SubType.Exclusive);
            CompletableFuture f = topic.subscribe((TransportCnx)this.serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.isDurable(), null, Collections.emptyMap(), cmd.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, null);
            try {
                f.get();
                Assert.fail((String)"should have failed");
            }
            catch (ExecutionException ee) {
                Assert.assertTrue((boolean)(ee.getCause() instanceof BrokerServiceException.TopicFencedException));
            }
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    void setupMLAsyncCallbackMocks() {
        this.ledgerMock = (ManagedLedger)Mockito.mock(ManagedLedger.class);
        this.cursorMock = (ManagedCursor)Mockito.mock(ManagedCursorImpl.class);
        final CompletableFuture closeFuture = new CompletableFuture();
        ((ManagedLedger)Mockito.doReturn(new ArrayList()).when((Object)this.ledgerMock)).getCursors();
        ((ManagedCursor)Mockito.doReturn((Object)"mockCursor").when((Object)this.cursorMock)).getName();
        ((ManagedCursor)Mockito.doReturn((Object)true).when((Object)this.cursorMock)).isDurable();
        ((ManagedCursor)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                return closeFuture.complete(null);
            }
        }).when((Object)this.cursorMock)).asyncClose(new AsyncCallbacks.CloseCallback(){

            public void closeComplete(Object ctx) {
                log.info("[{}] Successfully closed cursor ledger", (Object)"mockCursor");
                closeFuture.complete(null);
            }

            public void closeFailed(ManagedLedgerException exception, Object ctx) {
                log.error("Error closing cursor for subscription", (Throwable)exception);
                closeFuture.completeExceptionally((Throwable)new BrokerServiceException.PersistenceException((Throwable)exception));
            }
        }, null);
        ((ManagedLedgerFactory)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenLedgerCallback)invocationOnMock.getArguments()[2]).openLedgerComplete(PersistentTopicTest.this.ledgerMock, null);
                return null;
            }
        }).when((Object)this.mlFactoryMock)).asyncOpen(Mockito.matches((String)".*success.*"), (ManagedLedgerConfig)ArgumentMatchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback)ArgumentMatchers.any(AsyncCallbacks.OpenLedgerCallback.class), (Supplier)ArgumentMatchers.any(Supplier.class), ArgumentMatchers.any());
        ((ManagedLedgerFactory)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenLedgerCallback)invocationOnMock.getArguments()[2]).openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null);
                return null;
            }
        }).when((Object)this.mlFactoryMock)).asyncOpen(Mockito.matches((String)".*fail.*"), (ManagedLedgerConfig)ArgumentMatchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback)ArgumentMatchers.any(AsyncCallbacks.OpenLedgerCallback.class), (Supplier)ArgumentMatchers.any(Supplier.class), ArgumentMatchers.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.AddEntryCallback)invocationOnMock.getArguments()[1]).addComplete((Position)new PositionImpl(1L, 1L), null, invocationOnMock.getArguments()[2]);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncAddEntry((ByteBuf)ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback)ArgumentMatchers.any(AsyncCallbacks.AddEntryCallback.class), ArgumentMatchers.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenCursorCallback)invocationOnMock.getArguments()[2]).openCursorComplete(PersistentTopicTest.this.cursorMock, null);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncOpenCursor(Mockito.matches((String)".*success.*"), (CommandSubscribe.InitialPosition)ArgumentMatchers.any(CommandSubscribe.InitialPosition.class), (AsyncCallbacks.OpenCursorCallback)ArgumentMatchers.any(AsyncCallbacks.OpenCursorCallback.class), ArgumentMatchers.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenCursorCallback)invocationOnMock.getArguments()[3]).openCursorComplete(PersistentTopicTest.this.cursorMock, null);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncOpenCursor(Mockito.matches((String)".*success.*"), (CommandSubscribe.InitialPosition)ArgumentMatchers.any(CommandSubscribe.InitialPosition.class), (Map)ArgumentMatchers.any(Map.class), (AsyncCallbacks.OpenCursorCallback)ArgumentMatchers.any(AsyncCallbacks.OpenCursorCallback.class), ArgumentMatchers.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.CloseCallback)invocationOnMock.getArguments()[0]).closeComplete(null);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncClose((AsyncCallbacks.CloseCallback)ArgumentMatchers.any(AsyncCallbacks.CloseCallback.class), ArgumentMatchers.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.DeleteLedgerCallback)invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncDelete((AsyncCallbacks.DeleteLedgerCallback)ArgumentMatchers.any(AsyncCallbacks.DeleteLedgerCallback.class), ArgumentMatchers.any());
        ((ManagedLedger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.DeleteCursorCallback)invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
                return null;
            }
        }).when((Object)this.ledgerMock)).asyncDeleteCursor(Mockito.matches((String)".*success.*"), (AsyncCallbacks.DeleteCursorCallback)ArgumentMatchers.any(AsyncCallbacks.DeleteCursorCallback.class), ArgumentMatchers.any());
        ((ManagedCursor)Mockito.doAnswer(invokactionOnMock -> {
            ((AsyncCallbacks.MarkDeleteCallback)invokactionOnMock.getArguments()[2]).markDeleteComplete(invokactionOnMock.getArguments()[3]);
            return null;
        }).when((Object)this.cursorMock)).asyncMarkDelete((Position)ArgumentMatchers.any(), (Map)ArgumentMatchers.any(), (AsyncCallbacks.MarkDeleteCallback)ArgumentMatchers.any(AsyncCallbacks.MarkDeleteCallback.class), ArgumentMatchers.any());
    }

    @Test
    public void testFailoverSubscription() throws Exception {
        PersistentTopic topic1 = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        CommandSubscribe cmd1 = new CommandSubscribe().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setConsumerName("consumer-name").setReadCompacted(false).setRequestId(1L).setSubType(CommandSubscribe.SubType.Failover);
        CompletableFuture f1 = topic1.subscribe((TransportCnx)this.serverCnx, cmd1.getSubscription(), cmd1.getConsumerId(), cmd1.getSubType(), 0, cmd1.getConsumerName(), cmd1.isDurable(), null, Collections.emptyMap(), cmd1.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, null);
        f1.get();
        PersistentTopic topic2 = new PersistentTopic("persistent://prop/use/ns-abc/successTopic-partition-0", this.ledgerMock, this.brokerService);
        CommandSubscribe cmd2 = new CommandSubscribe().setConsumerId(1L).setConsumerName("C1").setTopic("persistent://prop/use/ns-abc/successTopic-partition-0").setSubscription("successSub").setReadCompacted(false).setRequestId(1L).setSubType(CommandSubscribe.SubType.Failover);
        CompletableFuture f2 = topic2.subscribe((TransportCnx)this.serverCnx, cmd2.getSubscription(), cmd2.getConsumerId(), cmd2.getSubType(), 0, cmd2.getConsumerName(), cmd2.isDurable(), null, Collections.emptyMap(), cmd2.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, null);
        f2.get();
        CommandSubscribe cmd3 = new CommandSubscribe().setConsumerId(2L).setConsumerName("C2").setTopic("persistent://prop/use/ns-abc/successTopic-partition-0").setSubscription("successSub").setReadCompacted(false).setRequestId(1L).setSubType(CommandSubscribe.SubType.Failover);
        CompletableFuture f3 = topic2.subscribe((TransportCnx)this.serverCnx, cmd3.getSubscription(), cmd3.getConsumerId(), cmd3.getSubType(), 0, cmd3.getConsumerName(), cmd3.isDurable(), null, Collections.emptyMap(), cmd3.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, null);
        f3.get();
        Assert.assertEquals((long)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerId(), (long)1L);
        Assert.assertEquals((String)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerName(), (String)"C1");
        Assert.assertEquals((long)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(1)).consumerId(), (long)2L);
        Assert.assertEquals((String)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(1)).consumerName(), (String)"C2");
        CommandSubscribe cmd4 = new CommandSubscribe().setConsumerId(3L).setConsumerName("C1").setTopic("persistent://prop/use/ns-abc/successTopic-partition-0").setSubscription("successSub").setReadCompacted(false).setRequestId(1L).setSubType(CommandSubscribe.SubType.Failover);
        CompletableFuture f4 = topic2.subscribe((TransportCnx)this.serverCnx, cmd4.getSubscription(), cmd4.getConsumerId(), cmd4.getSubType(), 0, cmd4.getConsumerName(), cmd4.isDurable(), null, Collections.emptyMap(), cmd4.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, null);
        f4.get();
        Assert.assertEquals((long)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerId(), (long)1L);
        Assert.assertEquals((String)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerName(), (String)"C1");
        Assert.assertEquals((long)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(1)).consumerId(), (long)3L);
        Assert.assertEquals((String)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(1)).consumerName(), (String)"C1");
        Assert.assertEquals((long)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(2)).consumerId(), (long)2L);
        Assert.assertEquals((String)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(2)).consumerName(), (String)"C2");
        CommandSubscribe cmd5 = new CommandSubscribe().setConsumerId(2L).setConsumerName("C1").setTopic("persistent://prop/use/ns-abc/successTopic-partition-0").setSubscription("successSub").setReadCompacted(false).setRequestId(1L).setSubType(CommandSubscribe.SubType.Exclusive);
        CompletableFuture f5 = topic2.subscribe((TransportCnx)this.serverCnx, cmd5.getSubscription(), cmd5.getConsumerId(), cmd5.getSubType(), 0, cmd5.getConsumerName(), cmd5.isDurable(), null, Collections.emptyMap(), cmd5.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, null);
        try {
            f5.get();
            Assert.fail((String)"should fail with exception");
        }
        catch (ExecutionException ee) {
            Assert.assertTrue((boolean)(ee.getCause() instanceof BrokerServiceException.SubscriptionBusyException));
        }
        CommandSubscribe cmd6 = new CommandSubscribe().setConsumerId(4L).setConsumerName("C3").setTopic("persistent://prop/use/ns-abc/successTopic-partition-0").setSubscription("successSub2").setReadCompacted(false).setRequestId(1L).setSubType(CommandSubscribe.SubType.Exclusive);
        CompletableFuture f6 = topic2.subscribe((TransportCnx)this.serverCnx, cmd6.getSubscription(), cmd6.getConsumerId(), cmd6.getSubType(), 0, cmd6.getConsumerName(), cmd6.isDurable(), null, Collections.emptyMap(), cmd6.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, 0L, false, null);
        f6.get();
        CompletableFuture f7 = topic2.unsubscribe("successSub2");
        f7.get();
        Assert.assertNull((Object)topic2.getSubscription("successSub2"));
        PersistentSubscription sub = topic2.getSubscription("successSub");
        Consumer cons = (Consumer)sub.getDispatcher().getConsumers().get(0);
        sub.removeConsumer(cons);
        Assert.assertEquals((long)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerId(), (long)3L);
        Assert.assertEquals((String)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerName(), (String)"C1");
        cons = (Consumer)sub.getDispatcher().getConsumers().get(0);
        sub.removeConsumer(cons);
        Assert.assertEquals((long)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerId(), (long)2L);
        Assert.assertEquals((String)((Consumer)topic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerName(), (String)"C2");
        CompletableFuture f8 = topic2.unsubscribe("successSub");
        f8.get();
        Assert.assertNull((Object)topic2.getSubscription("successSub"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAtomicReplicationRemoval() throws Exception {
        String globalTopicName = "persistent://prop/global/ns-abc/successTopic";
        String localCluster = "local";
        String remoteCluster = "remote";
        ManagedLedger ledgerMock = (ManagedLedger)Mockito.mock(ManagedLedger.class);
        ((ManagedLedger)Mockito.doNothing().when((Object)ledgerMock)).asyncDeleteCursor((String)ArgumentMatchers.any(), (AsyncCallbacks.DeleteCursorCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        ((ManagedLedger)Mockito.doReturn(new ArrayList()).when((Object)ledgerMock)).getCursors();
        PersistentTopic topic = new PersistentTopic("persistent://prop/global/ns-abc/successTopic", ledgerMock, this.brokerService);
        String remoteReplicatorName = topic.getReplicatorPrefix() + "." + remoteCluster;
        ConcurrentOpenHashMap replicatorMap = topic.getReplicators();
        URL brokerUrl = new URL("http://" + this.pulsar.getAdvertisedAddress() + ":" + this.pulsar.getConfiguration().getBrokerServicePort().get());
        PulsarClient client = PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
        try {
            ManagedCursor cursor = (ManagedCursor)Mockito.mock(ManagedCursorImpl.class);
            ((ManagedCursor)Mockito.doReturn((Object)remoteCluster).when((Object)cursor)).getName();
            this.brokerService.getReplicationClients().put((Object)remoteCluster, (Object)client);
            PersistentReplicator replicator = (PersistentReplicator)Mockito.spy((Object)new PersistentReplicator(topic, cursor, localCluster, remoteCluster, this.brokerService));
            replicatorMap.put((Object)remoteReplicatorName, (Object)replicator);
            Method removeMethod = PersistentTopic.class.getDeclaredMethod("removeReplicator", String.class);
            removeMethod.setAccessible(true);
            removeMethod.invoke((Object)topic, remoteReplicatorName);
            Mockito.when((Object)this.pulsar.getConfigurationCache().policiesCache().get(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/global/ns-abc/successTopic").getNamespace()}))).thenReturn(Optional.of(new Policies()));
            topic.startReplProducers();
            ((PersistentReplicator)Mockito.verify((Object)replicator, (VerificationMode)Mockito.times((int)0))).startProducer();
            ArgumentCaptor captor = ArgumentCaptor.forClass(AsyncCallbacks.DeleteCursorCallback.class);
            ((ManagedLedger)Mockito.verify((Object)ledgerMock)).asyncDeleteCursor((String)ArgumentMatchers.any(), (AsyncCallbacks.DeleteCursorCallback)captor.capture(), ArgumentMatchers.any());
            AsyncCallbacks.DeleteCursorCallback callback = (AsyncCallbacks.DeleteCursorCallback)captor.getValue();
            callback.deleteCursorComplete(null);
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testClosingReplicationProducerTwice() throws Exception {
        String globalTopicName = "persistent://prop/global/ns/testClosingReplicationProducerTwice";
        String localCluster = "local";
        String remoteCluster = "remote";
        ManagedLedger ledgerMock = (ManagedLedger)Mockito.mock(ManagedLedger.class);
        ((ManagedLedger)Mockito.doNothing().when((Object)ledgerMock)).asyncDeleteCursor((String)ArgumentMatchers.any(), (AsyncCallbacks.DeleteCursorCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        ((ManagedLedger)Mockito.doReturn(new ArrayList()).when((Object)ledgerMock)).getCursors();
        PersistentTopic topic = new PersistentTopic("persistent://prop/global/ns/testClosingReplicationProducerTwice", ledgerMock, this.brokerService);
        URL brokerUrl = new URL("http://" + this.pulsar.getAdvertisedAddress() + ":" + this.pulsar.getConfiguration().getBrokerServicePort().get());
        PulsarClient client = (PulsarClient)Mockito.spy((Object)PulsarClient.builder().serviceUrl(brokerUrl.toString()).build());
        try {
            PulsarClientImpl clientImpl = (PulsarClientImpl)client;
            ((PulsarClientImpl)Mockito.doReturn(new CompletableFuture()).when((Object)clientImpl)).createProducerAsync((ProducerConfigurationData)ArgumentMatchers.any(ProducerConfigurationData.class), (Schema)ArgumentMatchers.any(Schema.class));
            ManagedCursor cursor = (ManagedCursor)Mockito.mock(ManagedCursorImpl.class);
            ((ManagedCursor)Mockito.doReturn((Object)remoteCluster).when((Object)cursor)).getName();
            this.brokerService.getReplicationClients().put((Object)remoteCluster, (Object)client);
            PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, this.brokerService);
            ((PulsarClientImpl)Mockito.verify((Object)clientImpl)).createProducerAsync((ProducerConfigurationData)ArgumentMatchers.any(ProducerConfigurationData.class), (Schema)ArgumentMatchers.any(), (ProducerInterceptors)Mockito.eq(null));
            replicator.disconnect(false);
            replicator.disconnect(false);
            replicator.startProducer();
            ((PulsarClientImpl)Mockito.verify((Object)clientImpl, (VerificationMode)Mockito.times((int)2))).createProducerAsync((ProducerConfigurationData)ArgumentMatchers.any(), (Schema)ArgumentMatchers.any(), (ProducerInterceptors)ArgumentMatchers.any());
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    @Test
    public void testCompactorSubscription() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        CompactedTopic compactedTopic = (CompactedTopic)Mockito.mock(CompactedTopic.class);
        CompactorSubscription sub = new CompactorSubscription(topic, compactedTopic, "__compaction", this.cursorMock);
        PositionImpl position = new PositionImpl(1L, 1L);
        long ledgerId = 202112766L;
        sub.acknowledgeMessage(Collections.singletonList(position), CommandAck.AckType.Cumulative, (Map)ImmutableMap.of((Object)"CompactedTopicLedger", (Object)ledgerId));
        ((CompactedTopic)Mockito.verify((Object)compactedTopic, (VerificationMode)Mockito.times((int)1))).newCompactedLedger((Position)position, ledgerId);
    }

    @Test
    public void testCompactorSubscriptionUpdatedOnInit() throws Exception {
        long ledgerId = 202112766L;
        ImmutableMap properties = ImmutableMap.of((Object)"CompactedTopicLedger", (Object)ledgerId);
        PositionImpl position = new PositionImpl(1L, 1L);
        ((ManagedCursor)Mockito.doAnswer(arg_0 -> PersistentTopicTest.lambda$testCompactorSubscriptionUpdatedOnInit$12((Map)properties, arg_0)).when((Object)this.cursorMock)).getProperties();
        ((ManagedCursor)Mockito.doAnswer(invokactionOnMock -> position).when((Object)this.cursorMock)).getMarkDeletedPosition();
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        CompactedTopic compactedTopic = (CompactedTopic)Mockito.mock(CompactedTopic.class);
        new CompactorSubscription(topic, compactedTopic, "__compaction", this.cursorMock);
        ((CompactedTopic)Mockito.verify((Object)compactedTopic, (VerificationMode)Mockito.times((int)1))).newCompactedLedger((Position)position, ledgerId);
    }

    @Test
    public void testCompactionTriggeredAfterThresholdFirstInvocation() throws Exception {
        CompletableFuture compactPromise = new CompletableFuture();
        Compactor compactor = this.pulsar.getCompactor();
        ((Compactor)Mockito.doReturn(compactPromise).when((Object)compactor)).compact(Mockito.anyString());
        Policies policies = new Policies();
        policies.compaction_threshold = 1L;
        Mockito.when((Object)this.pulsar.getConfigurationCache().policiesCache().get(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/use/ns-abc/successTopic").getNamespace()}))).thenReturn(Optional.of(policies));
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        topic.checkCompaction();
        ((Compactor)Mockito.verify((Object)compactor, (VerificationMode)Mockito.times((int)0))).compact(Mockito.anyString());
        ((ManagedLedger)Mockito.doReturn((Object)10L).when((Object)this.ledgerMock)).getTotalSize();
        ((ManagedLedger)Mockito.doReturn((Object)10L).when((Object)this.ledgerMock)).getEstimatedBacklogSize();
        topic.checkCompaction();
        ((Compactor)Mockito.verify((Object)compactor, (VerificationMode)Mockito.times((int)1))).compact(Mockito.anyString());
        topic.checkCompaction();
        ((Compactor)Mockito.verify((Object)compactor, (VerificationMode)Mockito.times((int)1))).compact(Mockito.anyString());
    }

    @Test
    public void testCompactionTriggeredAfterThresholdSecondInvocation() throws Exception {
        CompletableFuture compactPromise = new CompletableFuture();
        Compactor compactor = this.pulsar.getCompactor();
        ((Compactor)Mockito.doReturn(compactPromise).when((Object)compactor)).compact(Mockito.anyString());
        ManagedCursor subCursor = (ManagedCursor)Mockito.mock(ManagedCursor.class);
        ((ManagedLedger)Mockito.doReturn((Object)Lists.newArrayList((Object[])new ManagedCursor[]{subCursor})).when((Object)this.ledgerMock)).getCursors();
        ((ManagedCursor)Mockito.doReturn((Object)"__compaction").when((Object)subCursor)).getName();
        Policies policies = new Policies();
        policies.compaction_threshold = 1L;
        Mockito.when((Object)this.pulsar.getConfigurationCache().policiesCache().get(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/use/ns-abc/successTopic").getNamespace()}))).thenReturn(Optional.of(policies));
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        topic.checkCompaction();
        ((Compactor)Mockito.verify((Object)compactor, (VerificationMode)Mockito.times((int)0))).compact(Mockito.anyString());
        ((ManagedCursor)Mockito.doReturn((Object)10L).when((Object)subCursor)).getEstimatedSizeSinceMarkDeletePosition();
        topic.checkCompaction();
        ((Compactor)Mockito.verify((Object)compactor, (VerificationMode)Mockito.times((int)1))).compact(Mockito.anyString());
        topic.checkCompaction();
        ((Compactor)Mockito.verify((Object)compactor, (VerificationMode)Mockito.times((int)1))).compact(Mockito.anyString());
    }

    @Test
    public void testCompactionDisabledWithZeroThreshold() throws Exception {
        CompletableFuture compactPromise = new CompletableFuture();
        Compactor compactor = this.pulsar.getCompactor();
        ((Compactor)Mockito.doReturn(compactPromise).when((Object)compactor)).compact(Mockito.anyString());
        Policies policies = new Policies();
        policies.compaction_threshold = 0L;
        Mockito.when((Object)this.pulsar.getConfigurationCache().policiesCache().get(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/use/ns-abc/successTopic").getNamespace()}))).thenReturn(Optional.of(policies));
        ((ManagedLedger)Mockito.doReturn((Object)1000L).when((Object)this.ledgerMock)).getEstimatedBacklogSize();
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        topic.checkCompaction();
        ((Compactor)Mockito.verify((Object)compactor, (VerificationMode)Mockito.times((int)0))).compact(Mockito.anyString());
    }

    @Test
    public void testBacklogCursor() throws Exception {
        int backloggedThreshold = 10;
        this.pulsar.getConfiguration().setManagedLedgerCursorBackloggedThreshold((long)backloggedThreshold);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("cache_backlog_ledger");
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", (ManagedLedger)ledger, this.brokerService);
        ManagedCursor cursor1 = ledger.openCursor("c1");
        PersistentSubscription sub1 = new PersistentSubscription(topic, "sub-1", cursor1, false);
        Consumer consumer1 = new Consumer((Subscription)sub1, CommandSubscribe.SubType.Exclusive, topic.getName(), 1L, 0, "Cons1", 50000, (TransportCnx)this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null);
        topic.getSubscriptions().put((Object)Codec.decode((String)cursor1.getName()), (Object)sub1);
        sub1.addConsumer(consumer1);
        ManagedCursor cursor2 = ledger.openCursor("c2");
        PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursor2, false);
        Consumer consumer2 = new Consumer((Subscription)sub2, CommandSubscribe.SubType.Exclusive, topic.getName(), 2L, 0, "Cons2", 50000, (TransportCnx)this.serverCnx, "myrole-2", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null);
        topic.getSubscriptions().put((Object)Codec.decode((String)cursor2.getName()), (Object)sub2);
        sub2.addConsumer(consumer2);
        ManagedCursor cursor3 = ledger.openCursor("c3");
        PersistentSubscription sub3 = new PersistentSubscription(topic, "sub-3", cursor3, false);
        Consumer consumer3 = new Consumer((Subscription)sub2, CommandSubscribe.SubType.Exclusive, topic.getName(), 3L, 0, "Cons2", 50000, (TransportCnx)this.serverCnx, "myrole-3", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null);
        topic.getSubscriptions().put((Object)Codec.decode((String)cursor3.getName()), (Object)sub3);
        Assert.assertTrue((boolean)cursor1.isActive());
        Assert.assertTrue((boolean)cursor2.isActive());
        Assert.assertTrue((boolean)cursor3.isActive());
        topic.checkBackloggedCursors();
        Assert.assertTrue((boolean)cursor1.isActive());
        Assert.assertTrue((boolean)cursor2.isActive());
        Assert.assertFalse((boolean)cursor3.isActive());
        final CountDownLatch latch = new CountDownLatch(backloggedThreshold);
        for (int i = 0; i < backloggedThreshold + 1; ++i) {
            String content = "entry";
            final ByteBuf entry = this.getMessageWithMetadata(content.getBytes());
            ledger.asyncAddEntry(entry, new AsyncCallbacks.AddEntryCallback(){

                public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                    latch.countDown();
                    entry.release();
                }

                public void addFailed(ManagedLedgerException exception, Object ctx) {
                    latch.countDown();
                    entry.release();
                }
            }, null);
        }
        latch.await();
        Assert.assertTrue((boolean)cursor1.isActive());
        Assert.assertTrue((boolean)cursor2.isActive());
        Assert.assertFalse((boolean)cursor3.isActive());
        topic.checkBackloggedCursors();
        Assert.assertFalse((boolean)cursor1.isActive());
        Assert.assertFalse((boolean)cursor2.isActive());
        Assert.assertFalse((boolean)cursor3.isActive());
        List entries1 = cursor1.readEntries(50);
        for (final ByteBuf entry : entries1) {
            log.info("Read entry. Position={} Content='{}'", (Object)entry.getPosition(), (Object)new String(entry.getData()));
            entry.release();
        }
        List entries3 = cursor3.readEntries(50);
        for (Entry entry : entries3) {
            log.info("Read entry. Position={} Content='{}'", (Object)entry.getPosition(), (Object)new String(entry.getData()));
            entry.release();
        }
        topic.checkBackloggedCursors();
        Assert.assertTrue((boolean)cursor1.isActive());
        Assert.assertFalse((boolean)cursor2.isActive());
        Assert.assertFalse((boolean)cursor3.isActive());
        sub3.addConsumer(consumer3);
        entries3 = cursor3.readEntries(50);
        for (Entry entry : entries3) {
            log.info("Read entry. Position={} Content='{}'", (Object)entry.getPosition(), (Object)new String(entry.getData()));
            entry.release();
        }
        topic.checkBackloggedCursors();
        Assert.assertTrue((boolean)cursor3.isActive());
    }

    @Test
    public void testCheckInactiveSubscriptions() throws Exception {
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        ConcurrentOpenHashMap subscriptions = new ConcurrentOpenHashMap(16, 1);
        PersistentSubscription nonDeletableSubscription1 = (PersistentSubscription)Mockito.spy((Object)new PersistentSubscription(topic, "nonDeletableSubscription1", this.cursorMock, false));
        subscriptions.put((Object)nonDeletableSubscription1.getName(), (Object)nonDeletableSubscription1);
        PersistentSubscription deletableSubscription1 = (PersistentSubscription)Mockito.spy((Object)new PersistentSubscription(topic, "deletableSubscription1", this.cursorMock, false));
        subscriptions.put((Object)deletableSubscription1.getName(), (Object)deletableSubscription1);
        PersistentSubscription nonDeletableSubscription2 = (PersistentSubscription)Mockito.spy((Object)new PersistentSubscription(topic, "nonDeletableSubscription2", this.cursorMock, true));
        subscriptions.put((Object)nonDeletableSubscription2.getName(), (Object)nonDeletableSubscription2);
        Field field = topic.getClass().getDeclaredField("subscriptions");
        field.setAccessible(true);
        field.set(topic, subscriptions);
        Method addConsumerToSubscription = AbstractTopic.class.getDeclaredMethod("addConsumerToSubscription", Subscription.class, Consumer.class);
        addConsumerToSubscription.setAccessible(true);
        Consumer consumer = new Consumer((Subscription)nonDeletableSubscription1, CommandSubscribe.SubType.Shared, topic.getName(), 1L, 0, "consumer1", 50000, (TransportCnx)this.serverCnx, "app1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null);
        addConsumerToSubscription.invoke((Object)topic, nonDeletableSubscription1, consumer);
        Mockito.when((Object)this.pulsar.getConfigurationCache().policiesCache().get(AdminResource.path((String[])new String[]{"policies", TopicName.get((String)"persistent://prop/use/ns-abc/successTopic").getNamespace()}))).thenReturn(Optional.of(new Policies()));
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        ((ServiceConfiguration)Mockito.doReturn((Object)5).when((Object)svcConfig)).getSubscriptionExpirationTimeMinutes();
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)this.pulsar)).getConfiguration();
        ((ManagedCursor)Mockito.doReturn((Object)(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(6L))).when((Object)this.cursorMock)).getLastActive();
        topic.checkInactiveSubscriptions();
        ((PersistentSubscription)Mockito.verify((Object)nonDeletableSubscription1, (VerificationMode)Mockito.times((int)0))).delete();
        ((PersistentSubscription)Mockito.verify((Object)deletableSubscription1, (VerificationMode)Mockito.times((int)1))).delete();
        ((PersistentSubscription)Mockito.verify((Object)nonDeletableSubscription2, (VerificationMode)Mockito.times((int)0))).delete();
    }

    @Test
    public void testTopicFencingTimeout() throws Exception {
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)this.pulsar)).getConfiguration();
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        Method fence = PersistentTopic.class.getDeclaredMethod("fence", new Class[0]);
        fence.setAccessible(true);
        Method unfence = PersistentTopic.class.getDeclaredMethod("unfence", new Class[0]);
        unfence.setAccessible(true);
        Field fencedTopicMonitoringTaskField = PersistentTopic.class.getDeclaredField("fencedTopicMonitoringTask");
        fencedTopicMonitoringTaskField.setAccessible(true);
        Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced");
        isFencedField.setAccessible(true);
        Field isClosingOrDeletingField = PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
        isClosingOrDeletingField.setAccessible(true);
        ((ServiceConfiguration)Mockito.doReturn((Object)10).when((Object)svcConfig)).getTopicFencingTimeoutSeconds();
        fence.invoke((Object)topic, new Object[0]);
        unfence.invoke((Object)topic, new Object[0]);
        ScheduledFuture fencedTopicMonitoringTask = (ScheduledFuture)fencedTopicMonitoringTaskField.get(topic);
        Assert.assertTrue((boolean)fencedTopicMonitoringTask.isDone());
        Assert.assertTrue((boolean)fencedTopicMonitoringTask.isCancelled());
        Assert.assertFalse((boolean)((Boolean)isFencedField.get(topic)));
        Assert.assertFalse((boolean)((Boolean)isClosingOrDeletingField.get(topic)));
        ((ServiceConfiguration)Mockito.doReturn((Object)1).when((Object)svcConfig)).getTopicFencingTimeoutSeconds();
        fence.invoke((Object)topic, new Object[0]);
        Thread.sleep(2000L);
        fencedTopicMonitoringTask = (ScheduledFuture)fencedTopicMonitoringTaskField.get(topic);
        Assert.assertTrue((boolean)fencedTopicMonitoringTask.isDone());
        Assert.assertFalse((boolean)fencedTopicMonitoringTask.isCancelled());
        Assert.assertTrue((boolean)((Boolean)isFencedField.get(topic)));
        Assert.assertTrue((boolean)((Boolean)isClosingOrDeletingField.get(topic)));
    }

    @Test
    public void testGetDurableSubscription() throws Exception {
        ManagedLedger mockLedger = (ManagedLedger)Mockito.mock(ManagedLedger.class);
        ManagedCursor mockCursor = (ManagedCursor)Mockito.mock(ManagedCursorImpl.class);
        Position mockPosition = (Position)Mockito.mock(Position.class);
        ((ManagedCursor)Mockito.doReturn((Object)"test").when((Object)mockCursor)).getName();
        ((ManagedCursor)Mockito.doAnswer(invocationOnMock -> {
            ((AsyncCallbacks.FindEntryCallback)invocationOnMock.getArguments()[2]).findEntryComplete(mockPosition, invocationOnMock.getArguments()[3]);
            return null;
        }).when((Object)mockCursor)).asyncFindNewestMatching((ManagedCursor.FindPositionConstraint)ArgumentMatchers.any(), (Predicate)ArgumentMatchers.any(), (AsyncCallbacks.FindEntryCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        ((ManagedCursor)Mockito.doAnswer(invocationOnMock -> {
            ((AsyncCallbacks.ResetCursorCallback)invocationOnMock.getArguments()[1]).resetComplete(null);
            return null;
        }).when((Object)mockCursor)).asyncResetCursor((Position)ArgumentMatchers.any(), (AsyncCallbacks.ResetCursorCallback)ArgumentMatchers.any());
        ((ManagedLedger)Mockito.doAnswer(invocationOnMock -> {
            ((AsyncCallbacks.DeleteCursorCallback)invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
            return null;
        }).when((Object)mockLedger)).asyncDeleteCursor(Mockito.matches((String)".*success.*"), (AsyncCallbacks.DeleteCursorCallback)ArgumentMatchers.any(AsyncCallbacks.DeleteCursorCallback.class), ArgumentMatchers.any());
        ((ManagedLedger)Mockito.doAnswer(invocationOnMock -> {
            ((AsyncCallbacks.OpenCursorCallback)invocationOnMock.getArguments()[3]).openCursorComplete(mockCursor, null);
            return null;
        }).when((Object)mockLedger)).asyncOpenCursor((String)ArgumentMatchers.any(), (CommandSubscribe.InitialPosition)ArgumentMatchers.any(), (Map)ArgumentMatchers.any(), (AsyncCallbacks.OpenCursorCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        PersistentTopic topic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", mockLedger, this.brokerService);
        CommandSubscribe cmd = new CommandSubscribe().setConsumerId(1L).setDurable(true).setStartMessageRollbackDurationSec(60L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setConsumerName("consumer-name").setReadCompacted(false).setRequestId(1L).setSubType(CommandSubscribe.SubType.Exclusive);
        CompletableFuture f1 = topic.subscribe((TransportCnx)this.serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.isDurable(), null, Collections.emptyMap(), cmd.isReadCompacted(), CommandSubscribe.InitialPosition.Latest, cmd.getStartMessageRollbackDurationSec(), false, null);
        f1.get();
        CompletableFuture f2 = topic.unsubscribe("successSub");
        f2.get();
    }

    private ByteBuf getMessageWithMetadata(byte[] data) {
        MessageMetadata messageData = new MessageMetadata().setPublishTime(System.currentTimeMillis()).setProducerName("prod-name").setSequenceId(0L);
        ByteBuf payload = Unpooled.wrappedBuffer((byte[])data, (int)0, (int)data.length);
        int msgMetadataSize = messageData.getSerializedSize();
        int headersSize = 4 + msgMetadataSize;
        ByteBuf headers = PulsarByteBufAllocator.DEFAULT.buffer(headersSize, headersSize);
        headers.writeInt(msgMetadataSize);
        messageData.writeTo(headers);
        return ByteBufPair.coalesce((ByteBufPair)ByteBufPair.get((ByteBuf)headers, (ByteBuf)payload));
    }

    private static /* synthetic */ Object lambda$testCompactorSubscriptionUpdatedOnInit$12(Map properties, InvocationOnMock invokactionOnMock) throws Throwable {
        return properties;
    }
}

