/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime;

import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.lang.reflect.Type;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.shade.com.beust.jcommander.JCommander;
import org.apache.pulsar.shade.com.beust.jcommander.Parameter;
import org.apache.pulsar.shade.com.beust.jcommander.converters.StringConverter;
import org.apache.pulsar.shade.com.google.gson.Gson;
import org.apache.pulsar.shade.com.google.gson.reflect.TypeToken;
import org.apache.pulsar.shade.com.google.protobuf.Empty;
import org.apache.pulsar.shade.com.google.protobuf.Message;
import org.apache.pulsar.shade.com.google.protobuf.util.JsonFormat;
import org.apache.pulsar.shade.io.prometheus.client.exporter.HTTPServer;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.functions.WindowConfig;
import org.apache.pulsar.shade.org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Reflections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JavaInstanceStarter
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(JavaInstanceStarter.class);
    @Parameter(names={"--function_details"}, description="Function details json\n", required=true)
    public String functionDetailsJsonString;
    @Parameter(names={"--jar"}, description="Path to Jar\n", listConverter=StringConverter.class)
    public String jarFile;
    @Parameter(names={"--instance_id"}, description="Instance Id\n", required=true)
    public int instanceId;
    @Parameter(names={"--function_id"}, description="Function Id\n", required=true)
    public String functionId;
    @Parameter(names={"--function_version"}, description="Function Version\n", required=true)
    public String functionVersion;
    @Parameter(names={"--pulsar_serviceurl"}, description="Pulsar Service Url\n", required=true)
    public String pulsarServiceUrl;
    @Parameter(names={"--client_auth_plugin"}, description="Client auth plugin name\n")
    public String clientAuthenticationPlugin;
    @Parameter(names={"--client_auth_params"}, description="Client auth param\n")
    public String clientAuthenticationParameters;
    @Parameter(names={"--use_tls"}, description="Use tls connection\n")
    public String useTls = Boolean.FALSE.toString();
    @Parameter(names={"--tls_allow_insecure"}, description="Allow insecure tls connection\n")
    public String tlsAllowInsecureConnection = Boolean.TRUE.toString();
    @Parameter(names={"--hostname_verification_enabled"}, description="Enable hostname verification")
    public String tlsHostNameVerificationEnabled = Boolean.FALSE.toString();
    @Parameter(names={"--tls_trust_cert_path"}, description="tls trust cert file path")
    public String tlsTrustCertFilePath;
    @Parameter(names={"--state_storage_impl_class"}, description="State Storage Service Implementation class\n", required=false)
    public String stateStorageImplClass;
    @Parameter(names={"--state_storage_serviceurl"}, description="State Storage Service Url\n", required=false)
    public String stateStorageServiceUrl;
    @Parameter(names={"--port"}, description="Port to listen on\n", required=true)
    public int port;
    @Parameter(names={"--metrics_port"}, description="Port metrics will be exposed on\n", required=true)
    public int metrics_port;
    @Parameter(names={"--max_buffered_tuples"}, description="Maximum number of tuples to buffer\n", required=true)
    public int maxBufferedTuples;
    @Parameter(names={"--expected_healthcheck_interval"}, description="Expected interval in seconds between healtchecks", required=true)
    public int expectedHealthCheckInterval;
    @Parameter(names={"--secrets_provider"}, description="The classname of the secrets provider", required=false)
    public String secretsProviderClassName;
    @Parameter(names={"--secrets_provider_config"}, description="The config that needs to be passed to secrets provider", required=false)
    public String secretsProviderConfig;
    @Parameter(names={"--cluster_name"}, description="The name of the cluster this instance is running on", required=true)
    public String clusterName;
    @Parameter(names={"--nar_extraction_directory"}, description="The directory where extraction of nar packages happen", required=false)
    public String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
    @Parameter(names={"--pending_async_requests"}, description="Max pending async requests per instance", required=false)
    public int maxPendingAsyncRequests = 1000;
    @Parameter(names={"--web_serviceurl"}, description="Pulsar Web Service Url", required=false)
    public String webServiceUrl = null;
    @Parameter(names={"--expose_pulsaradmin"}, description="Whether the pulsar admin client exposed to function context, default is disabled.", required=false)
    public Boolean exposePulsarAdminClientEnabled = false;
    private Server server;
    private RuntimeSpawner runtimeSpawner;
    private ThreadRuntimeFactory containerFactory;
    private Long lastHealthCheckTs = null;
    private HTTPServer metricsServer;
    private ScheduledFuture healthCheckTimer;

    public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassLoader rootClassLoader) throws Exception {
        SecretsProvider secretsProvider;
        Thread.currentThread().setContextClassLoader(functionInstanceClassLoader);
        JCommander jcommander = new JCommander(this);
        jcommander.parse(args);
        InstanceConfig instanceConfig = new InstanceConfig();
        instanceConfig.setFunctionId(this.functionId);
        instanceConfig.setFunctionVersion(this.functionVersion);
        instanceConfig.setInstanceId(this.instanceId);
        instanceConfig.setMaxBufferedTuples(this.maxBufferedTuples);
        instanceConfig.setClusterName(this.clusterName);
        instanceConfig.setMaxPendingAsyncRequests(this.maxPendingAsyncRequests);
        instanceConfig.setExposePulsarAdminClientEnabled(this.exposePulsarAdminClientEnabled);
        Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
        if (this.functionDetailsJsonString.charAt(0) == '\'') {
            this.functionDetailsJsonString = this.functionDetailsJsonString.substring(1);
        }
        if (this.functionDetailsJsonString.charAt(this.functionDetailsJsonString.length() - 1) == '\'') {
            this.functionDetailsJsonString = this.functionDetailsJsonString.substring(0, this.functionDetailsJsonString.length() - 1);
        }
        JsonFormat.parser().merge(this.functionDetailsJsonString, (Message.Builder)functionDetailsBuilder);
        this.inferringMissingTypeClassName(functionDetailsBuilder, functionInstanceClassLoader);
        Function.FunctionDetails functionDetails = functionDetailsBuilder.build();
        instanceConfig.setFunctionDetails(functionDetails);
        instanceConfig.setPort(this.port);
        instanceConfig.setMetricsPort(this.metrics_port);
        Map secretsProviderConfigMap = null;
        if (!StringUtils.isEmpty(this.secretsProviderConfig)) {
            if (this.secretsProviderConfig.charAt(0) == '\'') {
                this.secretsProviderConfig = this.secretsProviderConfig.substring(1);
            }
            if (this.secretsProviderConfig.charAt(this.secretsProviderConfig.length() - 1) == '\'') {
                this.secretsProviderConfig = this.secretsProviderConfig.substring(0, this.secretsProviderConfig.length() - 1);
            }
            Type type = new TypeToken<Map<String, String>>(){}.getType();
            secretsProviderConfigMap = (Map)new Gson().fromJson(this.secretsProviderConfig, type);
        }
        if (StringUtils.isEmpty(this.secretsProviderClassName)) {
            this.secretsProviderClassName = ClearTextSecretsProvider.class.getName();
        }
        try {
            secretsProvider = (SecretsProvider)Reflections.createInstance(this.secretsProviderClassName, functionInstanceClassLoader);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        secretsProvider.init(secretsProviderConfigMap);
        FunctionCollectorRegistry collectorRegistry = FunctionCollectorRegistry.getDefaultImplementation();
        RuntimeUtils.registerDefaultCollectors(collectorRegistry);
        this.containerFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup", this.pulsarServiceUrl, this.stateStorageImplClass, this.stateStorageServiceUrl, AuthenticationConfig.builder().clientAuthenticationPlugin(this.clientAuthenticationPlugin).clientAuthenticationParameters(this.clientAuthenticationParameters).useTls(JavaInstanceStarter.isTrue(this.useTls)).tlsAllowInsecureConnection(JavaInstanceStarter.isTrue(this.tlsAllowInsecureConnection)).tlsHostnameVerificationEnable(JavaInstanceStarter.isTrue(this.tlsHostNameVerificationEnabled)).tlsTrustCertsFilePath(this.tlsTrustCertFilePath).build(), secretsProvider, collectorRegistry, this.narExtractionDirectory, rootClassLoader, this.exposePulsarAdminClientEnabled, this.webServiceUrl);
        this.runtimeSpawner = new RuntimeSpawner(instanceConfig, this.jarFile, null, this.containerFactory, this.expectedHealthCheckInterval * 1000);
        this.server = ServerBuilder.forPort((int)this.port).addService((BindableService)new InstanceControlImpl(this.runtimeSpawner)).build().start();
        log.info("JavaInstance Server started, listening on " + this.port);
        java.lang.Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                try {
                    JavaInstanceStarter.this.close();
                }
                catch (Exception ex) {
                    System.err.println(ex);
                }
            }
        });
        log.info("Starting runtimeSpawner");
        this.runtimeSpawner.start();
        log.info("Starting metrics server on port {}", (Object)this.metrics_port);
        this.metricsServer = new HTTPServer(new InetSocketAddress(this.metrics_port), collectorRegistry, true);
        if (this.expectedHealthCheckInterval > 0) {
            this.healthCheckTimer = InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(() -> {
                try {
                    if (System.currentTimeMillis() - this.lastHealthCheckTs > (long)(3 * this.expectedHealthCheckInterval * 1000)) {
                        log.info("Haven't received health check from spawner in a while. Stopping instance...");
                        this.close();
                    }
                }
                catch (Exception e) {
                    log.error("Error occurred when checking for latest health check", (Throwable)e);
                }
            }, this.expectedHealthCheckInterval * 1000, this.expectedHealthCheckInterval * 1000, TimeUnit.MILLISECONDS);
        }
        this.runtimeSpawner.join();
        log.info("RuntimeSpawner quit, shutting down JavaInstance");
        this.close();
    }

    private static boolean isTrue(String param) {
        return Boolean.TRUE.toString().equals(param);
    }

    @Override
    public void close() {
        try {
            if (this.server != null) {
                this.server.shutdown();
            }
            if (this.runtimeSpawner != null) {
                this.runtimeSpawner.close();
            }
            if (this.healthCheckTimer != null) {
                this.healthCheckTimer.cancel(false);
            }
            if (this.containerFactory != null) {
                this.containerFactory.close();
            }
            if (this.metricsServer != null) {
                this.metricsServer.stop();
            }
            InstanceCache.shutdown();
        }
        catch (Exception ex) {
            System.err.println(ex);
        }
    }

    private void inferringMissingTypeClassName(Function.FunctionDetails.Builder functionDetailsBuilder, ClassLoader classLoader) throws ClassNotFoundException {
        switch (functionDetailsBuilder.getComponentType()) {
            case FUNCTION: {
                if ((!functionDetailsBuilder.hasSource() || !functionDetailsBuilder.getSource().getTypeClassName().isEmpty()) && (!functionDetailsBuilder.hasSink() || !functionDetailsBuilder.getSink().getTypeClassName().isEmpty())) break;
                Map userConfigs = (Map)new Gson().fromJson(functionDetailsBuilder.getUserConfig(), new TypeToken<Map<String, Object>>(){}.getType());
                boolean isWindowConfigPresent = userConfigs.containsKey("__WINDOWCONFIGS__");
                String className = functionDetailsBuilder.getClassName();
                if (isWindowConfigPresent) {
                    WindowConfig windowConfig = new Gson().fromJson(new Gson().toJson(userConfigs.get("__WINDOWCONFIGS__")), WindowConfig.class);
                    className = windowConfig.getActualWindowFunctionClassName();
                }
                Class<?>[] typeArgs = FunctionCommon.getFunctionTypes(classLoader.loadClass(className), isWindowConfigPresent);
                if (functionDetailsBuilder.hasSource() && functionDetailsBuilder.getSource().getTypeClassName().isEmpty() && typeArgs[0] != null) {
                    Function.SourceSpec.Builder sourceBuilder = functionDetailsBuilder.getSource().toBuilder();
                    sourceBuilder.setTypeClassName(typeArgs[0].getName());
                    functionDetailsBuilder.setSource(sourceBuilder.build());
                }
                if (!functionDetailsBuilder.hasSink() || !functionDetailsBuilder.getSink().getTypeClassName().isEmpty() || typeArgs[1] == null) break;
                Function.SinkSpec.Builder sinkBuilder = functionDetailsBuilder.getSink().toBuilder();
                sinkBuilder.setTypeClassName(typeArgs[1].getName());
                functionDetailsBuilder.setSink(sinkBuilder.build());
                break;
            }
            case SINK: {
                if (!functionDetailsBuilder.hasSink() || !functionDetailsBuilder.getSink().getTypeClassName().isEmpty()) break;
                String typeArg = FunctionCommon.getSinkType(functionDetailsBuilder.getClassName(), classLoader).getName();
                Function.SinkSpec.Builder sinkBuilder = Function.SinkSpec.newBuilder(functionDetailsBuilder.getSink());
                sinkBuilder.setTypeClassName(typeArg);
                functionDetailsBuilder.setSink(sinkBuilder);
                Function.SourceSpec sourceSpec = functionDetailsBuilder.getSource();
                if (null != sourceSpec && !StringUtils.isEmpty(sourceSpec.getTypeClassName())) break;
                Function.SourceSpec.Builder sourceBuilder = Function.SourceSpec.newBuilder(sourceSpec);
                sourceBuilder.setTypeClassName(typeArg);
                functionDetailsBuilder.setSource(sourceBuilder);
                break;
            }
            case SOURCE: {
                if (!functionDetailsBuilder.hasSource() || !functionDetailsBuilder.getSource().getTypeClassName().isEmpty()) break;
                String typeArg = FunctionCommon.getSourceType(functionDetailsBuilder.getClassName(), classLoader).getName();
                Function.SourceSpec.Builder sourceBuilder = Function.SourceSpec.newBuilder(functionDetailsBuilder.getSource());
                sourceBuilder.setTypeClassName(typeArg);
                functionDetailsBuilder.setSource(sourceBuilder);
                Function.SinkSpec sinkSpec = functionDetailsBuilder.getSink();
                if (null != sinkSpec && !StringUtils.isEmpty(sinkSpec.getTypeClassName())) break;
                Function.SinkSpec.Builder sinkBuilder = Function.SinkSpec.newBuilder(sinkSpec);
                sinkBuilder.setTypeClassName(typeArg);
                functionDetailsBuilder.setSink(sinkBuilder);
            }
        }
    }

    class InstanceControlImpl
    extends InstanceControlGrpc.InstanceControlImplBase {
        private RuntimeSpawner runtimeSpawner;

        public InstanceControlImpl(RuntimeSpawner runtimeSpawner) {
            this.runtimeSpawner = runtimeSpawner;
            JavaInstanceStarter.this.lastHealthCheckTs = System.currentTimeMillis();
        }

        @Override
        public void getFunctionStatus(Empty request, StreamObserver<InstanceCommunication.FunctionStatus> responseObserver) {
            try {
                InstanceCommunication.FunctionStatus response = this.runtimeSpawner.getFunctionStatus(this.runtimeSpawner.getInstanceConfig().getInstanceId()).get();
                responseObserver.onNext((Object)response);
                responseObserver.onCompleted();
            }
            catch (Exception e) {
                log.error("Exception in JavaInstance doing getFunctionStatus", (Throwable)e);
                throw new RuntimeException(e);
            }
        }

        @Override
        public void getAndResetMetrics(Empty request, StreamObserver<InstanceCommunication.MetricsData> responseObserver) {
            Runtime runtime = this.runtimeSpawner.getRuntime();
            if (runtime != null) {
                try {
                    InstanceCommunication.MetricsData metrics = runtime.getAndResetMetrics().get();
                    responseObserver.onNext((Object)metrics);
                    responseObserver.onCompleted();
                }
                catch (InterruptedException | ExecutionException e) {
                    log.error("Exception in JavaInstance doing getAndResetMetrics", (Throwable)e);
                    throw new RuntimeException(e);
                }
            }
        }

        @Override
        public void getMetrics(Empty request, StreamObserver<InstanceCommunication.MetricsData> responseObserver) {
            Runtime runtime = this.runtimeSpawner.getRuntime();
            if (runtime != null) {
                try {
                    InstanceCommunication.MetricsData metrics = runtime.getMetrics(JavaInstanceStarter.this.instanceId).get();
                    responseObserver.onNext((Object)metrics);
                    responseObserver.onCompleted();
                }
                catch (InterruptedException | ExecutionException e) {
                    log.error("Exception in JavaInstance doing getAndResetMetrics", (Throwable)e);
                    throw new RuntimeException(e);
                }
            }
        }

        @Override
        public void resetMetrics(Empty request, StreamObserver<Empty> responseObserver) {
            Runtime runtime = this.runtimeSpawner.getRuntime();
            if (runtime != null) {
                try {
                    runtime.resetMetrics().get();
                    responseObserver.onNext((Object)Empty.getDefaultInstance());
                    responseObserver.onCompleted();
                }
                catch (InterruptedException | ExecutionException e) {
                    log.error("Exception in JavaInstance doing resetMetrics", (Throwable)e);
                    throw new RuntimeException(e);
                }
            }
        }

        @Override
        public void healthCheck(Empty request, StreamObserver<InstanceCommunication.HealthCheckResult> responseObserver) {
            log.debug("Received health check request...");
            InstanceCommunication.HealthCheckResult healthCheckResult = InstanceCommunication.HealthCheckResult.newBuilder().setSuccess(true).build();
            responseObserver.onNext((Object)healthCheckResult);
            responseObserver.onCompleted();
            JavaInstanceStarter.this.lastHealthCheckTs = System.currentTimeMillis();
        }
    }
}

