/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.jute.Record;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.mockito.ArgumentMatchers;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class TopicOwnerTest {
    private static final Logger log = LoggerFactory.getLogger(TopicOwnerTest.class);
    LocalBookkeeperEnsemble bkEnsemble;
    protected PulsarAdmin[] pulsarAdmins = new PulsarAdmin[5];
    protected static final int BROKER_COUNT = 5;
    protected ServiceConfiguration[] configurations = new ServiceConfiguration[5];
    protected PulsarService[] pulsarServices = new PulsarService[5];
    protected PulsarService leaderPulsar;
    protected PulsarAdmin leaderAdmin;
    protected String testCluster = "my-cluster";
    protected String testTenant = "my-tenant";
    protected String testNamespace = this.testTenant + "/my-ns";

    @BeforeMethod
    void setup() throws Exception {
        log.info("---- Initializing TopicOwnerTest -----");
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
        this.bkEnsemble.start();
        for (int i = 0; i < 5; ++i) {
            ServiceConfiguration config = new ServiceConfiguration();
            config.setBrokerShutdownTimeoutMs(0L);
            config.setBrokerServicePort(Optional.of(0));
            config.setClusterName("my-cluster");
            config.setAdvertisedAddress("localhost");
            config.setWebServicePort(Optional.of(0));
            config.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
            config.setDefaultNumberOfNamespaceBundles(1);
            config.setLoadBalancerEnabled(false);
            this.configurations[i] = config;
            this.pulsarServices[i] = new PulsarService(config);
            this.pulsarServices[i].start();
            while (i == 0 && !this.pulsarServices[0].getLeaderElectionService().isLeader()) {
                Thread.sleep(10L);
            }
            this.pulsarAdmins[i] = PulsarAdmin.builder().serviceHttpUrl(this.pulsarServices[i].getWebServiceAddress()).build();
        }
        this.leaderPulsar = this.pulsarServices[0];
        this.leaderAdmin = this.pulsarAdmins[0];
        Thread.sleep(1000L);
        this.pulsarAdmins[0].clusters().createCluster(this.testCluster, ClusterData.builder().serviceUrl(this.pulsarServices[0].getWebServiceAddress()).build());
        TenantInfo tenantInfo = TenantInfo.builder().allowedClusters((Set)Sets.newHashSet((Object[])new String[]{this.testCluster})).build();
        this.pulsarAdmins[0].tenants().createTenant(this.testTenant, tenantInfo);
        this.pulsarAdmins[0].namespaces().createNamespace(this.testNamespace, 16);
    }

    @AfterMethod(alwaysRun=true)
    void tearDown() throws Exception {
        for (int i = 0; i < 5; ++i) {
            this.pulsarServices[i].close();
            this.pulsarAdmins[i].close();
        }
        this.bkEnsemble.stop();
    }

    private MutableObject<PulsarService> spyLeaderNamespaceServiceForAuthorizedBroker() {
        NamespaceService leaderNamespaceService = this.leaderPulsar.getNamespaceService();
        NamespaceService spyLeaderNamespaceService = (NamespaceService)PowerMockito.spy((Object)leaderNamespaceService);
        MutableObject leaderAuthorizedBroker = new MutableObject();
        Answer answer = invocation -> {
            PulsarService pulsarService = (PulsarService)leaderAuthorizedBroker.getValue();
            if (pulsarService == null) {
                return (CompletableFuture)invocation.callRealMethod();
            }
            LookupResult lookupResult = new LookupResult(pulsarService.getWebServiceAddress(), pulsarService.getWebServiceAddressTls(), pulsarService.getBrokerServiceUrl(), pulsarService.getBrokerServiceUrlTls(), true);
            return CompletableFuture.completedFuture(Optional.of(lookupResult));
        };
        ((NamespaceService)PowerMockito.doAnswer((Answer)answer).when((Object)spyLeaderNamespaceService)).getBrokerServiceUrlAsync((TopicName)ArgumentMatchers.any(TopicName.class), (LookupOptions)ArgumentMatchers.any(LookupOptions.class));
        Whitebox.setInternalState((Object)this.leaderPulsar, (String)"nsService", (Object)spyLeaderNamespaceService);
        return leaderAuthorizedBroker;
    }

    private CompletableFuture<Void> watchMetadataStoreReconnect(MetadataStoreExtended store) {
        CompletableFuture<Void> reconnectedFuture = new CompletableFuture<Void>();
        store.registerSessionListener(event -> {
            if (event == SessionEvent.Reconnected || event == SessionEvent.SessionReestablished) {
                reconnectedFuture.complete(null);
            }
        });
        return reconnectedFuture;
    }

    private void spyZookeeperToDisconnectBeforePersist(ZooKeeper zooKeeper, RequestMatcher matcher) {
        ZooKeeperServer zooKeeperServer = this.bkEnsemble.getZkServer();
        ServerCnxn zkServerConnection = this.bkEnsemble.getZookeeperServerConnection(zooKeeper);
        ZooKeeperServer spyZooKeeperServer = (ZooKeeperServer)PowerMockito.spy((Object)zooKeeperServer);
        Whitebox.setInternalState((Object)zkServerConnection, (String)"zkServer", (Object)spyZooKeeperServer);
        ((ZooKeeperServer)PowerMockito.doAnswer(invocation -> {
            Request request = (Request)invocation.getArgument(0);
            if (request.sessionId != zooKeeper.getSessionId()) {
                return invocation.callRealMethod();
            }
            if (!matcher.match(request)) {
                return invocation.callRealMethod();
            }
            Whitebox.setInternalState((Object)zkServerConnection, (String)"zkServer", (Object)zooKeeperServer);
            this.bkEnsemble.disconnectZookeeper(zooKeeper);
            return null;
        }).when((Object)spyZooKeeperServer)).submitRequest((Request)ArgumentMatchers.any(Request.class));
    }

    private void spyZookeeperToDisconnectAfterPersist(ZooKeeper zooKeeper, RequestMatcher matcher) {
        ZooKeeperServer zooKeeperServer = this.bkEnsemble.getZkServer();
        ServerCnxn zkServerConnection = this.bkEnsemble.getZookeeperServerConnection(zooKeeper);
        ZooKeeperServer spyZooKeeperServer = (ZooKeeperServer)PowerMockito.spy((Object)zooKeeperServer);
        Whitebox.setInternalState((Object)zkServerConnection, (String)"zkServer", (Object)spyZooKeeperServer);
        MutableBoolean disconnected = new MutableBoolean();
        ((ZooKeeperServer)PowerMockito.doAnswer(invocation -> {
            Request request = (Request)invocation.getArgument(0);
            if (request.sessionId != zooKeeper.getSessionId()) {
                return invocation.callRealMethod();
            }
            if (!matcher.match(request)) {
                return invocation.callRealMethod();
            }
            ServerCnxn spyZkServerConnection1 = (ServerCnxn)PowerMockito.spy((Object)zkServerConnection);
            ((ServerCnxn)PowerMockito.doAnswer(responseInvocation -> {
                MutableBoolean mutableBoolean = disconnected;
                synchronized (mutableBoolean) {
                    ReplyHeader replyHeader = (ReplyHeader)responseInvocation.getArgument(0);
                    if (replyHeader.getXid() == request.cxid && replyHeader.getErr() == 0) {
                        Whitebox.setInternalState((Object)zkServerConnection, (String)"zkServer", (Object)zooKeeperServer);
                        disconnected.setTrue();
                        this.bkEnsemble.disconnectZookeeper(zooKeeper);
                    } else if (disconnected.isFalse()) {
                        return responseInvocation.callRealMethod();
                    }
                    return null;
                }
            }).when((Object)spyZkServerConnection1)).sendResponse((ReplyHeader)ArgumentMatchers.any(ReplyHeader.class), (Record)ArgumentMatchers.nullable(Record.class), (String)ArgumentMatchers.any(String.class));
            Whitebox.setInternalState((Object)request, (String)"cnxn", (Object)spyZkServerConnection1);
            return invocation.callRealMethod();
        }).when((Object)spyZooKeeperServer)).submitRequest((Request)ArgumentMatchers.any(Request.class));
    }

    @Test
    public void testReestablishOwnershipAfterInvalidateCache() throws Exception {
        String topic1 = "persistent://my-tenant/my-ns/topic-1";
        NamespaceService leaderNamespaceService = this.leaderPulsar.getNamespaceService();
        NamespaceBundle namespaceBundle = leaderNamespaceService.getBundle(TopicName.get((String)topic1));
        MutableObject<PulsarService> leaderAuthorizedBroker = this.spyLeaderNamespaceServiceForAuthorizedBroker();
        PulsarService pulsar1 = this.pulsarServices[1];
        leaderAuthorizedBroker.setValue((Object)pulsar1);
        Assert.assertEquals((String)this.pulsarAdmins[0].lookups().lookupTopic(topic1), (String)pulsar1.getBrokerServiceUrl());
        Assert.assertEquals((String)this.pulsarAdmins[1].lookups().lookupTopic(topic1), (String)pulsar1.getBrokerServiceUrl());
        Assert.assertEquals((String)this.pulsarAdmins[2].lookups().lookupTopic(topic1), (String)pulsar1.getBrokerServiceUrl());
        Assert.assertEquals((String)this.pulsarAdmins[3].lookups().lookupTopic(topic1), (String)pulsar1.getBrokerServiceUrl());
        Assert.assertEquals((String)this.pulsarAdmins[4].lookups().lookupTopic(topic1), (String)pulsar1.getBrokerServiceUrl());
        OwnershipCache ownershipCache1 = pulsar1.getNamespaceService().getOwnershipCache();
        AsyncLoadingCache ownedBundlesCache1 = (AsyncLoadingCache)Whitebox.getInternalState((Object)ownershipCache1, (String)"ownedBundlesCache");
        leaderAuthorizedBroker.setValue(null);
        Assert.assertNotNull((Object)ownershipCache1.getOwnedBundle(namespaceBundle));
        ownedBundlesCache1.synchronous().invalidate((Object)namespaceBundle);
        Assert.assertNull((Object)ownershipCache1.getOwnedBundle(namespaceBundle));
        Assert.assertEquals((String)this.pulsarAdmins[0].lookups().lookupTopic(topic1), (String)pulsar1.getBrokerServiceUrl());
        Assert.assertEquals((String)this.pulsarAdmins[2].lookups().lookupTopic(topic1), (String)pulsar1.getBrokerServiceUrl());
        Assert.assertEquals((String)this.pulsarAdmins[3].lookups().lookupTopic(topic1), (String)pulsar1.getBrokerServiceUrl());
        Assert.assertEquals((String)this.pulsarAdmins[4].lookups().lookupTopic(topic1), (String)pulsar1.getBrokerServiceUrl());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConnectToInvalidateBundleCacheBroker() throws Exception {
        Assert.assertEquals((int)this.pulsarAdmins[0].namespaces().getPolicies((String)"my-tenant/my-ns").bundles.getNumBundles(), (int)16);
        String topic1 = "persistent://my-tenant/my-ns/topic-1";
        String topic2 = "persistent://my-tenant/my-ns/topic-2";
        String serviceUrlForTopic1 = this.pulsarAdmins[0].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1");
        String serviceUrlForTopic2 = this.pulsarAdmins[0].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-2");
        while (serviceUrlForTopic1.equals(serviceUrlForTopic2)) {
            this.pulsarAdmins[0].namespaces().unload("my-tenant/my-ns");
            serviceUrlForTopic1 = this.pulsarAdmins[0].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-1");
            serviceUrlForTopic2 = this.pulsarAdmins[0].lookups().lookupTopic("persistent://my-tenant/my-ns/topic-2");
        }
        this.pulsarAdmins[0].namespaces().splitNamespaceBundle("my-tenant/my-ns", this.pulsarServices[0].getNamespaceService().getBundle(TopicName.get((String)"persistent://my-tenant/my-ns/topic-1")).getBundleRange(), true, null);
        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrlForTopic1).build();
        try {
            Consumer consumer = client.newConsumer().topic(new String[]{"persistent://my-tenant/my-ns/topic-2"}).subscriptionName("test").subscribe();
            Assert.assertTrue((boolean)consumer.isConnected());
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    @Test
    public void testLookupPartitionedTopic() throws Exception {
        int partitions = 5;
        String topic = "persistent://my-tenant/my-ns/partitionedTopic";
        this.pulsarAdmins[0].topics().createPartitionedTopic("persistent://my-tenant/my-ns/partitionedTopic", 5);
        Map allPartitionMap = this.pulsarAdmins[0].lookups().lookupPartitionedTopic("persistent://my-tenant/my-ns/partitionedTopic");
        Assert.assertEquals((int)5, (int)allPartitionMap.size());
        LinkedHashMap<String, String> partitionedMap = new LinkedHashMap<String, String>();
        for (int i = 0; i < 5; ++i) {
            String partitionTopicName = "persistent://my-tenant/my-ns/partitionedTopic-partition-" + i;
            partitionedMap.put(partitionTopicName, this.pulsarAdmins[0].lookups().lookupTopic(partitionTopicName));
        }
        Assert.assertEquals((int)allPartitionMap.size(), (int)partitionedMap.size());
        for (Map.Entry entry : allPartitionMap.entrySet()) {
            Assert.assertTrue((boolean)((String)entry.getValue()).equalsIgnoreCase((String)partitionedMap.get(entry.getKey())));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListNonPersistentTopic() throws Exception {
        String topicName = "non-persistent://my-tenant/my-ns/my-topic";
        this.pulsarAdmins[0].topics().createPartitionedTopic("non-persistent://my-tenant/my-ns/my-topic", 16);
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarServices[0].getBrokerServiceUrl()).build();
        try {
            Consumer consumer = client.newConsumer().topic(new String[]{"non-persistent://my-tenant/my-ns/my-topic"}).subscriptionName("my-sub").subscribe();
            List topics = this.pulsarAdmins[0].topics().getList("my-tenant/my-ns");
            Assert.assertEquals((int)topics.size(), (int)16);
            for (String topic : topics) {
                Assert.assertTrue((boolean)topic.contains("non-persistent"));
                Assert.assertTrue((boolean)topic.contains("my-tenant/my-ns/my-topic"));
            }
            consumer.close();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    @FunctionalInterface
    static interface RequestMatcher {
        public boolean match(Request var1) throws Exception;
    }
}

