package org.apache.pulsar.client.impl;

import com.google.common.collect.Sets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/client/impl/ClientCnxTest.class */
public class ClientCnxTest extends MockedPulsarServiceBaseTest {
    public static final String CLUSTER_NAME = "test";
    public static final String TENANT = "tnx";
    public static final String NAMESPACE = "tnx/ns1";
    public static String persistentTopic = "persistent://tnx/ns1/test";
    ExecutorService executorService;

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("tnx", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1"}), Sets.newHashSet(new String[]{"test"})));
        this.admin.namespaces().createNamespace(NAMESPACE);
        this.executorService = Executors.newFixedThreadPool(20);
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
        this.executorService.shutdownNow();
    }

    @Test
    public void testRemoveAndHandlePendingRequestInCnx() throws Exception {
        int i = 5000;
        CountDownLatch countDownLatch = new CountDownLatch(5000);
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{persistentTopic}).subscriptionName("sub").subscribe();
        new Thread(() -> {
            for (int i2 = 0; i2 < i; i2++) {
                this.executorService.submit(() -> {
                    subscribe.getLastMessageIdAsync().whenComplete((messageId, th) -> {
                        countDownLatch.countDown();
                    });
                });
            }
        }).start();
        for (int i2 = 0; i2 < 5000; i2++) {
            ClientCnx clientCnx = subscribe.getClientCnx();
            if (clientCnx != null && clientCnx.ctx() != null) {
                clientCnx.ctx().close();
            }
        }
        Awaitility.await().until(() -> {
            countDownLatch.await();
            return true;
        });
    }

    @Test
    public void testClientVersion() throws Exception {
        String format = String.format("Pulsar-Java-v%s", PulsarVersion.getVersion());
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://tnx/ns1/testClientVersion").create();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).subscriptionName("my-sub").topic(new String[]{"persistent://tnx/ns1/testClientVersion"}).subscribe();
        Assert.assertEquals(((PublisherStats) this.admin.topics().getStats("persistent://tnx/ns1/testClientVersion").getPublishers().get(0)).getClientVersion(), format);
        Assert.assertEquals(((ConsumerStats) ((SubscriptionStats) this.admin.topics().getStats("persistent://tnx/ns1/testClientVersion").getSubscriptions().get("my-sub")).getConsumers().get(0)).getClientVersion(), format);
        create.close();
        subscribe.close();
    }
}
