/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.bookkeeper.util.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.worker.PulsarFunctionTestTemporaryDirectory;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"functions-worker"})
public class PulsarFunctionTlsTest {
    private static final Logger log = LoggerFactory.getLogger(PulsarFunctionTlsTest.class);
    protected static final int BROKER_COUNT = 2;
    private static final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
    private static final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
    private static final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
    private static final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
    LocalBookkeeperEnsemble bkEnsemble;
    protected PulsarAdmin[] pulsarAdmins = new PulsarAdmin[2];
    protected ServiceConfiguration[] configurations = new ServiceConfiguration[2];
    protected PulsarService[] pulsarServices = new PulsarService[2];
    protected PulsarService leaderPulsar;
    protected PulsarAdmin leaderAdmin;
    protected String testCluster = "my-cluster";
    protected String testTenant = "my-tenant";
    protected String testNamespace = this.testTenant + "/my-ns";
    private PulsarFunctionTestTemporaryDirectory[] tempDirectories = new PulsarFunctionTestTemporaryDirectory[2];

    @BeforeMethod
    void setup() throws Exception {
        log.info("---- Initializing TopicOwnerTest -----");
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
        this.bkEnsemble.start();
        for (int i = 0; i < 2; ++i) {
            int brokerPort = PortManager.nextFreePort();
            int webPort = PortManager.nextFreePort();
            ServiceConfiguration config = new ServiceConfiguration();
            config.setBrokerShutdownTimeoutMs(0L);
            config.setWebServicePort(Optional.empty());
            config.setWebServicePortTls(Optional.of(webPort));
            config.setBrokerServicePort(Optional.empty());
            config.setBrokerServicePortTls(Optional.of(brokerPort));
            config.setClusterName("my-cluster");
            config.setAdvertisedAddress("localhost");
            config.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
            config.setDefaultNumberOfNamespaceBundles(1);
            config.setLoadBalancerEnabled(false);
            HashSet superUsers = Sets.newHashSet((Object[])new String[]{"superUser", "admin"});
            config.setSuperUserRoles((Set)superUsers);
            HashSet<String> providers = new HashSet<String>();
            providers.add(AuthenticationProviderTls.class.getName());
            config.setAuthenticationEnabled(true);
            config.setAuthorizationEnabled(true);
            config.setAuthenticationProviders(providers);
            config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
            config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
            config.setTlsAllowInsecureConnection(true);
            config.setBrokerClientTlsEnabled(true);
            config.setBrokerClientTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
            config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
            config.setBrokerClientAuthenticationParameters("tlsCertFile:./src/test/resources/authentication/tls/client-cert.pem,tlsKeyFile:./src/test/resources/authentication/tls/client-key.pem");
            config.setFunctionsWorkerEnabled(true);
            config.setTlsEnabled(true);
            WorkerConfig workerConfig = PulsarService.initializeWorkerConfigFromBrokerConfig((ServiceConfiguration)config, null);
            this.tempDirectories[i] = PulsarFunctionTestTemporaryDirectory.create(this.getClass().getSimpleName());
            this.tempDirectories[i].useTemporaryDirectoriesForWorkerConfig(workerConfig);
            workerConfig.setPulsarFunctionsNamespace("public/functions");
            workerConfig.setSchedulerClassName(RoundRobinScheduler.class.getName());
            workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
            workerConfig.setFunctionRuntimeFactoryConfigs((Map)ObjectMapperFactory.getThreadLocal().convertValue((Object)new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
            workerConfig.setFailureCheckFreqMs(100L);
            workerConfig.setNumFunctionPackageReplicas(1);
            workerConfig.setClusterCoordinationTopicName("coordinate");
            workerConfig.setFunctionAssignmentTopicName("assignment");
            workerConfig.setFunctionMetadataTopicName("metadata");
            workerConfig.setInstanceLivenessCheckFreqMs(100L);
            workerConfig.setBrokerClientAuthenticationEnabled(Boolean.valueOf(true));
            workerConfig.setTlsEnabled(true);
            workerConfig.setUseTls(true);
            WorkerService fnWorkerService = WorkerServiceLoader.load((WorkerConfig)workerConfig);
            this.configurations[i] = config;
            this.pulsarServices[i] = new PulsarService(config, workerConfig, Optional.of(fnWorkerService), code -> {});
            this.pulsarServices[i].start();
            while (i == 0 && !this.pulsarServices[0].getLeaderElectionService().isLeader()) {
                Thread.sleep(10L);
            }
            HashMap<String, String> authParams = new HashMap<String, String>();
            authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
            authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
            AuthenticationTls authTls = new AuthenticationTls();
            authTls.configure(authParams);
            this.pulsarAdmins[i] = PulsarAdmin.builder().serviceHttpUrl(this.pulsarServices[i].getWebServiceAddressTls()).tlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH).allowTlsInsecureConnection(true).authentication((Authentication)authTls).build();
        }
        this.leaderPulsar = this.pulsarServices[0];
        this.leaderAdmin = this.pulsarAdmins[0];
        Thread.sleep(1000L);
        TenantInfo tenantInfo = TenantInfo.builder().allowedClusters(Collections.singleton(this.testCluster)).build();
        this.pulsarAdmins[0].tenants().createTenant(this.testTenant, tenantInfo);
        this.pulsarAdmins[0].namespaces().createNamespace(this.testNamespace, 16);
    }

    @AfterMethod(alwaysRun=true)
    void tearDown() throws Exception {
        int i;
        try {
            for (i = 0; i < 2; ++i) {
                if (this.pulsarServices[i] != null) {
                    this.pulsarServices[i].close();
                }
                if (this.pulsarAdmins[i] == null) continue;
                this.pulsarAdmins[i].close();
            }
            this.bkEnsemble.stop();
        }
        finally {
            for (i = 0; i < 2; ++i) {
                if (this.tempDirectories[i] == null) continue;
                this.tempDirectories[i].delete();
            }
        }
    }

    @Test
    public void testFunctionsCreation() throws Exception {
        String jarFilePathUrl = String.format("%s:%s", "file", PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath());
        for (int i = 0; i < 2; ++i) {
            String functionName = "function-" + i;
            FunctionConfig functionConfig = PulsarFunctionTlsTest.createFunctionConfig(jarFilePathUrl, this.testTenant, "my-ns", functionName, "my.*", "sink-topic-" + i, "sub-" + i);
            log.info(" -------- Start test function : {}", (Object)functionName);
            this.pulsarAdmins[i].functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
            FunctionConfig config = this.pulsarAdmins[i].functions().getFunction(this.testTenant, "my-ns", functionName);
            Assert.assertEquals((String)config.getTenant(), (String)this.testTenant);
            Assert.assertEquals((String)config.getNamespace(), (String)"my-ns");
            Assert.assertEquals((String)config.getName(), (String)functionName);
            this.pulsarAdmins[i].functions().deleteFunction(config.getTenant(), config.getNamespace(), config.getName());
        }
    }

    protected static FunctionConfig createFunctionConfig(String jarFile, String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) throws JsonProcessingException {
        File file = new File(jarFile);
        try {
            ClassLoader classLoader = ClassLoaderUtils.loadJar((File)file);
            if (classLoader instanceof Closeable) {
                ((Closeable)((Object)classLoader)).close();
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException("Failed to load user jar " + file, e);
        }
        String sourceTopicPattern = String.format("persistent://%s/%s/%s", tenant, namespace, sourceTopic);
        FunctionConfig functionConfig = new FunctionConfig();
        functionConfig.setTenant(tenant);
        functionConfig.setNamespace(namespace);
        functionConfig.setName(functionName);
        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
        functionConfig.setParallelism(Integer.valueOf(1));
        functionConfig.setClassName(IdentityFunction.class.getName());
        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
        functionConfig.setSubName(subscriptionName);
        functionConfig.setTopicsPattern(sourceTopicPattern);
        functionConfig.setAutoAck(Boolean.valueOf(true));
        functionConfig.setOutput(sinkTopic);
        log.info("Function Config: {}", (Object)new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString((Object)functionConfig));
        return functionConfig;
    }
}

