/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.security.KeyStore;
import java.security.PrivateKey;
import java.security.SecureRandom;
import java.security.cert.Certificate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.discovery.service.DiscoveryService;
import org.apache.pulsar.discovery.service.server.ServiceConfig;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.zookeeper.ZooKeeper;
import org.asynchttpclient.AsyncCompletionHandler;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncHttpClientConfig;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
import org.asynchttpclient.channel.KeepAliveStrategy;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker-api"})
public class BrokerServiceLookupTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(BrokerServiceLookupTest.class);

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setDefaultNumberOfNamespaceBundles(1);
        this.isTcpLookup = true;
        this.internalSetup();
        this.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        this.internalCleanup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMultipleBrokerLookup() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        ServiceConfiguration conf2 = new ServiceConfiguration();
        conf2.setBrokerShutdownTimeoutMs(0L);
        conf2.setBrokerServicePort(Optional.of(0));
        conf2.setWebServicePort(Optional.of(0));
        conf2.setAdvertisedAddress("localhost");
        conf2.setClusterName(this.conf.getClusterName());
        conf2.setZookeeperServers("localhost:2181");
        conf2.setConfigurationStoreServers("localhost:3181");
        PulsarService pulsar2 = this.startBroker(conf2);
        try {
            ((LoadManager)this.pulsar.getLoadManager().get()).writeLoadReportOnZookeeper();
            ((LoadManager)pulsar2.getLoadManager().get()).writeLoadReportOnZookeeper();
            LoadManager loadManager1 = (LoadManager)Mockito.spy(this.pulsar.getLoadManager().get());
            LoadManager loadManager2 = (LoadManager)Mockito.spy(pulsar2.getLoadManager().get());
            Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager");
            loadManagerField.setAccessible(true);
            ((LoadManager)Mockito.doReturn((Object)true).when((Object)loadManager2)).isCentralized();
            loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<LoadManager>(loadManager2));
            ((LoadManager)Mockito.doReturn((Object)true).when((Object)loadManager1)).isCentralized();
            SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getSafeWebServiceAddress(), null);
            ((LoadManager)Mockito.doReturn(Optional.of(resourceUnit)).when((Object)loadManager1)).getLeastLoaded((ServiceUnitId)ArgumentMatchers.any(ServiceUnitId.class));
            ((LoadManager)Mockito.doReturn(Optional.of(resourceUnit)).when((Object)loadManager2)).getLeastLoaded((ServiceUnitId)ArgumentMatchers.any(ServiceUnitId.class));
            loadManagerField.set(this.pulsar.getNamespaceService(), new AtomicReference<LoadManager>(loadManager1));
            PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(pulsar2.getBrokerServiceUrl()).build();
            try {
                Consumer consumer = pulsarClient2.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
                Producer producer = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/my-topic1").create();
                for (int i = 0; i < 10; ++i) {
                    String message = "my-message-" + i;
                    producer.send((Object)message.getBytes());
                }
                Message msg = null;
                HashSet messageSet = Sets.newHashSet();
                for (int i = 0; i < 10; ++i) {
                    msg = consumer.receive(5, TimeUnit.SECONDS);
                    String receivedMessage = new String(msg.getData());
                    log.debug("Received message: [{}]", (Object)receivedMessage);
                    String expectedMessage = "my-message-" + i;
                    this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
                }
                consumer.acknowledgeCumulative(msg);
                consumer.close();
                producer.close();
            }
            finally {
                if (Collections.singletonList(pulsarClient2).get(0) != null) {
                    pulsarClient2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(pulsar2).get(0) != null) {
                pulsar2.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentWriteBrokerData() throws Exception {
        ConcurrentHashMap<String, NamespaceBundleStats> map = new ConcurrentHashMap<String, NamespaceBundleStats>();
        for (int i = 0; i < 100; ++i) {
            map.put("key" + i, new NamespaceBundleStats());
        }
        BrokerService brokerService = (BrokerService)Mockito.mock(BrokerService.class);
        ((PulsarService)Mockito.doReturn((Object)brokerService).when((Object)this.pulsar)).getBrokerService();
        ((BrokerService)Mockito.doReturn(map).when((Object)brokerService)).getBundleStats();
        ModularLoadManagerWrapper loadManager = (ModularLoadManagerWrapper)this.pulsar.getLoadManager().get();
        ExecutorService executor = Executors.newFixedThreadPool(10);
        try {
            ArrayList list = new ArrayList();
            for (int i = 0; i < 1000; ++i) {
                LocalBrokerData localBrokerData = loadManager.getLoadManager().updateLocalBrokerData();
                localBrokerData.cleanDeltas();
                localBrokerData.getBundles().clear();
                list.add(executor.submit(() -> {
                    try {
                        Assert.assertNotNull((Object)loadManager.generateLoadReport());
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }));
                list.add(executor.submit(() -> {
                    try {
                        loadManager.writeLoadReportOnZookeeper();
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }));
            }
            for (Future future : list) {
                future.get();
            }
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(enabled=false)
    public void testMultipleBrokerDifferentClusterLookup() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String newCluster = "use2";
        String property = "my-property2";
        ServiceConfiguration conf2 = new ServiceConfiguration();
        conf2.setAdvertisedAddress("localhost");
        conf2.setBrokerShutdownTimeoutMs(0L);
        conf2.setBrokerServicePort(Optional.of(0));
        conf2.setWebServicePort(Optional.of(0));
        conf2.setAdvertisedAddress("localhost");
        conf2.setClusterName("use2");
        conf2.setZookeeperServers("localhost:2181");
        conf2.setConfigurationStoreServers("localhost:3181");
        String broker2ServiceUrl = "pulsar://localhost:" + conf2.getBrokerServicePort().get();
        this.admin.clusters().createCluster("use2", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).brokerServiceUrl(broker2ServiceUrl).build());
        this.admin.tenants().createTenant("my-property2", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2"}), (Set)Sets.newHashSet((Object[])new String[]{"use2"})));
        this.admin.namespaces().createNamespace("my-property2/use2/my-ns");
        PulsarService pulsar2 = this.startBroker(conf2);
        try {
            ((LoadManager)this.pulsar.getLoadManager().get()).writeLoadReportOnZookeeper();
            ((LoadManager)pulsar2.getLoadManager().get()).writeLoadReportOnZookeeper();
            URI brokerServiceUrl = new URI(broker2ServiceUrl);
            PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(brokerServiceUrl.toString()).build();
            try {
                this.pulsar.getConfiguration().setAuthorizationEnabled(true);
                this.stopBroker();
                this.startBroker();
                LoadManager loadManager2 = (LoadManager)Mockito.spy(pulsar2.getLoadManager().get());
                Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager");
                loadManagerField.setAccessible(true);
                ((LoadManager)Mockito.doReturn((Object)true).when((Object)loadManager2)).isCentralized();
                SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getSafeWebServiceAddress(), null);
                ((LoadManager)Mockito.doReturn(Optional.of(resourceUnit)).when((Object)loadManager2)).getLeastLoaded((ServiceUnitId)ArgumentMatchers.any(ServiceUnitId.class));
                loadManagerField.set(this.pulsar.getNamespaceService(), new AtomicReference<LoadManager>(loadManager2));
                Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property2/use2/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
                Producer producer = pulsarClient2.newProducer(Schema.BYTES).topic("persistent://my-property2/use2/my-ns/my-topic1").create();
                for (int i = 0; i < 10; ++i) {
                    String message = "my-message-" + i;
                    producer.send((Object)message.getBytes());
                }
                Message msg = null;
                HashSet messageSet = Sets.newHashSet();
                for (int i = 0; i < 10; ++i) {
                    msg = consumer.receive(5, TimeUnit.SECONDS);
                    String receivedMessage = new String(msg.getData());
                    log.debug("Received message: [{}]", (Object)receivedMessage);
                    String expectedMessage = "my-message-" + i;
                    this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
                }
                consumer.acknowledgeCumulative(msg);
                consumer.close();
                producer.close();
                this.pulsar.getConfiguration().setAuthorizationEnabled(false);
                loadManager2 = null;
            }
            finally {
                if (Collections.singletonList(pulsarClient2).get(0) != null) {
                    pulsarClient2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(pulsar2).get(0) != null) {
                pulsar2.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPartitionTopicLookup() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int numPartitions = 8;
        TopicName topicName = TopicName.get((String)"persistent://my-property/my-ns/my-partitionedtopic1");
        this.admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
        ServiceConfiguration conf2 = new ServiceConfiguration();
        conf2.setAdvertisedAddress("localhost");
        conf2.setBrokerShutdownTimeoutMs(0L);
        conf2.setBrokerServicePort(Optional.of(0));
        conf2.setWebServicePort(Optional.of(0));
        conf2.setAdvertisedAddress("localhost");
        conf2.setClusterName(this.pulsar.getConfiguration().getClusterName());
        conf2.setZookeeperServers("localhost:2181");
        conf2.setConfigurationStoreServers("localhost:3181");
        PulsarService pulsar2 = this.startBroker(conf2);
        try {
            ((LoadManager)this.pulsar.getLoadManager().get()).writeLoadReportOnZookeeper();
            ((LoadManager)pulsar2.getLoadManager().get()).writeLoadReportOnZookeeper();
            LoadManager loadManager1 = (LoadManager)Mockito.spy(this.pulsar.getLoadManager().get());
            LoadManager loadManager2 = (LoadManager)Mockito.spy(pulsar2.getLoadManager().get());
            Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager");
            loadManagerField.setAccessible(true);
            ((LoadManager)Mockito.doReturn((Object)true).when((Object)loadManager1)).isCentralized();
            loadManagerField.set(this.pulsar.getNamespaceService(), new AtomicReference<LoadManager>(loadManager1));
            ((LoadManager)Mockito.doReturn((Object)true).when((Object)loadManager2)).isCentralized();
            loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<LoadManager>(loadManager2));
            Producer producer = this.pulsarClient.newProducer(Schema.BYTES).topic(topicName.toString()).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-partitioned-subscriber").subscribe();
            for (int i = 0; i < 20; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            Message msg = null;
            HashSet messageSet = Sets.newHashSet();
            for (int i = 0; i < 20; ++i) {
                msg = consumer.receive(5, TimeUnit.SECONDS);
                Assert.assertNotNull((Object)msg, (String)"Message should not be null");
                consumer.acknowledge(msg);
                String receivedMessage = new String(msg.getData());
                log.debug("Received message: [{}]", (Object)receivedMessage);
                Assert.assertTrue((boolean)messageSet.add(receivedMessage), (String)("Message " + receivedMessage + " already received"));
            }
            producer.close();
            consumer.unsubscribe();
            consumer.close();
            this.admin.topics().deletePartitionedTopic(topicName.toString());
            loadManager2 = null;
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        finally {
            if (Collections.singletonList(pulsar2).get(0) != null) {
                pulsar2.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWebserviceServiceTls() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
        String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";
        String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/certificate/client.crt";
        String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key";
        ServiceConfiguration conf2 = new ServiceConfiguration();
        conf2.setBrokerShutdownTimeoutMs(0L);
        conf2.setAdvertisedAddress("localhost");
        conf2.setBrokerShutdownTimeoutMs(0L);
        conf2.setBrokerServicePort(Optional.of(0));
        conf2.setBrokerServicePortTls(Optional.of(0));
        conf2.setWebServicePort(Optional.of(0));
        conf2.setWebServicePortTls(Optional.of(0));
        conf2.setAdvertisedAddress("localhost");
        conf2.setTlsAllowInsecureConnection(true);
        conf2.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
        conf2.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
        conf2.setClusterName(this.conf.getClusterName());
        conf2.setZookeeperServers("localhost:2181");
        conf2.setConfigurationStoreServers("localhost:3181");
        PulsarService pulsar2 = this.startBroker(conf2);
        try {
            this.conf.setBrokerServicePortTls(Optional.of(0));
            this.conf.setWebServicePortTls(Optional.of(0));
            this.conf.setTlsAllowInsecureConnection(true);
            this.conf.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
            this.conf.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
            this.conf.setNumExecutorThreadPoolSize(5);
            this.stopBroker();
            this.startBroker();
            ((LoadManager)this.pulsar.getLoadManager().get()).writeLoadReportOnZookeeper();
            ((LoadManager)pulsar2.getLoadManager().get()).writeLoadReportOnZookeeper();
            LoadManager loadManager1 = (LoadManager)Mockito.spy(this.pulsar.getLoadManager().get());
            LoadManager loadManager2 = (LoadManager)Mockito.spy(pulsar2.getLoadManager().get());
            Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager");
            loadManagerField.setAccessible(true);
            ((LoadManager)Mockito.doReturn((Object)true).when((Object)loadManager2)).isCentralized();
            loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<LoadManager>(loadManager2));
            loadManagerField.set(this.pulsar.getNamespaceService(), new AtomicReference<LoadManager>(loadManager1));
            ((LoadManager)Mockito.doReturn((Object)true).when((Object)loadManager1)).isCentralized();
            ((LoadManager)Mockito.doReturn((Object)true).when((Object)loadManager2)).isCentralized();
            SimpleResourceUnit resourceUnit = new SimpleResourceUnit(this.pulsar.getWebServiceAddress(), null);
            ((LoadManager)Mockito.doReturn(Optional.of(resourceUnit)).when((Object)loadManager2)).getLeastLoaded((ServiceUnitId)ArgumentMatchers.any(ServiceUnitId.class));
            ((LoadManager)Mockito.doReturn(Optional.of(resourceUnit)).when((Object)loadManager1)).getLeastLoaded((ServiceUnitId)ArgumentMatchers.any(ServiceUnitId.class));
            URI brokerServiceUrl = new URI("pulsar://localhost:" + conf2.getBrokerServicePort().get());
            PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(brokerServiceUrl.toString()).build();
            try {
                String lookupResourceUrl = "/lookup/v2/topic/persistent/my-property/my-ns/my-topic1";
                KeyManager[] keyManagers = null;
                Certificate[] tlsCert = SecurityUtility.loadCertificatesFromPemFile((String)"./src/test/resources/certificate/client.crt");
                PrivateKey tlsKey = SecurityUtility.loadPrivateKeyFromPemFile((String)"./src/test/resources/certificate/client.key");
                KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
                ks.load(null, null);
                ks.setKeyEntry("private", tlsKey, "".toCharArray(), tlsCert);
                KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
                kmf.init(ks, "".toCharArray());
                keyManagers = kmf.getKeyManagers();
                TrustManager[] trustManagers = InsecureTrustManagerFactory.INSTANCE.getTrustManagers();
                SSLContext sslCtx = SSLContext.getInstance("TLS");
                sslCtx.init(keyManagers, trustManagers, new SecureRandom());
                HttpsURLConnection.setDefaultSSLSocketFactory(sslCtx.getSocketFactory());
                URLConnection con = new URL(pulsar2.getWebServiceAddressTls() + "/lookup/v2/topic/persistent/my-property/my-ns/my-topic1").openConnection();
                log.info("orignal url: {}", (Object)con.getURL());
                con.connect();
                log.info("connected url: {} ", (Object)con.getURL());
                Assert.assertEquals((Object)new Integer(con.getURL().getPort()), conf2.getWebServicePortTls().get());
                InputStream is = con.getInputStream();
                log.info("redirected url: {}", (Object)con.getURL());
                Assert.assertEquals((Object)new Integer(con.getURL().getPort()), this.conf.getWebServicePortTls().get());
                is.close();
                loadManager1 = null;
                loadManager2 = null;
            }
            finally {
                if (Collections.singletonList(pulsarClient2).get(0) != null) {
                    pulsarClient2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(pulsar2).get(0) != null) {
                pulsar2.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDiscoveryLookup() throws Exception {
        ServiceConfig config = new ServiceConfig();
        config.setServicePort(Optional.of(0));
        config.setBindOnLocalhost(true);
        DiscoveryService discoveryService = this.createAndStartDiscoveryService(config);
        try {
            String discoverySvcUrl = discoveryService.getServiceUrl();
            PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(discoverySvcUrl).build();
            try {
                Consumer consumer = pulsarClient2.newConsumer().topic(new String[]{"persistent://my-property2/use2/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
                Producer producer = pulsarClient2.newProducer(Schema.BYTES).topic("persistent://my-property2/use2/my-ns/my-topic1").create();
                for (int i = 0; i < 10; ++i) {
                    String message = "my-message-" + i;
                    producer.send((Object)message.getBytes());
                }
                Message msg = null;
                HashSet messageSet = Sets.newHashSet();
                for (int i = 0; i < 10; ++i) {
                    msg = consumer.receive(5, TimeUnit.SECONDS);
                    String receivedMessage = new String(msg.getData());
                    log.debug("Received message: [{}]", (Object)receivedMessage);
                    String expectedMessage = "my-message-" + i;
                    this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
                }
                consumer.acknowledgeCumulative(msg);
                consumer.close();
                producer.close();
            }
            finally {
                if (Collections.singletonList(pulsarClient2).get(0) != null) {
                    pulsarClient2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(discoveryService).get(0) != null) {
                discoveryService.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDiscoveryLookupTls() throws Exception {
        String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
        String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";
        String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/certificate/client.crt";
        String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key";
        this.conf.setBrokerServicePortTls(Optional.ofNullable(0));
        this.conf.setWebServicePortTls(Optional.ofNullable(0));
        this.conf.setTlsAllowInsecureConnection(true);
        this.conf.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
        this.conf.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
        this.conf.setNumExecutorThreadPoolSize(5);
        this.stopBroker();
        this.startBroker();
        ServiceConfig config = new ServiceConfig();
        config.setServicePort(Optional.of(0));
        config.setServicePortTls(Optional.of(0));
        config.setBindOnLocalhost(true);
        config.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
        config.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
        DiscoveryService discoveryService = this.createAndStartDiscoveryService(config);
        try {
            String discoverySvcUrl = discoveryService.getServiceUrlTls();
            HashMap<String, String> authParams = new HashMap<String, String>();
            authParams.put("tlsCertFile", "./src/test/resources/certificate/client.crt");
            authParams.put("tlsKeyFile", "./src/test/resources/certificate/client.key");
            AuthenticationTls auth = new AuthenticationTls();
            auth.configure(authParams);
            PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(discoverySvcUrl).authentication((Authentication)auth).enableTls(true).allowTlsInsecureConnection(true).build();
            try {
                Consumer consumer = pulsarClient2.newConsumer().topic(new String[]{"persistent://my-property2/use2/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
                Producer producer = pulsarClient2.newProducer(Schema.BYTES).topic("persistent://my-property2/use2/my-ns/my-topic1").create();
                for (int i = 0; i < 10; ++i) {
                    String message = "my-message-" + i;
                    producer.send((Object)message.getBytes());
                }
                Message msg = null;
                HashSet messageSet = Sets.newHashSet();
                for (int i = 0; i < 10; ++i) {
                    msg = consumer.receive(5, TimeUnit.SECONDS);
                    String receivedMessage = new String(msg.getData());
                    log.debug("Received message: [{}]", (Object)receivedMessage);
                    String expectedMessage = "my-message-" + i;
                    this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
                }
                consumer.acknowledgeCumulative(msg);
                consumer.close();
                producer.close();
            }
            finally {
                if (Collections.singletonList(pulsarClient2).get(0) != null) {
                    pulsarClient2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(discoveryService).get(0) != null) {
                discoveryService.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDiscoveryLookupAuthAndAuthSuccess() throws Exception {
        ServiceConfig config = new ServiceConfig();
        config.setServicePort(Optional.of(0));
        config.setBindOnLocalhost(true);
        HashSet providersClassNames = Sets.newHashSet((Object[])new String[]{MockAuthenticationProvider.class.getName()});
        config.setAuthenticationProviders((Set)providersClassNames);
        config.setAuthenticationEnabled(true);
        config.setAuthorizationEnabled(true);
        config.setZookeeperServers("localhost:2181");
        config.setConfigurationStoreServers("localhost:3181");
        DiscoveryService discoveryService = this.createAndStartDiscoveryService(config);
        try {
            String discoverySvcUrl = discoveryService.getServiceUrl();
            Authentication auth = new Authentication(){
                private static final long serialVersionUID = 1L;

                public void close() throws IOException {
                }

                public String getAuthMethodName() {
                    return "auth";
                }

                public AuthenticationDataProvider getAuthData() throws PulsarClientException {
                    return new AuthenticationDataProvider(){
                        private static final long serialVersionUID = 1L;
                    };
                }

                public void configure(Map<String, String> authParams) {
                }

                public void start() throws PulsarClientException {
                }
            };
            PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(discoverySvcUrl).authentication(auth).build();
            try {
                Consumer consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
                Producer producer = pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/use/my-ns/my-topic1").create();
                for (int i = 0; i < 10; ++i) {
                    String message = "my-message-" + i;
                    producer.send((Object)message.getBytes());
                }
                Message msg = null;
                HashSet messageSet = Sets.newHashSet();
                for (int i = 0; i < 10; ++i) {
                    msg = consumer.receive(5, TimeUnit.SECONDS);
                    String receivedMessage = new String(msg.getData());
                    log.debug("Received message: [{}]", (Object)receivedMessage);
                    String expectedMessage = "my-message-" + i;
                    this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
                }
                consumer.acknowledgeCumulative(msg);
                consumer.close();
                producer.close();
            }
            finally {
                if (Collections.singletonList(pulsarClient).get(0) != null) {
                    pulsarClient.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(discoveryService).get(0) != null) {
                discoveryService.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDiscoveryLookupAuthenticationFailure() throws Exception {
        ServiceConfig config = new ServiceConfig();
        config.setServicePort(Optional.of(0));
        config.setBindOnLocalhost(true);
        HashSet providersClassNames = Sets.newHashSet((Object[])new String[]{MockAuthenticationProviderFail.class.getName()});
        config.setAuthenticationProviders((Set)providersClassNames);
        config.setAuthenticationEnabled(true);
        config.setAuthorizationEnabled(true);
        DiscoveryService discoveryService = this.createAndStartDiscoveryService(config);
        try {
            String discoverySvcUrl = discoveryService.getServiceUrl();
            Authentication auth = new Authentication(){
                private static final long serialVersionUID = 1L;

                public void close() throws IOException {
                }

                public String getAuthMethodName() {
                    return "auth";
                }

                public AuthenticationDataProvider getAuthData() throws PulsarClientException {
                    return new AuthenticationDataProvider(){
                        private static final long serialVersionUID = 1L;
                    };
                }

                public void configure(Map<String, String> authParams) {
                }

                public void start() throws PulsarClientException {
                }
            };
            PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(discoverySvcUrl).authentication(auth).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
            try {
                try {
                    pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use2/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
                    Assert.fail((String)"should have failed due to authentication");
                }
                catch (PulsarClientException pulsarClientException) {
                    // empty catch block
                }
            }
            finally {
                if (Collections.singletonList(pulsarClient).get(0) != null) {
                    pulsarClient.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(discoveryService).get(0) != null) {
                discoveryService.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDiscoveryLookupAuthorizationFailure() throws Exception {
        ServiceConfig config = new ServiceConfig();
        config.setServicePort(Optional.of(0));
        config.setBindOnLocalhost(true);
        HashSet providersClassNames = Sets.newHashSet((Object[])new String[]{MockAuthorizationProviderFail.class.getName()});
        config.setAuthenticationProviders((Set)providersClassNames);
        config.setAuthenticationEnabled(true);
        config.setAuthorizationEnabled(true);
        DiscoveryService discoveryService = this.createAndStartDiscoveryService(config);
        try {
            String discoverySvcUrl = discoveryService.getServiceUrl();
            Authentication auth = new Authentication(){
                private static final long serialVersionUID = 1L;

                public void close() throws IOException {
                }

                public String getAuthMethodName() {
                    return "auth";
                }

                public AuthenticationDataProvider getAuthData() throws PulsarClientException {
                    return new AuthenticationDataProvider(){
                        private static final long serialVersionUID = 1L;
                    };
                }

                public void configure(Map<String, String> authParams) {
                }

                public void start() throws PulsarClientException {
                }
            };
            PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(discoverySvcUrl).authentication(auth).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
            try {
                try {
                    pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use2/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
                    Assert.fail((String)"should have failed due to authentication");
                }
                catch (PulsarClientException e) {
                    Assert.assertTrue((boolean)(e instanceof PulsarClientException.LookupException));
                }
            }
            finally {
                if (Collections.singletonList(pulsarClient).get(0) != null) {
                    pulsarClient.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(discoveryService).get(0) != null) {
                discoveryService.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=5000L)
    public void testSplitUnloadLookupTest() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String namespace = "my-property/my-ns";
        ServiceConfiguration conf2 = new ServiceConfiguration();
        conf2.setAdvertisedAddress("localhost");
        conf2.setBrokerShutdownTimeoutMs(0L);
        conf2.setBrokerServicePort(Optional.of(0));
        conf2.setWebServicePort(Optional.of(0));
        conf2.setAdvertisedAddress("localhost");
        conf2.setClusterName(this.conf.getClusterName());
        conf2.setZookeeperServers("localhost:2181");
        conf2.setConfigurationStoreServers("localhost:3181");
        PulsarService pulsar2 = this.startBroker(conf2);
        try {
            ((LoadManager)this.pulsar.getLoadManager().get()).writeLoadReportOnZookeeper();
            ((LoadManager)pulsar2.getLoadManager().get()).writeLoadReportOnZookeeper();
            ((LoadManager)this.pulsar.getLoadManager().get()).writeLoadReportOnZookeeper();
            ((LoadManager)pulsar2.getLoadManager().get()).writeLoadReportOnZookeeper();
            LoadManager loadManager1 = (LoadManager)Mockito.spy(this.pulsar.getLoadManager().get());
            LoadManager loadManager2 = (LoadManager)Mockito.spy(pulsar2.getLoadManager().get());
            Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager");
            loadManagerField.setAccessible(true);
            ((LoadManager)Mockito.doReturn((Object)true).when((Object)loadManager2)).isCentralized();
            loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<LoadManager>(loadManager2));
            ((LoadManager)Mockito.doReturn((Object)true).when((Object)loadManager1)).isCentralized();
            SimpleResourceUnit resourceUnit = new SimpleResourceUnit(this.pulsar.getSafeWebServiceAddress(), null);
            ((LoadManager)Mockito.doReturn(Optional.of(resourceUnit)).when((Object)loadManager1)).getLeastLoaded((ServiceUnitId)ArgumentMatchers.any(ServiceUnitId.class));
            ((LoadManager)Mockito.doReturn(Optional.of(resourceUnit)).when((Object)loadManager2)).getLeastLoaded((ServiceUnitId)ArgumentMatchers.any(ServiceUnitId.class));
            loadManagerField.set(this.pulsar.getNamespaceService(), new AtomicReference<LoadManager>(loadManager1));
            PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(pulsar2.getBrokerServiceUrl()).build();
            try {
                String topic1 = "persistent://my-property/my-ns/topic1";
                Consumer consumer1 = pulsarClient2.newConsumer().topic(new String[]{"persistent://my-property/my-ns/topic1"}).subscriptionName("my-subscriber-name").subscribe();
                Set serviceUnits1 = this.pulsar.getNamespaceService().getOwnedServiceUnits().stream().map(nb -> nb.toString()).collect(Collectors.toSet());
                String unsplitBundle = "my-property/my-ns/0x00000000_0xffffffff";
                Assert.assertTrue((boolean)serviceUnits1.contains("my-property/my-ns/0x00000000_0xffffffff"));
                TopicName topicName = TopicName.get((String)"persistent://my-property/my-ns/topic1");
                NamespaceBundle bundleInBroker2 = pulsar2.getNamespaceService().getBundle(topicName);
                Assert.assertEquals((String)bundleInBroker2.toString(), (String)"my-property/my-ns/0x00000000_0xffffffff");
                this.admin.namespaces().splitNamespaceBundle("my-property/my-ns", "0x00000000_0xffffffff", true, null);
                int retry = 5;
                for (int i = 0; i < 5 && pulsar2.getNamespaceService().getBundle(topicName).equals((Object)bundleInBroker2) && i != 4; ++i) {
                    Thread.sleep(200L);
                }
                String topic2 = "persistent://my-property/my-ns/topic2";
                Consumer consumer2 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/topic2"}).subscriptionName("my-subscriber-name").subscribe();
                NamespaceBundle bundleInBroker1AfterSplit = pulsar2.getNamespaceService().getBundle(TopicName.get((String)"persistent://my-property/my-ns/topic2"));
                Assert.assertNotEquals((Object)"my-property/my-ns/0x00000000_0xffffffff", (Object)bundleInBroker1AfterSplit);
                consumer1.close();
                consumer2.close();
            }
            finally {
                if (Collections.singletonList(pulsarClient2).get(0) != null) {
                    pulsarClient2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(pulsar2).get(0) != null) {
                pulsar2.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=20000L)
    public void testModularLoadManagerSplitBundle() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String loadBalancerName = this.conf.getLoadManagerClassName();
        try {
            String namespace = "my-property/my-ns";
            ServiceConfiguration conf2 = new ServiceConfiguration();
            conf2.setBrokerShutdownTimeoutMs(0L);
            conf2.setAdvertisedAddress("localhost");
            conf2.setBrokerShutdownTimeoutMs(0L);
            conf2.setBrokerServicePort(Optional.of(0));
            conf2.setWebServicePort(Optional.of(0));
            conf2.setAdvertisedAddress("localhost");
            conf2.setClusterName(this.conf.getClusterName());
            conf2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
            conf2.setZookeeperServers("localhost:2181");
            conf2.setConfigurationStoreServers("localhost:3181");
            conf2.setLoadBalancerAutoBundleSplitEnabled(true);
            conf2.setLoadBalancerAutoUnloadSplitBundlesEnabled(true);
            conf2.setLoadBalancerNamespaceBundleMaxTopics(1);
            this.stopBroker();
            this.conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
            this.startBroker();
            PulsarService pulsar2 = this.startBroker(conf2);
            try {
                ((LoadManager)this.pulsar.getLoadManager().get()).writeLoadReportOnZookeeper();
                ((LoadManager)pulsar2.getLoadManager().get()).writeLoadReportOnZookeeper();
                LoadManager loadManager1 = (LoadManager)Mockito.spy(this.pulsar.getLoadManager().get());
                LoadManager loadManager2 = (LoadManager)Mockito.spy(pulsar2.getLoadManager().get());
                Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager");
                loadManagerField.setAccessible(true);
                ((LoadManager)Mockito.doReturn((Object)true).when((Object)loadManager2)).isCentralized();
                loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<LoadManager>(loadManager2));
                ((LoadManager)Mockito.doReturn((Object)true).when((Object)loadManager1)).isCentralized();
                SimpleResourceUnit resourceUnit = new SimpleResourceUnit(this.pulsar.getSafeWebServiceAddress(), null);
                Optional<SimpleResourceUnit> res = Optional.of(resourceUnit);
                ((LoadManager)Mockito.doReturn(res).when((Object)loadManager1)).getLeastLoaded((ServiceUnitId)ArgumentMatchers.any(ServiceUnitId.class));
                ((LoadManager)Mockito.doReturn(res).when((Object)loadManager2)).getLeastLoaded((ServiceUnitId)ArgumentMatchers.any(ServiceUnitId.class));
                loadManagerField.set(this.pulsar.getNamespaceService(), new AtomicReference<LoadManager>(loadManager1));
                PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(pulsar2.getBrokerServiceUrl()).build();
                try {
                    String topic1 = "persistent://my-property/my-ns/topic1";
                    Consumer consumer1 = pulsarClient2.newConsumer().topic(new String[]{"persistent://my-property/my-ns/topic1"}).subscriptionName("my-subscriber-name").subscribe();
                    try {
                        String topic2 = "persistent://my-property/my-ns/topic2";
                        Consumer consumer2 = pulsarClient2.newConsumer().topic(new String[]{"persistent://my-property/my-ns/topic2"}).subscriptionName("my-subscriber-name").subscribe();
                        try {
                            String unsplitBundle = "my-property/my-ns/0x00000000_0xffffffff";
                            Awaitility.await().until(() -> this.pulsar.getNamespaceService().getOwnedServiceUnits().stream().map(nb -> nb.toString()).collect(Collectors.toSet()).contains("my-property/my-ns/0x00000000_0xffffffff"));
                            TopicName topicName = TopicName.get((String)"persistent://my-property/my-ns/topic1");
                            NamespaceBundle bundleInBroker2 = pulsar2.getNamespaceService().getBundle(topicName);
                            Assert.assertEquals((String)bundleInBroker2.toString(), (String)"my-property/my-ns/0x00000000_0xffffffff");
                            this.pulsar.getBrokerService().updateRates();
                            ((LoadManager)this.pulsar.getLoadManager().get()).writeLoadReportOnZookeeper();
                            ((LoadManager)this.pulsar.getLoadManager().get()).writeResourceQuotasToZooKeeper();
                            ((LoadManager)pulsar2.getLoadManager().get()).writeLoadReportOnZookeeper();
                            Method updateAllMethod = ModularLoadManagerImpl.class.getDeclaredMethod("updateAll", new Class[0]);
                            updateAllMethod.setAccessible(true);
                            this.pulsar.getLeaderElectionService().close();
                            ModularLoadManagerImpl loadManager = (ModularLoadManagerImpl)((ModularLoadManagerWrapper)pulsar2.getLoadManager().get()).getLoadManager();
                            updateAllMethod.invoke((Object)loadManager, new Object[0]);
                            loadManager.checkNamespaceBundleSplit();
                            Awaitility.await().untilAsserted(() -> Assert.assertNotEquals((Object)pulsar2.getNamespaceService().getBundle(topicName), (Object)bundleInBroker2));
                            String topic3 = "persistent://my-property/my-ns/topic3";
                            Consumer consumer3 = pulsarClient2.newConsumer().topic(new String[]{"persistent://my-property/my-ns/topic3"}).subscriptionName("my-subscriber-name").subscribe();
                            try {
                                Awaitility.await().untilAsserted(() -> {
                                    NamespaceBundle bundleInBroker1AfterSplit = pulsar2.getNamespaceService().getBundle(TopicName.get((String)"persistent://my-property/my-ns/topic3"));
                                    Assert.assertNotEquals((Object)bundleInBroker1AfterSplit.toString(), (Object)"my-property/my-ns/0x00000000_0xffffffff");
                                });
                            }
                            finally {
                                if (Collections.singletonList(consumer3).get(0) != null) {
                                    consumer3.close();
                                }
                            }
                        }
                        finally {
                            if (Collections.singletonList(consumer2).get(0) != null) {
                                consumer2.close();
                            }
                        }
                    }
                    finally {
                        if (Collections.singletonList(consumer1).get(0) != null) {
                            consumer1.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(pulsarClient2).get(0) != null) {
                        pulsarClient2.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(pulsar2).get(0) != null) {
                    pulsar2.close();
                }
            }
        }
        finally {
            this.conf.setLoadManagerClassName(loadBalancerName);
        }
    }

    @Test(timeOut=10000L)
    public void testPartitionedMetadataWithDeprecatedVersion() throws Exception {
        String cluster = "use2";
        String property = "my-property2";
        String namespace = "my-ns";
        String topicName = "my-partitioned";
        int totalPartitions = 10;
        TopicName dest = TopicName.get((String)"persistent", (String)"my-property2", (String)"use2", (String)"my-ns", (String)"my-partitioned");
        this.admin.clusters().createCluster("use2", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("my-property2", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2"}), (Set)Sets.newHashSet((Object[])new String[]{"use2"})));
        this.admin.namespaces().createNamespace("my-property2/use2/my-ns");
        this.admin.topics().createPartitionedTopic(dest.toString(), 10);
        this.stopBroker();
        this.conf.setClientLibraryVersionCheckEnabled(true);
        this.startBroker();
        URI brokerServiceUrl = new URI(this.pulsar.getSafeWebServiceAddress());
        URL url = brokerServiceUrl.toURL();
        String path = String.format("admin/%s/partitions", dest.getLookupName());
        AsyncHttpClient httpClient = this.getHttpClient("Pulsar-Java-1.20");
        PartitionedTopicMetadata metadata = this.getPartitionedMetadata(httpClient, url, path);
        Assert.assertEquals((int)metadata.partitions, (int)10);
        httpClient.close();
        httpClient = this.getHttpClient("Pulsar-CPP-v1.21");
        metadata = this.getPartitionedMetadata(httpClient, url, path);
        Assert.assertEquals((int)metadata.partitions, (int)10);
        httpClient.close();
        httpClient = this.getHttpClient("Pulsar-CPP-v1.21-SNAPSHOT");
        metadata = this.getPartitionedMetadata(httpClient, url, path);
        Assert.assertEquals((int)metadata.partitions, (int)10);
        httpClient.close();
        httpClient = this.getHttpClient("");
        try {
            metadata = this.getPartitionedMetadata(httpClient, url, path);
            Assert.fail((String)"should have failed due to invalid version");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException));
        }
        httpClient.close();
        httpClient = this.getHttpClient("Pulsar-CPP-v1.20-SNAPSHOT");
        try {
            metadata = this.getPartitionedMetadata(httpClient, url, path);
            Assert.fail((String)"should have failed due to invalid version");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException));
        }
        httpClient.close();
        httpClient = this.getHttpClient("Pulsar-CPP-v1.20");
        try {
            metadata = this.getPartitionedMetadata(httpClient, url, path);
            Assert.fail((String)"should have failed due to invalid version");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException));
        }
        httpClient.close();
    }

    private PartitionedTopicMetadata getPartitionedMetadata(AsyncHttpClient httpClient, URL url, String path) throws Exception {
        final CompletableFuture future = new CompletableFuture();
        try {
            final String requestUrl = new URL(url, path).toString();
            BoundRequestBuilder builder = httpClient.prepareGet(requestUrl);
            ListenableFuture responseFuture = ((BoundRequestBuilder)builder.setHeader((CharSequence)"Accept", "application/json")).execute((AsyncHandler)new AsyncCompletionHandler<Response>(){

                public Response onCompleted(Response response) throws Exception {
                    return response;
                }

                public void onThrowable(Throwable t) {
                    log.warn("[{}] Failed to perform http request: {}", (Object)requestUrl, (Object)t.getMessage());
                    future.completeExceptionally(new PulsarClientException(t));
                }
            });
            responseFuture.addListener(() -> {
                try {
                    Response response = (Response)responseFuture.get();
                    if (response.getStatusCode() != 200) {
                        log.warn("[{}] HTTP get request failed: {}", (Object)requestUrl, (Object)response.getStatusText());
                        future.completeExceptionally(new PulsarClientException("HTTP get request failed: " + response.getStatusText()));
                        return;
                    }
                    PartitionedTopicMetadata data = (PartitionedTopicMetadata)ObjectMapperFactory.getThreadLocal().readValue(response.getResponseBodyAsBytes(), PartitionedTopicMetadata.class);
                    future.complete(data);
                }
                catch (Exception e) {
                    log.warn("[{}] Error during HTTP get request: {}", (Object)requestUrl, (Object)e.getMessage());
                    future.completeExceptionally(new PulsarClientException((Throwable)e));
                }
            }, MoreExecutors.directExecutor());
        }
        catch (Exception e) {
            log.warn("[{}] Failed to get authentication data for lookup: {}", (Object)path, (Object)e.getMessage());
            if (e instanceof PulsarClientException) {
                future.completeExceptionally(e);
            }
            future.completeExceptionally(new PulsarClientException((Throwable)e));
        }
        return (PartitionedTopicMetadata)future.get();
    }

    private AsyncHttpClient getHttpClient(String version) {
        DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
        confBuilder.setFollowRedirect(true);
        confBuilder.setUserAgent(version);
        confBuilder.setKeepAliveStrategy((KeepAliveStrategy)new DefaultKeepAliveStrategy(){

            public boolean keepAlive(InetSocketAddress remoteAddress, Request ahcRequest, HttpRequest request, HttpResponse response) {
                return response.status().code() / 100 != 5 && super.keepAlive(remoteAddress, ahcRequest, request, response);
            }
        });
        DefaultAsyncHttpClientConfig config = confBuilder.build();
        return new DefaultAsyncHttpClient((AsyncHttpClientConfig)config);
    }

    private DiscoveryService createAndStartDiscoveryService(ServiceConfig config) throws Exception {
        ZKMetadataStore localMetadatastore = new ZKMetadataStore((ZooKeeper)this.mockZooKeeper);
        ZKMetadataStore configMetadatastore = new ZKMetadataStore((ZooKeeper)this.mockZooKeeperGlobal);
        DiscoveryService discoveryService = (DiscoveryService)Mockito.spy((Object)new DiscoveryService(config));
        ((DiscoveryService)Mockito.doReturn((Object)localMetadatastore).when((Object)discoveryService)).createLocalMetadataStore();
        ((DiscoveryService)Mockito.doReturn((Object)configMetadatastore).when((Object)discoveryService)).createConfigurationMetadataStore();
        ((DiscoveryService)Mockito.doReturn((Object)localMetadatastore).when((Object)discoveryService)).createLocalMetadataStore();
        ((DiscoveryService)Mockito.doReturn((Object)configMetadatastore).when((Object)discoveryService)).createConfigurationMetadataStore();
        discoveryService.start();
        return discoveryService;
    }

    public static class MockAuthorizationProviderFail
    extends MockAuthenticationProvider {
        @Override
        public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
            return "invalid";
        }
    }

    public static class MockAuthenticationProviderFail
    extends MockAuthenticationProvider {
        @Override
        public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
            throw new AuthenticationException("authentication failed");
        }
    }

    public static class MockAuthenticationProvider
    implements AuthenticationProvider {
        public void close() throws IOException {
        }

        public void initialize(ServiceConfiguration config) throws IOException {
        }

        public String getAuthMethodName() {
            return "auth";
        }

        public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
            return "appid1";
        }
    }
}

