package org.apache.pulsar.client.impl;

import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.socks5.Socks5Server;
import org.apache.pulsar.socks5.auth.DefaultPasswordAuthImpl;
import org.apache.pulsar.socks5.config.Socks5Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.internal.thread.ThreadTimeoutException;

@Test
/* loaded from: input_file:org/apache/pulsar/client/impl/ClientWithSocks5ProxyTest.class */
public class ClientWithSocks5ProxyTest extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(ClientWithSocks5ProxyTest.class);
    private Socks5Server server;
    final String topicName = "persistent://public/default/socks5";

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        baseSetup();
        initData();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    public void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
        clientBuilder.socks5ProxyAddress(new InetSocketAddress("localhost", 11080)).socks5ProxyUsername(DefaultPasswordAuthImpl.DEFAULT_USERNAME).socks5ProxyPassword(DefaultPasswordAuthImpl.DEFAULT_PASSWORD);
    }

    private void startSocks5Server(boolean z) {
        Socks5Config socks5Config = new Socks5Config();
        socks5Config.setPort(11080);
        socks5Config.setEnableAuth(z);
        this.server = new Socks5Server(socks5Config);
        Thread thread = new Thread(() -> {
            try {
                this.server.start();
            } catch (Exception e) {
                log.error("start socks5 server error", e);
            }
        });
        thread.setDaemon(true);
        thread.start();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        internalCleanup();
        this.server.shutdown();
        System.clearProperty("socks5Proxy.address");
    }

    private void initData() throws PulsarAdminException {
        this.admin.tenants().createTenant("public", new TenantInfo() { // from class: org.apache.pulsar.client.impl.ClientWithSocks5ProxyTest.1
            public Set<String> getAdminRoles() {
                return Collections.emptySet();
            }

            public Set<String> getAllowedClusters() {
                HashSet hashSet = new HashSet();
                hashSet.add("test");
                return hashSet;
            }
        });
        this.admin.namespaces().createNamespace("public/default");
        this.admin.topics().createNonPartitionedTopic("persistent://public/default/socks5");
    }

    @Test
    public void testSendAndConsumer() throws PulsarClientException {
        startSocks5Server(true);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://public/default/socks5"}).subscriptionName("socks5-subscription").subscriptionType(SubscriptionType.Shared).subscribe();
        this.pulsarClient.newProducer().topic("persistent://public/default/socks5").create().send("abc".getBytes());
        Assert.assertEquals(new String(subscribe.receive().getData()), "abc");
        subscribe.unsubscribe();
    }

    @Test
    public void testDisableAuth() throws PulsarClientException {
        startSocks5Server(false);
        replacePulsarClient(PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).socks5ProxyAddress(new InetSocketAddress("localhost", 11080))).newProducer().topic("persistent://public/default/socks5").create().send("abc".getBytes());
    }

    @Test
    public void testSetFromSystemProperty() throws PulsarClientException {
        startSocks5Server(false);
        System.setProperty("socks5Proxy.address", "http://localhost:11080");
        replacePulsarClient(PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl())).newProducer().topic("persistent://public/default/socks5").create().send("abc".getBytes());
    }

    @Test(expectedExceptions = {PulsarClientException.class})
    public void testSetErrorProxyAddress() throws PulsarClientException {
        startSocks5Server(false);
        System.setProperty("socks5Proxy.address", "localhost:11080");
        replacePulsarClient(PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl())).newProducer().topic("persistent://public/default/socks5").create().send("abc".getBytes());
    }

    @Test(timeOut = 5000, expectedExceptions = {ThreadTimeoutException.class})
    public void testWithErrorPassword() throws PulsarClientException {
        startSocks5Server(true);
        replacePulsarClient(PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).socks5ProxyAddress(new InetSocketAddress("localhost", 11080)).socks5ProxyUsername(DefaultPasswordAuthImpl.DEFAULT_USERNAME).socks5ProxyPassword("error-password")).newProducer().topic("persistent://public/default/socks5").create();
    }
}
