package org.apache.pulsar.client.impl;

import io.netty.channel.EventLoopGroup;
import io.netty.resolver.AbstractAddressResolver;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/client/impl/ConnectionPoolTest.class */
public class ConnectionPoolTest extends MockedPulsarServiceBaseTest {
    String serviceUrl;
    int brokerPort;

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        this.brokerPort = ((Integer) this.pulsar.getBrokerListenPort().get()).intValue();
        this.serviceUrl = "pulsar://non-existing-dns-name:" + this.brokerPort;
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testSingleIpAddress() throws Exception {
        ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
        EventLoopGroup newEventLoopGroup = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("test"));
        ConnectionPool connectionPool = (ConnectionPool) BrokerTestUtil.spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, clientConfigurationData, newEventLoopGroup);
        clientConfigurationData.setServiceUrl(this.serviceUrl);
        PulsarClientImpl pulsarClientImpl = new PulsarClientImpl(clientConfigurationData, newEventLoopGroup, connectionPool);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new InetSocketAddress("127.0.0.1", this.brokerPort));
        Mockito.when(connectionPool.resolveName(InetSocketAddress.createUnresolved("non-existing-dns-name", this.brokerPort))).thenReturn(CompletableFuture.completedFuture(arrayList));
        pulsarClientImpl.newProducer().topic("persistent://sample/standalone/ns/my-topic").create();
        pulsarClientImpl.close();
        newEventLoopGroup.shutdownGracefully();
    }

    @Test
    public void testSelectConnectionForSameProducer() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://sample/standalone/ns/tp_");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        CommandCloseProducer commandCloseProducer = new CommandCloseProducer();
        PulsarClient build = PulsarClient.builder().connectionsPerBroker(10).serviceUrl(this.lookupUrl.toString()).build();
        ProducerImpl create = build.newProducer().topic(newUniqueName).create();
        commandCloseProducer.setProducerId(create.producerId);
        for (int i = 0; i < 20; i++) {
            ClientCnx clientCnx = create.getClientCnx();
            if (clientCnx != null) {
                clientCnx.handleCloseProducer(commandCloseProducer);
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertEquals(create.getState().toString(), HandlerState.State.Ready.toString(), "The producer uses a different connection when reconnecting");
                });
            }
        }
        create.close();
        build.close();
        this.admin.topics().delete(newUniqueName);
    }

    @Test
    public void testDoubleIpAddress() throws Exception {
        ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
        EventLoopGroup newEventLoopGroup = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("test"));
        ConnectionPool connectionPool = (ConnectionPool) BrokerTestUtil.spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, clientConfigurationData, newEventLoopGroup);
        clientConfigurationData.setServiceUrl(this.serviceUrl);
        PulsarClientImpl pulsarClientImpl = new PulsarClientImpl(clientConfigurationData, newEventLoopGroup, connectionPool);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new InetSocketAddress("127.0.0.99", this.brokerPort));
        arrayList.add(new InetSocketAddress("127.0.0.1", this.brokerPort));
        Mockito.when(connectionPool.resolveName(InetSocketAddress.createUnresolved("non-existing-dns-name", this.brokerPort))).thenReturn(CompletableFuture.completedFuture(arrayList));
        pulsarClientImpl.newProducer().topic("persistent://sample/standalone/ns/my-topic").create();
        pulsarClientImpl.close();
        newEventLoopGroup.shutdownGracefully();
    }

    @Test
    public void testNoConnectionPool() throws Exception {
        ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
        clientConfigurationData.setConnectionsPerBroker(0);
        EventLoopGroup newEventLoopGroup = EventLoopUtil.newEventLoopGroup(8, false, new DefaultThreadFactory("test"));
        ConnectionPool connectionPool = (ConnectionPool) BrokerTestUtil.spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, clientConfigurationData, newEventLoopGroup);
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved("127.0.0.1", this.brokerPort);
        IntStream.range(1, 5).forEach(i -> {
            connectionPool.getConnection(createUnresolved).thenAccept(clientCnx -> {
                Assert.assertTrue(clientCnx.channel().isActive());
                connectionPool.releaseConnection(clientCnx);
                Assert.assertTrue(clientCnx.channel().isActive());
            });
        });
        Assert.assertEquals(connectionPool.getPoolSize(), 0);
        connectionPool.closeAllConnections();
        connectionPool.close();
        newEventLoopGroup.shutdownGracefully();
    }

    @Test
    public void testEnableConnectionPool() throws Exception {
        ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
        clientConfigurationData.setConnectionsPerBroker(5);
        EventLoopGroup newEventLoopGroup = EventLoopUtil.newEventLoopGroup(8, false, new DefaultThreadFactory("test"));
        ConnectionPool connectionPool = (ConnectionPool) BrokerTestUtil.spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, clientConfigurationData, newEventLoopGroup);
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved("127.0.0.1", this.brokerPort);
        IntStream.range(1, 10).forEach(i -> {
            connectionPool.getConnection(createUnresolved).thenAccept(clientCnx -> {
                Assert.assertTrue(clientCnx.channel().isActive());
                connectionPool.releaseConnection(clientCnx);
                Assert.assertTrue(clientCnx.channel().isActive());
            });
        });
        Assert.assertTrue(connectionPool.getPoolSize() <= 5 && connectionPool.getPoolSize() > 0);
        connectionPool.closeAllConnections();
        connectionPool.close();
        newEventLoopGroup.shutdownGracefully();
    }

    @Test
    public void testSetProxyToTargetBrokerAddress() throws Exception {
        ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
        clientConfigurationData.setConnectionsPerBroker(1);
        EventLoopGroup newEventLoopGroup = EventLoopUtil.newEventLoopGroup(8, false, new DefaultThreadFactory("test"));
        ConnectionPool connectionPool = (ConnectionPool) BrokerTestUtil.spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, clientConfigurationData, newEventLoopGroup, () -> {
            return new ClientCnx(InstrumentProvider.NOOP, clientConfigurationData, newEventLoopGroup);
        }, Optional.of(new AbstractAddressResolver(newEventLoopGroup.next()) { // from class: org.apache.pulsar.client.impl.ConnectionPoolTest.1
            protected boolean doIsResolved(SocketAddress socketAddress) {
                return !((InetSocketAddress) socketAddress).isUnresolved();
            }

            protected void doResolve(SocketAddress socketAddress, Promise promise) throws Exception {
                promise.setFailure(new IllegalStateException());
                throw new IllegalStateException();
            }

            protected void doResolveAll(SocketAddress socketAddress, Promise promise) throws Exception {
                String hostName = ((InetSocketAddress) socketAddress).getHostName();
                boolean equals = hostName.equals("proxy");
                boolean startsWith = hostName.startsWith("broker");
                if (!equals && !startsWith) {
                    promise.setFailure(new IllegalStateException());
                    throw new IllegalStateException();
                }
                ArrayList arrayList = new ArrayList();
                if (equals) {
                    arrayList.add(new InetSocketAddress("127.0.0.101", ConnectionPoolTest.this.brokerPort));
                    arrayList.add(new InetSocketAddress("127.0.0.102", ConnectionPoolTest.this.brokerPort));
                } else if (hostName.equals("broker1")) {
                    arrayList.add(new InetSocketAddress("127.0.0.103", ConnectionPoolTest.this.brokerPort));
                    arrayList.add(new InetSocketAddress("127.0.0.104", ConnectionPoolTest.this.brokerPort));
                } else if (hostName.equals("broker2")) {
                    arrayList.add(new InetSocketAddress("127.0.0.105", ConnectionPoolTest.this.brokerPort));
                    arrayList.add(new InetSocketAddress("127.0.0.106", ConnectionPoolTest.this.brokerPort));
                }
                promise.setSuccess(arrayList);
            }
        }));
        ClientCnx clientCnx = (ClientCnx) connectionPool.getConnection(InetSocketAddress.createUnresolved("proxy", 9999), InetSocketAddress.createUnresolved("proxy", 9999), connectionPool.genRandomKeyToSelectCon()).get();
        Assert.assertEquals(clientCnx.remoteHostName, "proxy");
        Assert.assertNull(clientCnx.proxyToTargetBrokerAddress);
        ClientCnx clientCnx2 = (ClientCnx) connectionPool.getConnection(InetSocketAddress.createUnresolved("broker1", 9999), InetSocketAddress.createUnresolved("proxy", 9999), connectionPool.genRandomKeyToSelectCon()).get();
        Assert.assertEquals(clientCnx2.remoteHostName, "proxy");
        Assert.assertEquals(clientCnx2.proxyToTargetBrokerAddress, "broker1:9999");
        ClientCnx clientCnx3 = (ClientCnx) connectionPool.getConnection(InetSocketAddress.createUnresolved("broker2", 9999), InetSocketAddress.createUnresolved("broker2", 9999), connectionPool.genRandomKeyToSelectCon()).get();
        Assert.assertEquals(clientCnx3.remoteHostName, "broker2");
        Assert.assertNull(clientCnx3.proxyToTargetBrokerAddress);
        connectionPool.closeAllConnections();
        connectionPool.close();
        newEventLoopGroup.shutdownGracefully();
    }
}
