/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.admin;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletContext;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.MockServletContext;
import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
import org.apache.pulsar.broker.admin.v2.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
import org.apache.zookeeper.KeeperException;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@PrepareForTest(value={PersistentTopics.class})
@PowerMockIgnore(value={"com.sun.management.*"})
@Test(groups={"broker"})
public class PersistentTopicsTest
extends MockedPulsarServiceBaseTest {
    private static final Logger log = LoggerFactory.getLogger(PersistentTopicsTest.class);
    private PersistentTopics persistentTopics;
    private final String testTenant = "my-tenant";
    private final String testLocalCluster = "use";
    private final String testNamespace = "my-namespace";
    protected Field uriField;
    protected UriInfo uriInfo;
    private NonPersistentTopics nonPersistentTopic;

    @BeforeClass
    public void initPersistentTopics() throws Exception {
        this.uriField = PulsarWebResource.class.getDeclaredField("uri");
        this.uriField.setAccessible(true);
        this.uriInfo = (UriInfo)Mockito.mock(UriInfo.class);
    }

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        this.persistentTopics = (PersistentTopics)Mockito.spy((Object)new PersistentTopics());
        this.persistentTopics.setServletContext((ServletContext)new MockServletContext());
        this.persistentTopics.setPulsar(this.pulsar);
        ((PersistentTopics)Mockito.doReturn((Object)this.mockZooKeeper).when((Object)this.persistentTopics)).localZk();
        ((PersistentTopics)Mockito.doReturn((Object)false).when((Object)this.persistentTopics)).isRequestHttps();
        ((PersistentTopics)Mockito.doReturn(null).when((Object)this.persistentTopics)).originalPrincipal();
        ((PersistentTopics)Mockito.doReturn((Object)"test").when((Object)this.persistentTopics)).clientAppId();
        ((PersistentTopics)Mockito.doReturn((Object)TopicDomain.persistent.value()).when((Object)this.persistentTopics)).domain();
        ((PersistentTopics)Mockito.doNothing().when((Object)this.persistentTopics)).validateAdminAccessForTenant(this.testTenant);
        ((PersistentTopics)Mockito.doReturn((Object)Mockito.mock(AuthenticationDataHttps.class)).when((Object)this.persistentTopics)).clientAuthData();
        this.nonPersistentTopic = (NonPersistentTopics)Mockito.spy((Object)new NonPersistentTopics());
        this.nonPersistentTopic.setServletContext((ServletContext)new MockServletContext());
        this.nonPersistentTopic.setPulsar(this.pulsar);
        ((NonPersistentTopics)Mockito.doReturn((Object)this.mockZooKeeper).when((Object)this.nonPersistentTopic)).localZk();
        ((NonPersistentTopics)Mockito.doReturn((Object)false).when((Object)this.nonPersistentTopic)).isRequestHttps();
        ((NonPersistentTopics)Mockito.doReturn(null).when((Object)this.nonPersistentTopic)).originalPrincipal();
        ((NonPersistentTopics)Mockito.doReturn((Object)"test").when((Object)this.nonPersistentTopic)).clientAppId();
        ((NonPersistentTopics)Mockito.doReturn((Object)TopicDomain.non_persistent.value()).when((Object)this.nonPersistentTopic)).domain();
        ((NonPersistentTopics)Mockito.doNothing().when((Object)this.nonPersistentTopic)).validateAdminAccessForTenant(this.testTenant);
        ((NonPersistentTopics)Mockito.doReturn((Object)Mockito.mock(AuthenticationDataHttps.class)).when((Object)this.nonPersistentTopic)).clientAuthData();
        this.admin.clusters().createCluster("use", new ClusterData("http://broker-use.com:8080"));
        this.admin.clusters().createCluster("test", new ClusterData("http://broker-use.com:8080"));
        this.admin.tenants().createTenant(this.testTenant, new TenantInfo((Set)Sets.newHashSet((Object[])new String[]{"role1", "role2"}), (Set)Sets.newHashSet((Object[])new String[]{"use", "test"})));
        this.admin.namespaces().createNamespace("my-tenant/my-namespace", (Set)Sets.newHashSet((Object[])new String[]{"use", "test"}));
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testGetSubscriptions() {
        String testLocalTopicName = "topic-not-found";
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.getSubscriptions(response, "my-tenant", "my-namespace", testLocalTopicName, true);
        ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Throwable)errorCaptor.capture());
        Assert.assertEquals((int)((RestException)((Object)errorCaptor.getValue())).getResponse().getStatus(), (int)Response.Status.NOT_FOUND.getStatusCode());
        Assert.assertEquals((String)((RestException)((Object)errorCaptor.getValue())).getMessage(), (String)"Topic not found");
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.getSubscriptions(response, "my-tenant", "my-namespace", testLocalTopicName + "-partition-0", true);
        errorCaptor = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Throwable)errorCaptor.capture());
        Assert.assertEquals((int)((RestException)((Object)errorCaptor.getValue())).getResponse().getStatus(), (int)Response.Status.NOT_FOUND.getStatusCode());
        Assert.assertEquals((String)((RestException)((Object)errorCaptor.getValue())).getMessage(), (String)"Partitioned Topic not found: persistent://my-tenant/my-namespace/topic-not-found-partition-0 has zero partitions");
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", testLocalTopicName, 3);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(response, "my-tenant", "my-namespace", testLocalTopicName, "test", true, (MessageIdImpl)MessageId.earliest, false);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.getSubscriptions(response, "my-tenant", "my-namespace", testLocalTopicName + "-partition-0", true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Object)Lists.newArrayList((Object[])new String[]{"test"}));
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.deleteSubscription(response, "my-tenant", "my-namespace", testLocalTopicName, "test", false, true);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.getSubscriptions(response, "my-tenant", "my-namespace", testLocalTopicName + "-partition-0", true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Object)Lists.newArrayList());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.deletePartitionedTopic(response, "my-tenant", "my-namespace", testLocalTopicName, true, true, false);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
    }

    @Test
    public void testCreateSubscriptions() throws Exception {
        int numberOfMessages = 5;
        String SUB_EARLIEST = "sub-earliest";
        String SUB_LATEST = "sub-latest";
        String SUB_NONE_MESSAGE_ID = "sub-none-message-id";
        String testLocalTopicName = "subWithPositionOrNot";
        String topicName = "persistent://my-tenant/my-namespace/" + testLocalTopicName;
        this.admin.topics().createNonPartitionedTopic(topicName);
        ProducerImpl producer = (ProducerImpl)this.pulsarClient.newProducer().topic(topicName).maxPendingMessages(30000).create();
        for (int i = 0; i < 5; ++i) {
            System.out.println(producer.send((Object)new byte[10]));
        }
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(response, "my-tenant", "my-namespace", testLocalTopicName, "sub-earliest", true, (MessageIdImpl)MessageId.earliest, false);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        TopicStats topicStats = this.persistentTopics.getStats("my-tenant", "my-namespace", testLocalTopicName, true, true, false);
        long msgBacklog = ((SubscriptionStats)topicStats.subscriptions.get((Object)"sub-earliest")).msgBacklog;
        System.out.println("Message back log for sub-earliest is :" + msgBacklog);
        Assert.assertEquals((long)msgBacklog, (long)5L);
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(response, "my-tenant", "my-namespace", testLocalTopicName, "sub-latest", true, (MessageIdImpl)MessageId.latest, false);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        topicStats = this.persistentTopics.getStats("my-tenant", "my-namespace", testLocalTopicName, true, true, false);
        msgBacklog = ((SubscriptionStats)topicStats.subscriptions.get((Object)"sub-latest")).msgBacklog;
        System.out.println("Message back log for sub-latest is :" + msgBacklog);
        Assert.assertEquals((long)msgBacklog, (long)0L);
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(response, "my-tenant", "my-namespace", testLocalTopicName, "sub-none-message-id", true, null, false);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        topicStats = this.persistentTopics.getStats("my-tenant", "my-namespace", testLocalTopicName, true, true, false);
        msgBacklog = ((SubscriptionStats)topicStats.subscriptions.get((Object)"sub-none-message-id")).msgBacklog;
        System.out.println("Message back log for sub-none-message-id is :" + msgBacklog);
        Assert.assertEquals((long)msgBacklog, (long)0L);
        producer.close();
    }

    @Test
    public void testCreateSubscriptionForNonPersistentTopic() throws InterruptedException {
        ((PersistentTopics)Mockito.doReturn((Object)TopicDomain.non_persistent.value()).when((Object)this.persistentTopics)).domain();
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(RestException.class);
        this.persistentTopics.createSubscription(response, "my-tenant", "my-namespace", "testCreateSubscriptionForNonPersistentTopic", "sub", true, (MessageIdImpl)MessageId.earliest, false);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Throwable)responseCaptor.capture());
        Assert.assertEquals((int)((WebApplicationException)((Object)responseCaptor.getValue())).getResponse().getStatus(), (int)Response.Status.BAD_REQUEST.getStatusCode());
    }

    @Test
    public void testTerminatePartitionedTopic() {
        String testLocalTopicName = "topic-not-found";
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", testLocalTopicName, 1);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(response, "my-tenant", "my-namespace", testLocalTopicName, "test", true, (MessageIdImpl)MessageId.earliest, false);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.terminatePartitionedTopic(response, "my-tenant", "my-namespace", testLocalTopicName, true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(Arrays.asList(new MessageIdImpl(3L, -1L, -1)));
    }

    @Test
    public void testNonPartitionedTopics() {
        String nonPartitionTopic = "non-partitioned-topic";
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(response, "my-tenant", "my-namespace", "non-partitioned-topic", "test", true, (MessageIdImpl)MessageId.latest, false);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.getSubscriptions(response, "my-tenant", "my-namespace", "non-partitioned-topic-partition-0", true);
        ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Throwable)errorCaptor.capture());
        Assert.assertTrue((boolean)((RestException)((Object)errorCaptor.getValue())).getMessage().contains("zero partitions"));
        String nonPartitionTopic2 = "secondary-non-partitioned-topic";
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "secondary-non-partitioned-topic", true);
        Assert.assertEquals((int)this.persistentTopics.getPartitionedMetadata((String)"my-tenant", (String)"my-namespace", (String)"non-partitioned-topic", (boolean)true, (boolean)false).partitions, (int)0);
        Assert.assertEquals((int)this.persistentTopics.getPartitionedMetadata((String)"my-tenant", (String)"my-namespace", (String)"non-partitioned-topic", (boolean)true, (boolean)true).partitions, (int)0);
    }

    @Test
    public void testCreateNonPartitionedTopic() {
        String topicName = "standard-topic-partition-a";
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "standard-topic-partition-a", true);
        PartitionedTopicMetadata pMetadata = this.persistentTopics.getPartitionedMetadata("my-tenant", "my-namespace", "standard-topic-partition-a", true, false);
        Assert.assertEquals((int)pMetadata.partitions, (int)0);
        PartitionedTopicMetadata metadata = this.persistentTopics.getPartitionedMetadata("my-tenant", "my-namespace", "standard-topic-partition-a", true, true);
        Assert.assertEquals((int)metadata.partitions, (int)0);
    }

    @Test(expectedExceptions={RestException.class})
    public void testCreateNonPartitionedTopicWithInvalidName() {
        String topicName = "standard-topic-partition-10";
        ((PersistentTopics)Mockito.doAnswer(invocation -> {
            TopicName partitionedTopicname = (TopicName)invocation.getArgument(0, TopicName.class);
            assert (partitionedTopicname.getLocalName().equals("standard-topic"));
            return new PartitionedTopicMetadata(10);
        }).when((Object)this.persistentTopics)).getPartitionedTopicMetadata((TopicName)ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "standard-topic-partition-10", true);
    }

    @Test
    public void testCreatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws KeeperException, InterruptedException {
        String nonPartitionTopicName1 = "standard-topic";
        String nonPartitionTopicName2 = "special-topic-partition-123";
        String partitionedTopicName = "special-topic";
        LocalZooKeeperCacheService mockLocalZooKeeperCacheService = (LocalZooKeeperCacheService)Mockito.mock(LocalZooKeeperCacheService.class);
        ZooKeeperManagedLedgerCache mockZooKeeperChildrenCache = (ZooKeeperManagedLedgerCache)Mockito.mock(ZooKeeperManagedLedgerCache.class);
        ((PulsarService)Mockito.doReturn((Object)mockLocalZooKeeperCacheService).when((Object)this.pulsar)).getLocalZkCacheService();
        ((LocalZooKeeperCacheService)Mockito.doReturn((Object)mockZooKeeperChildrenCache).when((Object)mockLocalZooKeeperCacheService)).managedLedgerListCache();
        ((ZooKeeperManagedLedgerCache)Mockito.doReturn((Object)ImmutableSet.of((Object)"standard-topic", (Object)"special-topic-partition-123")).when((Object)mockZooKeeperChildrenCache)).get(ArgumentMatchers.anyString());
        ((ZooKeeperManagedLedgerCache)Mockito.doReturn(CompletableFuture.completedFuture(ImmutableSet.of((Object)"standard-topic", (Object)"special-topic-partition-123"))).when((Object)mockZooKeeperChildrenCache)).getAsync(ArgumentMatchers.anyString());
        ((PersistentTopics)Mockito.doReturn((Object)new Policies()).when((Object)this.persistentTopics)).getNamespacePolicies((NamespaceName)ArgumentMatchers.any());
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        ArgumentCaptor errCaptor = ArgumentCaptor.forClass(RestException.class);
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", "special-topic", 5);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Throwable)errCaptor.capture());
        Assert.assertEquals((int)((RestException)((Object)errCaptor.getValue())).getResponse().getStatus(), (int)Response.Status.CONFLICT.getStatusCode());
    }

    @Test(expectedExceptions={RestException.class})
    public void testUpdatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws Exception {
        String nonPartitionTopicName2 = "special-topic-partition-10";
        String partitionedTopicName = "special-topic";
        LocalZooKeeperCacheService mockLocalZooKeeperCacheService = (LocalZooKeeperCacheService)Mockito.mock(LocalZooKeeperCacheService.class);
        ZooKeeperManagedLedgerCache mockZooKeeperChildrenCache = (ZooKeeperManagedLedgerCache)Mockito.mock(ZooKeeperManagedLedgerCache.class);
        ((PulsarService)Mockito.doReturn((Object)mockLocalZooKeeperCacheService).when((Object)this.pulsar)).getLocalZkCacheService();
        ((LocalZooKeeperCacheService)Mockito.doReturn((Object)mockZooKeeperChildrenCache).when((Object)mockLocalZooKeeperCacheService)).managedLedgerListCache();
        ((ZooKeeperManagedLedgerCache)Mockito.doReturn((Object)ImmutableSet.of((Object)"special-topic-partition-10")).when((Object)mockZooKeeperChildrenCache)).get(ArgumentMatchers.anyString());
        ((ZooKeeperManagedLedgerCache)Mockito.doReturn(CompletableFuture.completedFuture(ImmutableSet.of((Object)"special-topic-partition-10"))).when((Object)mockZooKeeperChildrenCache)).getAsync(ArgumentMatchers.anyString());
        ((PersistentTopics)Mockito.doAnswer(invocation -> {
            this.persistentTopics.namespaceName = NamespaceName.get((String)"tenant", (String)"namespace");
            this.persistentTopics.topicName = TopicName.get((String)"persistent", (String)"tenant", (String)"cluster", (String)"namespace", (String)"topicname");
            return null;
        }).when((Object)this.persistentTopics)).validatePartitionedTopicName((String)ArgumentMatchers.any(), (String)ArgumentMatchers.any(), (String)ArgumentMatchers.any());
        ((PersistentTopics)Mockito.doNothing().when((Object)this.persistentTopics)).validateAdminAccessForTenant(ArgumentMatchers.anyString());
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", "special-topic", 5);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        this.persistentTopics.updatePartitionedTopic("my-tenant", "my-namespace", "special-topic", true, false, 10);
    }

    @Test(timeOut=10000L)
    public void testUnloadTopic() {
        String topicName = "standard-topic-to-be-unload";
        String partitionTopicName = "partition-topic-to-be-unload";
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.unloadTopic(response, "my-tenant", "my-namespace", "topic-not-exist", true);
        ArgumentCaptor errCaptor = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)45000L).times(1))).resume((Throwable)errCaptor.capture());
        Assert.assertEquals((int)((RestException)((Object)errCaptor.getValue())).getResponse().getStatus(), (int)Response.Status.NOT_FOUND.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "standard-topic-to-be-unload", true);
        this.persistentTopics.unloadTopic(response, "my-tenant", "my-namespace", "standard-topic-to-be-unload", true);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", "partition-topic-to-be-unload", 6);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.unloadTopic(response, "my-tenant", "my-namespace", "partition-topic-to-be-unload", true);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.deletePartitionedTopic(response, "my-tenant", "my-namespace", "partition-topic-to-be-unload", true, true, false);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
    }

    @Test(timeOut=10000L)
    public void testUnloadTopicShallThrowNotFoundWhenTopicNotExist() {
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.unloadTopic(response, "my-tenant", "my-namespace", "non-existent-topic", true);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)45000L).times(1))).resume((Throwable)responseCaptor.capture());
        Assert.assertEquals((int)((RestException)((Object)responseCaptor.getValue())).getResponse().getStatus(), (int)Response.Status.NOT_FOUND.getStatusCode());
    }

    @Test
    public void testGetPartitionedTopicsList() throws KeeperException, InterruptedException, PulsarAdminException {
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", "test-topic1", 3);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        this.nonPersistentTopic.createPartitionedTopic(response, "my-tenant", "my-namespace", "test-topic2", 3);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        List persistentPartitionedTopics = this.persistentTopics.getPartitionedTopicList("my-tenant", "my-namespace");
        Assert.assertEquals((int)persistentPartitionedTopics.size(), (int)1);
        Assert.assertEquals((String)TopicName.get((String)((String)persistentPartitionedTopics.get(0))).getDomain().value(), (String)TopicDomain.persistent.value());
        List nonPersistentPartitionedTopics = this.nonPersistentTopic.getPartitionedTopicList("my-tenant", "my-namespace");
        Assert.assertEquals((int)nonPersistentPartitionedTopics.size(), (int)1);
        Assert.assertEquals((String)TopicName.get((String)((String)nonPersistentPartitionedTopics.get(0))).getDomain().value(), (String)TopicDomain.non_persistent.value());
    }

    @Test
    public void testGrantNonPartitionedTopic() {
        String topicName = "non-partitioned-topic";
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "non-partitioned-topic", true);
        String role = "role";
        HashSet<AuthAction> expectActions = new HashSet<AuthAction>();
        expectActions.add(AuthAction.produce);
        this.persistentTopics.grantPermissionsOnTopic("my-tenant", "my-namespace", "non-partitioned-topic", role, expectActions);
        Map permissions = this.persistentTopics.getPermissionsOnTopic("my-tenant", "my-namespace", "non-partitioned-topic");
        Assert.assertEquals((Set)((Set)permissions.get(role)), expectActions);
    }

    @Test
    public void testCreateExistedPartition() {
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        String topicName = "test-create-existed-partition";
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", "test-create-existed-partition", 3);
        String partitionName = TopicName.get((String)"test-create-existed-partition").getPartition(0).getLocalName();
        try {
            this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", partitionName, false);
            Assert.fail();
        }
        catch (RestException e) {
            log.error("Failed to create {}: {}", (Object)partitionName, (Object)e.getMessage());
            Assert.assertEquals((int)e.getResponse().getStatus(), (int)409);
            Assert.assertEquals((String)e.getMessage(), (String)"This topic already exists");
        }
    }

    @Test
    public void testGrantPartitionedTopic() {
        String partitionedTopicName = "partitioned-topic";
        int numPartitions = 5;
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", "partitioned-topic", 5);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        String role = "role";
        HashSet<AuthAction> expectActions = new HashSet<AuthAction>();
        expectActions.add(AuthAction.produce);
        this.persistentTopics.grantPermissionsOnTopic("my-tenant", "my-namespace", "partitioned-topic", role, expectActions);
        Map permissions = this.persistentTopics.getPermissionsOnTopic("my-tenant", "my-namespace", "partitioned-topic");
        Assert.assertEquals((Set)((Set)permissions.get(role)), expectActions);
        TopicName topicName = TopicName.get((String)TopicDomain.persistent.value(), (String)"my-tenant", (String)"my-namespace", (String)"partitioned-topic");
        for (int i = 0; i < 5; ++i) {
            TopicName partition = topicName.getPartition(i);
            Map partitionPermissions = this.persistentTopics.getPermissionsOnTopic("my-tenant", "my-namespace", partition.getEncodedLocalName());
            Assert.assertEquals((Set)((Set)partitionPermissions.get(role)), expectActions);
        }
    }

    @Test
    public void testRevokeNonPartitionedTopic() {
        String topicName = "non-partitioned-topic";
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "non-partitioned-topic", true);
        String role = "role";
        HashSet<AuthAction> expectActions = new HashSet<AuthAction>();
        expectActions.add(AuthAction.produce);
        this.persistentTopics.grantPermissionsOnTopic("my-tenant", "my-namespace", "non-partitioned-topic", role, expectActions);
        this.persistentTopics.revokePermissionsOnTopic("my-tenant", "my-namespace", "non-partitioned-topic", role);
        Map permissions = this.persistentTopics.getPermissionsOnTopic("my-tenant", "my-namespace", "non-partitioned-topic");
        Assert.assertEquals((Set)((Set)permissions.get(role)), null);
    }

    @Test
    public void testRevokePartitionedTopic() {
        String partitionedTopicName = "partitioned-topic";
        int numPartitions = 5;
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", "partitioned-topic", 5);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        String role = "role";
        HashSet<AuthAction> expectActions = new HashSet<AuthAction>();
        expectActions.add(AuthAction.produce);
        this.persistentTopics.grantPermissionsOnTopic("my-tenant", "my-namespace", "partitioned-topic", role, expectActions);
        this.persistentTopics.revokePermissionsOnTopic("my-tenant", "my-namespace", "partitioned-topic", role);
        Map permissions = this.persistentTopics.getPermissionsOnTopic("my-tenant", "my-namespace", "partitioned-topic");
        Assert.assertEquals((Set)((Set)permissions.get(role)), null);
        TopicName topicName = TopicName.get((String)TopicDomain.persistent.value(), (String)"my-tenant", (String)"my-namespace", (String)"partitioned-topic");
        for (int i = 0; i < 5; ++i) {
            TopicName partition = topicName.getPartition(i);
            Map partitionPermissions = this.persistentTopics.getPermissionsOnTopic("my-tenant", "my-namespace", partition.getEncodedLocalName());
            Assert.assertEquals((Set)((Set)partitionPermissions.get(role)), null);
        }
    }

    @Test
    public void testTriggerCompactionTopic() {
        String partitionTopicName = "test-part";
        String nonPartitionTopicName = "test-non-part";
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.compact(response, "my-tenant", "my-namespace", "non-existing-topic", true);
        ArgumentCaptor errCaptor = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Throwable)errCaptor.capture());
        Assert.assertEquals((int)((RestException)((Object)errCaptor.getValue())).getResponse().getStatus(), (int)Response.Status.NOT_FOUND.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "test-non-part", true);
        this.persistentTopics.compact(response, "my-tenant", "my-namespace", "test-non-part", true);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", "test-part", 2);
        this.persistentTopics.compact(response, "my-tenant", "my-namespace", "test-part", true);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
    }

    @Test
    public void testPeekWithSubscriptionNameNotExist() throws Exception {
        String topicName = "testTopic";
        String topic = TopicName.get((String)TopicDomain.persistent.value(), (String)"my-tenant", (String)"my-namespace", (String)"testTopic").toString();
        String subscriptionName = "sub";
        this.admin.topics().createPartitionedTopic(topic, 3);
        String partitionedTopic = topic + "-partition-0";
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).create();
        for (int i = 0; i < 100; ++i) {
            producer.send((Object)("test" + i));
        }
        List messages = this.admin.topics().peekMessages(partitionedTopic, "sub", 5);
        Assert.assertEquals((int)messages.size(), (int)5);
        producer.close();
    }

    @Test
    public void testGetLastMessageId() throws Exception {
        TenantInfo tenantInfo = new TenantInfo((Set)Sets.newHashSet((Object[])new String[]{"role1", "role2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.tenants().createTenant("prop-xyz", tenantInfo);
        this.admin.namespaces().createNamespace("prop-xyz/ns1", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        String topicName = "persistent://prop-xyz/ns1/testGetLastMessageId";
        this.admin.topics().createNonPartitionedTopic("persistent://prop-xyz/ns1/testGetLastMessageId");
        Producer batchProducer = this.pulsarClient.newProducer().topic("persistent://prop-xyz/ns1/testGetLastMessageId").enableBatching(true).batchingMaxMessages(100).batchingMaxPublishDelay(2L, TimeUnit.SECONDS).create();
        this.admin.topics().createSubscription("persistent://prop-xyz/ns1/testGetLastMessageId", "test", MessageId.earliest);
        CompletableFuture completableFuture = new CompletableFuture();
        for (int i = 0; i < 10; ++i) {
            completableFuture = batchProducer.sendAsync((Object)"test".getBytes());
        }
        completableFuture.get();
        Assert.assertEquals((int)((BatchMessageIdImpl)this.admin.topics().getLastMessageId("persistent://prop-xyz/ns1/testGetLastMessageId")).getBatchIndex(), (int)9);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop-xyz/ns1/testGetLastMessageId").enableBatching(false).create();
        producer.send((Object)"test".getBytes());
        Assert.assertTrue((boolean)(this.admin.topics().getLastMessageId("persistent://prop-xyz/ns1/testGetLastMessageId") instanceof MessageIdImpl));
    }

    @Test
    public void testExamineMessage() throws Exception {
        TenantInfo tenantInfo = new TenantInfo((Set)Sets.newHashSet((Object[])new String[]{"role1", "role2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.tenants().createTenant("tenant-xyz", tenantInfo);
        this.admin.namespaces().createNamespace("tenant-xyz/ns-abc", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        String topicName = "persistent://tenant-xyz/ns-abc/topic-123";
        this.admin.topics().createPartitionedTopic("persistent://tenant-xyz/ns-abc/topic-123", 2);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://tenant-xyz/ns-abc/topic-123-partition-0").create();
        try {
            this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123", "earliest", 1L);
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((String)e.getMessage(), (String)"Examine messages on a partitioned topic is not allowed, please try examine message on specific topic partition");
        }
        producer.send((Object)"message1");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "earliest", 1L).getData()), (String)"message1");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "latest", 1L).getData()), (String)"message1");
        producer.send((Object)"message2");
        producer.send((Object)"message3");
        producer.send((Object)"message4");
        producer.send((Object)"message5");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "earliest", 1L).getData()), (String)"message1");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "earliest", 2L).getData()), (String)"message2");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "earliest", 3L).getData()), (String)"message3");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "earliest", 4L).getData()), (String)"message4");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "earliest", 5L).getData()), (String)"message5");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "latest", 1L).getData()), (String)"message5");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "latest", 2L).getData()), (String)"message4");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "latest", 3L).getData()), (String)"message3");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "latest", 4L).getData()), (String)"message2");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "latest", 5L).getData()), (String)"message1");
    }

    @Test
    public void testOffloadWithNullMessageId() {
        String topicName = "topic-123";
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "topic-123", true);
        try {
            this.persistentTopics.triggerOffload("my-tenant", "my-namespace", "topic-123", true, null);
            Assert.fail((String)"should have failed");
        }
        catch (RestException e) {
            Assert.assertEquals((int)e.getResponse().getStatus(), (int)Response.Status.BAD_REQUEST.getStatusCode());
        }
    }
}

