package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/BrokerServiceTest.class */
public class BrokerServiceTest extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(BrokerServiceTest.class);
    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";
    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/certificate/client.crt";
    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key";

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        this.conf.setSystemTopicEnabled(false);
        this.conf.setTopicLevelPoliciesEnabled(false);
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
        resetConfig();
    }

    private void resetState() throws Exception {
        cleanup();
        setup();
    }

    @Test
    public void testShutDownWithMaxConcurrentUnload() throws Exception {
        cleanup();
        this.conf.setDefaultNumberOfNamespaceBundles(3);
        setup();
        this.admin.topics().createPartitionedTopic("persistent://prop/ns-abc/successTopic", 12);
        Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://prop/ns-abc/successTopic").create();
        Assert.assertEquals(this.admin.namespaces().getBundles("prop/ns-abc").getNumBundles(), 3);
        Assert.assertEquals(this.admin.brokers().getActiveBrokers("test").size(), 1);
        this.admin.brokers().shutDownBrokerGracefully(1, false);
        Awaitility.await().atLeast(3 - 1, TimeUnit.SECONDS).untilAsserted(() -> {
            Assert.assertEquals(this.pulsar.getBrokerService().getTopics().size(), 0L);
        });
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assert.assertNull(this.pulsar.getBrokerService());
            Assert.assertEquals(this.pulsar.getState(), PulsarService.State.Closed);
        });
        try {
            create.send("1".getBytes(StandardCharsets.UTF_8));
            Assert.fail("sending msg should timeout, because broker is down and there is only one broker");
        } catch (Exception e) {
            Assert.assertTrue(e instanceof PulsarClientException.TimeoutException);
        }
        this.pulsar = null;
        create.close();
        resetState();
    }

    @Test
    public void testOwnedNsCheck() throws Exception {
        BrokerService brokerService = this.pulsar.getBrokerService();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        brokerService.getOrCreateTopic("persistent://prop/ns-abc/successTopic").thenAccept(topic -> {
            countDownLatch.countDown();
            Assert.fail("should fail as NS is not owned");
        }).exceptionally(th -> {
            Assert.assertTrue(th.getCause() instanceof IOException);
            countDownLatch.countDown();
            return null;
        });
        countDownLatch.await();
        this.admin.lookups().lookupTopic("persistent://prop/ns-abc/successTopic");
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        brokerService.getOrCreateTopic("persistent://prop/ns-abc/successTopic").thenAccept(topic2 -> {
            try {
                Assert.assertNotNull(brokerService.getTopicReference("persistent://prop/ns-abc/successTopic"));
            } catch (Exception e) {
                Assert.fail("should not fail");
            }
            countDownLatch2.countDown();
        }).exceptionally(th2 -> {
            countDownLatch2.countDown();
            Assert.fail("should not fail");
            return null;
        });
        countDownLatch2.await();
    }

    @Test
    public void testBrokerServicePersistentTopicStats() throws Exception {
        resetState();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/successTopic"}).subscriptionName("successSub").acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        Thread.sleep(100L);
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/successTopic").get();
        Assert.assertNotNull(persistentTopic);
        rolloverPerIntervalStats();
        TopicStatsImpl stats = persistentTopic.getStats(false, false, false);
        SubscriptionStats subscriptionStats = (SubscriptionStats) stats.getSubscriptions().values().iterator().next();
        Assert.assertEquals(stats.getSubscriptions().keySet().size(), 1);
        Assert.assertEquals(subscriptionStats.getMsgBacklog(), 0L);
        Assert.assertEquals(subscriptionStats.getConsumers().size(), 1);
        Assert.assertEquals(stats.getOffloadedStorageSize(), 0L);
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/successTopic").create();
        Thread.sleep(100L);
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Thread.sleep(100L);
        rolloverPerIntervalStats();
        TopicStatsImpl stats2 = persistentTopic.getStats(false, false, false);
        SubscriptionStats subscriptionStats2 = (SubscriptionStats) stats2.getSubscriptions().values().iterator().next();
        Assert.assertEquals(subscriptionStats2.getMsgBacklog(), 10L);
        Assert.assertEquals(stats2.getPublishers().size(), 1);
        Assert.assertTrue(((PublisherStats) stats2.getPublishers().get(0)).getMsgRateIn() > 0.0d);
        Assert.assertTrue(((PublisherStats) stats2.getPublishers().get(0)).getMsgThroughputIn() > 0.0d);
        Assert.assertTrue(((PublisherStats) stats2.getPublishers().get(0)).getAverageMsgSize() > 0.0d);
        Assert.assertNotNull(((PublisherStats) stats2.getPublishers().get(0)).getClientVersion());
        Assert.assertEquals(stats2.getMsgRateIn(), ((PublisherStats) stats2.getPublishers().get(0)).getMsgRateIn());
        Assert.assertEquals(stats2.getMsgThroughputIn(), ((PublisherStats) stats2.getPublishers().get(0)).getMsgThroughputIn());
        Assert.assertTrue(Math.abs(stats2.getAverageMsgSize() - ((PublisherStats) stats2.getPublishers().get(0)).getAverageMsgSize()) < 1.0E-6d);
        Assert.assertTrue(((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getMsgRateOut() > 0.0d);
        Assert.assertTrue(((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getMsgThroughputOut() > 0.0d);
        Assert.assertEquals(subscriptionStats2.getMsgRateOut(), ((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getMsgRateOut());
        Assert.assertEquals(subscriptionStats2.getMsgThroughputOut(), ((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getMsgThroughputOut());
        Assert.assertEquals(stats2.getMsgRateOut(), ((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getMsgRateOut());
        Assert.assertEquals(stats2.getMsgThroughputOut(), ((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getMsgThroughputOut());
        Assert.assertNotNull(((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getClientVersion());
        Assert.assertEquals(stats2.getOffloadedStorageSize(), 0L);
        for (int i2 = 0; i2 < 10; i2++) {
            subscribe.acknowledge(subscribe.receive());
        }
        subscribe.close();
        Thread.sleep(100L);
        rolloverPerIntervalStats();
        TopicStatsImpl stats3 = persistentTopic.getStats(false, false, false);
        SubscriptionStats subscriptionStats3 = (SubscriptionStats) stats3.getSubscriptions().values().iterator().next();
        Assert.assertEquals(stats3.getOffloadedStorageSize(), 0L);
        Assert.assertEquals(subscriptionStats3.getMsgBacklog(), 0L);
    }

    @Test
    public void testConnectionController() throws Exception {
        cleanup();
        this.conf.setBrokerMaxConnections(3);
        this.conf.setBrokerMaxConnectionsPerIp(2);
        setup();
        String str = "persistent://prop/ns-abc/connection" + UUID.randomUUID();
        ArrayList arrayList = new ArrayList();
        ClientBuilder serviceUrl = PulsarClient.builder().operationTimeout(1, TimeUnit.DAYS).connectionTimeout(1, TimeUnit.DAYS).serviceUrl(this.brokerUrl.toString());
        long currentTimeMillis = System.currentTimeMillis();
        arrayList.add(createNewConnection(str, serviceUrl));
        arrayList.add(createNewConnection(str, serviceUrl));
        createNewConnectionAndCheckFail(str, serviceUrl);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis < 20000);
        cleanClient(arrayList);
        arrayList.clear();
        cleanup();
        this.conf.setBrokerMaxConnections(2);
        this.conf.setBrokerMaxConnectionsPerIp(3);
        setup();
        long currentTimeMillis2 = System.currentTimeMillis();
        serviceUrl.serviceUrl(this.brokerUrl.toString());
        arrayList.add(createNewConnection(str, serviceUrl));
        arrayList.add(createNewConnection(str, serviceUrl));
        createNewConnectionAndCheckFail(str, serviceUrl);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis2 < 20000);
        cleanClient(arrayList);
        arrayList.clear();
    }

    @Test
    public void testConnectionController2() throws Exception {
        cleanup();
        this.conf.setBrokerMaxConnections(0);
        this.conf.setBrokerMaxConnectionsPerIp(1);
        setup();
        String str = "persistent://prop/ns-abc/connection" + UUID.randomUUID();
        ArrayList arrayList = new ArrayList();
        ClientBuilder serviceUrl = PulsarClient.builder().operationTimeout(1, TimeUnit.DAYS).connectionTimeout(1, TimeUnit.DAYS).serviceUrl(this.brokerUrl.toString());
        long currentTimeMillis = System.currentTimeMillis();
        arrayList.add(createNewConnection(str, serviceUrl));
        createNewConnectionAndCheckFail(str, serviceUrl);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis < 20000);
        cleanClient(arrayList);
        arrayList.clear();
        cleanup();
        this.conf.setBrokerMaxConnections(1);
        this.conf.setBrokerMaxConnectionsPerIp(0);
        setup();
        long currentTimeMillis2 = System.currentTimeMillis();
        serviceUrl.serviceUrl(this.brokerUrl.toString());
        arrayList.add(createNewConnection(str, serviceUrl));
        createNewConnectionAndCheckFail(str, serviceUrl);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis2 < 20000);
        cleanClient(arrayList);
        arrayList.clear();
        cleanup();
        this.conf.setBrokerMaxConnections(1);
        this.conf.setBrokerMaxConnectionsPerIp(1);
        setup();
        long currentTimeMillis3 = System.currentTimeMillis();
        serviceUrl.serviceUrl(this.brokerUrl.toString());
        arrayList.add(createNewConnection(str, serviceUrl));
        createNewConnectionAndCheckFail(str, serviceUrl);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis3 < 20000);
        cleanClient(arrayList);
        arrayList.clear();
        cleanup();
        this.conf.setBrokerMaxConnections(0);
        this.conf.setBrokerMaxConnectionsPerIp(0);
        setup();
        serviceUrl.serviceUrl(this.brokerUrl.toString());
        long currentTimeMillis4 = System.currentTimeMillis();
        for (int i = 0; i < 10; i++) {
            arrayList.add(createNewConnection(str, serviceUrl));
        }
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis4 < 20000);
        cleanClient(arrayList);
        arrayList.clear();
    }

    private void createNewConnectionAndCheckFail(String str, ClientBuilder clientBuilder) throws Exception {
        try {
            createNewConnection(str, clientBuilder);
            Assert.fail("should fail");
        } catch (Exception e) {
            Assert.assertTrue(e.getMessage().contains("Reached the maximum number of connections"));
        }
    }

    private PulsarClient createNewConnection(String str, ClientBuilder clientBuilder) throws PulsarClientException {
        PulsarClient build = clientBuilder.build();
        build.newProducer().topic(str).create().close();
        return build;
    }

    private void cleanClient(List<PulsarClient> list) throws Exception {
        for (PulsarClient pulsarClient : list) {
            if (pulsarClient != null) {
                pulsarClient.close();
            }
        }
    }

    @Test
    public void testStatsOfStorageSizeWithSubscription() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/no-subscription").create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/no-subscription").get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertEquals(persistentTopic.getStats(false, false, false).storageSize, 0L);
        for (int i = 0; i < 10; i++) {
            create.send(new byte[10]);
        }
        Assert.assertTrue(persistentTopic.getStats(false, false, false).storageSize > 0);
    }

    @Test
    public void testBrokerServicePersistentRedeliverTopicStats() throws Exception {
        resetState();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/successSharedTopic"}).subscriptionName("successSharedSub").subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        Thread.sleep(100L);
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/successSharedTopic").get();
        Assert.assertNotNull(persistentTopic);
        rolloverPerIntervalStats();
        TopicStatsImpl stats = persistentTopic.getStats(false, false, false);
        SubscriptionStats subscriptionStats = (SubscriptionStats) stats.getSubscriptions().values().iterator().next();
        Assert.assertEquals(stats.getSubscriptions().keySet().size(), 1);
        Assert.assertEquals(subscriptionStats.getMsgBacklog(), 0L);
        Assert.assertEquals(subscriptionStats.getConsumers().size(), 1);
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/successSharedTopic").create();
        Thread.sleep(100L);
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Thread.sleep(100L);
        rolloverPerIntervalStats();
        TopicStatsImpl stats2 = persistentTopic.getStats(false, false, false);
        SubscriptionStats subscriptionStats2 = (SubscriptionStats) stats2.getSubscriptions().values().iterator().next();
        Assert.assertEquals(subscriptionStats2.getMsgBacklog(), 10L);
        Assert.assertEquals(stats2.getPublishers().size(), 1);
        Assert.assertTrue(((PublisherStats) stats2.getPublishers().get(0)).getMsgRateIn() > 0.0d);
        Assert.assertTrue(((PublisherStats) stats2.getPublishers().get(0)).getMsgThroughputIn() > 0.0d);
        Assert.assertTrue(((PublisherStats) stats2.getPublishers().get(0)).getAverageMsgSize() > 0.0d);
        Assert.assertEquals(stats2.getMsgRateIn(), ((PublisherStats) stats2.getPublishers().get(0)).getMsgRateIn());
        Assert.assertEquals(stats2.getMsgThroughputIn(), ((PublisherStats) stats2.getPublishers().get(0)).getMsgThroughputIn());
        Assert.assertTrue(Math.abs(stats2.getAverageMsgSize() - ((PublisherStats) stats2.getPublishers().get(0)).getAverageMsgSize()) < 1.0E-6d);
        Assert.assertTrue(((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getMsgRateOut() > 0.0d);
        Assert.assertTrue(((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getMsgThroughputOut() > 0.0d);
        Assert.assertEquals(subscriptionStats2.getMsgRateRedeliver(), 0.0d);
        Assert.assertEquals(((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getUnackedMessages(), 10);
        Assert.assertEquals(subscriptionStats2.getMsgRateOut(), ((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getMsgRateOut());
        Assert.assertEquals(subscriptionStats2.getMsgThroughputOut(), ((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getMsgThroughputOut());
        Assert.assertEquals(subscriptionStats2.getMsgRateRedeliver(), ((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getMsgRateRedeliver());
        Assert.assertEquals(stats2.getMsgRateOut(), ((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getMsgRateOut());
        Assert.assertEquals(stats2.getMsgThroughputOut(), ((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getMsgThroughputOut());
        Assert.assertEquals(subscriptionStats2.getMsgRateRedeliver(), ((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getMsgRateRedeliver());
        Assert.assertEquals(subscriptionStats2.getUnackedMessages(), ((ConsumerStats) subscriptionStats2.getConsumers().get(0)).getUnackedMessages());
        subscribe.redeliverUnacknowledgedMessages();
        Thread.sleep(100L);
        rolloverPerIntervalStats();
        SubscriptionStats subscriptionStats3 = (SubscriptionStats) persistentTopic.getStats(false, false, false).getSubscriptions().values().iterator().next();
        Assert.assertTrue(subscriptionStats3.getMsgRateRedeliver() > 0.0d);
        Assert.assertEquals(subscriptionStats3.getMsgRateRedeliver(), ((ConsumerStats) subscriptionStats3.getConsumers().get(0)).getMsgRateRedeliver());
        for (int i2 = 0; i2 < 10; i2++) {
            subscribe.acknowledge(subscribe.receive());
        }
        subscribe.close();
        Thread.sleep(100L);
        rolloverPerIntervalStats();
        Assert.assertEquals(((SubscriptionStats) persistentTopic.getStats(false, false, false).getSubscriptions().values().iterator().next()).getMsgBacklog(), 0L);
    }

    @Test
    public void testBrokerStatsMetrics() throws Exception {
        BrokerStats brokerStats = this.admin.brokerStats();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
        Thread.sleep(100L);
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/newTopic").create();
        Thread.sleep(100L);
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Thread.sleep(100L);
        for (int i2 = 0; i2 < 10; i2++) {
            subscribe.acknowledge(subscribe.receive());
        }
        subscribe.close();
        Thread.sleep(100L);
        JsonArray jsonArray = (JsonArray) new Gson().fromJson(brokerStats.getMetrics(), JsonArray.class);
        boolean z = false;
        boolean z2 = false;
        for (int i3 = 0; i3 < jsonArray.size(); i3++) {
            try {
                String jsonElement = jsonArray.get(i3).getAsJsonObject().get("dimensions").toString();
                if (!z && jsonElement.contains("prop/ns-abc")) {
                    z = true;
                }
                if (!z2 && jsonElement.contains("prop/ns-abc")) {
                    z2 = true;
                }
            } catch (Exception e) {
            }
        }
        Assert.assertTrue(z && z2);
        Thread.sleep(100L);
    }

    @Test
    public void testBrokerServiceNamespaceStats() throws Exception {
        resetState();
        ArrayList<String> newArrayList = Lists.newArrayList(new String[]{"prop/stats1", "prop/stats2"});
        ArrayList newArrayList2 = Lists.newArrayList();
        BrokerStats brokerStats = this.admin.brokerStats();
        for (String str : newArrayList) {
            this.admin.namespaces().createNamespace(str, 4);
            this.admin.namespaces().setNamespaceReplicationClusters(str, Sets.newHashSet(new String[]{"test"}));
            newArrayList2.add(this.pulsarClient.newProducer().topic(String.format("persistent://%s/topic1", str)).create());
            newArrayList2.add(this.pulsarClient.newProducer().topic(String.format("persistent://%s/topic2", str)).create());
        }
        rolloverPerIntervalStats();
        JsonObject jsonObject = (JsonObject) new Gson().fromJson(brokerStats.getTopics(), JsonObject.class);
        Assert.assertEquals(jsonObject.size(), 2, jsonObject.toString());
        for (String str2 : newArrayList) {
            JsonObject asJsonObject = jsonObject.getAsJsonObject(str2);
            for (String str3 : this.admin.namespaces().getTopics(str2)) {
                JsonObject asJsonObject2 = asJsonObject.getAsJsonObject(this.pulsar.getNamespaceService().getBundle(TopicName.get(str3)).getBundleRange()).getAsJsonObject("persistent");
                AtomicBoolean atomicBoolean = new AtomicBoolean();
                asJsonObject2.entrySet().iterator().forEachRemaining(entry -> {
                    if (((String) entry.getKey()).equals(str3)) {
                        atomicBoolean.set(true);
                    }
                });
                Assert.assertTrue(atomicBoolean.get());
            }
        }
        Iterator it = newArrayList2.iterator();
        while (it.hasNext()) {
            ((Producer) it.next()).close();
        }
        for (String str4 : newArrayList) {
            Iterator it2 = this.admin.namespaces().getTopics(str4).iterator();
            while (it2.hasNext()) {
                this.admin.topics().delete((String) it2.next());
            }
            this.admin.namespaces().deleteNamespace(str4);
        }
    }

    @Test
    public void testTlsDisabled() throws Exception {
        PulsarClient pulsarClient = null;
        this.conf.setAuthenticationEnabled(false);
        restartBroker();
        try {
            try {
                pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
                Consumer subscribe = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                pulsarClient.close();
            } catch (Throwable th) {
                pulsarClient.close();
                throw th;
            }
        } catch (Exception e) {
            Assert.fail("should not fail");
            pulsarClient.close();
        }
        try {
            try {
                PulsarClient build = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
                Consumer subscribe2 = build.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
                try {
                    Assert.fail("TLS connection should fail");
                    if (Collections.singletonList(subscribe2).get(0) != null) {
                        subscribe2.close();
                    }
                    build.close();
                } catch (Throwable th2) {
                    if (Collections.singletonList(subscribe2).get(0) != null) {
                        subscribe2.close();
                    }
                    throw th2;
                }
            } catch (Throwable th3) {
                pulsarClient.close();
                throw th3;
            }
        } catch (Exception e2) {
            Assert.assertTrue(e2.getMessage().contains("ConnectException"));
            pulsarClient.close();
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testTlsEnabled() throws Exception {
        Consumer subscribe;
        this.conf.setAuthenticationEnabled(false);
        this.conf.setBrokerServicePortTls(Optional.of(0));
        this.conf.setWebServicePortTls(Optional.of(0));
        this.conf.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
        this.conf.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
        this.conf.setNumExecutorThreadPoolSize(5);
        restartBroker();
        PulsarClient pulsarClient = null;
        try {
            try {
                pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
                Consumer subscribe2 = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
                if (Collections.singletonList(subscribe2).get(0) != null) {
                    subscribe2.close();
                }
                pulsarClient.close();
            } catch (Throwable th) {
                pulsarClient.close();
                throw th;
            }
        } catch (Exception e) {
            Assert.fail("should not fail");
            pulsarClient.close();
        }
        try {
            try {
                pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
                Consumer subscribe3 = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
                if (Collections.singletonList(subscribe3).get(0) != null) {
                    subscribe3.close();
                }
                pulsarClient.close();
            } catch (Exception e2) {
                Assert.fail("should not fail");
                pulsarClient.close();
            }
            try {
                try {
                    pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(false).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
                    subscribe = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
                } catch (Exception e3) {
                    Assert.assertTrue(e3.getMessage().contains("General OpenSslEngine problem"));
                    pulsarClient.close();
                }
                try {
                    Assert.fail("should fail");
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    pulsarClient.close();
                    try {
                        try {
                            pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(false).tlsTrustCertsFilePath("./src/test/resources/certificate/server.crt").statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
                            Consumer subscribe4 = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
                            if (Collections.singletonList(subscribe4).get(0) != null) {
                                subscribe4.close();
                            }
                            pulsarClient.close();
                        } catch (Throwable th2) {
                            pulsarClient.close();
                            throw th2;
                        }
                    } catch (Exception e4) {
                        Assert.fail("should not fail");
                        pulsarClient.close();
                    }
                } catch (Throwable th3) {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    throw th3;
                }
            } catch (Throwable th4) {
                pulsarClient.close();
                throw th4;
            }
        } catch (Throwable th5) {
            pulsarClient.close();
            throw th5;
        }
    }

    @Test
    public void testTlsEnabledWithoutNonTlsServicePorts() throws Exception {
        this.conf.setAuthenticationEnabled(false);
        this.conf.setBrokerServicePort(Optional.empty());
        this.conf.setBrokerServicePortTls(Optional.of(0));
        this.conf.setWebServicePort(Optional.empty());
        this.conf.setWebServicePortTls(Optional.of(0));
        this.conf.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
        this.conf.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
        this.conf.setNumExecutorThreadPoolSize(5);
        restartBroker();
        try {
            try {
                this.pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
                Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Exception e) {
                Assert.fail("should not fail");
                this.pulsarClient.close();
            }
            resetState();
        } finally {
            this.pulsarClient.close();
        }
    }

    @Test
    public void testTlsAuthAllowInsecure() throws Exception {
        HashSet hashSet = new HashSet();
        hashSet.add("org.apache.pulsar.broker.authentication.AuthenticationProviderTls");
        this.conf.setAuthenticationEnabled(true);
        this.conf.setAuthenticationProviders(hashSet);
        this.conf.setBrokerServicePortTls(Optional.of(0));
        this.conf.setWebServicePortTls(Optional.of(0));
        this.conf.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
        this.conf.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
        this.conf.setTlsAllowInsecureConnection(true);
        this.conf.setNumExecutorThreadPoolSize(5);
        restartBroker();
        HashMap hashMap = new HashMap();
        hashMap.put("tlsCertFile", "./src/test/resources/certificate/client.crt");
        hashMap.put("tlsKeyFile", "./src/test/resources/certificate/client.key");
        PulsarClient pulsarClient = null;
        try {
            try {
                pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
                Consumer subscribe = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
                try {
                    Assert.fail("should fail");
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    pulsarClient.close();
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                pulsarClient.close();
                throw th2;
            }
        } catch (Exception e) {
            Assert.assertTrue(e.getMessage().contains("Unauthorized"));
            pulsarClient.close();
        }
        try {
            try {
                AuthenticationTls authenticationTls = new AuthenticationTls();
                authenticationTls.configure(hashMap);
                pulsarClient = PulsarClient.builder().authentication(authenticationTls).serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
                Consumer subscribe2 = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
                if (Collections.singletonList(subscribe2).get(0) != null) {
                    subscribe2.close();
                }
                pulsarClient.close();
            } catch (Exception e2) {
                Assert.fail("should not fail");
                pulsarClient.close();
            }
        } catch (Throwable th3) {
            pulsarClient.close();
            throw th3;
        }
    }

    @Test
    public void testTlsAuthDisallowInsecure() throws Exception {
        Consumer subscribe;
        HashSet hashSet = new HashSet();
        hashSet.add("org.apache.pulsar.broker.authentication.AuthenticationProviderTls");
        this.conf.setAuthenticationEnabled(true);
        this.conf.setAuthenticationProviders(hashSet);
        this.conf.setBrokerServicePortTls(Optional.of(0));
        this.conf.setWebServicePortTls(Optional.of(0));
        this.conf.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
        this.conf.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
        this.conf.setTlsAllowInsecureConnection(false);
        this.conf.setNumExecutorThreadPoolSize(5);
        restartBroker();
        HashMap hashMap = new HashMap();
        hashMap.put("tlsCertFile", "./src/test/resources/certificate/client.crt");
        hashMap.put("tlsKeyFile", "./src/test/resources/certificate/client.key");
        PulsarClient pulsarClient = null;
        try {
            try {
                pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
                subscribe = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/my-ns/newTopic"}).subscriptionName("newSub").subscribe();
            } catch (Exception e) {
                Assert.assertTrue(e.getMessage().contains("Unauthorized"));
                pulsarClient.close();
            }
            try {
                Assert.fail("should fail");
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                pulsarClient.close();
                try {
                    try {
                        AuthenticationTls authenticationTls = new AuthenticationTls();
                        authenticationTls.configure(hashMap);
                        PulsarClient build = PulsarClient.builder().authentication(authenticationTls).serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
                        subscribe = build.newConsumer().topic(new String[]{"persistent://prop/my-ns/newTopic"}).subscriptionName("newSub").subscribe();
                        try {
                            Assert.fail("should fail");
                            if (Collections.singletonList(subscribe).get(0) != null) {
                                subscribe.close();
                            }
                            build.close();
                        } finally {
                        }
                    } catch (Exception e2) {
                        Assert.assertTrue(e2.getMessage().contains("Unauthorized"));
                        pulsarClient.close();
                    }
                } catch (Throwable th) {
                    pulsarClient.close();
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th2) {
            pulsarClient.close();
            throw th2;
        }
    }

    @Test
    public void testTlsAuthUseTrustCert() throws Exception {
        Consumer subscribe;
        HashSet hashSet = new HashSet();
        hashSet.add("org.apache.pulsar.broker.authentication.AuthenticationProviderTls");
        this.conf.setAuthenticationEnabled(true);
        this.conf.setAuthenticationProviders(hashSet);
        this.conf.setBrokerServicePortTls(Optional.of(0));
        this.conf.setWebServicePortTls(Optional.of(0));
        this.conf.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
        this.conf.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
        this.conf.setTlsAllowInsecureConnection(false);
        this.conf.setTlsTrustCertsFilePath("./src/test/resources/certificate/client.crt");
        this.conf.setNumExecutorThreadPoolSize(5);
        restartBroker();
        HashMap hashMap = new HashMap();
        hashMap.put("tlsCertFile", "./src/test/resources/certificate/client.crt");
        hashMap.put("tlsKeyFile", "./src/test/resources/certificate/client.key");
        PulsarClient pulsarClient = null;
        try {
            try {
                pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
                subscribe = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            } catch (Exception e) {
                Assert.assertTrue(e.getMessage().contains("Unauthorized"));
                pulsarClient.close();
            }
            try {
                Assert.fail("should fail");
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                pulsarClient.close();
                try {
                    try {
                        AuthenticationTls authenticationTls = new AuthenticationTls();
                        authenticationTls.configure(hashMap);
                        pulsarClient = PulsarClient.builder().authentication(authenticationTls).serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
                        Consumer subscribe2 = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
                        if (Collections.singletonList(subscribe2).get(0) != null) {
                            subscribe2.close();
                        }
                        pulsarClient.close();
                    } catch (Throwable th) {
                        pulsarClient.close();
                        throw th;
                    }
                } catch (Exception e2) {
                    Assert.fail("should not fail");
                    pulsarClient.close();
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th2;
            }
        } catch (Throwable th3) {
            pulsarClient.close();
            throw th3;
        }
    }

    /* JADX WARN: Type inference failed for: r4v2, types: [void, long, java.util.function.Supplier] */
    @Test
    public void testLookupThrottlingForClientByClient() throws Exception {
        PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver();
        pulsarServiceNameResolver.updateServiceUrl(this.pulsar.getBrokerServiceUrl());
        ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
        clientConfigurationData.setConcurrentLookupRequest(1);
        clientConfigurationData.setMaxLookupRequest(2);
        EventLoopGroup newEventLoopGroup = EventLoopUtil.newEventLoopGroup(20, false, new DefaultThreadFactory("test-pool", Thread.currentThread().isDaemon()));
        long j = -559038737;
        AtomicReference atomicReference = new AtomicReference();
        atomicReference.set(new CountDownLatch(1));
        ?? r4 = () -> {
            return new ClientCnx(clientConfigurationData, newEventLoopGroup) { // from class: org.apache.pulsar.broker.service.BrokerServiceTest.1
                protected void handleLookupResponse(CommandLookupTopicResponse commandLookupTopicResponse) {
                    try {
                        ((CountDownLatch) atomicReference.get()).await();
                    } catch (InterruptedException e) {
                    }
                    super.handleLookupResponse(commandLookupTopicResponse);
                }

                protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse commandPartitionedTopicMetadataResponse) {
                    try {
                        ((CountDownLatch) atomicReference.get()).await();
                    } catch (InterruptedException e) {
                    }
                    super.handlePartitionResponse(commandPartitionedTopicMetadataResponse);
                }
            };
        };
        ConnectionPool connectionPool = new ConnectionPool(clientConfigurationData, newEventLoopGroup, (Supplier) r4);
        try {
            ByteBuf newPartitionMetadataRequest = Commands.newPartitionMetadataRequest("persistent://prop/ns-abc/newTopic", -559038737L);
            CompletableFuture thenCompose = connectionPool.getConnection(pulsarServiceNameResolver.resolveHost()).thenCompose(clientCnx -> {
                return clientCnx.newLookup(newPartitionMetadataRequest, j);
            });
            ByteBuf newPartitionMetadataRequest2 = Commands.newPartitionMetadataRequest("persistent://prop/ns-abc/newTopic", (long) r4);
            CompletableFuture thenCompose2 = connectionPool.getConnection(pulsarServiceNameResolver.resolveHost()).thenCompose(clientCnx2 -> {
                CompletableFuture newLookup = clientCnx2.newLookup(newPartitionMetadataRequest2, r4);
                ((CountDownLatch) atomicReference.get()).countDown();
                return newLookup;
            });
            thenCompose.get();
            thenCompose2.get();
            char c = 1;
            atomicReference.set(new CountDownLatch(1));
            ByteBuf newPartitionMetadataRequest3 = Commands.newPartitionMetadataRequest("persistent://prop/ns-abc/newTopic", 1L);
            CompletableFuture thenCompose3 = connectionPool.getConnection(pulsarServiceNameResolver.resolveHost()).thenCompose(clientCnx3 -> {
                return clientCnx3.newLookup(newPartitionMetadataRequest3, c);
            });
            ByteBuf newPartitionMetadataRequest4 = Commands.newPartitionMetadataRequest("persistent://prop/ns-abc/newTopic", 1L);
            CompletableFuture thenCompose4 = connectionPool.getConnection(pulsarServiceNameResolver.resolveHost()).thenCompose(clientCnx4 -> {
                return clientCnx4.newLookup(newPartitionMetadataRequest4, c);
            });
            long j2 = (-559038737) + 1 + 1 + 1 + 1 + 1;
            ByteBuf newPartitionMetadataRequest5 = Commands.newPartitionMetadataRequest("persistent://prop/ns-abc/newTopic", 1L);
            CompletableFuture thenCompose5 = connectionPool.getConnection(pulsarServiceNameResolver.resolveHost()).thenCompose(clientCnx5 -> {
                CompletableFuture newLookup = clientCnx5.newLookup(newPartitionMetadataRequest5, c);
                ((CountDownLatch) atomicReference.get()).countDown();
                return newLookup;
            });
            thenCompose3.get();
            thenCompose4.get();
            try {
                thenCompose5.get();
                Assert.fail("At least one should fail");
            } catch (ExecutionException e) {
                ExecutionException executionException = e;
                while (executionException instanceof ExecutionException) {
                    executionException = executionException.getCause();
                }
                if (!(executionException instanceof PulsarClientException.TooManyRequestsException)) {
                    throw e;
                }
            }
            char c2 = 1;
            atomicReference.set(new CountDownLatch(1));
            long j3 = j2 + 1;
            ByteBuf newLookup = Commands.newLookup("persistent://prop/ns-abc/newTopic", true, 1L);
            CompletableFuture thenCompose6 = connectionPool.getConnection(pulsarServiceNameResolver.resolveHost()).thenCompose(clientCnx6 -> {
                return clientCnx6.newLookup(newLookup, c2);
            });
            ByteBuf newLookup2 = Commands.newLookup("persistent://prop/ns-abc/newTopic", true, j3);
            CompletableFuture thenCompose7 = connectionPool.getConnection(pulsarServiceNameResolver.resolveHost()).thenCompose(clientCnx7 -> {
                CompletableFuture newLookup3 = clientCnx7.newLookup(newLookup2, j3);
                ((CountDownLatch) atomicReference.get()).countDown();
                return newLookup3;
            });
            thenCompose6.get();
            thenCompose7.get();
            char c3 = 1;
            atomicReference.set(new CountDownLatch(1));
            long j4 = j3 + 1 + 1;
            ByteBuf newLookup3 = Commands.newLookup("persistent://prop/ns-abc/newTopic", true, 1L);
            CompletableFuture thenCompose8 = connectionPool.getConnection(pulsarServiceNameResolver.resolveHost()).thenCompose(clientCnx8 -> {
                return clientCnx8.newLookup(newLookup3, c3);
            });
            long j5 = j4 + 1;
            ByteBuf newLookup4 = Commands.newLookup("persistent://prop/ns-abc/newTopic", true, j4);
            CompletableFuture thenCompose9 = connectionPool.getConnection(pulsarServiceNameResolver.resolveHost()).thenCompose(clientCnx9 -> {
                return clientCnx9.newLookup(newLookup4, j4);
            });
            long j6 = j5 + 1;
            ByteBuf newLookup5 = Commands.newLookup("persistent://prop/ns-abc/newTopic", true, j5);
            CompletableFuture thenCompose10 = connectionPool.getConnection(pulsarServiceNameResolver.resolveHost()).thenCompose(clientCnx10 -> {
                CompletableFuture newLookup6 = clientCnx10.newLookup(newLookup5, j5);
                ((CountDownLatch) atomicReference.get()).countDown();
                return newLookup6;
            });
            thenCompose8.get();
            thenCompose9.get();
            try {
                thenCompose10.get();
                Assert.fail("At least one should fail");
            } catch (ExecutionException e2) {
                ExecutionException executionException2 = e2;
                while (executionException2 instanceof ExecutionException) {
                    executionException2 = executionException2.getCause();
                }
                if (!(executionException2 instanceof PulsarClientException.TooManyRequestsException)) {
                    throw e2;
                }
            }
            connectionPool.close();
        } catch (Throwable th) {
            try {
                connectionPool.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testTopicLoadingOnDisableNamespaceBundle() throws Exception {
        try {
            this.admin.namespaces().createNamespace("prop/disableBundle");
        } catch (PulsarAdminException.ConflictException e) {
        }
        this.admin.namespaces().setNamespaceReplicationClusters("prop/disableBundle", Sets.newHashSet(new String[]{"test"}));
        TopicName topicName = TopicName.get("persistent://prop/disableBundle/my-topic");
        this.pulsarClient.newProducer().topic("persistent://prop/disableBundle/my-topic").create().close();
        this.pulsar.getNamespaceService().getOwnershipCache().updateBundleState(this.pulsar.getNamespaceService().getBundle(topicName), false).join();
        try {
            this.pulsar.getBrokerService().loadOrCreatePersistentTopic("persistent://prop/disableBundle/my-topic", true, (Map) null).get();
            Assert.fail("Topic creation should fail due to disable bundle");
        } catch (Exception e2) {
            if (e2.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException) {
                return;
            }
            Assert.fail("Topic creation should fail with ServiceUnitNotReadyException");
        }
    }

    @Test(timeOut = 3000)
    public void testTopicFailureShouldNotHaveDeadLock() {
        try {
            this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/ownBundleTopic").create().close();
        } catch (Exception e) {
            Assert.fail(e.getMessage());
        }
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            BrokerService brokerService = (BrokerService) Mockito.spy(this.pulsar.getBrokerService());
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.completeExceptionally(new NullPointerException("failed to peristent policy"));
            ((BrokerService) Mockito.doReturn(completableFuture).when(brokerService)).getManagedLedgerConfig((TopicName) Mockito.any());
            CompletableFuture completableFuture2 = new CompletableFuture();
            newSingleThreadExecutor.submit(() -> {
                brokerService.getOrCreateTopic("persistent://prop/ns-abc/deadLockTestTopic").thenAccept(topic -> {
                    completableFuture2.complete(null);
                }).exceptionally(th -> {
                    completableFuture2.completeExceptionally(th.getCause());
                    return null;
                });
            });
            try {
                completableFuture2.get(1L, TimeUnit.SECONDS);
            } catch (InterruptedException | TimeoutException e2) {
                Assert.fail("there is a dead-lock and it should have been prevented");
            } catch (ExecutionException e3) {
                Assert.assertTrue(e3.getCause() instanceof NullPointerException);
            }
        } finally {
            if (Collections.singletonList(newSingleThreadExecutor).get(0) != null) {
                newSingleThreadExecutor.shutdownNow();
            }
        }
    }

    @Test
    public void testLedgerOpenFailureShouldNotHaveDeadLock() throws Exception {
        try {
            this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/ownBundleTopic").create().close();
        } catch (Exception e) {
            Assert.fail(e.getMessage());
        }
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            BrokerService brokerService = (BrokerService) Mockito.spy(this.pulsar.getBrokerService());
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.complete(new ManagedLedgerConfig());
            ((BrokerService) Mockito.doReturn(completableFuture).when(brokerService)).getManagedLedgerConfig((TopicName) Mockito.any());
            CompletableFuture completableFuture2 = new CompletableFuture();
            Field declaredField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
            declaredField.setAccessible(true);
            ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) declaredField.get(this.pulsar.getManagedLedgerFactory());
            CompletableFuture completableFuture3 = new CompletableFuture();
            completableFuture3.completeExceptionally(new ManagedLedgerException("ledger opening failed"));
            concurrentHashMap.put("prop/ns-abc/persistent/deadLockTestTopic", completableFuture3);
            newSingleThreadExecutor.submit(() -> {
                brokerService.getOrCreateTopic("persistent://prop/ns-abc/deadLockTestTopic").thenAccept(topic -> {
                    completableFuture2.complete(null);
                }).exceptionally(th -> {
                    completableFuture2.completeExceptionally(th.getCause());
                    return null;
                });
            });
            try {
                try {
                    try {
                        completableFuture2.get(1L, TimeUnit.SECONDS);
                        concurrentHashMap.clear();
                    } catch (Throwable th) {
                        concurrentHashMap.clear();
                        throw th;
                    }
                } catch (ExecutionException e2) {
                    Assert.assertEquals(e2.getCause().getClass(), BrokerServiceException.PersistenceException.class);
                    concurrentHashMap.clear();
                }
            } catch (InterruptedException | TimeoutException e3) {
                Assert.fail("there is a dead-lock and it should have been prevented");
                concurrentHashMap.clear();
            }
            if (Collections.singletonList(newSingleThreadExecutor).get(0) != null) {
                newSingleThreadExecutor.shutdownNow();
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(newSingleThreadExecutor).get(0) != null) {
                newSingleThreadExecutor.shutdownNow();
            }
            throw th2;
        }
    }

    @Test
    public void testCreateNamespacePolicy() throws Exception {
        System.err.println("----------------");
        this.admin.namespaces().createNamespace("prop/testPolicy", BundlesData.builder().numBundles(3).build());
        this.admin.topics().createNonPartitionedTopic("prop/testPolicy/test");
        Optional localPolicies = this.pulsar.getPulsarResources().getLocalPolicies().getLocalPolicies(NamespaceName.get("prop/testPolicy"));
        Assert.assertTrue(localPolicies.isPresent());
        Assert.assertEquals(((LocalPolicies) localPolicies.get()).bundles.getNumBundles(), 3);
    }

    @Test
    public void testStuckTopicUnloading() throws Exception {
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/unoadTopic"}).subscriptionName("my-subscriber-name").subscribe().close();
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/unoadTopic").sendTimeout(5, TimeUnit.SECONDS).create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/unoadTopic").get();
        ManagedLedgerFactoryImpl managedLedgerFactory = this.pulsar.getManagedLedgerClientFactory().getManagedLedgerFactory();
        Field declaredField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
        declaredField.setAccessible(true);
        ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) declaredField.get(managedLedgerFactory);
        Assert.assertNotNull(concurrentHashMap.get("prop/ns-abc/persistent/unoadTopic"));
        Producer producer = (Producer) Mockito.spy(persistentTopic.producers.values().toArray()[0]);
        persistentTopic.producers.clear();
        persistentTopic.producers.put(producer.getProducerName(), producer);
        ((Producer) Mockito.doReturn(new CompletableFuture()).when(producer)).disconnect();
        for (NamespaceBundle namespaceBundle : this.pulsar.getNamespaceService().getOwnedServiceUnits()) {
            String namespaceName = namespaceBundle.getNamespaceObject().toString();
            System.out.println();
            if ("prop/ns-abc".equals(namespaceName)) {
                this.pulsar.getNamespaceService().unloadNamespaceBundle(namespaceBundle, 2L, TimeUnit.SECONDS);
            }
        }
        Assert.assertNull(concurrentHashMap.get("prop/ns-abc/persistent/unoadTopic"));
    }

    @Test
    public void testMetricsProvider() throws IOException {
        getPulsar().addPrometheusRawMetricsProvider(simpleTextOutputStream -> {
            simpleTextOutputStream.write("test_metrics{label1=\"xyz\"} 10 \n");
        });
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(HttpClientBuilder.create().build().execute(new HttpGet(getPulsar().getWebServiceAddress() + "/metrics")).getEntity().getContent()));
        StringBuffer stringBuffer = new StringBuffer();
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                Assert.assertTrue(stringBuffer.toString().contains("test_metrics"));
                return;
            }
            stringBuffer.append(readLine);
        }
    }

    @Test
    public void testPublishRateLimiterMonitor() {
        BrokerService.PublishRateLimiterMonitor publishRateLimiterMonitor = new BrokerService.PublishRateLimiterMonitor("test");
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        Objects.requireNonNull(atomicInteger);
        Runnable runnable = atomicInteger::incrementAndGet;
        Objects.requireNonNull(atomicInteger2);
        publishRateLimiterMonitor.startOrUpdate(100L, runnable, atomicInteger2::incrementAndGet);
        Assert.assertEquals(publishRateLimiterMonitor.getTickTimeMs(), 100L);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicInteger.get() > 0);
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicInteger2.get() > 0);
        });
        Objects.requireNonNull(atomicInteger);
        Runnable runnable2 = atomicInteger::incrementAndGet;
        Objects.requireNonNull(atomicInteger2);
        publishRateLimiterMonitor.startOrUpdate(500L, runnable2, atomicInteger2::incrementAndGet);
        Assert.assertEquals(publishRateLimiterMonitor.getTickTimeMs(), 500L);
        atomicInteger.set(0);
        atomicInteger2.set(0);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicInteger.get() > 0);
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicInteger2.get() > 0);
        });
        publishRateLimiterMonitor.stop();
        Assert.assertEquals(publishRateLimiterMonitor.getTickTimeMs(), 0L);
    }

    @Test
    public void testDynamicBrokerPublisherThrottlingTickTimeMillis() throws Exception {
        cleanup();
        this.conf.setBrokerPublisherThrottlingMaxMessageRate(1000);
        this.conf.setBrokerPublisherThrottlingTickTimeMillis(100);
        setup();
        int i = 100;
        BrokerService.PublishRateLimiterMonitor publishRateLimiterMonitor = this.pulsar.getBrokerService().brokerPublishRateLimiterMonitor;
        Awaitility.await().until(() -> {
            return Boolean.valueOf(publishRateLimiterMonitor.getTickTimeMs() == ((long) i));
        });
        int i2 = 100 * 2;
        this.admin.brokers().updateDynamicConfiguration("brokerPublisherThrottlingTickTimeMillis", String.valueOf(i2));
        Awaitility.await().until(() -> {
            return Boolean.valueOf(publishRateLimiterMonitor.getTickTimeMs() == ((long) i2));
        });
        this.admin.brokers().updateDynamicConfiguration("brokerPublisherThrottlingTickTimeMillis", String.valueOf(0));
        Awaitility.await().until(() -> {
            return Boolean.valueOf(publishRateLimiterMonitor.getTickTimeMs() == 0);
        });
        this.admin.brokers().updateDynamicConfiguration("brokerPublisherThrottlingTickTimeMillis", String.valueOf(100));
        Awaitility.await().until(() -> {
            return Boolean.valueOf(publishRateLimiterMonitor.getTickTimeMs() == ((long) i));
        });
    }

    @Test
    public void testDynamicTopicPublisherThrottlingTickTimeMillis() throws Exception {
        cleanup();
        this.conf.setPreciseTopicPublishRateLimiterEnable(false);
        this.conf.setMaxPublishRatePerTopicInMessages(1000);
        this.conf.setTopicPublisherThrottlingTickTimeMillis(100);
        setup();
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/test-topic").create();
        try {
            int i = 100;
            BrokerService.PublishRateLimiterMonitor publishRateLimiterMonitor = this.pulsar.getBrokerService().topicPublishRateLimiterMonitor;
            Awaitility.await().until(() -> {
                return Boolean.valueOf(publishRateLimiterMonitor.getTickTimeMs() == ((long) i));
            });
            int i2 = 100 * 2;
            this.admin.brokers().updateDynamicConfiguration("topicPublisherThrottlingTickTimeMillis", String.valueOf(i2));
            Awaitility.await().until(() -> {
                return Boolean.valueOf(publishRateLimiterMonitor.getTickTimeMs() == ((long) i2));
            });
            this.admin.brokers().updateDynamicConfiguration("topicPublisherThrottlingTickTimeMillis", String.valueOf(0));
            Awaitility.await().until(() -> {
                return Boolean.valueOf(publishRateLimiterMonitor.getTickTimeMs() == 0);
            });
            this.admin.brokers().updateDynamicConfiguration("topicPublisherThrottlingTickTimeMillis", String.valueOf(100));
            Awaitility.await().until(() -> {
                return Boolean.valueOf(publishRateLimiterMonitor.getTickTimeMs() == ((long) i));
            });
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void shouldNotPreventCreatingTopicWhenNonexistingTopicIsCached() throws Exception {
        for (int i = 0; i < 100; i++) {
            String str = "persistent://prop/ns-abc/topic-caching-test-topic" + i;
            CountDownLatch countDownLatch = new CountDownLatch(1);
            Thread thread = new Thread(() -> {
                try {
                    countDownLatch.countDown();
                    Thread.sleep(1L);
                    this.admin.topics().getStats(str);
                    Assert.fail("The topic should not exist yet.");
                } catch (PulsarAdminException.NotFoundException e) {
                } catch (PulsarAdminException | InterruptedException e2) {
                    log.error("Exception in {}", Thread.currentThread().getName(), e2);
                }
            }, "getStatsThread#" + i);
            thread.start();
            countDownLatch.await();
            Producer create = this.pulsarClient.newProducer().topic(str).create();
            try {
                Assert.assertNotNull(create);
                thread.join();
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        }
    }

    @Test
    public void testIsSystemTopic() {
        BrokerService brokerService = this.pulsar.getBrokerService();
        Assert.assertFalse(brokerService.isSystemTopic(TopicName.get("test")));
        Assert.assertFalse(brokerService.isSystemTopic(TopicName.get("public/default/test")));
        Assert.assertFalse(brokerService.isSystemTopic(TopicName.get("healthcheck")));
        Assert.assertFalse(brokerService.isSystemTopic(TopicName.get("public/default/healthcheck")));
        Assert.assertFalse(brokerService.isSystemTopic(TopicName.get("persistent://public/default/test")));
        Assert.assertFalse(brokerService.isSystemTopic(TopicName.get("non-persistent://public/default/test")));
        Assert.assertTrue(brokerService.isSystemTopic(TopicName.get("__change_events")));
        Assert.assertTrue(brokerService.isSystemTopic(TopicName.get("__change_events-partition-0")));
        Assert.assertTrue(brokerService.isSystemTopic(TopicName.get("__change_events-partition-1")));
        Assert.assertTrue(brokerService.isSystemTopic(TopicName.get("__transaction_buffer_snapshot")));
        Assert.assertTrue(brokerService.isSystemTopic(TopicName.get("__transaction_buffer_snapshot-partition-0")));
        Assert.assertTrue(brokerService.isSystemTopic(TopicName.get("__transaction_buffer_snapshot-partition-1")));
        Assert.assertTrue(brokerService.isSystemTopic(TopicName.get("topicxxx-partition-0-multiTopicsReader-f433329d68__transaction_pending_ack")));
        Assert.assertTrue(brokerService.isSystemTopic(TopicName.get("topicxxx-multiTopicsReader-f433329d68__transaction_pending_ack")));
        Assert.assertTrue(brokerService.isSystemTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN));
        Assert.assertTrue(brokerService.isSystemTopic(SystemTopicNames.TRANSACTION_COORDINATOR_LOG));
        NamespaceName heartbeatNamespace = NamespaceService.getHeartbeatNamespace(this.pulsar.getAdvertisedAddress(), this.pulsar.getConfig());
        NamespaceName heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(this.pulsar.getAdvertisedAddress(), this.pulsar.getConfig());
        Assert.assertTrue(brokerService.isSystemTopic("persistent://" + heartbeatNamespace.toString() + "/healthcheck"));
        Assert.assertTrue(brokerService.isSystemTopic(heartbeatNamespaceV2.toString() + "/healthcheck"));
    }
}
