/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.worker.FileServer;
import org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest;
import org.apache.pulsar.functions.worker.PulsarFunctionTestTemporaryDirectory;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public abstract class AbstractPulsarE2ETest {
    public static final Logger log = LoggerFactory.getLogger(AbstractPulsarE2ETest.class);
    protected final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
    protected final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
    protected final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
    protected final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
    protected final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
    protected final String tenant = "external-repl-prop";
    protected LocalBookkeeperEnsemble bkEnsemble;
    protected ServiceConfiguration config;
    protected WorkerConfig workerConfig;
    protected PulsarService pulsar;
    protected PulsarAdmin admin;
    protected PulsarClient pulsarClient;
    protected BrokerStats brokerStatsClient;
    protected PulsarWorkerService functionsWorkerService;
    protected String pulsarFunctionsNamespace = "external-repl-prop/pulsar-function-admin";
    protected String primaryHost;
    protected String workerId;
    protected PulsarFunctionTestTemporaryDirectory tempDirectory;
    protected FileServer fileServer;

    @DataProvider(name="validRoleName")
    public Object[][] validRoleName() {
        return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}};
    }

    @BeforeMethod(alwaysRun=true)
    public void setup(Method method) throws Exception {
        log.info("--- Setting up method {} ---", (Object)method.getName());
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
        this.bkEnsemble.start();
        this.config = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        this.config.setClusterName("use");
        HashSet superUsers = Sets.newHashSet((Object[])new String[]{"superUser", "admin"});
        this.config.setSuperUserRoles((Set)superUsers);
        this.config.setWebServicePort(Optional.of(0));
        this.config.setWebServicePortTls(Optional.of(0));
        this.config.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
        this.config.setBrokerShutdownTimeoutMs(0L);
        this.config.setBrokerServicePort(Optional.of(0));
        this.config.setBrokerServicePortTls(Optional.of(0));
        this.config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
        this.config.setTlsAllowInsecureConnection(true);
        this.config.setAdvertisedAddress("localhost");
        HashSet<String> providers = new HashSet<String>();
        providers.add(AuthenticationProviderTls.class.getName());
        this.config.setAuthenticationEnabled(true);
        this.config.setAuthenticationProviders(providers);
        this.config.setAuthorizationEnabled(true);
        this.config.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
        this.config.setTlsCertificateFilePath("./src/test/resources/authentication/tls/broker-cert.pem");
        this.config.setTlsKeyFilePath("./src/test/resources/authentication/tls/broker-key.pem");
        this.config.setTlsTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem");
        this.config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
        this.config.setBrokerClientAuthenticationParameters("tlsCertFile:./src/test/resources/authentication/tls/client-cert.pem,tlsKeyFile:./src/test/resources/authentication/tls/client-key.pem");
        this.config.setBrokerClientTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem");
        this.config.setBrokerClientTlsEnabled(true);
        this.config.setAllowAutoTopicCreationType("non-partitioned");
        System.setProperty("pulsar.functions.java.instance.jar", FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());
        this.functionsWorkerService = this.createPulsarFunctionWorker(this.config);
        if (Arrays.asList(method.getAnnotation(Test.class).groups()).contains("builtin")) {
            File connectorsDir = new File(this.workerConfig.getConnectorsDirectory());
            File file = PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar();
            Files.copy(file.toPath(), new File(connectorsDir, file.getName()).toPath(), new CopyOption[0]);
            file = PulsarFunctionLocalRunTest.getPulsarIOBatchDataGeneratorNar();
            Files.copy(file.toPath(), new File(connectorsDir, file.getName()).toPath(), new CopyOption[0]);
        }
        Optional<PulsarWorkerService> functionWorkerService = Optional.of(this.functionsWorkerService);
        this.pulsar = new PulsarService(this.config, this.workerConfig, functionWorkerService, exitCode -> {});
        this.pulsar.start();
        HashMap<String, String> authParams = new HashMap<String, String>();
        authParams.put("tlsCertFile", "./src/test/resources/authentication/tls/client-cert.pem");
        authParams.put("tlsKeyFile", "./src/test/resources/authentication/tls/client-key.pem");
        AuthenticationTls authTls = new AuthenticationTls();
        authTls.configure(authParams);
        this.admin = (PulsarAdmin)Mockito.spy((Object)PulsarAdmin.builder().serviceHttpUrl(this.pulsar.getWebServiceAddressTls()).tlsTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem").allowTlsInsecureConnection(true).authentication((Authentication)authTls).build());
        this.brokerStatsClient = this.admin.brokerStats();
        this.primaryHost = String.format("http://%s:%d", "localhost", this.pulsar.getListenPortHTTP().get());
        ClusterData clusterData = new ClusterData(this.pulsar.getBrokerServiceUrlTls());
        this.admin.clusters().updateCluster(this.config.getClusterName(), clusterData);
        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
        if (StringUtils.isNotBlank((CharSequence)this.workerConfig.getBrokerClientAuthenticationPlugin()) && StringUtils.isNotBlank((CharSequence)this.workerConfig.getBrokerClientAuthenticationParameters())) {
            clientBuilder.enableTls(this.workerConfig.isUseTls());
            clientBuilder.allowTlsInsecureConnection(this.workerConfig.isTlsAllowInsecureConnection());
            clientBuilder.authentication(this.workerConfig.getBrokerClientAuthenticationPlugin(), this.workerConfig.getBrokerClientAuthenticationParameters());
        }
        if (this.pulsarClient != null) {
            this.pulsarClient.close();
        }
        this.pulsarClient = clientBuilder.build();
        TenantInfo propAdmin = new TenantInfo();
        propAdmin.getAdminRoles().add("superUser");
        propAdmin.setAllowedClusters((Set)Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"})));
        this.admin.tenants().updateTenant("external-repl-prop", propAdmin);
        Assert.assertTrue((boolean)PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar().exists(), (String)("pulsar-io-data-generator.nar file " + PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar().getAbsolutePath() + " doesn't exist."));
        Assert.assertTrue((boolean)PulsarFunctionLocalRunTest.getPulsarIOBatchDataGeneratorNar().exists(), (String)("pulsar-io-batch-data-generator.nar file " + PulsarFunctionLocalRunTest.getPulsarIOBatchDataGeneratorNar().getAbsolutePath() + " doesn't exist."));
        Assert.assertTrue((boolean)PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().exists(), (String)("pulsar-functions-api-examples.jar file " + PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().getAbsolutePath() + " doesn't exist."));
        this.fileServer = new FileServer();
        this.fileServer.serveFile("/pulsar-io-data-generator.nar", PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar());
        this.fileServer.serveFile("/pulsar-io-batch-data-generator.nar", PulsarFunctionLocalRunTest.getPulsarIOBatchDataGeneratorNar());
        this.fileServer.serveFile("/pulsar-functions-api-examples.jar", PulsarFunctionLocalRunTest.getPulsarApiExamplesJar());
        this.fileServer.start();
        Awaitility.await().until(() -> this.functionsWorkerService.getLeaderService().isLeader());
    }

    @AfterMethod(alwaysRun=true)
    void shutdown() throws Exception {
        log.info("--- Shutting down ---");
        try {
            if (this.fileServer != null) {
                this.fileServer.stop();
            }
            if (this.pulsarClient != null) {
                this.pulsarClient.close();
            }
            if (this.admin != null) {
                this.admin.close();
            }
            if (this.functionsWorkerService != null) {
                this.functionsWorkerService.stop();
            }
            if (this.pulsar != null) {
                this.pulsar.close();
            }
            if (this.bkEnsemble != null) {
                this.bkEnsemble.stop();
            }
        }
        finally {
            if (this.tempDirectory != null) {
                this.tempDirectory.delete();
            }
        }
    }

    private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration config) throws IOException {
        System.setProperty("pulsar.functions.java.instance.jar", FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());
        this.workerConfig = new WorkerConfig();
        this.tempDirectory = PulsarFunctionTestTemporaryDirectory.create(this.getClass().getSimpleName());
        this.tempDirectory.useTemporaryDirectoriesForWorkerConfig(this.workerConfig);
        this.workerConfig.setPulsarFunctionsNamespace(this.pulsarFunctionsNamespace);
        this.workerConfig.setSchedulerClassName(RoundRobinScheduler.class.getName());
        this.workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
        this.workerConfig.setFunctionRuntimeFactoryConfigs((Map)ObjectMapperFactory.getThreadLocal().convertValue((Object)new ThreadRuntimeFactoryConfig().setThreadGroupName("use"), Map.class));
        this.workerConfig.setFailureCheckFreqMs(100L);
        this.workerConfig.setNumFunctionPackageReplicas(1);
        this.workerConfig.setClusterCoordinationTopicName("coordinate");
        this.workerConfig.setFunctionAssignmentTopicName("assignment");
        this.workerConfig.setFunctionMetadataTopicName("metadata");
        this.workerConfig.setInstanceLivenessCheckFreqMs(100L);
        this.workerConfig.setWorkerPort(Integer.valueOf(0));
        this.workerConfig.setPulsarFunctionsCluster(config.getClusterName());
        String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress((String)config.getAdvertisedAddress());
        this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + this.workerConfig.getWorkerPort();
        this.workerConfig.setWorkerHostname(hostname);
        this.workerConfig.setWorkerId(this.workerId);
        this.workerConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
        this.workerConfig.setBrokerClientAuthenticationParameters(String.format("tlsCertFile:%s,tlsKeyFile:%s", "./src/test/resources/authentication/tls/client-cert.pem", "./src/test/resources/authentication/tls/client-key.pem"));
        this.workerConfig.setUseTls(true);
        this.workerConfig.setTlsAllowInsecureConnection(true);
        this.workerConfig.setTlsTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem");
        this.workerConfig.setAuthenticationEnabled(true);
        this.workerConfig.setAuthorizationEnabled(true);
        PulsarWorkerService workerService = new PulsarWorkerService();
        workerService.init(this.workerConfig, null, false);
        return workerService;
    }
}

