/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.management.MalformedObjectNameException;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.go.GoInstanceConfig;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.pulsar.functions.runtime.shaded.com.google.protobuf.util.JsonFormat;
import org.apache.pulsar.functions.runtime.shaded.io.prometheus.client.hotspot.BufferPoolsExports;
import org.apache.pulsar.functions.runtime.shaded.io.prometheus.client.hotspot.ClassLoadingExports;
import org.apache.pulsar.functions.runtime.shaded.io.prometheus.client.hotspot.GarbageCollectorExports;
import org.apache.pulsar.functions.runtime.shaded.io.prometheus.client.hotspot.MemoryPoolsExports;
import org.apache.pulsar.functions.runtime.shaded.io.prometheus.client.hotspot.StandardExports;
import org.apache.pulsar.functions.runtime.shaded.io.prometheus.client.hotspot.ThreadExports;
import org.apache.pulsar.functions.runtime.shaded.io.prometheus.client.hotspot.VersionInfoExports;
import org.apache.pulsar.functions.runtime.shaded.io.prometheus.jmx.JmxCollector;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.JavaVersion;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RuntimeUtils {
    private static final Logger log = LoggerFactory.getLogger(RuntimeUtils.class);
    private static final String FUNCTIONS_EXTRA_DEPS_PROPERTY = "pulsar.functions.extra.dependencies.dir";
    public static final String FUNCTIONS_INSTANCE_CLASSPATH = "pulsar.functions.instance.classpath";

    public static List<String> composeCmd(InstanceConfig instanceConfig, String instanceFile, String extraDependenciesDir, String logDirectory, String originalCodeFileName, String originalTransformFunctionFileName, String pulsarServiceUrl, String stateStorageServiceUrl, AuthenticationConfig authConfig, String shardId, Integer grpcPort, Long expectedHealthCheckInterval, String logConfigFile, String secretsProviderClassName, String secretsProviderConfig, Boolean installUserCodeDependencies, String pythonDependencyRepository, String pythonExtraDependencyRepository, String narExtractionDirectory, String functionInstanceClassPath, String pulsarWebServiceUrl) throws Exception {
        List<String> cmd = RuntimeUtils.getArgsBeforeCmd(instanceConfig, extraDependenciesDir);
        cmd.addAll(RuntimeUtils.getCmd(instanceConfig, instanceFile, extraDependenciesDir, logDirectory, originalCodeFileName, originalTransformFunctionFileName, pulsarServiceUrl, stateStorageServiceUrl, authConfig, shardId, grpcPort, expectedHealthCheckInterval, logConfigFile, secretsProviderClassName, secretsProviderConfig, installUserCodeDependencies, pythonDependencyRepository, pythonExtraDependencyRepository, narExtractionDirectory, functionInstanceClassPath, false, pulsarWebServiceUrl));
        return cmd;
    }

    public static List<String> getArgsBeforeCmd(InstanceConfig instanceConfig, String extraDependenciesDir) {
        LinkedList<String> args = new LinkedList<String>();
        if (instanceConfig.getFunctionDetails().getRuntime() != Function.FunctionDetails.Runtime.JAVA && instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON && StringUtils.isNotEmpty(extraDependenciesDir)) {
            args.add("PYTHONPATH=${PYTHONPATH}:" + extraDependenciesDir);
        }
        return args;
    }

    public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig, AuthenticationConfig authConfig, String originalCodeFileName, String pulsarServiceUrl, boolean k8sRuntime) throws IOException {
        LinkedList<String> args = new LinkedList<String>();
        GoInstanceConfig goInstanceConfig = new GoInstanceConfig();
        if (instanceConfig.getClusterName() != null) {
            goInstanceConfig.setClusterName(instanceConfig.getClusterName());
        }
        if (instanceConfig.getInstanceId() != 0) {
            goInstanceConfig.setInstanceID(instanceConfig.getInstanceId());
        }
        if (instanceConfig.getFunctionId() != null) {
            goInstanceConfig.setFuncID(instanceConfig.getFunctionId());
        }
        if (instanceConfig.getFunctionVersion() != null) {
            goInstanceConfig.setFuncVersion(instanceConfig.getFunctionVersion());
        }
        if (instanceConfig.getFunctionDetails().getAutoAck()) {
            goInstanceConfig.setAutoAck(instanceConfig.getFunctionDetails().getAutoAck());
        }
        if (instanceConfig.getFunctionDetails().getTenant() != null) {
            goInstanceConfig.setTenant(instanceConfig.getFunctionDetails().getTenant());
        }
        if (instanceConfig.getFunctionDetails().getNamespace() != null) {
            goInstanceConfig.setNameSpace(instanceConfig.getFunctionDetails().getNamespace());
        }
        if (instanceConfig.getFunctionDetails().getName() != null) {
            goInstanceConfig.setName(instanceConfig.getFunctionDetails().getName());
        }
        if (instanceConfig.getFunctionDetails().getLogTopic() != null) {
            goInstanceConfig.setLogTopic(instanceConfig.getFunctionDetails().getLogTopic());
        }
        if (instanceConfig.getFunctionDetails().getProcessingGuarantees() != null) {
            goInstanceConfig.setProcessingGuarantees(instanceConfig.getFunctionDetails().getProcessingGuaranteesValue());
        }
        if (instanceConfig.getFunctionDetails().getRuntime() != null) {
            goInstanceConfig.setRuntime(instanceConfig.getFunctionDetails().getRuntimeValue());
        }
        if (instanceConfig.getFunctionDetails().getSecretsMap() != null) {
            goInstanceConfig.setSecretsMap(instanceConfig.getFunctionDetails().getSecretsMap());
        }
        if (instanceConfig.getFunctionDetails().getUserConfig() != null) {
            goInstanceConfig.setUserConfig(instanceConfig.getFunctionDetails().getUserConfig());
        }
        if (instanceConfig.getFunctionDetails().getParallelism() != 0) {
            goInstanceConfig.setParallelism(instanceConfig.getFunctionDetails().getParallelism());
        }
        if (authConfig != null) {
            if (StringUtils.isNotBlank(authConfig.getClientAuthenticationPlugin()) && StringUtils.isNotBlank(authConfig.getClientAuthenticationParameters())) {
                goInstanceConfig.setClientAuthenticationPlugin(authConfig.getClientAuthenticationPlugin());
                goInstanceConfig.setClientAuthenticationParameters(authConfig.getClientAuthenticationParameters());
            }
            goInstanceConfig.setTlsAllowInsecureConnection(authConfig.isTlsAllowInsecureConnection());
            goInstanceConfig.setTlsHostnameVerificationEnable(authConfig.isTlsHostnameVerificationEnable());
            if (StringUtils.isNotBlank(authConfig.getTlsTrustCertsFilePath())) {
                goInstanceConfig.setTlsTrustCertsFilePath(authConfig.getTlsTrustCertsFilePath());
            }
        }
        if (instanceConfig.getMaxBufferedTuples() != 0) {
            goInstanceConfig.setMaxBufTuples(instanceConfig.getMaxBufferedTuples());
        }
        if (pulsarServiceUrl != null) {
            goInstanceConfig.setPulsarServiceURL(pulsarServiceUrl);
        }
        if (instanceConfig.getFunctionDetails().getSource().getCleanupSubscription()) {
            goInstanceConfig.setCleanupSubscription(instanceConfig.getFunctionDetails().getSource().getCleanupSubscription());
        }
        if (instanceConfig.getFunctionDetails().getSource().getSubscriptionName() != null) {
            goInstanceConfig.setSubscriptionName(instanceConfig.getFunctionDetails().getSource().getSubscriptionName());
        }
        goInstanceConfig.setSubscriptionPosition(instanceConfig.getFunctionDetails().getSource().getSubscriptionPosition().getNumber());
        if (instanceConfig.getFunctionDetails().getSource().getInputSpecsMap() != null) {
            HashMap<String, String> sourceInputSpecs = new HashMap<String, String>();
            for (Map.Entry<String, Function.ConsumerSpec> entry : instanceConfig.getFunctionDetails().getSource().getInputSpecsMap().entrySet()) {
                String topic = entry.getKey();
                Function.ConsumerSpec spec = entry.getValue();
                sourceInputSpecs.put(topic, JsonFormat.printer().omittingInsignificantWhitespace().print(spec));
                goInstanceConfig.setSourceSpecsTopic(topic);
            }
            goInstanceConfig.setSourceInputSpecs(sourceInputSpecs);
        }
        if (instanceConfig.getFunctionDetails().getSource().getTimeoutMs() != 0L) {
            goInstanceConfig.setTimeoutMs(instanceConfig.getFunctionDetails().getSource().getTimeoutMs());
        }
        if (instanceConfig.getFunctionDetails().getSink().getTopic() != null) {
            goInstanceConfig.setSinkSpecsTopic(instanceConfig.getFunctionDetails().getSink().getTopic());
        }
        if (instanceConfig.getFunctionDetails().getResources().getCpu() != 0.0) {
            goInstanceConfig.setCpu(instanceConfig.getFunctionDetails().getResources().getCpu());
        }
        if (instanceConfig.getFunctionDetails().getResources().getRam() != 0L) {
            goInstanceConfig.setRam(instanceConfig.getFunctionDetails().getResources().getRam());
        }
        if (instanceConfig.getFunctionDetails().getResources().getDisk() != 0L) {
            goInstanceConfig.setDisk(instanceConfig.getFunctionDetails().getResources().getDisk());
        }
        if (instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic() != null) {
            goInstanceConfig.setDeadLetterTopic(instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
        }
        if (instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries() != 0) {
            goInstanceConfig.setMaxMessageRetries(instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
        }
        if (instanceConfig.hasValidMetricsPort()) {
            goInstanceConfig.setMetricsPort(instanceConfig.getMetricsPort());
        }
        goInstanceConfig.setKillAfterIdleMs(0);
        goInstanceConfig.setPort(instanceConfig.getPort());
        ObjectMapper objectMapper = ObjectMapperFactory.getMapper().getObjectMapper();
        String configContent = objectMapper.writeValueAsString(goInstanceConfig);
        args.add(originalCodeFileName);
        args.add("-instance-conf");
        if (k8sRuntime) {
            args.add("'" + configContent + "'");
        } else {
            args.add(configContent);
        }
        return args;
    }

    public static List<String> getCmd(InstanceConfig instanceConfig, String instanceFile, String extraDependenciesDir, String logDirectory, String originalCodeFileName, String originalTransformFunctionFileName, String pulsarServiceUrl, String stateStorageServiceUrl, AuthenticationConfig authConfig, String shardId, Integer grpcPort, Long expectedHealthCheckInterval, String logConfigFile, String secretsProviderClassName, String secretsProviderConfig, Boolean installUserCodeDependencies, String pythonDependencyRepository, String pythonExtraDependencyRepository, String narExtractionDirectory, String functionInstanceClassPath, boolean k8sRuntime, String pulsarWebServiceUrl) throws Exception {
        LinkedList<String> args = new LinkedList<String>();
        if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.GO) {
            return RuntimeUtils.getGoInstanceCmd(instanceConfig, authConfig, originalCodeFileName, pulsarServiceUrl, k8sRuntime);
        }
        if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
            Function.Resources resources;
            args.add("java");
            args.add("-cp");
            Object classpath = instanceFile;
            if (StringUtils.isNotEmpty(extraDependenciesDir)) {
                classpath = (String)classpath + ":" + extraDependenciesDir + "/*";
            }
            args.add((String)classpath);
            if (StringUtils.isNotEmpty(extraDependenciesDir)) {
                args.add(String.format("-D%s=%s", FUNCTIONS_EXTRA_DEPS_PROPERTY, extraDependenciesDir));
            }
            if (StringUtils.isNotEmpty(functionInstanceClassPath)) {
                args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, functionInstanceClassPath));
            } else {
                String systemFunctionInstanceClasspath = System.getProperty(FUNCTIONS_INSTANCE_CLASSPATH);
                if (systemFunctionInstanceClasspath == null) {
                    log.warn("Property {} is not set.  Falling back to using classpath of current JVM", (Object)FUNCTIONS_INSTANCE_CLASSPATH);
                    systemFunctionInstanceClasspath = System.getProperty("java.class.path");
                }
                args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, systemFunctionInstanceClasspath));
            }
            args.add("-Dlog4j.configurationFile=" + logConfigFile);
            args.add("-Dpulsar.function.log.dir=" + RuntimeUtils.genFunctionLogFolder(logDirectory, instanceConfig));
            args.add("-Dpulsar.function.log.file=" + String.format("%s-%s", instanceConfig.getFunctionDetails().getName(), shardId));
            args.add("-Dio.netty.tryReflectionSetAccessible=true");
            args.add("-Dorg.apache.pulsar.shade.io.netty.tryReflectionSetAccessible=true");
            args.add("-Dio.grpc.netty.shaded.io.netty.tryReflectionSetAccessible=true");
            if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_11)) {
                args.add("--add-opens");
                args.add("java.base/java.nio=ALL-UNNAMED");
                args.add("--add-opens");
                args.add("java.base/jdk.internal.misc=ALL-UNNAMED");
            }
            if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
                args.add("--add-opens");
                args.add("java.base/java.util.zip=ALL-UNNAMED");
            }
            if (instanceConfig.getAdditionalJavaRuntimeArguments() != null) {
                args.addAll(instanceConfig.getAdditionalJavaRuntimeArguments());
            }
            if (!StringUtils.isEmpty(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
                Collections.addAll(args, RuntimeUtils.splitRuntimeArgs(instanceConfig.getFunctionDetails().getRuntimeFlags()));
            }
            if (instanceConfig.getFunctionDetails().getResources() != null && (resources = instanceConfig.getFunctionDetails().getResources()).getRam() != 0L) {
                args.add("-Xmx" + String.valueOf(resources.getRam()));
            }
            args.add("org.apache.pulsar.functions.instance.JavaInstanceMain");
            args.add("--jar");
            args.add(originalCodeFileName);
            if (StringUtils.isNotEmpty(originalTransformFunctionFileName)) {
                args.add("--transform_function_jar");
                args.add(originalTransformFunctionFileName);
                args.add("--transform_function_id");
                args.add(instanceConfig.getTransformFunctionId());
            }
        } else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) {
            args.add("python3");
            if (!StringUtils.isEmpty(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
                Collections.addAll(args, RuntimeUtils.splitRuntimeArgs(instanceConfig.getFunctionDetails().getRuntimeFlags()));
            }
            args.add(instanceFile);
            args.add("--py");
            args.add(originalCodeFileName);
            args.add("--logging_directory");
            args.add(logDirectory);
            args.add("--logging_file");
            args.add(instanceConfig.getFunctionDetails().getName());
            args.add("--logging_config_file");
            args.add(logConfigFile);
            if (installUserCodeDependencies != null && installUserCodeDependencies.booleanValue()) {
                args.add("--install_usercode_dependencies");
                args.add("True");
            }
            if (!StringUtils.isEmpty(pythonDependencyRepository)) {
                args.add("--dependency_repository");
                args.add(pythonDependencyRepository);
            }
            if (!StringUtils.isEmpty(pythonExtraDependencyRepository)) {
                args.add("--extra_dependency_repository");
                args.add(pythonExtraDependencyRepository);
            }
        }
        args.add("--instance_id");
        args.add(shardId);
        args.add("--function_id");
        args.add(instanceConfig.getFunctionId());
        args.add("--function_version");
        args.add(instanceConfig.getFunctionVersion());
        args.add("--function_details");
        args.add("'" + JsonFormat.printer().omittingInsignificantWhitespace().print(instanceConfig.getFunctionDetails()) + "'");
        args.add("--pulsar_serviceurl");
        args.add(pulsarServiceUrl);
        if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA && instanceConfig.isExposePulsarAdminClientEnabled() && StringUtils.isNotBlank(pulsarWebServiceUrl)) {
            args.add("--web_serviceurl");
            args.add(pulsarWebServiceUrl);
            args.add("--expose_pulsaradmin");
        }
        if (authConfig != null) {
            if (StringUtils.isNotBlank(authConfig.getClientAuthenticationPlugin()) && StringUtils.isNotBlank(authConfig.getClientAuthenticationParameters())) {
                args.add("--client_auth_plugin");
                args.add(authConfig.getClientAuthenticationPlugin());
                args.add("--client_auth_params");
                args.add(authConfig.getClientAuthenticationParameters());
            }
            args.add("--use_tls");
            args.add(Boolean.toString(authConfig.isUseTls()));
            args.add("--tls_allow_insecure");
            args.add(Boolean.toString(authConfig.isTlsAllowInsecureConnection()));
            args.add("--hostname_verification_enabled");
            args.add(Boolean.toString(authConfig.isTlsHostnameVerificationEnable()));
            if (StringUtils.isNotBlank(authConfig.getTlsTrustCertsFilePath())) {
                args.add("--tls_trust_cert_path");
                args.add(authConfig.getTlsTrustCertsFilePath());
            }
        }
        args.add("--max_buffered_tuples");
        args.add(String.valueOf(instanceConfig.getMaxBufferedTuples()));
        args.add("--port");
        args.add(String.valueOf(grpcPort));
        args.add("--metrics_port");
        args.add(String.valueOf(instanceConfig.getMetricsPort()));
        if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
            args.add("--pending_async_requests");
            args.add(String.valueOf(instanceConfig.getMaxPendingAsyncRequests()));
        }
        if (null != stateStorageServiceUrl) {
            args.add("--state_storage_serviceurl");
            args.add(stateStorageServiceUrl);
        }
        args.add("--expected_healthcheck_interval");
        args.add(String.valueOf(expectedHealthCheckInterval));
        if (!StringUtils.isEmpty(secretsProviderClassName)) {
            args.add("--secrets_provider");
            args.add(secretsProviderClassName);
            if (!StringUtils.isEmpty(secretsProviderConfig)) {
                args.add("--secrets_provider_config");
                args.add("'" + secretsProviderConfig + "'");
            }
        }
        args.add("--cluster_name");
        args.add(instanceConfig.getClusterName());
        if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA && !StringUtils.isEmpty(narExtractionDirectory)) {
            args.add("--nar_extraction_directory");
            args.add(narExtractionDirectory);
        }
        return args;
    }

    public static String genFunctionLogFolder(String logDirectory, InstanceConfig instanceConfig) {
        return String.format("%s/%s", logDirectory, FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
    }

    public static String getPrometheusMetrics(int metricsPort) throws IOException {
        String line;
        StringBuilder result = new StringBuilder();
        URL url = new URL(String.format("http://%s:%s", InetAddress.getLocalHost().getHostAddress(), metricsPort));
        HttpURLConnection conn = (HttpURLConnection)url.openConnection();
        conn.setRequestMethod("GET");
        BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream()));
        while ((line = rd.readLine()) != null) {
            result.append(line + System.lineSeparator());
        }
        rd.close();
        return result.toString();
    }

    public static String[] splitRuntimeArgs(String input) {
        return input.split("\\s(?=([^\"]*\"[^\"]*\")*[^\"]*$)");
    }

    public static <T> T getRuntimeFunctionConfig(Map<String, Object> configMap, Class<T> functionRuntimeConfigClass) {
        return ObjectMapperFactory.getMapper().getObjectMapper().convertValue(configMap, functionRuntimeConfigClass);
    }

    public static void registerDefaultCollectors(FunctionCollectorRegistry registry) {
        try {
            new JmxCollector("{}").register(registry);
        }
        catch (MalformedObjectNameException ex) {
            System.err.println(ex);
        }
        new StandardExports().register(registry);
        new MemoryPoolsExports().register(registry);
        new BufferPoolsExports().register(registry);
        new GarbageCollectorExports().register(registry);
        new ThreadExports().register(registry);
        new ClassLoadingExports().register(registry);
        new VersionInfoExports().register(registry);
    }
}

