/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.socks5.Socks5Server;
import org.apache.pulsar.socks5.config.Socks5Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.internal.thread.ThreadTimeoutException;

@Test
public class ClientWithSocks5ProxyTest
extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(ClientWithSocks5ProxyTest.class);
    private Socks5Server server;
    final String topicName = "persistent://public/default/socks5";

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        this.baseSetup();
        this.initData();
    }

    @Override
    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
        clientBuilder.socks5ProxyAddress(new InetSocketAddress("localhost", 11080)).socks5ProxyUsername("socks5").socks5ProxyPassword("pulsar");
    }

    private void startSocks5Server(boolean enableAuth) {
        Socks5Config config = new Socks5Config();
        config.setPort(11080);
        config.setEnableAuth(enableAuth);
        this.server = new Socks5Server(config);
        Thread thread = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    ClientWithSocks5ProxyTest.this.server.start();
                }
                catch (Exception e) {
                    log.error("start socks5 server error", (Throwable)e);
                }
            }
        });
        thread.setDaemon(true);
        thread.start();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        this.internalCleanup();
        this.server.shutdown();
        System.clearProperty("socks5Proxy.address");
    }

    private void initData() throws PulsarAdminException {
        this.admin.tenants().createTenant("public", new TenantInfo(){

            public Set<String> getAdminRoles() {
                return Collections.emptySet();
            }

            public Set<String> getAllowedClusters() {
                HashSet<String> clusters = new HashSet<String>();
                clusters.add("test");
                return clusters;
            }
        });
        this.admin.namespaces().createNamespace("public/default");
        this.admin.topics().createNonPartitionedTopic("persistent://public/default/socks5");
    }

    @Test
    public void testSendAndConsumer() throws PulsarClientException {
        this.startSocks5Server(true);
        String subscriptionName = "socks5-subscription";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://public/default/socks5"}).subscriptionName("socks5-subscription").subscriptionType(SubscriptionType.Shared).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://public/default/socks5").create();
        String msg = "abc";
        producer.send((Object)msg.getBytes());
        Message message = consumer.receive();
        Assert.assertEquals((String)new String(message.getData()), (String)msg);
        consumer.unsubscribe();
    }

    @Test
    public void testDisableAuth() throws PulsarClientException {
        this.startSocks5Server(false);
        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).socks5ProxyAddress(new InetSocketAddress("localhost", 11080));
        PulsarClient pulsarClient = this.replacePulsarClient(clientBuilder);
        Producer producer = pulsarClient.newProducer().topic("persistent://public/default/socks5").create();
        String msg = "abc";
        producer.send((Object)msg.getBytes());
    }

    @Test
    public void testSetFromSystemProperty() throws PulsarClientException {
        this.startSocks5Server(false);
        System.setProperty("socks5Proxy.address", "http://localhost:11080");
        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl());
        PulsarClient pulsarClient = this.replacePulsarClient(clientBuilder);
        Producer producer = pulsarClient.newProducer().topic("persistent://public/default/socks5").create();
        String msg = "abc";
        producer.send((Object)msg.getBytes());
    }

    @Test(expectedExceptions={PulsarClientException.class})
    public void testSetErrorProxyAddress() throws PulsarClientException {
        this.startSocks5Server(false);
        System.setProperty("socks5Proxy.address", "localhost:11080");
        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl());
        PulsarClient pulsarClient = this.replacePulsarClient(clientBuilder);
        Producer producer = pulsarClient.newProducer().topic("persistent://public/default/socks5").create();
        String msg = "abc";
        producer.send((Object)msg.getBytes());
    }

    @Test(timeOut=5000L, expectedExceptions={ThreadTimeoutException.class})
    public void testWithErrorPassword() throws PulsarClientException {
        this.startSocks5Server(true);
        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).socks5ProxyAddress(new InetSocketAddress("localhost", 11080)).socks5ProxyUsername("socks5").socks5ProxyPassword("error-password");
        PulsarClient pulsarClient = this.replacePulsarClient(clientBuilder);
        pulsarClient.newProducer().topic("persistent://public/default/socks5").create();
    }
}

