/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io;

import com.google.common.collect.Sets;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.PulsarFunctionTestTemporaryDirectory;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.WorkerServer;
import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker-io"})
public class PulsarFunctionTlsTest {
    LocalBookkeeperEnsemble bkEnsemble;
    ServiceConfiguration config;
    WorkerConfig workerConfig;
    PulsarWorkerService functionsWorkerService;
    final String tenant = "external-repl-prop";
    String pulsarFunctionsNamespace = "external-repl-prop/use/pulsar-function-admin";
    String workerId;
    WorkerServer workerServer;
    PulsarAdmin functionAdmin;
    private final List<String> namespaceList = new LinkedList<String>();
    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
    private static final Logger log = LoggerFactory.getLogger(PulsarFunctionTlsTest.class);
    private PulsarFunctionTestTemporaryDirectory tempDirectory;

    @BeforeMethod
    void setup(Method method) throws Exception {
        log.info("--- Setting up method {} ---", (Object)method.getName());
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
        this.bkEnsemble.start();
        this.config = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        this.config.setBrokerShutdownTimeoutMs(0L);
        this.config.setClusterName("use");
        HashSet superUsers = Sets.newHashSet((Object[])new String[]{"superUser", "admin"});
        this.config.setSuperUserRoles((Set)superUsers);
        this.config.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
        HashSet<String> providers = new HashSet<String>();
        providers.add(AuthenticationProviderTls.class.getName());
        this.config.setAuthenticationEnabled(true);
        this.config.setAuthorizationEnabled(true);
        this.config.setAuthenticationProviders(providers);
        this.config.setTlsCertificateFilePath("./src/test/resources/authentication/tls/broker-cert.pem");
        this.config.setTlsKeyFilePath("./src/test/resources/authentication/tls/broker-key.pem");
        this.config.setTlsAllowInsecureConnection(true);
        this.config.setAdvertisedAddress("localhost");
        PulsarAdmin admin = (PulsarAdmin)Mockito.mock(PulsarAdmin.class);
        Tenants tenants = (Tenants)Mockito.mock(Tenants.class);
        Mockito.when((Object)admin.tenants()).thenReturn((Object)tenants);
        HashSet admins = Sets.newHashSet((Object[])new String[]{"superUser", "admin"});
        TenantInfoImpl tenantInfo = new TenantInfoImpl((Set)admins, null);
        Mockito.when((Object)tenants.getTenantInfo((String)ArgumentMatchers.any())).thenReturn((Object)tenantInfo);
        Namespaces namespaces = (Namespaces)Mockito.mock(Namespaces.class);
        Mockito.when((Object)admin.namespaces()).thenReturn((Object)namespaces);
        Mockito.when((Object)namespaces.getNamespaces((String)ArgumentMatchers.any())).thenReturn(this.namespaceList);
        this.functionsWorkerService = (PulsarWorkerService)Mockito.spy((Object)this.createPulsarFunctionWorker(this.config, admin));
        ((PulsarWorkerService)Mockito.doNothing().when((Object)this.functionsWorkerService)).initAsStandalone((WorkerConfig)ArgumentMatchers.any(WorkerConfig.class));
        Mockito.when((Object)this.functionsWorkerService.getBrokerAdmin()).thenReturn((Object)admin);
        this.functionsWorkerService.init(this.workerConfig, null, false);
        AuthenticationService authenticationService = new AuthenticationService(this.config);
        AuthorizationService authorizationService = new AuthorizationService(this.config, (ConfigurationCacheService)Mockito.mock(ConfigurationCacheService.class));
        Mockito.when((Object)this.functionsWorkerService.getAuthenticationService()).thenReturn((Object)authenticationService);
        Mockito.when((Object)this.functionsWorkerService.getAuthorizationService()).thenReturn((Object)authorizationService);
        Mockito.when((Object)this.functionsWorkerService.isInitialized()).thenReturn((Object)true);
        FunctionMetaDataManager dataManager = (FunctionMetaDataManager)Mockito.mock(FunctionMetaDataManager.class);
        Mockito.when((Object)dataManager.containsFunction((String)ArgumentMatchers.any(), (String)ArgumentMatchers.any(), (String)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)this.functionsWorkerService.getFunctionMetaDataManager()).thenReturn((Object)dataManager);
        this.workerServer = new WorkerServer((WorkerService)this.functionsWorkerService, authenticationService);
        this.workerServer.start();
        Thread.sleep(2000L);
        String functionTlsUrl = String.format("https://%s:%s", this.functionsWorkerService.getWorkerConfig().getWorkerHostname(), this.workerServer.getListenPortHTTPS().get());
        HashMap<String, String> authParams = new HashMap<String, String>();
        authParams.put("tlsCertFile", "./src/test/resources/authentication/tls/client-cert.pem");
        authParams.put("tlsKeyFile", "./src/test/resources/authentication/tls/client-key.pem");
        AuthenticationTls authTls = new AuthenticationTls();
        authTls.configure(authParams);
        this.functionAdmin = PulsarAdmin.builder().serviceHttpUrl(functionTlsUrl).tlsTrustCertsFilePath("./src/test/resources/authentication/tls/client-cert.pem").allowTlsInsecureConnection(true).authentication((Authentication)authTls).build();
        Thread.sleep(100L);
    }

    @AfterMethod(alwaysRun=true)
    void shutdown() throws Exception {
        log.info("--- Shutting down ---");
        try {
            this.functionAdmin.close();
            this.bkEnsemble.stop();
            this.workerServer.stop();
            this.functionsWorkerService.stop();
        }
        finally {
            if (this.tempDirectory != null) {
                this.tempDirectory.delete();
            }
        }
    }

    private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration config, final PulsarAdmin mockPulsarAdmin) {
        this.workerConfig = new WorkerConfig();
        this.tempDirectory = PulsarFunctionTestTemporaryDirectory.create(this.getClass().getSimpleName());
        this.tempDirectory.useTemporaryDirectoriesForWorkerConfig(this.workerConfig);
        this.workerConfig.setPulsarFunctionsNamespace(this.pulsarFunctionsNamespace);
        this.workerConfig.setSchedulerClassName(RoundRobinScheduler.class.getName());
        this.workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
        this.workerConfig.setFunctionRuntimeFactoryConfigs((Map)ObjectMapperFactory.getThreadLocal().convertValue((Object)new ThreadRuntimeFactoryConfig().setThreadGroupName("use"), Map.class));
        this.workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort().get());
        this.workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePort().get());
        this.workerConfig.setFailureCheckFreqMs(100L);
        this.workerConfig.setNumFunctionPackageReplicas(1);
        this.workerConfig.setClusterCoordinationTopicName("coordinate");
        this.workerConfig.setFunctionAssignmentTopicName("assignment");
        this.workerConfig.setFunctionMetadataTopicName("metadata");
        this.workerConfig.setInstanceLivenessCheckFreqMs(100L);
        this.workerConfig.setWorkerPort(Integer.valueOf(0));
        this.workerConfig.setPulsarFunctionsCluster(config.getClusterName());
        String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress((String)config.getAdvertisedAddress());
        this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + this.workerConfig.getWorkerPort();
        this.workerConfig.setWorkerHostname(hostname);
        this.workerConfig.setWorkerId(this.workerId);
        this.workerConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
        this.workerConfig.setBrokerClientAuthenticationParameters(String.format("tlsCertFile:%s,tlsKeyFile:%s", "./src/test/resources/authentication/tls/client-cert.pem", "./src/test/resources/authentication/tls/client-key.pem"));
        this.workerConfig.setUseTls(true);
        this.workerConfig.setTlsAllowInsecureConnection(true);
        this.workerConfig.setTlsTrustCertsFilePath("./src/test/resources/authentication/tls/client-cert.pem");
        this.workerConfig.setWorkerPortTls(Integer.valueOf(0));
        this.workerConfig.setTlsEnabled(true);
        this.workerConfig.setTlsCertificateFilePath("./src/test/resources/authentication/tls/broker-cert.pem");
        this.workerConfig.setTlsKeyFilePath("./src/test/resources/authentication/tls/broker-key.pem");
        this.workerConfig.setAuthenticationEnabled(true);
        this.workerConfig.setAuthorizationEnabled(true);
        PulsarWorkerService workerService = new PulsarWorkerService(new PulsarWorkerService.PulsarClientCreator(){

            public PulsarAdmin newPulsarAdmin(String pulsarServiceUrl, WorkerConfig workerConfig) {
                return mockPulsarAdmin;
            }

            public PulsarClient newPulsarClient(String pulsarServiceUrl, WorkerConfig workerConfig) {
                return null;
            }
        });
        return workerService;
    }

    @Test
    public void testAuthorization() {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sinkTopic = "persistent://external-repl-prop/io/output";
        String functionName = "PulsarSink-test";
        String subscriptionName = "test-sub";
        this.namespaceList.add("external-repl-prop/io");
        String jarFilePathUrl = String.format("%s:%s", "file", PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath());
        FunctionConfig functionConfig = PulsarFunctionTlsTest.createFunctionConfig(jarFilePathUrl, "external-repl-prop", "io", "PulsarSink-test", "my.*", "persistent://external-repl-prop/io/output", "test-sub");
        try {
            this.functionAdmin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
            Assert.fail((String)"Authentication should pass but call should fail with function already exist");
        }
        catch (PulsarAdminException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("already exists"));
        }
    }

    protected static FunctionConfig createFunctionConfig(String jarFile, String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) {
        File file = new File(jarFile);
        try {
            ClassLoader classLoader = ClassLoaderUtils.loadJar((File)file);
            if (classLoader instanceof Closeable) {
                ((Closeable)((Object)classLoader)).close();
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException("Failed to load user jar " + file, e);
        }
        String sourceTopicPattern = String.format("persistent://%s/%s/%s", tenant, namespace, sourceTopic);
        FunctionConfig functionConfig = new FunctionConfig();
        functionConfig.setTenant(tenant);
        functionConfig.setNamespace(namespace);
        functionConfig.setName(functionName);
        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
        functionConfig.setParallelism(Integer.valueOf(1));
        functionConfig.setClassName(IdentityFunction.class.getName());
        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
        functionConfig.setSubName(subscriptionName);
        functionConfig.setTopicsPattern(sourceTopicPattern);
        functionConfig.setAutoAck(Boolean.valueOf(true));
        functionConfig.setOutput(sinkTopic);
        return functionConfig;
    }
}

