package org.apache.pulsar.functions.worker;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import org.apache.bookkeeper.util.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"functions-worker"})
/* loaded from: input_file:org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.class */
public class PulsarFunctionTlsTest {
    private static final Logger log = LoggerFactory.getLogger(PulsarFunctionTlsTest.class);
    protected static final int BROKER_COUNT = 2;
    private static final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
    private static final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
    private static final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
    private static final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
    LocalBookkeeperEnsemble bkEnsemble;
    protected PulsarService leaderPulsar;
    protected PulsarAdmin leaderAdmin;
    protected PulsarAdmin[] pulsarAdmins = new PulsarAdmin[2];
    protected ServiceConfiguration[] configurations = new ServiceConfiguration[2];
    protected PulsarService[] pulsarServices = new PulsarService[2];
    protected String testCluster = "my-cluster";
    protected String testTenant = "my-tenant";
    protected String testNamespace = this.testTenant + "/my-ns";
    private PulsarFunctionTestTemporaryDirectory[] tempDirectories = new PulsarFunctionTestTemporaryDirectory[2];

    @BeforeMethod
    void setup() throws Exception {
        log.info("---- Initializing TopicOwnerTest -----");
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> {
            return 0;
        });
        this.bkEnsemble.start();
        for (int i = 0; i < 2; i++) {
            int nextFreePort = PortManager.nextFreePort();
            int nextFreePort2 = PortManager.nextFreePort();
            ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
            serviceConfiguration.setBrokerShutdownTimeoutMs(0L);
            serviceConfiguration.setWebServicePort(Optional.empty());
            serviceConfiguration.setWebServicePortTls(Optional.of(Integer.valueOf(nextFreePort2)));
            serviceConfiguration.setBrokerServicePort(Optional.empty());
            serviceConfiguration.setBrokerServicePortTls(Optional.of(Integer.valueOf(nextFreePort)));
            serviceConfiguration.setClusterName("my-cluster");
            serviceConfiguration.setAdvertisedAddress("localhost");
            serviceConfiguration.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
            serviceConfiguration.setDefaultNumberOfNamespaceBundles(1);
            serviceConfiguration.setLoadBalancerEnabled(false);
            serviceConfiguration.setSuperUserRoles(Sets.newHashSet(new String[]{"superUser", "admin"}));
            HashSet hashSet = new HashSet();
            hashSet.add(AuthenticationProviderTls.class.getName());
            serviceConfiguration.setAuthenticationEnabled(true);
            serviceConfiguration.setAuthorizationEnabled(true);
            serviceConfiguration.setAuthenticationProviders(hashSet);
            serviceConfiguration.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
            serviceConfiguration.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
            serviceConfiguration.setTlsAllowInsecureConnection(true);
            serviceConfiguration.setBrokerClientTlsEnabled(true);
            serviceConfiguration.setBrokerClientTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
            serviceConfiguration.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
            serviceConfiguration.setBrokerClientAuthenticationParameters("tlsCertFile:./src/test/resources/authentication/tls/client-cert.pem,tlsKeyFile:./src/test/resources/authentication/tls/client-key.pem");
            serviceConfiguration.setFunctionsWorkerEnabled(true);
            serviceConfiguration.setTlsEnabled(true);
            WorkerConfig initializeWorkerConfigFromBrokerConfig = PulsarService.initializeWorkerConfigFromBrokerConfig(serviceConfiguration, (String) null);
            this.tempDirectories[i] = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
            this.tempDirectories[i].useTemporaryDirectoriesForWorkerConfig(initializeWorkerConfigFromBrokerConfig);
            initializeWorkerConfigFromBrokerConfig.setPulsarFunctionsNamespace("public/functions");
            initializeWorkerConfigFromBrokerConfig.setSchedulerClassName(RoundRobinScheduler.class.getName());
            initializeWorkerConfigFromBrokerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
            initializeWorkerConfigFromBrokerConfig.setFunctionRuntimeFactoryConfigs((Map) ObjectMapperFactory.getThreadLocal().convertValue(new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
            initializeWorkerConfigFromBrokerConfig.setFailureCheckFreqMs(100L);
            initializeWorkerConfigFromBrokerConfig.setNumFunctionPackageReplicas(1);
            initializeWorkerConfigFromBrokerConfig.setClusterCoordinationTopicName("coordinate");
            initializeWorkerConfigFromBrokerConfig.setFunctionAssignmentTopicName("assignment");
            initializeWorkerConfigFromBrokerConfig.setFunctionMetadataTopicName("metadata");
            initializeWorkerConfigFromBrokerConfig.setInstanceLivenessCheckFreqMs(100L);
            initializeWorkerConfigFromBrokerConfig.setBrokerClientAuthenticationEnabled(true);
            initializeWorkerConfigFromBrokerConfig.setTlsEnabled(true);
            initializeWorkerConfigFromBrokerConfig.setUseTls(true);
            WorkerService load = WorkerServiceLoader.load(initializeWorkerConfigFromBrokerConfig);
            this.configurations[i] = serviceConfiguration;
            this.pulsarServices[i] = new PulsarService(serviceConfiguration, initializeWorkerConfigFromBrokerConfig, Optional.of(load), num -> {
            });
            this.pulsarServices[i].start();
            while (i == 0 && !this.pulsarServices[0].getLeaderElectionService().isLeader()) {
                Thread.sleep(10L);
            }
            HashMap hashMap = new HashMap();
            hashMap.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
            hashMap.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
            AuthenticationTls authenticationTls = new AuthenticationTls();
            authenticationTls.configure(hashMap);
            this.pulsarAdmins[i] = PulsarAdmin.builder().serviceHttpUrl(this.pulsarServices[i].getWebServiceAddressTls()).tlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH).allowTlsInsecureConnection(true).authentication(authenticationTls).build();
        }
        this.leaderPulsar = this.pulsarServices[0];
        this.leaderAdmin = this.pulsarAdmins[0];
        Thread.sleep(1000L);
        this.pulsarAdmins[0].tenants().createTenant(this.testTenant, TenantInfo.builder().allowedClusters(Collections.singleton(this.testCluster)).build());
        this.pulsarAdmins[0].namespaces().createNamespace(this.testNamespace, 16);
    }

    @AfterMethod(alwaysRun = true)
    void tearDown() throws Exception {
        for (int i = 0; i < 2; i++) {
            try {
                if (this.pulsarServices[i] != null) {
                    this.pulsarServices[i].close();
                }
                if (this.pulsarAdmins[i] != null) {
                    this.pulsarAdmins[i].close();
                }
            } catch (Throwable th) {
                for (int i2 = 0; i2 < 2; i2++) {
                    if (this.tempDirectories[i2] != null) {
                        this.tempDirectories[i2].delete();
                    }
                }
                throw th;
            }
        }
        this.bkEnsemble.stop();
        for (int i3 = 0; i3 < 2; i3++) {
            if (this.tempDirectories[i3] != null) {
                this.tempDirectories[i3].delete();
            }
        }
    }

    @Test
    public void testFunctionsCreation() throws Exception {
        String format = String.format("%s:%s", "file", PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath());
        for (int i = 0; i < 2; i++) {
            String str = "function-" + i;
            FunctionConfig createFunctionConfig = createFunctionConfig(format, this.testTenant, "my-ns", str, "my.*", "sink-topic-" + i, "sub-" + i);
            log.info(" -------- Start test function : {}", str);
            this.pulsarAdmins[i].functions().createFunctionWithUrl(createFunctionConfig, format);
            FunctionConfig function = this.pulsarAdmins[i].functions().getFunction(this.testTenant, "my-ns", str);
            Assert.assertEquals(function.getTenant(), this.testTenant);
            Assert.assertEquals(function.getNamespace(), "my-ns");
            Assert.assertEquals(function.getName(), str);
            this.pulsarAdmins[i].functions().deleteFunction(function.getTenant(), function.getNamespace(), function.getName());
        }
    }

    protected static FunctionConfig createFunctionConfig(String str, String str2, String str3, String str4, String str5, String str6, String str7) throws JsonProcessingException {
        File file = new File(str);
        try {
            Object loadJar = ClassLoaderUtils.loadJar(file);
            if (loadJar instanceof Closeable) {
                ((Closeable) loadJar).close();
            }
            String format = String.format("persistent://%s/%s/%s", str2, str3, str5);
            FunctionConfig functionConfig = new FunctionConfig();
            functionConfig.setTenant(str2);
            functionConfig.setNamespace(str3);
            functionConfig.setName(str4);
            functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
            functionConfig.setParallelism(1);
            functionConfig.setClassName(IdentityFunction.class.getName());
            functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
            functionConfig.setSubName(str7);
            functionConfig.setTopicsPattern(format);
            functionConfig.setAutoAck(true);
            functionConfig.setOutput(str6);
            log.info("Function Config: {}", new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(functionConfig));
            return functionConfig;
        } catch (IOException e) {
            throw new UncheckedIOException("Failed to load user jar " + file, e);
        }
    }
}
