package org.apache.pulsar.broker.admin;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
import org.apache.pulsar.broker.admin.v2.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
import org.apache.zookeeper.KeeperException;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@PrepareForTest({PersistentTopics.class})
@Test(groups = {"broker"})
@PowerMockIgnore({"com.sun.management.*"})
/* loaded from: input_file:org/apache/pulsar/broker/admin/PersistentTopicsTest.class */
public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
    private static final Logger log;
    private PersistentTopics persistentTopics;
    private final String testTenant = "my-tenant";
    private final String testLocalCluster = "use";
    private final String testNamespace = "my-namespace";
    protected Field uriField;
    protected UriInfo uriInfo;
    private NonPersistentTopics nonPersistentTopic;
    static final /* synthetic */ boolean $assertionsDisabled;

    @BeforeClass
    public void initPersistentTopics() throws Exception {
        this.uriField = PulsarWebResource.class.getDeclaredField("uri");
        this.uriField.setAccessible(true);
        this.uriInfo = (UriInfo) Mockito.mock(UriInfo.class);
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        this.persistentTopics = (PersistentTopics) Mockito.spy(new PersistentTopics());
        this.persistentTopics.setServletContext(new MockServletContext());
        this.persistentTopics.setPulsar(this.pulsar);
        ((PersistentTopics) Mockito.doReturn(this.mockZooKeeper).when(this.persistentTopics)).localZk();
        ((PersistentTopics) Mockito.doReturn(false).when(this.persistentTopics)).isRequestHttps();
        ((PersistentTopics) Mockito.doReturn((Object) null).when(this.persistentTopics)).originalPrincipal();
        ((PersistentTopics) Mockito.doReturn("test").when(this.persistentTopics)).clientAppId();
        ((PersistentTopics) Mockito.doReturn(TopicDomain.persistent.value()).when(this.persistentTopics)).domain();
        PersistentTopics persistentTopics = (PersistentTopics) Mockito.doNothing().when(this.persistentTopics);
        getClass();
        persistentTopics.validateAdminAccessForTenant("my-tenant");
        ((PersistentTopics) Mockito.doReturn(Mockito.mock(AuthenticationDataHttps.class)).when(this.persistentTopics)).clientAuthData();
        this.nonPersistentTopic = (NonPersistentTopics) Mockito.spy(new NonPersistentTopics());
        this.nonPersistentTopic.setServletContext(new MockServletContext());
        this.nonPersistentTopic.setPulsar(this.pulsar);
        ((NonPersistentTopics) Mockito.doReturn(this.mockZooKeeper).when(this.nonPersistentTopic)).localZk();
        ((NonPersistentTopics) Mockito.doReturn(false).when(this.nonPersistentTopic)).isRequestHttps();
        ((NonPersistentTopics) Mockito.doReturn((Object) null).when(this.nonPersistentTopic)).originalPrincipal();
        ((NonPersistentTopics) Mockito.doReturn("test").when(this.nonPersistentTopic)).clientAppId();
        ((NonPersistentTopics) Mockito.doReturn(TopicDomain.non_persistent.value()).when(this.nonPersistentTopic)).domain();
        NonPersistentTopics nonPersistentTopics = (NonPersistentTopics) Mockito.doNothing().when(this.nonPersistentTopic);
        getClass();
        nonPersistentTopics.validateAdminAccessForTenant("my-tenant");
        ((NonPersistentTopics) Mockito.doReturn(Mockito.mock(AuthenticationDataHttps.class)).when(this.nonPersistentTopic)).clientAuthData();
        this.admin.clusters().createCluster("use", ClusterData.builder().serviceUrl("http://broker-use.com:8080").build());
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl("http://broker-use.com:8080").build());
        Tenants tenants = this.admin.tenants();
        getClass();
        tenants.createTenant("my-tenant", new TenantInfoImpl(Sets.newHashSet(new String[]{"role1", "role2"}), Sets.newHashSet(new String[]{"use", "test"})));
        this.admin.namespaces().createNamespace("my-tenant/my-namespace", Sets.newHashSet(new String[]{"use", "test"}));
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testGetSubscriptions() {
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.getSubscriptions(asyncResponse, "my-tenant", "my-namespace", "topic-not-found", true);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume((Throwable) forClass.capture());
        Assert.assertEquals(((RestException) forClass.getValue()).getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
        Assert.assertEquals(((RestException) forClass.getValue()).getMessage(), "Topic not found");
        AsyncResponse asyncResponse2 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.getSubscriptions(asyncResponse2, "my-tenant", "my-namespace", "topic-not-found-partition-0", true);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse) Mockito.verify(asyncResponse2, Mockito.timeout(5000L).times(1))).resume((Throwable) forClass2.capture());
        Assert.assertEquals(((RestException) forClass2.getValue()).getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
        Assert.assertEquals(((RestException) forClass2.getValue()).getMessage(), "Partitioned Topic not found: persistent://my-tenant/my-namespace/topic-not-found-partition-0 has zero partitions");
        AsyncResponse asyncResponse3 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Response.class);
        this.persistentTopics.createPartitionedTopic(asyncResponse3, "my-tenant", "my-namespace", "topic-not-found", 3, true);
        ((AsyncResponse) Mockito.verify(asyncResponse3, Mockito.timeout(5000L).times(1))).resume(forClass3.capture());
        Assert.assertEquals(((Response) forClass3.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        AsyncResponse asyncResponse4 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(asyncResponse4, "my-tenant", "my-namespace", "topic-not-found", "test", true, MessageId.earliest, false);
        ArgumentCaptor forClass4 = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse4, Mockito.timeout(5000L).times(1))).resume(forClass4.capture());
        Assert.assertEquals(((Response) forClass4.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        AsyncResponse asyncResponse5 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.getSubscriptions(asyncResponse5, "my-tenant", "my-namespace", "topic-not-found-partition-0", true);
        ((AsyncResponse) Mockito.verify(asyncResponse5, Mockito.timeout(5000L).times(1))).resume(Lists.newArrayList(new String[]{"test"}));
        AsyncResponse asyncResponse6 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.deleteSubscription(asyncResponse6, "my-tenant", "my-namespace", "topic-not-found", "test", false, true);
        ArgumentCaptor forClass5 = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse6, Mockito.timeout(5000L).times(1))).resume(forClass5.capture());
        Assert.assertEquals(((Response) forClass5.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        AsyncResponse asyncResponse7 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.getSubscriptions(asyncResponse7, "my-tenant", "my-namespace", "topic-not-found-partition-0", true);
        ((AsyncResponse) Mockito.verify(asyncResponse7, Mockito.timeout(5000L).times(1))).resume(Lists.newArrayList());
        AsyncResponse asyncResponse8 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(asyncResponse8, "my-tenant", "my-namespace", "topic-not-found-partition-1", "test", true, MessageId.earliest, false);
        ArgumentCaptor forClass6 = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse8, Mockito.timeout(5000L).times(1))).resume(forClass6.capture());
        Assert.assertEquals(((Response) forClass6.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        AsyncResponse asyncResponse9 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.getSubscriptions(asyncResponse9, "my-tenant", "my-namespace", "topic-not-found-partition-1", true);
        ((AsyncResponse) Mockito.verify(asyncResponse9, Mockito.timeout(5000L).times(1))).resume(Lists.newArrayList(new String[]{"test"}));
        AsyncResponse asyncResponse10 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.getSubscriptions(asyncResponse10, "my-tenant", "my-namespace", "topic-not-found-partition-0", true);
        ((AsyncResponse) Mockito.verify(asyncResponse10, Mockito.timeout(5000L).times(1))).resume(Lists.newArrayList());
        AsyncResponse asyncResponse11 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.getSubscriptions(asyncResponse11, "my-tenant", "my-namespace", "topic-not-found", true);
        ((AsyncResponse) Mockito.verify(asyncResponse11, Mockito.timeout(5000L).times(1))).resume(Lists.newArrayList(new String[]{"test"}));
        AsyncResponse asyncResponse12 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.deletePartitionedTopic(asyncResponse12, "my-tenant", "my-namespace", "topic-not-found", true, true, false);
        ArgumentCaptor forClass7 = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse12, Mockito.timeout(5000L).times(1))).resume(forClass7.capture());
        Assert.assertEquals(((Response) forClass7.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
    }

    @Test
    public void testCreateSubscriptions() throws Exception {
        String str = "persistent://my-tenant/my-namespace/subWithPositionOrNot";
        this.admin.topics().createNonPartitionedTopic(str);
        ProducerImpl create = this.pulsarClient.newProducer().topic(str).maxPendingMessages(30000).create();
        for (int i = 0; i < 5; i++) {
            System.out.println(create.send(new byte[10]));
        }
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(asyncResponse, "my-tenant", "my-namespace", "subWithPositionOrNot", "sub-earliest", true, MessageId.earliest, false);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume(forClass.capture());
        Assert.assertEquals(((Response) forClass.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        long msgBacklog = ((SubscriptionStats) this.persistentTopics.getStats("my-tenant", "my-namespace", "subWithPositionOrNot", true, true, false).getSubscriptions().get("sub-earliest")).getMsgBacklog();
        System.out.println("Message back log for sub-earliest is :" + msgBacklog);
        Assert.assertEquals(msgBacklog, 5L);
        AsyncResponse asyncResponse2 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(asyncResponse2, "my-tenant", "my-namespace", "subWithPositionOrNot", "sub-latest", true, MessageId.latest, false);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse2, Mockito.timeout(5000L).times(1))).resume(forClass2.capture());
        Assert.assertEquals(((Response) forClass2.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        long msgBacklog2 = ((SubscriptionStats) this.persistentTopics.getStats("my-tenant", "my-namespace", "subWithPositionOrNot", true, true, false).getSubscriptions().get("sub-latest")).getMsgBacklog();
        System.out.println("Message back log for sub-latest is :" + msgBacklog2);
        Assert.assertEquals(msgBacklog2, 0L);
        AsyncResponse asyncResponse3 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(asyncResponse3, "my-tenant", "my-namespace", "subWithPositionOrNot", "sub-none-message-id", true, (MessageIdImpl) null, false);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse3, Mockito.timeout(5000L).times(1))).resume(forClass3.capture());
        Assert.assertEquals(((Response) forClass3.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        long msgBacklog3 = ((SubscriptionStats) this.persistentTopics.getStats("my-tenant", "my-namespace", "subWithPositionOrNot", true, true, false).getSubscriptions().get("sub-none-message-id")).getMsgBacklog();
        System.out.println("Message back log for sub-none-message-id is :" + msgBacklog3);
        Assert.assertEquals(msgBacklog3, 0L);
        create.close();
    }

    @Test
    public void testCreateSubscriptionForNonPersistentTopic() throws InterruptedException {
        ((PersistentTopics) Mockito.doReturn(TopicDomain.non_persistent.value()).when(this.persistentTopics)).domain();
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(RestException.class);
        this.persistentTopics.createSubscription(asyncResponse, "my-tenant", "my-namespace", "testCreateSubscriptionForNonPersistentTopic", "sub", true, MessageId.earliest, false);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume((Throwable) forClass.capture());
        Assert.assertEquals(((WebApplicationException) forClass.getValue()).getResponse().getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
    }

    @Test
    public void testTerminatePartitionedTopic() {
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createPartitionedTopic(asyncResponse, "my-tenant", "my-namespace", "topic-not-found", 1, true);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume(forClass.capture());
        Assert.assertEquals(((Response) forClass.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        AsyncResponse asyncResponse2 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(asyncResponse2, "my-tenant", "my-namespace", "topic-not-found", "test", true, MessageId.earliest, false);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse2, Mockito.timeout(5000L).times(1))).resume(forClass2.capture());
        Assert.assertEquals(((Response) forClass2.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        AsyncResponse asyncResponse3 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.terminatePartitionedTopic(asyncResponse3, "my-tenant", "my-namespace", "topic-not-found", true);
        ((AsyncResponse) Mockito.verify(asyncResponse3, Mockito.timeout(5000L).times(1))).resume(Arrays.asList(new MessageIdImpl(3L, -1L, -1)));
    }

    @Test
    public void testNonPartitionedTopics() {
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(asyncResponse, "my-tenant", "my-namespace", "non-partitioned-topic", "test", true, MessageId.latest, false);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume(forClass.capture());
        Assert.assertEquals(((Response) forClass.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        AsyncResponse asyncResponse2 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.getSubscriptions(asyncResponse2, "my-tenant", "my-namespace", "non-partitioned-topic-partition-0", true);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse) Mockito.verify(asyncResponse2, Mockito.timeout(5000L).times(1))).resume((Throwable) forClass2.capture());
        Assert.assertTrue(((RestException) forClass2.getValue()).getMessage().contains("zero partitions"));
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "secondary-non-partitioned-topic", true);
        Assert.assertEquals(this.persistentTopics.getPartitionedMetadata("my-tenant", "my-namespace", "non-partitioned-topic", true, false).partitions, 0);
        Assert.assertEquals(this.persistentTopics.getPartitionedMetadata("my-tenant", "my-namespace", "non-partitioned-topic", true, true).partitions, 0);
    }

    @Test
    public void testCreateNonPartitionedTopic() {
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "standard-topic-partition-a", true);
        Assert.assertEquals(this.persistentTopics.getPartitionedMetadata("my-tenant", "my-namespace", "standard-topic-partition-a", true, false).partitions, 0);
        Assert.assertEquals(this.persistentTopics.getPartitionedMetadata("my-tenant", "my-namespace", "standard-topic-partition-a", true, true).partitions, 0);
    }

    @Test(expectedExceptions = {RestException.class})
    public void testCreateNonPartitionedTopicWithInvalidName() {
        ((PersistentTopics) Mockito.doAnswer(invocationOnMock -> {
            TopicName topicName = (TopicName) invocationOnMock.getArgument(0, TopicName.class);
            if ($assertionsDisabled || topicName.getLocalName().equals("standard-topic")) {
                return new PartitionedTopicMetadata(10);
            }
            throw new AssertionError();
        }).when(this.persistentTopics)).getPartitionedTopicMetadata((TopicName) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "standard-topic-partition-10", true);
    }

    @Test
    public void testCreatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws KeeperException, InterruptedException {
        LocalZooKeeperCacheService localZooKeeperCacheService = (LocalZooKeeperCacheService) Mockito.mock(LocalZooKeeperCacheService.class);
        ZooKeeperManagedLedgerCache zooKeeperManagedLedgerCache = (ZooKeeperManagedLedgerCache) Mockito.mock(ZooKeeperManagedLedgerCache.class);
        ((PulsarService) Mockito.doReturn(localZooKeeperCacheService).when(this.pulsar)).getLocalZkCacheService();
        ((LocalZooKeeperCacheService) Mockito.doReturn(zooKeeperManagedLedgerCache).when(localZooKeeperCacheService)).managedLedgerListCache();
        ((ZooKeeperManagedLedgerCache) Mockito.doReturn(ImmutableSet.of("standard-topic", "special-topic-partition-123")).when(zooKeeperManagedLedgerCache)).get(ArgumentMatchers.anyString());
        ((ZooKeeperManagedLedgerCache) Mockito.doReturn(CompletableFuture.completedFuture(ImmutableSet.of("standard-topic", "special-topic-partition-123"))).when(zooKeeperManagedLedgerCache)).getAsync(ArgumentMatchers.anyString());
        ((PersistentTopics) Mockito.doReturn(new Policies()).when(this.persistentTopics)).getNamespacePolicies((NamespaceName) ArgumentMatchers.any());
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(RestException.class);
        this.persistentTopics.createPartitionedTopic(asyncResponse, "my-tenant", "my-namespace", "special-topic", 5, true);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume((Throwable) forClass.capture());
        Assert.assertEquals(((RestException) forClass.getValue()).getResponse().getStatus(), Response.Status.CONFLICT.getStatusCode());
    }

    @Test(expectedExceptions = {RestException.class})
    public void testUpdatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws Exception {
        LocalZooKeeperCacheService localZooKeeperCacheService = (LocalZooKeeperCacheService) Mockito.mock(LocalZooKeeperCacheService.class);
        ZooKeeperManagedLedgerCache zooKeeperManagedLedgerCache = (ZooKeeperManagedLedgerCache) Mockito.mock(ZooKeeperManagedLedgerCache.class);
        ((PulsarService) Mockito.doReturn(localZooKeeperCacheService).when(this.pulsar)).getLocalZkCacheService();
        ((LocalZooKeeperCacheService) Mockito.doReturn(zooKeeperManagedLedgerCache).when(localZooKeeperCacheService)).managedLedgerListCache();
        ((ZooKeeperManagedLedgerCache) Mockito.doReturn(ImmutableSet.of("special-topic-partition-10")).when(zooKeeperManagedLedgerCache)).get(ArgumentMatchers.anyString());
        ((ZooKeeperManagedLedgerCache) Mockito.doReturn(CompletableFuture.completedFuture(ImmutableSet.of("special-topic-partition-10"))).when(zooKeeperManagedLedgerCache)).getAsync(ArgumentMatchers.anyString());
        ((PersistentTopics) Mockito.doAnswer(invocationOnMock -> {
            this.persistentTopics.namespaceName = NamespaceName.get("tenant", "namespace");
            this.persistentTopics.topicName = TopicName.get("persistent", "tenant", "cluster", "namespace", "topicname");
            return null;
        }).when(this.persistentTopics)).validatePartitionedTopicName((String) ArgumentMatchers.any(), (String) ArgumentMatchers.any(), (String) ArgumentMatchers.any());
        ((PersistentTopics) Mockito.doNothing().when(this.persistentTopics)).validateAdminAccessForTenant(ArgumentMatchers.anyString());
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Response.class);
        this.persistentTopics.createPartitionedTopic(asyncResponse, "my-tenant", "my-namespace", "special-topic", 5, true);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume(forClass.capture());
        Assert.assertEquals(((Response) forClass.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        this.persistentTopics.updatePartitionedTopic("my-tenant", "my-namespace", "special-topic", true, false, 10);
    }

    @Test(timeOut = 10000)
    public void testUnloadTopic() {
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.unloadTopic(asyncResponse, "my-tenant", "my-namespace", "topic-not-exist", true);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(45000L).times(1))).resume((Throwable) forClass.capture());
        Assert.assertEquals(((RestException) forClass.getValue()).getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
        AsyncResponse asyncResponse2 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "standard-topic-to-be-unload", true);
        this.persistentTopics.unloadTopic(asyncResponse2, "my-tenant", "my-namespace", "standard-topic-to-be-unload", true);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse2, Mockito.timeout(5000L).times(1))).resume(forClass2.capture());
        Assert.assertEquals(((Response) forClass2.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        AsyncResponse asyncResponse3 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Response.class);
        this.persistentTopics.createPartitionedTopic(asyncResponse3, "my-tenant", "my-namespace", "partition-topic-to-be-unload", 6, true);
        ((AsyncResponse) Mockito.verify(asyncResponse3, Mockito.timeout(5000L).times(1))).resume(forClass3.capture());
        Assert.assertEquals(((Response) forClass3.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        AsyncResponse asyncResponse4 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.unloadTopic(asyncResponse4, "my-tenant", "my-namespace", "partition-topic-to-be-unload", true);
        ArgumentCaptor forClass4 = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse4, Mockito.timeout(5000L).times(1))).resume(forClass4.capture());
        Assert.assertEquals(((Response) forClass4.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        AsyncResponse asyncResponse5 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.deletePartitionedTopic(asyncResponse5, "my-tenant", "my-namespace", "partition-topic-to-be-unload", true, true, false);
        ArgumentCaptor forClass5 = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse5, Mockito.timeout(5000L).times(1))).resume(forClass5.capture());
        Assert.assertEquals(((Response) forClass5.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
    }

    @Test(timeOut = 10000)
    public void testUnloadTopicShallThrowNotFoundWhenTopicNotExist() {
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.unloadTopic(asyncResponse, "my-tenant", "my-namespace", "non-existent-topic", true);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(45000L).times(1))).resume((Throwable) forClass.capture());
        Assert.assertEquals(((RestException) forClass.getValue()).getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
    }

    @Test
    public void testGetPartitionedTopicsList() throws KeeperException, InterruptedException, PulsarAdminException {
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Response.class);
        this.persistentTopics.createPartitionedTopic(asyncResponse, "my-tenant", "my-namespace", "test-topic1", 3, true);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume(forClass.capture());
        Assert.assertEquals(((Response) forClass.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        AsyncResponse asyncResponse2 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Response.class);
        this.nonPersistentTopic.createPartitionedTopic(asyncResponse2, "my-tenant", "my-namespace", "test-topic2", 3, true);
        ((AsyncResponse) Mockito.verify(asyncResponse2, Mockito.timeout(5000L).times(1))).resume(forClass2.capture());
        Assert.assertEquals(((Response) forClass2.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        List partitionedTopicList = this.persistentTopics.getPartitionedTopicList("my-tenant", "my-namespace");
        Assert.assertEquals(partitionedTopicList.size(), 1);
        Assert.assertEquals(TopicName.get((String) partitionedTopicList.get(0)).getDomain().value(), TopicDomain.persistent.value());
        List partitionedTopicList2 = this.nonPersistentTopic.getPartitionedTopicList("my-tenant", "my-namespace");
        Assert.assertEquals(partitionedTopicList2.size(), 1);
        Assert.assertEquals(TopicName.get((String) partitionedTopicList2.get(0)).getDomain().value(), TopicDomain.non_persistent.value());
    }

    @Test
    public void testGrantNonPartitionedTopic() {
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "non-partitioned-topic", true);
        HashSet hashSet = new HashSet();
        hashSet.add(AuthAction.produce);
        this.persistentTopics.grantPermissionsOnTopic("my-tenant", "my-namespace", "non-partitioned-topic", "role", hashSet);
        Assert.assertEquals((Set) this.persistentTopics.getPermissionsOnTopic("my-tenant", "my-namespace", "non-partitioned-topic").get("role"), hashSet);
    }

    @Test
    public void testCreateExistedPartition() {
        this.persistentTopics.createPartitionedTopic((AsyncResponse) Mockito.mock(AsyncResponse.class), "my-tenant", "my-namespace", "test-create-existed-partition", 3, true);
        String localName = TopicName.get("test-create-existed-partition").getPartition(0).getLocalName();
        try {
            this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", localName, false);
            Assert.fail();
        } catch (RestException e) {
            log.error("Failed to create {}: {}", localName, e.getMessage());
            Assert.assertEquals(e.getResponse().getStatus(), 409);
            Assert.assertEquals(e.getMessage(), "This topic already exists");
        }
    }

    @Test
    public void testGrantPartitionedTopic() {
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Response.class);
        this.persistentTopics.createPartitionedTopic(asyncResponse, "my-tenant", "my-namespace", "partitioned-topic", 5, true);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume(forClass.capture());
        Assert.assertEquals(((Response) forClass.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        HashSet hashSet = new HashSet();
        hashSet.add(AuthAction.produce);
        this.persistentTopics.grantPermissionsOnTopic("my-tenant", "my-namespace", "partitioned-topic", "role", hashSet);
        Assert.assertEquals((Set) this.persistentTopics.getPermissionsOnTopic("my-tenant", "my-namespace", "partitioned-topic").get("role"), hashSet);
        TopicName topicName = TopicName.get(TopicDomain.persistent.value(), "my-tenant", "my-namespace", "partitioned-topic");
        for (int i = 0; i < 5; i++) {
            Assert.assertEquals((Set) this.persistentTopics.getPermissionsOnTopic("my-tenant", "my-namespace", topicName.getPartition(i).getEncodedLocalName()).get("role"), hashSet);
        }
    }

    @Test
    public void testRevokeNonPartitionedTopic() {
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "non-partitioned-topic", true);
        HashSet hashSet = new HashSet();
        hashSet.add(AuthAction.produce);
        this.persistentTopics.grantPermissionsOnTopic("my-tenant", "my-namespace", "non-partitioned-topic", "role", hashSet);
        this.persistentTopics.revokePermissionsOnTopic("my-tenant", "my-namespace", "non-partitioned-topic", "role");
        Assert.assertEquals((Set) this.persistentTopics.getPermissionsOnTopic("my-tenant", "my-namespace", "non-partitioned-topic").get("role"), (Set) null);
    }

    @Test
    public void testRevokePartitionedTopic() {
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Response.class);
        this.persistentTopics.createPartitionedTopic(asyncResponse, "my-tenant", "my-namespace", "partitioned-topic", 5, true);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume(forClass.capture());
        Assert.assertEquals(((Response) forClass.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        HashSet hashSet = new HashSet();
        hashSet.add(AuthAction.produce);
        this.persistentTopics.grantPermissionsOnTopic("my-tenant", "my-namespace", "partitioned-topic", "role", hashSet);
        this.persistentTopics.revokePermissionsOnTopic("my-tenant", "my-namespace", "partitioned-topic", "role");
        Assert.assertEquals((Set) this.persistentTopics.getPermissionsOnTopic("my-tenant", "my-namespace", "partitioned-topic").get("role"), (Set) null);
        TopicName topicName = TopicName.get(TopicDomain.persistent.value(), "my-tenant", "my-namespace", "partitioned-topic");
        for (int i = 0; i < 5; i++) {
            Assert.assertEquals((Set) this.persistentTopics.getPermissionsOnTopic("my-tenant", "my-namespace", topicName.getPartition(i).getEncodedLocalName()).get("role"), (Set) null);
        }
    }

    @Test
    public void testTriggerCompactionTopic() {
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.compact(asyncResponse, "my-tenant", "my-namespace", "non-existing-topic", true);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume((Throwable) forClass.capture());
        Assert.assertEquals(((RestException) forClass.getValue()).getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
        AsyncResponse asyncResponse2 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "test-non-part", true);
        this.persistentTopics.compact(asyncResponse2, "my-tenant", "my-namespace", "test-non-part", true);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse2, Mockito.timeout(5000L).times(1))).resume(forClass2.capture());
        Assert.assertEquals(((Response) forClass2.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        AsyncResponse asyncResponse3 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createPartitionedTopic(asyncResponse3, "my-tenant", "my-namespace", "test-part", 2, true);
        this.persistentTopics.compact(asyncResponse3, "my-tenant", "my-namespace", "test-part", true);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse3, Mockito.timeout(5000L).times(1))).resume(forClass3.capture());
        Assert.assertEquals(((Response) forClass3.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
    }

    @Test
    public void testPeekWithSubscriptionNameNotExist() throws Exception {
        String topicName = TopicName.get(TopicDomain.persistent.value(), "my-tenant", "my-namespace", "testTopic").toString();
        this.admin.topics().createPartitionedTopicAsync(topicName, 3, true).get();
        String str = topicName + "-partition-0";
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
        for (int i = 0; i < 100; i++) {
            create.send("test" + i);
        }
        Assert.assertEquals(this.admin.topics().peekMessages(str, "sub", 5).size(), 5);
        create.close();
    }

    @Test
    public void testGetLastMessageId() throws Exception {
        this.admin.tenants().createTenant("prop-xyz", new TenantInfoImpl(Sets.newHashSet(new String[]{"role1", "role2"}), Sets.newHashSet(new String[]{"test"})));
        this.admin.namespaces().createNamespace("prop-xyz/ns1", Sets.newHashSet(new String[]{"test"}));
        this.admin.topics().createNonPartitionedTopic("persistent://prop-xyz/ns1/testGetLastMessageId");
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop-xyz/ns1/testGetLastMessageId").enableBatching(true).batchingMaxMessages(100).batchingMaxPublishDelay(2L, TimeUnit.SECONDS).create();
        this.admin.topics().createSubscription("persistent://prop-xyz/ns1/testGetLastMessageId", "test", MessageId.earliest);
        CompletableFuture completableFuture = new CompletableFuture();
        for (int i = 0; i < 10; i++) {
            completableFuture = create.sendAsync("test".getBytes());
        }
        completableFuture.get();
        Assert.assertEquals(this.admin.topics().getLastMessageId("persistent://prop-xyz/ns1/testGetLastMessageId").getBatchIndex(), 9);
        this.pulsarClient.newProducer().topic("persistent://prop-xyz/ns1/testGetLastMessageId").enableBatching(false).create().send("test".getBytes());
        Assert.assertTrue(this.admin.topics().getLastMessageId("persistent://prop-xyz/ns1/testGetLastMessageId") instanceof MessageIdImpl);
    }

    @Test
    public void testExamineMessage() throws Exception {
        this.admin.tenants().createTenant("tenant-xyz", new TenantInfoImpl(Sets.newHashSet(new String[]{"role1", "role2"}), Sets.newHashSet(new String[]{"test"})));
        this.admin.namespaces().createNamespace("tenant-xyz/ns-abc", Sets.newHashSet(new String[]{"test"}));
        this.admin.topics().createPartitionedTopic("persistent://tenant-xyz/ns-abc/topic-123", 2);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://tenant-xyz/ns-abc/topic-123-partition-0").create();
        try {
            this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123", "earliest", 1L);
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getMessage(), "Examine messages on a partitioned topic is not allowed, please try examine message on specific topic partition");
        }
        create.send("message1");
        Assert.assertEquals(new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "earliest", 1L).getData()), "message1");
        Assert.assertEquals(new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "latest", 1L).getData()), "message1");
        create.send("message2");
        create.send("message3");
        create.send("message4");
        create.send("message5");
        Assert.assertEquals(new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "earliest", 1L).getData()), "message1");
        Assert.assertEquals(new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "earliest", 2L).getData()), "message2");
        Assert.assertEquals(new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "earliest", 3L).getData()), "message3");
        Assert.assertEquals(new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "earliest", 4L).getData()), "message4");
        Assert.assertEquals(new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "earliest", 5L).getData()), "message5");
        Assert.assertEquals(new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "latest", 1L).getData()), "message5");
        Assert.assertEquals(new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "latest", 2L).getData()), "message4");
        Assert.assertEquals(new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "latest", 3L).getData()), "message3");
        Assert.assertEquals(new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "latest", 4L).getData()), "message2");
        Assert.assertEquals(new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "latest", 5L).getData()), "message1");
    }

    @Test
    public void testExamineMessageMetadata() throws Exception {
        this.admin.tenants().createTenant("tenant-xyz", new TenantInfoImpl(Sets.newHashSet(new String[]{"role1", "role2"}), Sets.newHashSet(new String[]{"test"})));
        this.admin.namespaces().createNamespace("tenant-xyz/ns-abc", Sets.newHashSet(new String[]{"test"}));
        this.admin.topics().createPartitionedTopic("persistent://tenant-xyz/ns-abc/topic-testExamineMessageMetadata", 2);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).producerName("testExamineMessageMetadataProducer").compressionType(CompressionType.LZ4).topic("persistent://tenant-xyz/ns-abc/topic-testExamineMessageMetadata-partition-0").create();
        create.newMessage().keyBytes("partition123".getBytes()).orderingKey(new byte[]{0}).replicationClusters(Lists.newArrayList(new String[]{"a", "b"})).sequenceId(112233L).value("data").send();
        MessageImpl examineMessage = this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-testExamineMessageMetadata-partition-0", "earliest", 1L);
        Assert.assertEquals(112233L, examineMessage.getSequenceId());
        Assert.assertEquals(new byte[]{0}, examineMessage.getOrderingKey());
        Assert.assertEquals("partition123".getBytes(), examineMessage.getKeyBytes());
        Assert.assertTrue(examineMessage.hasBase64EncodedKey());
        Assert.assertEquals(Lists.newArrayList(new String[]{"a", "b"}), examineMessage.getReplicateTo());
        Assert.assertEquals(create.getProducerName(), examineMessage.getProducerName());
        Assert.assertEquals(CompressionType.LZ4.ordinal(), examineMessage.getMessageBuilder().getCompression().ordinal());
        Assert.assertEquals("data", new String(examineMessage.getData()));
    }

    @Test
    public void testOffloadWithNullMessageId() {
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "topic-123", true);
        try {
            this.persistentTopics.triggerOffload("my-tenant", "my-namespace", "topic-123", true, (MessageIdImpl) null);
            Assert.fail("should have failed");
        } catch (RestException e) {
            Assert.assertEquals(e.getResponse().getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
        }
    }

    @Test
    public void testSetReplicatedSubscriptionStatus() {
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.setReplicatedSubscriptionStatus(asyncResponse, "my-tenant", "my-namespace", "topic-with-repl-sub", "sub", true, true);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(10000L).times(1))).resume((Throwable) forClass.capture());
        Assert.assertEquals(((RestException) forClass.getValue()).getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
        AsyncResponse asyncResponse2 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.setReplicatedSubscriptionStatus(asyncResponse2, "my-tenant", "my-namespace", "topic-with-repl-sub-partition-0", "sub", true, true);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse) Mockito.verify(asyncResponse2, Mockito.timeout(10000L).times(1))).resume((Throwable) forClass2.capture());
        Assert.assertEquals(((RestException) forClass2.getValue()).getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
        AsyncResponse asyncResponse3 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Response.class);
        this.persistentTopics.createPartitionedTopic(asyncResponse3, "my-tenant", "my-namespace", "topic-with-repl-sub", 2, false);
        ((AsyncResponse) Mockito.verify(asyncResponse3, Mockito.timeout(10000L).times(1))).resume(forClass3.capture());
        Assert.assertEquals(((Response) forClass3.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        AsyncResponse asyncResponse4 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(asyncResponse4, "my-tenant", "my-namespace", "topic-with-repl-sub", "sub", true, MessageId.latest, false);
        ArgumentCaptor forClass4 = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse4, Mockito.timeout(10000L).times(1))).resume(forClass4.capture());
        Assert.assertEquals(((Response) forClass4.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        AsyncResponse asyncResponse5 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.setReplicatedSubscriptionStatus(asyncResponse5, "my-tenant", "my-namespace", "topic-with-repl-sub", "sub", true, true);
        ((AsyncResponse) Mockito.verify(asyncResponse5, Mockito.timeout(10000L).times(1))).resume(forClass4.capture());
        Assert.assertEquals(((Response) forClass4.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        AsyncResponse asyncResponse6 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.setReplicatedSubscriptionStatus(asyncResponse6, "my-tenant", "my-namespace", "topic-with-repl-sub", "sub", true, false);
        ((AsyncResponse) Mockito.verify(asyncResponse6, Mockito.timeout(10000L).times(1))).resume(forClass4.capture());
        Assert.assertEquals(((Response) forClass4.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        AsyncResponse asyncResponse7 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.setReplicatedSubscriptionStatus(asyncResponse7, "my-tenant", "my-namespace", "topic-with-repl-sub-partition-0", "sub", true, true);
        ((AsyncResponse) Mockito.verify(asyncResponse7, Mockito.timeout(10000L).times(1))).resume(forClass4.capture());
        Assert.assertEquals(((Response) forClass4.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        AsyncResponse asyncResponse8 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.setReplicatedSubscriptionStatus(asyncResponse8, "my-tenant", "my-namespace", "topic-with-repl-sub-partition-0", "sub", true, false);
        ((AsyncResponse) Mockito.verify(asyncResponse8, Mockito.timeout(10000L).times(1))).resume(forClass4.capture());
        Assert.assertEquals(((Response) forClass4.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        AsyncResponse asyncResponse9 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.deleteSubscription(asyncResponse9, "my-tenant", "my-namespace", "topic-with-repl-sub", "sub", false, true);
        ArgumentCaptor forClass5 = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse9, Mockito.timeout(10000L).times(1))).resume(forClass5.capture());
        Assert.assertEquals(((Response) forClass5.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
        AsyncResponse asyncResponse10 = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        this.persistentTopics.deletePartitionedTopic(asyncResponse10, "my-tenant", "my-namespace", "topic-with-repl-sub", true, true, false);
        ArgumentCaptor forClass6 = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse10, Mockito.timeout(10000L).times(1))).resume(forClass6.capture());
        Assert.assertEquals(((Response) forClass6.getValue()).getStatus(), Response.Status.NO_CONTENT.getStatusCode());
    }

    static {
        $assertionsDisabled = !PersistentTopicsTest.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(PersistentTopicsTest.class);
    }
}
