/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker.rest.api;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Base64;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.utils.functions.Functions;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.rest.RestUtils;
import org.apache.pulsar.functions.worker.service.api.Component;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.base.Utf8;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.buffer.ByteBufUtil;
import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
import org.apache.pulsar.shade.javax.ws.rs.WebApplicationException;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.apache.pulsar.shade.javax.ws.rs.core.StreamingOutput;
import org.apache.pulsar.shade.javax.ws.rs.core.UriBuilder;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.StorageClient;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.Table;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.result.KeyValue;
import org.apache.pulsar.shade.org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.pulsar.shade.org.apache.bookkeeper.clients.admin.StorageAdminClient;
import org.apache.pulsar.shade.org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.pulsar.shade.org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.pulsar.shade.org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.shade.org.apache.commons.io.IOUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.shade.org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.shade.org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.shade.org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.FunctionStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.shade.org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ComponentImpl
implements Component<PulsarWorkerService> {
    private static final Logger log = LoggerFactory.getLogger(ComponentImpl.class);
    private final AtomicReference<StorageClient> storageClient = new AtomicReference();
    protected final Supplier<PulsarWorkerService> workerServiceSupplier;
    protected final Function.FunctionDetails.ComponentType componentType;

    public ComponentImpl(Supplier<PulsarWorkerService> workerServiceSupplier, Function.FunctionDetails.ComponentType componentType) {
        this.workerServiceSupplier = workerServiceSupplier;
        this.componentType = componentType;
    }

    @Override
    public PulsarWorkerService worker() {
        try {
            return Preconditions.checkNotNull(this.workerServiceSupplier.get());
        }
        catch (Throwable t) {
            log.info("Failed to get worker service", t);
            throw t;
        }
    }

    boolean isWorkerServiceAvailable() {
        WorkerService workerService = this.workerServiceSupplier.get();
        if (workerService == null) {
            return false;
        }
        return workerService.isInitialized();
    }

    Function.PackageLocationMetaData.Builder getFunctionPackageLocation(Function.FunctionMetaData functionMetaData, String functionPkgUrl, FormDataContentDisposition fileDetail, File uploadedInputStreamAsFile) throws Exception {
        Function.FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
        String tenant = functionDetails.getTenant();
        String namespace = functionDetails.getNamespace();
        String componentName = functionDetails.getName();
        Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder = Function.PackageLocationMetaData.newBuilder();
        boolean isBuiltin = FunctionCommon.isFunctionCodeBuiltin(functionDetails);
        boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl);
        if (this.worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
            if (isBuiltin) {
                if (this.worker().getWorkerConfig().getUploadBuiltinSinksSources().booleanValue()) {
                    File sinkOrSource;
                    if (this.componentType == Function.FunctionDetails.ComponentType.SOURCE) {
                        String archiveName = functionDetails.getSource().getBuiltin();
                        sinkOrSource = this.worker().getConnectorsManager().getSourceArchive(archiveName).toFile();
                    } else {
                        String archiveName = functionDetails.getSink().getBuiltin();
                        sinkOrSource = this.worker().getConnectorsManager().getSinkArchive(archiveName).toFile();
                    }
                    packageLocationMetaDataBuilder.setPackagePath(ComponentImpl.createPackagePath(tenant, namespace, componentName, sinkOrSource.getName()));
                    packageLocationMetaDataBuilder.setOriginalFileName(sinkOrSource.getName());
                    log.info("Uploading {} package to {}", (Object)ComponentTypeUtils.toString(this.componentType), (Object)packageLocationMetaDataBuilder.getPackagePath());
                    WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), sinkOrSource, this.worker().getDlogNamespace());
                } else {
                    log.info("Skipping upload for the built-in package {}", (Object)ComponentTypeUtils.toString(this.componentType));
                    packageLocationMetaDataBuilder.setPackagePath("builtin://" + this.getFunctionCodeBuiltin(functionDetails));
                }
            } else if (isPkgUrlProvided) {
                packageLocationMetaDataBuilder.setPackagePath(ComponentImpl.createPackagePath(tenant, namespace, componentName, uploadedInputStreamAsFile.getName()));
                packageLocationMetaDataBuilder.setOriginalFileName(uploadedInputStreamAsFile.getName());
                log.info("Uploading {} package to {}", (Object)ComponentTypeUtils.toString(this.componentType), (Object)packageLocationMetaDataBuilder.getPackagePath());
                WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, this.worker().getDlogNamespace());
            } else if (functionMetaData.getPackageLocation().getPackagePath().startsWith("http") || functionMetaData.getPackageLocation().getPackagePath().startsWith("file")) {
                String fileName = new File(new URL(functionMetaData.getPackageLocation().getPackagePath()).toURI()).getName();
                packageLocationMetaDataBuilder.setPackagePath(ComponentImpl.createPackagePath(tenant, namespace, componentName, fileName));
                packageLocationMetaDataBuilder.setOriginalFileName(fileName);
                log.info("Uploading {} package to {}", (Object)ComponentTypeUtils.toString(this.componentType), (Object)packageLocationMetaDataBuilder.getPackagePath());
                WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, this.worker().getDlogNamespace());
            } else {
                packageLocationMetaDataBuilder.setPackagePath(ComponentImpl.createPackagePath(tenant, namespace, componentName, fileDetail.getFileName()));
                packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
                log.info("Uploading {} package to {}", (Object)ComponentTypeUtils.toString(this.componentType), (Object)packageLocationMetaDataBuilder.getPackagePath());
                WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, this.worker().getDlogNamespace());
            }
        } else if (isBuiltin) {
            packageLocationMetaDataBuilder.setPackagePath("builtin://" + this.getFunctionCodeBuiltin(functionDetails));
        } else if (isPkgUrlProvided) {
            packageLocationMetaDataBuilder.setPackagePath(functionPkgUrl);
        } else if (functionMetaData.getPackageLocation().getPackagePath().startsWith("http") || functionMetaData.getPackageLocation().getPackagePath().startsWith("file")) {
            packageLocationMetaDataBuilder.setPackagePath(functionMetaData.getPackageLocation().getPackagePath());
        } else {
            packageLocationMetaDataBuilder.setPackagePath(ComponentImpl.createPackagePath(tenant, namespace, componentName, fileDetail.getFileName()));
            packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
            log.info("Uploading {} package to {}", (Object)ComponentTypeUtils.toString(this.componentType), (Object)packageLocationMetaDataBuilder.getPackagePath());
            WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, this.worker().getDlogNamespace());
        }
        return packageLocationMetaDataBuilder;
    }

    private void deleteStatestoreTableAsync(String namespace, String table) {
        StorageAdminClient adminClient = this.worker().getStateStoreAdminClient();
        if (adminClient != null) {
            adminClient.deleteStream(namespace, table).whenComplete((res, throwable) -> {
                if (throwable == null && res.booleanValue() || throwable instanceof NamespaceNotFoundException || throwable instanceof StreamNotFoundException) {
                    log.info("{}/{} table deleted successfully", (Object)namespace, (Object)table);
                } else if (throwable != null) {
                    log.error("{}/{} table deletion failed {}  but moving on", new Object[]{namespace, table, throwable});
                } else {
                    log.error("{}/{} table deletion failed but moving on", (Object)namespace, (Object)table);
                }
            });
        }
    }

    @Override
    public void deregisterFunction(String tenant, String namespace, String componentName, String clientRole, AuthenticationDataHttps clientAuthenticationDataHttps) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        try {
            if (!this.isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                log.warn("{}/{}/{} Client [{}] is not authorized to deregister {}", new Object[]{tenant, namespace, componentName, clientRole, ComponentTypeUtils.toString(this.componentType)});
                throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
            }
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        try {
            this.validateDeregisterRequestParams(tenant, namespace, componentName, this.componentType);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid deregister {} request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} to deregister does not exist @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(this.componentType), componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, ComponentTypeUtils.toString(this.componentType)});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(this.componentType), componentName));
        }
        Function.FunctionMetaData newVersionedMetaData = FunctionMetaDataUtils.incrMetadataVersion(functionMetaData, functionMetaData);
        this.internalProcessFunctionRequest(newVersionedMetaData.getFunctionDetails().getTenant(), newVersionedMetaData.getFunctionDetails().getNamespace(), newVersionedMetaData.getFunctionDetails().getName(), newVersionedMetaData, true, String.format("Error deleting %s @ /%s/%s/%s", ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName));
        String functionPackagePath = functionMetaData.getPackageLocation().getPackagePath();
        if (!(functionPackagePath.startsWith("http") || functionPackagePath.startsWith("file") || functionPackagePath.startsWith("builtin"))) {
            try {
                WorkerUtils.deleteFromBookkeeper(this.worker().getDlogNamespace(), functionMetaData.getPackageLocation().getPackagePath());
            }
            catch (IOException e) {
                log.error("{}/{}/{} Failed to cleanup package in BK with path {}", new Object[]{tenant, namespace, componentName, functionMetaData.getPackageLocation().getPackagePath(), e});
            }
        }
        this.deleteStatestoreTableAsync(FunctionCommon.getStateNamespace(tenant, namespace), componentName);
    }

    @Override
    public FunctionConfig getFunctionInfo(String tenant, String namespace, String componentName, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        try {
            if (!this.isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                log.warn("{}/{}/{} Client [{}] is not authorized to get {}", new Object[]{tenant, namespace, componentName, clientRole, ComponentTypeUtils.toString(this.componentType)});
                throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
            }
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        try {
            this.validateGetFunctionRequestParams(tenant, namespace, componentName, this.componentType);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid get {} request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} does not exist @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format(ComponentTypeUtils.toString(this.componentType) + " %s doesn't exist", componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, ComponentTypeUtils.toString(this.componentType)});
            throw new RestException(Response.Status.NOT_FOUND, String.format(ComponentTypeUtils.toString(this.componentType) + " %s doesn't exist", componentName));
        }
        FunctionConfig config = FunctionConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
        return config;
    }

    @Override
    public void stopFunctionInstance(String tenant, String namespace, String componentName, String instanceId, URI uri, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        this.changeFunctionInstanceStatus(tenant, namespace, componentName, instanceId, false, uri, clientRole, clientAuthenticationDataHttps);
    }

    @Override
    public void startFunctionInstance(String tenant, String namespace, String componentName, String instanceId, URI uri, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        this.changeFunctionInstanceStatus(tenant, namespace, componentName, instanceId, true, uri, clientRole, clientAuthenticationDataHttps);
    }

    public void changeFunctionInstanceStatus(String tenant, String namespace, String componentName, String instanceId, boolean start, URI uri, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        try {
            if (!this.isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                log.warn("{}/{}/{} Client [{}] is not authorized to start/stop {}", new Object[]{tenant, namespace, componentName, clientRole, ComponentTypeUtils.toString(this.componentType)});
                throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
            }
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        try {
            this.validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, this.componentType, instanceId);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid start/stop {} request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} does not exist @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(this.componentType), componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, ComponentTypeUtils.toString(this.componentType)});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(this.componentType), componentName));
        }
        if (!FunctionMetaDataUtils.canChangeState(functionMetaData, Integer.parseInt(instanceId), start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
            log.error("Operation not permitted on {}/{}/{}", new Object[]{tenant, namespace, componentName});
            throw new RestException(Response.Status.BAD_REQUEST, "Operation not permitted");
        }
        Function.FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData, Integer.parseInt(instanceId), start);
        this.internalProcessFunctionRequest(tenant, namespace, componentName, newFunctionMetaData, false, String.format("Failed to start/stop %s: %s/%s/%s/%s", ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName, instanceId));
    }

    @Override
    public void restartFunctionInstance(String tenant, String namespace, String componentName, String instanceId, URI uri, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        try {
            if (!this.isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                log.warn("{}/{}/{} Client [{}] is not authorized to restart {}", new Object[]{tenant, namespace, componentName, clientRole, ComponentTypeUtils.toString(this.componentType)});
                throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
            }
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        try {
            this.validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, this.componentType, instanceId);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid restart {} request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} does not exist @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(this.componentType), componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, ComponentTypeUtils.toString(this.componentType)});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(this.componentType), componentName));
        }
        FunctionRuntimeManager functionRuntimeManager = this.worker().getFunctionRuntimeManager();
        try {
            functionRuntimeManager.restartFunctionInstance(tenant, namespace, componentName, Integer.parseInt(instanceId), uri);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("Failed to restart {}: {}/{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName, instanceId, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    @Override
    public void stopFunctionInstances(String tenant, String namespace, String componentName, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        this.changeFunctionStatusAllInstances(tenant, namespace, componentName, false, clientRole, clientAuthenticationDataHttps);
    }

    @Override
    public void startFunctionInstances(String tenant, String namespace, String componentName, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        this.changeFunctionStatusAllInstances(tenant, namespace, componentName, true, clientRole, clientAuthenticationDataHttps);
    }

    public void changeFunctionStatusAllInstances(String tenant, String namespace, String componentName, boolean start, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        try {
            if (!this.isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                log.warn("{}/{}/{} Client [{}] is not authorized to start/stop {}", new Object[]{tenant, namespace, componentName, clientRole, ComponentTypeUtils.toString(this.componentType)});
                throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
            }
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        try {
            this.validateGetFunctionRequestParams(tenant, namespace, componentName, this.componentType);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid start/stop {} request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.warn("{} in stopFunctionInstances does not exist @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(this.componentType), componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, ComponentTypeUtils.toString(this.componentType)});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(this.componentType), componentName));
        }
        if (!FunctionMetaDataUtils.canChangeState(functionMetaData, -1, start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
            log.error("Operation not permitted on {}/{}/{}", new Object[]{tenant, namespace, componentName});
            throw new RestException(Response.Status.BAD_REQUEST, "Operation not permitted");
        }
        Function.FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData, -1, start);
        this.internalProcessFunctionRequest(tenant, namespace, componentName, newFunctionMetaData, false, String.format("Failed to start/stop %s: %s/%s/%s", ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName));
    }

    @Override
    public void restartFunctionInstances(String tenant, String namespace, String componentName, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        try {
            if (!this.isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                log.warn("{}/{}/{} Client [{}] is not authorized to restart {}", new Object[]{tenant, namespace, componentName, clientRole, ComponentTypeUtils.toString(this.componentType)});
                throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
            }
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        try {
            this.validateGetFunctionRequestParams(tenant, namespace, componentName, this.componentType);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid restart {} request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.warn("{} in stopFunctionInstances does not exist @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(this.componentType), componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, ComponentTypeUtils.toString(this.componentType)});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(this.componentType), componentName));
        }
        FunctionRuntimeManager functionRuntimeManager = this.worker().getFunctionRuntimeManager();
        try {
            functionRuntimeManager.restartFunctionInstances(tenant, namespace, componentName);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("Failed to restart {}: {}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    @Override
    public FunctionStatsImpl getFunctionStats(String tenant, String namespace, String componentName, URI uri, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        FunctionStatsImpl functionStats;
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        try {
            if (!this.isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                log.warn("{}/{}/{} Client [{}] is not authorized to get stats for {}", new Object[]{tenant, namespace, componentName, clientRole, ComponentTypeUtils.toString(this.componentType)});
                throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
            }
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        try {
            this.validateGetFunctionRequestParams(tenant, namespace, componentName, this.componentType);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid get {} Stats request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.warn("{} in get {} Stats does not exist @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), this.componentType, tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(this.componentType), componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, ComponentTypeUtils.toString(this.componentType)});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(this.componentType), componentName));
        }
        FunctionRuntimeManager functionRuntimeManager = this.worker().getFunctionRuntimeManager();
        try {
            functionStats = functionRuntimeManager.getFunctionStats(tenant, namespace, componentName, uri);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("{}/{}/{} Got Exception Getting Stats", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        return functionStats;
    }

    @Override
    public FunctionInstanceStatsDataImpl getFunctionsInstanceStats(String tenant, String namespace, String componentName, String instanceId, URI uri, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        FunctionInstanceStatsDataImpl functionInstanceStatsData;
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        try {
            if (!this.isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                log.warn("{}/{}/{} Client [{}] is not authorized to get stats for {}", new Object[]{tenant, namespace, componentName, clientRole, ComponentTypeUtils.toString(this.componentType)});
                throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
            }
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        try {
            this.validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, this.componentType, instanceId);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid get {} Stats request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.warn("{} in get {} Stats does not exist @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), this.componentType, tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(this.componentType), componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, ComponentTypeUtils.toString(this.componentType)});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(this.componentType), componentName));
        }
        int instanceIdInt = Integer.parseInt(instanceId);
        if (instanceIdInt < 0 || instanceIdInt >= functionMetaData.getFunctionDetails().getParallelism()) {
            log.error("instanceId in get {} Stats out of bounds @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName});
            throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s doesn't have instance with id %s", ComponentTypeUtils.toString(this.componentType), componentName, instanceId));
        }
        FunctionRuntimeManager functionRuntimeManager = this.worker().getFunctionRuntimeManager();
        try {
            functionInstanceStatsData = functionRuntimeManager.getFunctionInstanceStats(tenant, namespace, componentName, Integer.parseInt(instanceId), uri);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("{}/{}/{} Got Exception Getting Stats", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        return functionInstanceStatsData;
    }

    @Override
    public List<String> listFunctions(String tenant, String namespace, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        try {
            if (!this.isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                log.warn("{}/{} Client [{}] is not authorized to list {}", new Object[]{tenant, namespace, clientRole, ComponentTypeUtils.toString(this.componentType)});
                throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
            }
        }
        catch (PulsarAdminException e) {
            log.error("{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        try {
            this.validateListFunctionRequestParams(tenant, namespace);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid list {} request @ /{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), tenant, namespace, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        Collection<Function.FunctionMetaData> functionStateList = functionMetaDataManager.listFunctions(tenant, namespace);
        LinkedList<String> retVals = new LinkedList<String>();
        for (Function.FunctionMetaData functionMetaData : functionStateList) {
            if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(this.componentType)) continue;
            retVals.add(functionMetaData.getFunctionDetails().getName());
        }
        return retVals;
    }

    void updateRequest(Function.FunctionMetaData existingFunctionMetaData, Function.FunctionMetaData functionMetaData) {
        Function.FunctionMetaData updatedVersionMetaData = FunctionMetaDataUtils.incrMetadataVersion(existingFunctionMetaData, functionMetaData);
        this.internalProcessFunctionRequest(updatedVersionMetaData.getFunctionDetails().getTenant(), updatedVersionMetaData.getFunctionDetails().getNamespace(), updatedVersionMetaData.getFunctionDetails().getName(), updatedVersionMetaData, false, "Update Failed");
    }

    @Override
    public List<ConnectorDefinition> getListOfConnectors() {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        return this.worker().getConnectorsManager().getConnectorDefinitions();
    }

    @Override
    public void reloadConnectors(String clientRole, AuthenticationDataSource authenticationData) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        if (this.worker().getWorkerConfig().isAuthorizationEnabled() && !this.isSuperUser(clientRole, authenticationData)) {
            throw new RestException(Response.Status.UNAUTHORIZED, "This operation requires super-user access");
        }
        try {
            this.worker().getConnectorsManager().reloadConnectors(this.worker().getWorkerConfig());
        }
        catch (IOException e) {
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public String triggerFunction(String tenant, String namespace, String functionName, String input, InputStream uploadedInputStream, String topic, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        String inputTopicToWrite;
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        try {
            if (!this.isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                log.warn("{}/{}/{} Client [{}] is not authorized to trigger {}", new Object[]{tenant, namespace, functionName, clientRole, ComponentTypeUtils.toString(this.componentType)});
                throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
            }
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, functionName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        try {
            this.validateTriggerRequestParams(tenant, namespace, functionName, topic, input, uploadedInputStream);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid trigger function request @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
            log.warn("Function in trigger function does not exist @ /{}/{}/{}", new Object[]{tenant, namespace, functionName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("Function %s doesn't exist", functionName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
        if (topic != null) {
            inputTopicToWrite = topic;
        } else {
            if (functionMetaData.getFunctionDetails().getSource().getInputSpecsCount() != 1) {
                log.error("Function in trigger function has more than 1 input topics @ /{}/{}/{}", new Object[]{tenant, namespace, functionName});
                throw new RestException(Response.Status.BAD_REQUEST, "Function in trigger function has more than 1 input topics");
            }
            inputTopicToWrite = functionMetaData.getFunctionDetails().getSource().getInputSpecsMap().keySet().iterator().next();
        }
        if (functionMetaData.getFunctionDetails().getSource().getInputSpecsCount() == 0 || !functionMetaData.getFunctionDetails().getSource().getInputSpecsMap().containsKey(inputTopicToWrite)) {
            log.error("Function in trigger function has unidentified topic @ /{}/{}/{} {}", new Object[]{tenant, namespace, functionName, inputTopicToWrite});
            throw new RestException(Response.Status.BAD_REQUEST, "Function in trigger function has unidentified topic");
        }
        try {
            this.worker().getBrokerAdmin().topics().getSubscriptions(inputTopicToWrite);
        }
        catch (PulsarAdminException e) {
            log.error("Function in trigger function is not ready @ /{}/{}/{}", new Object[]{tenant, namespace, functionName});
            throw new RestException(Response.Status.BAD_REQUEST, "Function in trigger function is not ready");
        }
        String outputTopic = functionMetaData.getFunctionDetails().getSink().getTopic();
        Reader<byte[]> reader = null;
        Producer<byte[]> producer = null;
        try {
            byte[] targetArray;
            if (outputTopic != null && !outputTopic.isEmpty()) {
                reader = this.worker().getClient().newReader().topic(outputTopic).startMessageId(MessageId.latest).readerName(this.worker().getWorkerConfig().getWorkerId() + "-trigger-" + FunctionCommon.getFullyQualifiedName(tenant, namespace, functionName)).create();
            }
            producer = this.worker().getClient().newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(inputTopicToWrite).producerName(this.worker().getWorkerConfig().getWorkerId() + "-trigger-" + FunctionCommon.getFullyQualifiedName(tenant, namespace, functionName)).create();
            if (uploadedInputStream != null) {
                targetArray = new byte[uploadedInputStream.available()];
                uploadedInputStream.read(targetArray);
            } else {
                targetArray = input.getBytes();
            }
            MessageId msgId = producer.send(targetArray);
            if (reader == null) {
                String string = null;
                return string;
            }
            long curTime = System.currentTimeMillis();
            long maxTime = curTime + 1000L;
            while (curTime < maxTime) {
                MessageId newMsgId;
                Message<byte[]> msg = reader.readNext(10000, TimeUnit.MILLISECONDS);
                if (msg == null) {
                    throw new RestException(Response.Status.REQUEST_TIMEOUT, "Request Timed Out");
                }
                if (msg.getProperties().containsKey("__pfn_input_msg_id__") && msg.getProperties().containsKey("__pfn_input_topic__") && msgId.equals(newMsgId = MessageId.fromByteArray(Base64.getDecoder().decode(msg.getProperties().get("__pfn_input_msg_id__")))) && msg.getProperties().get("__pfn_input_topic__").equals(TopicName.get(inputTopicToWrite).toString())) {
                    String string = new String(msg.getData());
                    return string;
                }
                curTime = System.currentTimeMillis();
            }
            throw new RestException(Response.Status.REQUEST_TIMEOUT, "Request Timed Out");
        }
        catch (SchemaSerializationException e) {
            throw new RestException(Response.Status.BAD_REQUEST, String.format("Failed to serialize input with error: %s. Please check if input data conforms with the schema of the input topic.", e.getMessage()));
        }
        catch (IOException e) {
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        finally {
            if (reader != null) {
                reader.closeAsync();
            }
            if (producer != null) {
                producer.closeAsync();
            }
        }
    }

    @Override
    public FunctionState getFunctionState(String tenant, String namespace, String functionName, String key, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        FunctionState value;
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        try {
            if (!this.isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                log.warn("{}/{}/{} Client [{}] is not authorized to get state for {}", new Object[]{tenant, namespace, functionName, clientRole, ComponentTypeUtils.toString(this.componentType)});
                throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
            }
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, functionName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        if (null == this.worker().getStateStoreAdminClient()) {
            this.throwStateStoreUnvailableResponse();
        }
        try {
            this.validateFunctionStateParams(tenant, namespace, functionName, key);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid getFunctionState request @ /{}/{}/{}/{}", new Object[]{tenant, namespace, functionName, key, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        String tableNs = FunctionCommon.getStateNamespace(tenant, namespace);
        String tableName = functionName;
        String stateStorageServiceUrl = this.worker().getWorkerConfig().getStateStorageServiceUrl();
        if (this.storageClient.get() == null) {
            this.storageClient.compareAndSet(null, StorageClientBuilder.newBuilder().withSettings(StorageClientSettings.newBuilder().serviceUri(stateStorageServiceUrl).clientName("functions-admin").build()).withNamespace(tableNs).build());
        }
        try (Table<ByteBuf, ByteBuf> table = FutureUtils.result(this.storageClient.get().openTable(tableName));
             KeyValue kv = FutureUtils.result(table.getKv(Unpooled.wrappedBuffer(key.getBytes(StandardCharsets.UTF_8))));){
            byte[] bytes;
            if (null == kv) {
                throw new RestException(Response.Status.NOT_FOUND, "key '" + key + "' doesn't exist.");
            }
            value = kv.isNumber() ? new FunctionState(key, null, null, kv.numberValue(), kv.version()) : (Utf8.isWellFormed(bytes = ByteBufUtil.getBytes((ByteBuf)kv.value())) ? new FunctionState(key, new String(bytes, StandardCharsets.UTF_8), null, null, kv.version()) : new FunctionState(key, null, bytes, null, kv.version()));
        }
        catch (RestException e) {
            throw e;
        }
        catch (NamespaceNotFoundException | StreamNotFoundException e) {
            log.debug("State not found while processing getFunctionState request @ /{}/{}/{}/{}", new Object[]{tenant, namespace, functionName, key, e});
            throw new RestException(Response.Status.NOT_FOUND, e.getMessage());
        }
        catch (Exception e) {
            log.error("Error while getFunctionState request @ /{}/{}/{}/{}", new Object[]{tenant, namespace, functionName, key, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        return value;
    }

    @Override
    public void putFunctionState(String tenant, String namespace, String functionName, String key, FunctionState state, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        if (null == this.worker().getStateStoreAdminClient()) {
            this.throwStateStoreUnvailableResponse();
        }
        try {
            if (!this.isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                log.warn("{}/{}/{} Client [{}] is not authorized to put state for {}", new Object[]{tenant, namespace, functionName, clientRole, ComponentTypeUtils.toString(this.componentType)});
                throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
            }
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, functionName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        if (!key.equals(state.getKey())) {
            log.error("{}/{}/{} Bad putFunction Request, path key doesn't match key in json", new Object[]{tenant, namespace, functionName});
            throw new RestException(Response.Status.BAD_REQUEST, "Path key doesn't match key in json");
        }
        if (state.getStringValue() == null && state.getByteValue() == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Setting Counter values not supported in put state");
        }
        try {
            this.validateFunctionStateParams(tenant, namespace, functionName, key);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid putFunctionState request @ /{}/{}/{}/{}", new Object[]{tenant, namespace, functionName, key, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        String tableNs = FunctionCommon.getStateNamespace(tenant, namespace);
        String tableName = functionName;
        String stateStorageServiceUrl = this.worker().getWorkerConfig().getStateStorageServiceUrl();
        if (this.storageClient.get() == null) {
            this.storageClient.compareAndSet(null, StorageClientBuilder.newBuilder().withSettings(StorageClientSettings.newBuilder().serviceUri(stateStorageServiceUrl).clientName("functions-admin").build()).withNamespace(tableNs).build());
        }
        ByteBuf value = !StringUtils.isEmpty(state.getStringValue()) ? Unpooled.wrappedBuffer(state.getStringValue().getBytes()) : Unpooled.wrappedBuffer(state.getByteValue());
        try (Table<ByteBuf, ByteBuf> table = FutureUtils.result(this.storageClient.get().openTable(tableName));){
            FutureUtils.result(table.put(Unpooled.wrappedBuffer(key.getBytes(StandardCharsets.UTF_8)), value));
        }
        catch (NamespaceNotFoundException | StreamNotFoundException e) {
            log.debug("State not found while processing putFunctionState request @ /{}/{}/{}/{}", new Object[]{tenant, namespace, functionName, key, e});
            throw new RestException(Response.Status.NOT_FOUND, e.getMessage());
        }
        catch (Exception e) {
            log.error("Error while putFunctionState request @ /{}/{}/{}/{}", new Object[]{tenant, namespace, functionName, key, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    @Override
    public void uploadFunction(InputStream uploadedInputStream, String path, String clientRole, AuthenticationDataSource authenticationData) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        if (this.worker().getWorkerConfig().isAuthorizationEnabled() && !this.isSuperUser(clientRole, authenticationData)) {
            throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
        }
        try {
            if (uploadedInputStream == null || path == null) {
                throw new IllegalArgumentException("Function Package is not provided " + path);
            }
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid upload function request @ /{}", (Object)path, (Object)e);
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        try {
            log.info("Uploading function package to {}", (Object)path);
            WorkerUtils.uploadToBookKeeper(this.worker().getDlogNamespace(), uploadedInputStream, path);
        }
        catch (IOException e) {
            log.error("Error uploading file {}", (Object)path, (Object)e);
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    @Override
    public StreamingOutput downloadFunction(String tenant, String namespace, String componentName, String clientRole, AuthenticationDataHttps clientAuthenticationDataHttps) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        try {
            if (!this.isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                log.warn("{}/{}/{} Client [{}] is not admin and authorized to download package for {} ", new Object[]{tenant, namespace, componentName, clientRole, ComponentTypeUtils.toString(this.componentType)});
                throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
            }
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} does not exist @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(this.componentType), componentName));
        }
        String pkgPath = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName).getPackageLocation().getPackagePath();
        return this.getStreamingOutput(pkgPath);
    }

    private StreamingOutput getStreamingOutput(String pkgPath) {
        StreamingOutput streamingOutput = output -> {
            if (pkgPath.startsWith("http")) {
                URL url = URI.create(pkgPath).toURL();
                try (InputStream inputStream = url.openStream();){
                    IOUtils.copy(inputStream, output);
                }
            } else if (pkgPath.startsWith("file")) {
                URI url = URI.create(pkgPath);
                File file = new File(url.getPath());
                Files.copy(file.toPath(), output);
            } else if (pkgPath.startsWith("builtin") && !this.worker().getWorkerConfig().getUploadBuiltinSinksSources().booleanValue()) {
                String sType = pkgPath.replaceFirst("^builtin://", "");
                String connectorsDir = this.worker().getWorkerConfig().getConnectorsDirectory();
                log.warn("Processing package {} ; looking at the dir {}", (Object)pkgPath, (Object)connectorsDir);
                Functions sinksOrSources = FunctionUtils.searchForFunctions(connectorsDir, true);
                Path narPath = sinksOrSources.getFunctions().get(sType);
                if (narPath == null) {
                    throw new IllegalStateException("Didn't find " + pkgPath + " in " + connectorsDir);
                }
                log.info("Loading {} from {}", (Object)pkgPath, (Object)narPath);
                try (FileInputStream in = new FileInputStream(narPath.toString());){
                    IOUtils.copy((InputStream)in, output, 1024);
                    output.flush();
                }
            } else {
                WorkerUtils.downloadFromBookkeeper(this.worker().getDlogNamespace(), output, pkgPath);
            }
        };
        return streamingOutput;
    }

    @Override
    public StreamingOutput downloadFunction(String path, String clientRole, AuthenticationDataHttps clientAuthenticationDataHttps) {
        block7: {
            if (!this.isWorkerServiceAvailable()) {
                RestUtils.throwUnavailableException();
            }
            if (this.worker().getWorkerConfig().isAuthorizationEnabled()) {
                String[] tokens = path.split("/");
                if (tokens.length == 4) {
                    String tenant = tokens[0];
                    String namespace = tokens[1];
                    String componentName = tokens[2];
                    try {
                        if (!this.isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                            log.warn("{}/{}/{} Client [{}] is not admin and authorized to download package for {} ", new Object[]{tenant, namespace, componentName, clientRole, ComponentTypeUtils.toString(this.componentType)});
                            throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
                        }
                        break block7;
                    }
                    catch (PulsarAdminException e) {
                        log.error("{}/{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, componentName, e});
                        throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
                    }
                }
                if (!this.isSuperUser(clientRole, clientAuthenticationDataHttps)) {
                    throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
                }
            }
        }
        return this.getStreamingOutput(path);
    }

    private void validateListFunctionRequestParams(String tenant, String namespace) throws IllegalArgumentException {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
    }

    protected void validateGetFunctionInstanceRequestParams(String tenant, String namespace, String componentName, Function.FunctionDetails.ComponentType componentType, String instanceId) throws IllegalArgumentException {
        this.validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
        if (instanceId == null) {
            throw new IllegalArgumentException(String.format("%s Instance Id is not provided", componentType));
        }
    }

    protected void validateGetFunctionRequestParams(String tenant, String namespace, String subject, Function.FunctionDetails.ComponentType componentType) throws IllegalArgumentException {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (subject == null) {
            throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " name is not provided");
        }
    }

    private void validateDeregisterRequestParams(String tenant, String namespace, String subject, Function.FunctionDetails.ComponentType componentType) throws IllegalArgumentException {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (subject == null) {
            throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " name is not provided");
        }
    }

    private void validateFunctionStateParams(String tenant, String namespace, String functionName, String key) throws IllegalArgumentException {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (functionName == null) {
            throw new IllegalArgumentException(ComponentTypeUtils.toString(this.componentType) + " name is not provided");
        }
        if (key == null) {
            throw new IllegalArgumentException("Key is not provided");
        }
    }

    private String getFunctionCodeBuiltin(Function.FunctionDetails functionDetails) {
        Function.SinkSpec sinkSpec;
        Function.SourceSpec sourceSpec;
        if (functionDetails.hasSource() && !StringUtils.isEmpty((sourceSpec = functionDetails.getSource()).getBuiltin())) {
            return sourceSpec.getBuiltin();
        }
        if (functionDetails.hasSink() && !StringUtils.isEmpty((sinkSpec = functionDetails.getSink()).getBuiltin())) {
            return sinkSpec.getBuiltin();
        }
        return null;
    }

    private void validateTriggerRequestParams(String tenant, String namespace, String functionName, String topic, String input, InputStream uploadedInputStream) {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (functionName == null) {
            throw new IllegalArgumentException("Function name is not provided");
        }
        if (uploadedInputStream == null && input == null) {
            throw new IllegalArgumentException("Trigger Data is not provided");
        }
    }

    private void throwStateStoreUnvailableResponse() {
        throw new RestException(Response.Status.SERVICE_UNAVAILABLE, "State storage client is not done initializing. Please try again in a little while.");
    }

    public static String createPackagePath(String tenant, String namespace, String functionName, String fileName) {
        return String.format("%s/%s/%s/%s", tenant, namespace, Codec.encode(functionName), FunctionCommon.getUniquePackageName(Codec.encode(fileName)));
    }

    public boolean isAuthorizedRole(String tenant, String namespace, String clientRole, AuthenticationDataSource authenticationData) throws PulsarAdminException {
        if (this.worker().getWorkerConfig().isAuthorizationEnabled()) {
            if (this.isSuperUser(clientRole, authenticationData)) {
                return true;
            }
            if (clientRole != null) {
                try {
                    TenantInfoImpl tenantInfo = (TenantInfoImpl)this.worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
                    if (tenantInfo != null && this.worker().getAuthorizationService().isTenantAdmin(tenant, clientRole, tenantInfo, authenticationData).get().booleanValue()) {
                        return true;
                    }
                }
                catch (InterruptedException | ExecutionException | PulsarAdminException.NotFoundException exception) {
                    // empty catch block
                }
            }
            if (clientRole != null && authenticationData != null) {
                return this.allowFunctionOps(NamespaceName.get(tenant, namespace), clientRole, authenticationData);
            }
            return false;
        }
        return true;
    }

    protected void componentStatusRequestValidate(String tenant, String namespace, String componentName, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        if (!this.isWorkerServiceAvailable()) {
            throw new RestException(Response.Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
        }
        try {
            if (!this.isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                log.warn("{}/{}/{} Client [{}] is not authorized get status for {}", new Object[]{tenant, namespace, componentName, clientRole, ComponentTypeUtils.toString(this.componentType)});
                throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
            }
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        try {
            this.validateGetFunctionRequestParams(tenant, namespace, componentName, this.componentType);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid get {} Status request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.warn("{} in get {} Status does not exist @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), this.componentType, tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(this.componentType), componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, ComponentTypeUtils.toString(this.componentType)});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(this.componentType), componentName));
        }
    }

    protected void componentInstanceStatusRequestValidate(String tenant, String namespace, String componentName, int instanceId, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        this.componentStatusRequestValidate(tenant, namespace, componentName, clientRole, clientAuthenticationDataHttps);
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        int parallelism = functionMetaData.getFunctionDetails().getParallelism();
        if (instanceId < 0 || instanceId >= parallelism) {
            log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), tenant, namespace, componentName});
            throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s doesn't have instance with id %s", ComponentTypeUtils.toString(this.componentType), componentName, instanceId));
        }
    }

    public boolean isSuperUser(String clientRole, AuthenticationDataSource authenticationData) {
        if (clientRole != null) {
            try {
                if (this.worker().getWorkerConfig().getSuperUserRoles() != null && this.worker().getWorkerConfig().getSuperUserRoles().contains(clientRole)) {
                    return true;
                }
                return this.worker().getAuthorizationService().isSuperUser(clientRole, authenticationData).get(this.worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                log.warn("Time-out {} sec while checking the role {} is a super user role ", (Object)this.worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), (Object)clientRole);
                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
            }
            catch (Exception e) {
                log.warn("Admin-client with Role - failed to check the role {} is a super user role {} ", new Object[]{clientRole, e.getMessage(), e});
                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
            }
        }
        return false;
    }

    public boolean allowFunctionOps(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
        try {
            switch (this.componentType) {
                case SINK: {
                    return this.worker().getAuthorizationService().allowSinkOpsAsync(namespaceName, role, authenticationData).get(this.worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
                }
                case SOURCE: {
                    return this.worker().getAuthorizationService().allowSourceOpsAsync(namespaceName, role, authenticationData).get(this.worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
                }
            }
            return this.worker().getAuthorizationService().allowFunctionOpsAsync(namespaceName, role, authenticationData).get(this.worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            log.warn("Time-out {} sec while checking function authorization on {} ", (Object)this.worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), (Object)namespaceName);
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        catch (Exception e) {
            log.warn("Admin-client with Role - {} failed to get function permissions for namespace - {}. {}", new Object[]{role, namespaceName, e.getMessage(), e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    private void internalProcessFunctionRequest(String tenant, String namespace, String functionName, Function.FunctionMetaData functionMetadata, boolean delete, String errorMsg) {
        try {
            if (this.worker().getLeaderService().isLeader()) {
                this.worker().getFunctionMetaDataManager().updateFunctionOnLeader(functionMetadata, delete);
            } else {
                FunctionsImpl functions = (FunctionsImpl)this.worker().getFunctionAdmin().functions();
                functions.updateOnWorkerLeader(tenant, namespace, functionName, functionMetadata.toByteArray(), delete);
            }
        }
        catch (PulsarAdminException e) {
            log.error(errorMsg, (Throwable)e);
            throw new RestException(e.getStatusCode(), e.getMessage());
        }
        catch (IllegalStateException e) {
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        catch (IllegalArgumentException e) {
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
    }

    protected ClassLoader getClassLoaderFromPackage(String className, File packageFile, String narExtractionDirectory) {
        return FunctionCommon.getClassLoaderFromPackage(this.componentType, className, packageFile, narExtractionDirectory);
    }

    protected abstract class GetStatus<S, T> {
        protected GetStatus() {
        }

        public abstract T notScheduledInstance();

        public abstract T fromFunctionStatusProto(InstanceCommunication.FunctionStatus var1, String var2);

        public abstract T notRunning(String var1, String var2);

        public T getComponentInstanceStatus(String tenant, String namespace, String name, int instanceId, URI uri) {
            String workerId;
            Function.Assignment assignment = ComponentImpl.this.worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged() ? ComponentImpl.this.worker().getFunctionRuntimeManager().findFunctionAssignment(tenant, namespace, name, -1) : ComponentImpl.this.worker().getFunctionRuntimeManager().findFunctionAssignment(tenant, namespace, name, instanceId);
            if (assignment == null) {
                return this.notScheduledInstance();
            }
            String assignedWorkerId = assignment.getWorkerId();
            if (assignedWorkerId.equals(workerId = ComponentImpl.this.worker().getWorkerConfig().getWorkerId())) {
                FunctionRuntimeInfo functionRuntimeInfo = ComponentImpl.this.worker().getFunctionRuntimeManager().getFunctionRuntimeInfo(FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()));
                if (functionRuntimeInfo == null) {
                    return this.notRunning(assignedWorkerId, "");
                }
                RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
                if (runtimeSpawner != null) {
                    try {
                        return this.fromFunctionStatusProto(functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get(), assignedWorkerId);
                    }
                    catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                }
                String message = functionRuntimeInfo.getStartupException() != null ? functionRuntimeInfo.getStartupException().getMessage() : "";
                return this.notRunning(assignedWorkerId, message);
            }
            List<WorkerInfo> workerInfoList = ComponentImpl.this.worker().getMembershipManager().getCurrentMembership();
            WorkerInfo workerInfo = null;
            for (WorkerInfo entry : workerInfoList) {
                if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
                workerInfo = entry;
            }
            if (workerInfo == null) {
                return this.notScheduledInstance();
            }
            if (uri == null) {
                throw new WebApplicationException(Response.serverError().status(Response.Status.INTERNAL_SERVER_ERROR).build());
            }
            URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(new Object[0]);
            throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
        }

        public abstract S getStatus(String var1, String var2, String var3, Collection<Function.Assignment> var4, URI var5) throws PulsarAdminException;

        public abstract S getStatusExternal(String var1, String var2, String var3, int var4);

        public abstract S emptyStatus(int var1);

        public S getComponentStatus(String tenant, String namespace, String name, URI uri) {
            Function.FunctionMetaData functionMetaData = ComponentImpl.this.worker().getFunctionMetaDataManager().getFunctionMetaData(tenant, namespace, name);
            Collection<Function.Assignment> assignments = ComponentImpl.this.worker().getFunctionRuntimeManager().findFunctionAssignments(tenant, namespace, name);
            if (ComponentImpl.this.worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
                Function.Assignment assignment = assignments.iterator().next();
                boolean isOwner = ComponentImpl.this.worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
                if (isOwner) {
                    return this.getStatusExternal(tenant, namespace, name, functionMetaData.getFunctionDetails().getParallelism());
                }
                List<WorkerInfo> workerInfoList = ComponentImpl.this.worker().getMembershipManager().getCurrentMembership();
                WorkerInfo workerInfo = null;
                for (WorkerInfo entry : workerInfoList) {
                    if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
                    workerInfo = entry;
                }
                if (workerInfo == null) {
                    return this.emptyStatus(functionMetaData.getFunctionDetails().getParallelism());
                }
                if (uri == null) {
                    throw new WebApplicationException(Response.serverError().status(Response.Status.INTERNAL_SERVER_ERROR).build());
                }
                URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(new Object[0]);
                throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
            }
            try {
                return this.getStatus(tenant, namespace, name, assignments, uri);
            }
            catch (PulsarAdminException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

