package org.apache.pulsar.io;

import com.google.common.collect.Sets;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.PulsarFunctionTestTemporaryDirectory;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.rest.WorkerServer;
import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker-io"})
/* loaded from: input_file:org/apache/pulsar/io/PulsarFunctionTlsTest.class */
public class PulsarFunctionTlsTest {
    LocalBookkeeperEnsemble bkEnsemble;
    ServiceConfiguration config;
    WorkerConfig workerConfig;
    PulsarWorkerService functionsWorkerService;
    String workerId;
    WorkerServer workerServer;
    PulsarAdmin functionAdmin;
    private static final Logger log = LoggerFactory.getLogger(PulsarFunctionTlsTest.class);
    private PulsarFunctionTestTemporaryDirectory tempDirectory;
    final String tenant = "external-repl-prop";
    String pulsarFunctionsNamespace = "external-repl-prop/use/pulsar-function-admin";
    private final List<String> namespaceList = new LinkedList();
    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";

    @BeforeMethod
    void setup(Method method) throws Exception {
        log.info("--- Setting up method {} ---", method.getName());
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> {
            return 0;
        });
        this.bkEnsemble.start();
        this.config = (ServiceConfiguration) Mockito.spy(new ServiceConfiguration());
        this.config.setBrokerShutdownTimeoutMs(0L);
        this.config.setClusterName("use");
        this.config.setSuperUserRoles(Sets.newHashSet(new String[]{"superUser", "admin"}));
        this.config.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
        HashSet hashSet = new HashSet();
        hashSet.add(AuthenticationProviderTls.class.getName());
        this.config.setAuthenticationEnabled(true);
        this.config.setAuthorizationEnabled(true);
        this.config.setAuthenticationProviders(hashSet);
        this.config.setTlsCertificateFilePath("./src/test/resources/authentication/tls/broker-cert.pem");
        this.config.setTlsKeyFilePath("./src/test/resources/authentication/tls/broker-key.pem");
        this.config.setTlsAllowInsecureConnection(true);
        this.config.setAdvertisedAddress("localhost");
        PulsarAdmin pulsarAdmin = (PulsarAdmin) Mockito.mock(PulsarAdmin.class);
        Tenants tenants = (Tenants) Mockito.mock(Tenants.class);
        Mockito.when(pulsarAdmin.tenants()).thenReturn(tenants);
        Mockito.when(tenants.getTenantInfo((String) ArgumentMatchers.any())).thenReturn(new TenantInfo(Sets.newHashSet(new String[]{"superUser", "admin"}), (Set) null));
        Namespaces namespaces = (Namespaces) Mockito.mock(Namespaces.class);
        Mockito.when(pulsarAdmin.namespaces()).thenReturn(namespaces);
        Mockito.when(namespaces.getNamespaces((String) ArgumentMatchers.any())).thenReturn(this.namespaceList);
        this.functionsWorkerService = (PulsarWorkerService) Mockito.spy(createPulsarFunctionWorker(this.config, pulsarAdmin));
        ((PulsarWorkerService) Mockito.doNothing().when(this.functionsWorkerService)).initAsStandalone((WorkerConfig) ArgumentMatchers.any(WorkerConfig.class));
        Mockito.when(this.functionsWorkerService.getBrokerAdmin()).thenReturn(pulsarAdmin);
        this.functionsWorkerService.init(this.workerConfig, (URI) null, false);
        AuthenticationService authenticationService = new AuthenticationService(this.config);
        AuthorizationService authorizationService = new AuthorizationService(this.config, (ConfigurationCacheService) Mockito.mock(ConfigurationCacheService.class));
        Mockito.when(this.functionsWorkerService.getAuthenticationService()).thenReturn(authenticationService);
        Mockito.when(this.functionsWorkerService.getAuthorizationService()).thenReturn(authorizationService);
        Mockito.when(Boolean.valueOf(this.functionsWorkerService.isInitialized())).thenReturn(true);
        FunctionMetaDataManager functionMetaDataManager = (FunctionMetaDataManager) Mockito.mock(FunctionMetaDataManager.class);
        Mockito.when(Boolean.valueOf(functionMetaDataManager.containsFunction((String) ArgumentMatchers.any(), (String) ArgumentMatchers.any(), (String) ArgumentMatchers.any()))).thenReturn(true);
        Mockito.when(this.functionsWorkerService.getFunctionMetaDataManager()).thenReturn(functionMetaDataManager);
        this.workerServer = new WorkerServer(this.functionsWorkerService, authenticationService);
        this.workerServer.start();
        Thread.sleep(2000L);
        String format = String.format("https://%s:%s", this.functionsWorkerService.getWorkerConfig().getWorkerHostname(), this.workerServer.getListenPortHTTPS().get());
        HashMap hashMap = new HashMap();
        hashMap.put("tlsCertFile", "./src/test/resources/authentication/tls/client-cert.pem");
        hashMap.put("tlsKeyFile", "./src/test/resources/authentication/tls/client-key.pem");
        AuthenticationTls authenticationTls = new AuthenticationTls();
        authenticationTls.configure(hashMap);
        this.functionAdmin = PulsarAdmin.builder().serviceHttpUrl(format).tlsTrustCertsFilePath("./src/test/resources/authentication/tls/client-cert.pem").allowTlsInsecureConnection(true).authentication(authenticationTls).build();
        Thread.sleep(100L);
    }

    @AfterMethod(alwaysRun = true)
    void shutdown() throws Exception {
        log.info("--- Shutting down ---");
        try {
            this.functionAdmin.close();
            this.bkEnsemble.stop();
            this.workerServer.stop();
            this.functionsWorkerService.stop();
        } finally {
            if (this.tempDirectory != null) {
                this.tempDirectory.delete();
            }
        }
    }

    private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration serviceConfiguration, final PulsarAdmin pulsarAdmin) {
        this.workerConfig = new WorkerConfig();
        this.tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
        this.tempDirectory.useTemporaryDirectoriesForWorkerConfig(this.workerConfig);
        this.workerConfig.setPulsarFunctionsNamespace(this.pulsarFunctionsNamespace);
        this.workerConfig.setSchedulerClassName(RoundRobinScheduler.class.getName());
        this.workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
        this.workerConfig.setFunctionRuntimeFactoryConfigs((Map) ObjectMapperFactory.getThreadLocal().convertValue(new ThreadRuntimeFactoryConfig().setThreadGroupName("use"), Map.class));
        this.workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + serviceConfiguration.getBrokerServicePort().get());
        this.workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + serviceConfiguration.getWebServicePort().get());
        this.workerConfig.setFailureCheckFreqMs(100L);
        this.workerConfig.setNumFunctionPackageReplicas(1);
        this.workerConfig.setClusterCoordinationTopicName("coordinate");
        this.workerConfig.setFunctionAssignmentTopicName("assignment");
        this.workerConfig.setFunctionMetadataTopicName("metadata");
        this.workerConfig.setInstanceLivenessCheckFreqMs(100L);
        this.workerConfig.setWorkerPort(0);
        this.workerConfig.setPulsarFunctionsCluster(serviceConfiguration.getClusterName());
        String defaultOrConfiguredAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(serviceConfiguration.getAdvertisedAddress());
        this.workerId = "c-" + serviceConfiguration.getClusterName() + "-fw-" + defaultOrConfiguredAddress + "-" + this.workerConfig.getWorkerPort();
        this.workerConfig.setWorkerHostname(defaultOrConfiguredAddress);
        this.workerConfig.setWorkerId(this.workerId);
        this.workerConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
        this.workerConfig.setBrokerClientAuthenticationParameters(String.format("tlsCertFile:%s,tlsKeyFile:%s", "./src/test/resources/authentication/tls/client-cert.pem", "./src/test/resources/authentication/tls/client-key.pem"));
        this.workerConfig.setUseTls(true);
        this.workerConfig.setTlsAllowInsecureConnection(true);
        this.workerConfig.setTlsTrustCertsFilePath("./src/test/resources/authentication/tls/client-cert.pem");
        this.workerConfig.setWorkerPortTls(0);
        this.workerConfig.setTlsEnabled(true);
        this.workerConfig.setTlsCertificateFilePath("./src/test/resources/authentication/tls/broker-cert.pem");
        this.workerConfig.setTlsKeyFilePath("./src/test/resources/authentication/tls/broker-key.pem");
        this.workerConfig.setAuthenticationEnabled(true);
        this.workerConfig.setAuthorizationEnabled(true);
        return new PulsarWorkerService(new PulsarWorkerService.PulsarClientCreator() { // from class: org.apache.pulsar.io.PulsarFunctionTlsTest.1
            public PulsarAdmin newPulsarAdmin(String str, WorkerConfig workerConfig) {
                return pulsarAdmin;
            }

            public PulsarClient newPulsarClient(String str, WorkerConfig workerConfig) {
                return null;
            }
        });
    }

    @Test
    public void testAuthorization() {
        this.namespaceList.add("external-repl-prop/io");
        String format = String.format("%s:%s", "file", PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath());
        try {
            this.functionAdmin.functions().createFunctionWithUrl(createFunctionConfig(format, "external-repl-prop", "io", "PulsarSink-test", "my.*", "persistent://external-repl-prop/io/output", "test-sub"), format);
            Assert.fail("Authentication should pass but call should fail with function already exist");
        } catch (PulsarAdminException e) {
            Assert.assertTrue(e.getMessage().contains("already exists"));
        }
    }

    protected static FunctionConfig createFunctionConfig(String str, String str2, String str3, String str4, String str5, String str6, String str7) {
        File file = new File(str);
        try {
            Object loadJar = ClassLoaderUtils.loadJar(file);
            if (loadJar instanceof Closeable) {
                ((Closeable) loadJar).close();
            }
            String format = String.format("persistent://%s/%s/%s", str2, str3, str5);
            FunctionConfig functionConfig = new FunctionConfig();
            functionConfig.setTenant(str2);
            functionConfig.setNamespace(str3);
            functionConfig.setName(str4);
            functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
            functionConfig.setParallelism(1);
            functionConfig.setClassName(IdentityFunction.class.getName());
            functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
            functionConfig.setSubName(str7);
            functionConfig.setTopicsPattern(format);
            functionConfig.setAutoAck(true);
            functionConfig.setOutput(str6);
            return functionConfig;
        } catch (IOException e) {
            throw new UncheckedIOException("Failed to load user jar " + file, e);
        }
    }
}
