package org.apache.pulsar.functions.worker;

import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionInstanceId;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.javax.ws.rs.WebApplicationException;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.apache.pulsar.shade.javax.ws.rs.core.UriBuilder;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.FunctionInstanceStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.FunctionStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Reflections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/worker/FunctionRuntimeManager.class */
public class FunctionRuntimeManager implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(FunctionRuntimeManager.class);
    final WorkerConfig workerConfig;
    private FunctionActioner functionActioner;
    private RuntimeFactory runtimeFactory;
    private MembershipManager membershipManager;
    private final PulsarAdmin functionAdmin;
    private PulsarWorkerService workerService;
    private final FunctionMetaDataManager functionMetaDataManager;
    private final WorkerStatsManager workerStatsManager;
    private final ErrorNotifier errorNotifier;
    Map<String, Map<String, Function.Assignment>> workerIdToAssignments = new ConcurrentHashMap();
    final FunctionRuntimeInfos functionRuntimeInfos = new FunctionRuntimeInfos();
    boolean isInitializePhase = false;
    private final CompletableFuture<Void> isInitialized = new CompletableFuture<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/functions/worker/FunctionRuntimeManager$FunctionRuntimeInfos.class */
    public class FunctionRuntimeInfos {
        private Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new ConcurrentHashMap();

        FunctionRuntimeInfos() {
        }

        public FunctionRuntimeInfo get(String str) {
            return this.functionRuntimeInfoMap.get(str);
        }

        public void put(String str, FunctionRuntimeInfo functionRuntimeInfo) {
            if (FunctionRuntimeManager.this.isInitializePhase) {
                return;
            }
            this.functionRuntimeInfoMap.put(str, functionRuntimeInfo);
        }

        public void remove(String str) {
            if (FunctionRuntimeManager.this.isInitializePhase) {
                return;
            }
            this.functionRuntimeInfoMap.remove(str);
        }

        public Map<String, FunctionRuntimeInfo> getAll() {
            return this.functionRuntimeInfoMap;
        }

        public int size() {
            return this.functionRuntimeInfoMap.size();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v74, types: [org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator] */
    public FunctionRuntimeManager(WorkerConfig workerConfig, PulsarWorkerService pulsarWorkerService, Namespace namespace, MembershipManager membershipManager, ConnectorsManager connectorsManager, FunctionsManager functionsManager, FunctionMetaDataManager functionMetaDataManager, WorkerStatsManager workerStatsManager, ErrorNotifier errorNotifier) throws Exception {
        this.workerConfig = workerConfig;
        this.workerService = pulsarWorkerService;
        this.functionAdmin = pulsarWorkerService.getFunctionAdmin();
        DefaultSecretsProviderConfigurator defaultSecretsProviderConfigurator = !StringUtils.isEmpty(workerConfig.getSecretsProviderConfiguratorClassName()) ? (SecretsProviderConfigurator) Reflections.createInstance(workerConfig.getSecretsProviderConfiguratorClassName(), ClassLoader.getSystemClassLoader()) : new DefaultSecretsProviderConfigurator();
        log.info("Initializing secrets provider configurator {} with configs: {}", defaultSecretsProviderConfigurator.getClass().getName(), workerConfig.getSecretsProviderConfiguratorConfig());
        defaultSecretsProviderConfigurator.init(workerConfig.getSecretsProviderConfiguratorConfig());
        Optional<FunctionAuthProvider> empty = Optional.empty();
        AuthenticationConfig authenticationConfig = null;
        if (workerConfig.isAuthenticationEnabled()) {
            authenticationConfig = AuthenticationConfig.builder().clientAuthenticationPlugin(workerConfig.getBrokerClientAuthenticationPlugin()).clientAuthenticationParameters(workerConfig.getBrokerClientAuthenticationParameters()).tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath()).useTls(workerConfig.isUseTls()).tlsAllowInsecureConnection(workerConfig.isTlsAllowInsecureConnection()).tlsHostnameVerificationEnable(workerConfig.isTlsEnableHostnameVerification()).build();
            if (!StringUtils.isEmpty(workerConfig.getFunctionAuthProviderClassName())) {
                empty = Optional.of(FunctionAuthProvider.getAuthProvider(workerConfig.getFunctionAuthProviderClassName()));
            }
        }
        Optional<RuntimeCustomizer> empty2 = Optional.empty();
        if (!StringUtils.isEmpty(workerConfig.getRuntimeCustomizerClassName())) {
            empty2 = Optional.of(RuntimeCustomizer.getRuntimeCustomizer(workerConfig.getRuntimeCustomizerClassName()));
            empty2.get().initialize((Map) Optional.ofNullable(workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap()));
        }
        if (!StringUtils.isEmpty(workerConfig.getFunctionRuntimeFactoryClassName())) {
            this.runtimeFactory = RuntimeFactory.getFuntionRuntimeFactory(workerConfig.getFunctionRuntimeFactoryClassName());
        } else if (workerConfig.getThreadContainerFactory() != null) {
            this.runtimeFactory = new ThreadRuntimeFactory();
            workerConfig.setFunctionRuntimeFactoryConfigs((Map) ObjectMapperFactory.getThreadLocal().convertValue(workerConfig.getThreadContainerFactory(), Map.class));
        } else if (workerConfig.getProcessContainerFactory() != null) {
            this.runtimeFactory = new ProcessRuntimeFactory();
            workerConfig.setFunctionRuntimeFactoryConfigs((Map) ObjectMapperFactory.getThreadLocal().convertValue(workerConfig.getProcessContainerFactory(), Map.class));
        } else {
            if (workerConfig.getKubernetesContainerFactory() == null) {
                throw new RuntimeException("A Function Runtime Factory needs to be set");
            }
            this.runtimeFactory = new KubernetesRuntimeFactory();
            workerConfig.setFunctionRuntimeFactoryConfigs((Map) ObjectMapperFactory.getThreadLocal().convertValue(workerConfig.getKubernetesContainerFactory(), Map.class));
        }
        this.runtimeFactory.initialize(workerConfig, authenticationConfig, defaultSecretsProviderConfigurator, connectorsManager, empty, empty2);
        this.functionActioner = new FunctionActioner(this.workerConfig, this.runtimeFactory, namespace, connectorsManager, functionsManager, pulsarWorkerService.getBrokerAdmin());
        this.membershipManager = membershipManager;
        this.functionMetaDataManager = functionMetaDataManager;
        this.workerStatsManager = workerStatsManager;
        this.errorNotifier = errorNotifier;
    }

    public MessageId initialize() {
        try {
            Reader<byte[]> createReader = WorkerUtils.createReader(this.workerService.getClient().newReader(), this.workerConfig.getWorkerId() + "-function-assignment-initialize", this.workerConfig.getFunctionAssignmentTopic(), MessageId.earliest);
            Throwable th = null;
            try {
                this.isInitializePhase = true;
                MessageId messageId = MessageId.earliest;
                while (createReader.hasMessageAvailable()) {
                    Message<byte[]> readNext = createReader.readNext();
                    messageId = readNext.getMessageId();
                    processAssignmentMessage(readNext);
                }
                this.isInitializePhase = false;
                Map<String, Function.Assignment> map = this.workerIdToAssignments.get(this.workerConfig.getWorkerId());
                if (map != null) {
                    for (Function.Assignment assignment : map.values()) {
                        if (needsStart(assignment)) {
                            startFunctionInstance(assignment);
                        }
                    }
                }
                this.isInitialized.complete(null);
                MessageId messageId2 = messageId;
                if (createReader != null) {
                    if (0 != 0) {
                        try {
                            createReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createReader.close();
                    }
                }
                return messageId2;
            } finally {
            }
        } catch (Exception e) {
            log.error("Failed to initialize function runtime manager: {}", e.getMessage(), e);
            throw new RuntimeException(e);
        }
    }

    public synchronized Map<String, Map<String, Function.Assignment>> getCurrentAssignments() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Map<String, Function.Assignment>> entry : this.workerIdToAssignments.entrySet()) {
            HashMap hashMap2 = new HashMap();
            hashMap2.putAll(entry.getValue());
            hashMap.put(entry.getKey(), hashMap2);
        }
        return hashMap;
    }

    public synchronized Function.Assignment findFunctionAssignment(String str, String str2, String str3, int i) {
        return findAssignment(str, str2, str3, i);
    }

    public synchronized Collection<Function.Assignment> findFunctionAssignments(String str, String str2, String str3) {
        return findFunctionAssignments(str, str2, str3, this.workerIdToAssignments);
    }

    public static Collection<Function.Assignment> findFunctionAssignments(String str, String str2, String str3, Map<String, Map<String, Function.Assignment>> map) {
        LinkedList linkedList = new LinkedList();
        Iterator<Map<String, Function.Assignment>> it = map.values().iterator();
        while (it.hasNext()) {
            linkedList.addAll((Collection) it.next().values().stream().filter(assignment -> {
                return str.equals(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant()) && str2.equals(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace()) && str3.equals(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName());
            }).collect(Collectors.toList()));
        }
        return linkedList;
    }

    public synchronized void removeAssignments(Collection<Function.Assignment> collection) {
        Iterator<Function.Assignment> it = collection.iterator();
        while (it.hasNext()) {
            deleteAssignment(it.next());
        }
    }

    public synchronized void restartFunctionInstance(String str, String str2, String str3, int i, URI uri) throws Exception {
        if (this.runtimeFactory.externallyManaged()) {
            throw new WebApplicationException(Response.serverError().status(Response.Status.NOT_IMPLEMENTED).type("application/json").entity(new ErrorData("Externally managed schedulers can't do per instance stop")).build());
        }
        Function.Assignment findAssignment = findAssignment(str, str2, str3, i);
        String format = String.format("%s/%s/%s/%s", str, str2, str3, Integer.valueOf(i));
        if (findAssignment == null) {
            throw new WebApplicationException(Response.serverError().status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(format + " doesn't exist")).build());
        }
        if (findAssignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
            stopFunction(FunctionCommon.getFullyQualifiedInstanceId(findAssignment.getInstance()), true);
            return;
        }
        WorkerInfo workerInfo = null;
        for (WorkerInfo workerInfo2 : this.membershipManager.getCurrentMembership()) {
            if (findAssignment.getWorkerId().equals(workerInfo2.getWorkerId())) {
                workerInfo = workerInfo2;
            }
        }
        if (workerInfo == null) {
            throw new WebApplicationException(Response.serverError().status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(format + " has not been assigned yet")).build());
        }
        if (uri != null) {
            throw new WebApplicationException(Response.temporaryRedirect(UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(new Object[0])).build());
        }
        throw new WebApplicationException(Response.serverError().status(Response.Status.INTERNAL_SERVER_ERROR).build());
    }

    public synchronized void restartFunctionInstances(String str, String str2, String str3) throws Exception {
        String format = String.format("%s/%s/%s", str, str2, str3);
        Collection<Function.Assignment> findFunctionAssignments = findFunctionAssignments(str, str2, str3);
        if (findFunctionAssignments.isEmpty()) {
            throw new WebApplicationException(Response.serverError().status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(format + " has not been assigned yet")).build());
        }
        if (this.runtimeFactory.externallyManaged()) {
            Function.Assignment next = findFunctionAssignments.iterator().next();
            String workerId = next.getWorkerId();
            String workerId2 = this.workerConfig.getWorkerId();
            String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(next.getInstance());
            if (workerId.equals(workerId2)) {
                stopFunction(fullyQualifiedInstanceId, true);
                return;
            }
            WorkerInfo workerInfo = null;
            for (WorkerInfo workerInfo2 : this.membershipManager.getCurrentMembership()) {
                if (next.getWorkerId().equals(workerInfo2.getWorkerId())) {
                    workerInfo = workerInfo2;
                }
            }
            if (workerInfo != null) {
                restartFunctionUsingPulsarAdmin(next, str, str2, str3, true);
                return;
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] has not been assigned yet", fullyQualifiedInstanceId);
                }
                throw new WebApplicationException(Response.serverError().status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(format + " has not been assigned yet")).build());
            }
        }
        for (Function.Assignment assignment : findFunctionAssignments) {
            String workerId3 = assignment.getWorkerId();
            String workerId4 = this.workerConfig.getWorkerId();
            String fullyQualifiedInstanceId2 = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
            if (workerId3.equals(workerId4)) {
                stopFunction(fullyQualifiedInstanceId2, true);
            } else {
                WorkerInfo workerInfo3 = null;
                for (WorkerInfo workerInfo4 : this.membershipManager.getCurrentMembership()) {
                    if (assignment.getWorkerId().equals(workerInfo4.getWorkerId())) {
                        workerInfo3 = workerInfo4;
                    }
                }
                if (workerInfo3 != null) {
                    restartFunctionUsingPulsarAdmin(assignment, str, str2, str3, false);
                } else if (log.isDebugEnabled()) {
                    log.debug("[{}] has not been assigned yet", fullyQualifiedInstanceId2);
                }
            }
        }
    }

    @VisibleForTesting
    void restartFunctionUsingPulsarAdmin(Function.Assignment assignment, String str, String str2, String str3, boolean z) throws PulsarAdminException {
        Function.FunctionDetails.ComponentType componentType = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType();
        if (z) {
            if (Function.FunctionDetails.ComponentType.SOURCE == componentType) {
                this.functionAdmin.sources().restartSource(str, str2, str3);
                return;
            } else if (Function.FunctionDetails.ComponentType.SINK == componentType) {
                this.functionAdmin.sinks().restartSink(str, str2, str3);
                return;
            } else {
                this.functionAdmin.functions().restartFunction(str, str2, str3);
                return;
            }
        }
        if (Function.FunctionDetails.ComponentType.SOURCE == componentType) {
            this.functionAdmin.sources().restartSource(str, str2, str3, assignment.getInstance().getInstanceId());
        } else if (Function.FunctionDetails.ComponentType.SINK == componentType) {
            this.functionAdmin.sinks().restartSink(str, str2, str3, assignment.getInstance().getInstanceId());
        } else {
            this.functionAdmin.functions().restartFunction(str, str2, str3, assignment.getInstance().getInstanceId());
        }
    }

    public void stopAllOwnedFunctions() {
        if (this.runtimeFactory.externallyManaged()) {
            log.warn("Will not stop any functions since they are externally managed");
            return;
        }
        Map<String, Function.Assignment> map = this.workerIdToAssignments.get(this.workerConfig.getWorkerId());
        if (map != null) {
            new TreeMap(map).values().forEach(assignment -> {
                String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
                try {
                    stopFunction(fullyQualifiedInstanceId, false);
                } catch (Exception e) {
                    log.warn("Failed to stop function {} - {}", fullyQualifiedInstanceId, e.getMessage());
                }
            });
        }
    }

    @VisibleForTesting
    void stopFunction(String str, boolean z) throws Exception {
        log.info("[{}] {}..", z ? "restarting" : "stopping", str);
        FunctionRuntimeInfo functionRuntimeInfo = getFunctionRuntimeInfo(str);
        if (functionRuntimeInfo != null) {
            conditionallyStopFunction(functionRuntimeInfo);
            if (z) {
                try {
                    conditionallyStartFunction(functionRuntimeInfo);
                } catch (Exception e) {
                    log.info("{} Error re-starting function", str, e);
                    functionRuntimeInfo.setStartupException(e);
                    throw e;
                }
            }
        }
    }

    public FunctionInstanceStatsDataImpl getFunctionInstanceStats(String str, String str2, String str3, int i, URI uri) {
        Function.Assignment findAssignment = this.runtimeFactory.externallyManaged() ? findAssignment(str, str2, str3, -1) : findAssignment(str, str2, str3, i);
        if (findAssignment == null) {
            return new FunctionInstanceStatsDataImpl();
        }
        if (findAssignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
            FunctionRuntimeInfo functionRuntimeInfo = getFunctionRuntimeInfo(FunctionCommon.getFullyQualifiedInstanceId(findAssignment.getInstance()));
            return functionRuntimeInfo.getRuntimeSpawner() != null ? (FunctionInstanceStatsDataImpl) WorkerUtils.getFunctionInstanceStats(FunctionCommon.getFullyQualifiedInstanceId(findAssignment.getInstance()), functionRuntimeInfo, i).getMetrics() : new FunctionInstanceStatsDataImpl();
        }
        WorkerInfo workerInfo = null;
        for (WorkerInfo workerInfo2 : this.membershipManager.getCurrentMembership()) {
            if (findAssignment.getWorkerId().equals(workerInfo2.getWorkerId())) {
                workerInfo = workerInfo2;
            }
        }
        if (workerInfo == null) {
            return new FunctionInstanceStatsDataImpl();
        }
        if (uri == null) {
            throw new WebApplicationException(Response.serverError().status(Response.Status.INTERNAL_SERVER_ERROR).build());
        }
        throw new WebApplicationException(Response.temporaryRedirect(UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(new Object[0])).build());
    }

    public FunctionStatsImpl getFunctionStats(String str, String str2, String str3, URI uri) throws PulsarAdminException {
        Collection<Function.Assignment> findFunctionAssignments = findFunctionAssignments(str, str2, str3);
        FunctionStatsImpl functionStatsImpl = new FunctionStatsImpl();
        if (findFunctionAssignments.isEmpty()) {
            return functionStatsImpl;
        }
        if (this.runtimeFactory.externallyManaged()) {
            Function.Assignment next = findFunctionAssignments.iterator().next();
            if (!this.workerConfig.getWorkerId().equals(next.getWorkerId())) {
                WorkerInfo workerInfo = null;
                for (WorkerInfo workerInfo2 : this.membershipManager.getCurrentMembership()) {
                    if (next.getWorkerId().equals(workerInfo2.getWorkerId())) {
                        workerInfo = workerInfo2;
                    }
                }
                if (workerInfo == null) {
                    return functionStatsImpl;
                }
                if (uri == null) {
                    throw new WebApplicationException(Response.serverError().status(Response.Status.INTERNAL_SERVER_ERROR).build());
                }
                throw new WebApplicationException(Response.temporaryRedirect(UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(new Object[0])).build());
            }
            int parallelism = next.getInstance().getFunctionMetaData().getFunctionDetails().getParallelism();
            for (int i = 0; i < parallelism; i++) {
                FunctionInstanceStatsDataImpl functionInstanceStats = getFunctionInstanceStats(str, str2, str3, i, null);
                FunctionInstanceStatsImpl functionInstanceStatsImpl = new FunctionInstanceStatsImpl();
                functionInstanceStatsImpl.setInstanceId(i);
                functionInstanceStatsImpl.setMetrics(functionInstanceStats);
                functionStatsImpl.addInstance(functionInstanceStatsImpl);
            }
        } else {
            for (Function.Assignment assignment : findFunctionAssignments) {
                FunctionInstanceStatsDataImpl functionInstanceStats2 = this.workerConfig.getWorkerId().equals(assignment.getWorkerId()) ? getFunctionInstanceStats(str, str2, str3, assignment.getInstance().getInstanceId(), null) : (FunctionInstanceStatsDataImpl) this.functionAdmin.functions().getFunctionStats(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), assignment.getInstance().getInstanceId());
                FunctionInstanceStatsImpl functionInstanceStatsImpl2 = new FunctionInstanceStatsImpl();
                functionInstanceStatsImpl2.setInstanceId(assignment.getInstance().getInstanceId());
                functionInstanceStatsImpl2.setMetrics(functionInstanceStats2);
                functionStatsImpl.addInstance(functionInstanceStatsImpl2);
            }
        }
        return functionStatsImpl.calculateOverall();
    }

    public synchronized void processAssignmentMessage(Message<byte[]> message) {
        if (message.getData() == null || message.getData().length == 0) {
            log.info("Received assignment delete: {}", message.getKey());
            deleteAssignment(message.getKey());
            return;
        }
        try {
            Function.Assignment parseFrom = Function.Assignment.parseFrom(message.getData());
            log.info("Received assignment update: {}", parseFrom);
            processAssignment(parseFrom);
        } catch (IOException e) {
            log.error("[{}] Received bad assignment update at message {}", message.getMessageId(), e);
            throw new RuntimeException(e);
        }
    }

    public synchronized void processAssignment(Function.Assignment assignment) {
        boolean z = false;
        Iterator<Map<String, Function.Assignment>> it = this.workerIdToAssignments.values().iterator();
        while (it.hasNext()) {
            if (it.next().containsKey(FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()))) {
                z = true;
            }
        }
        if (z) {
            updateAssignment(assignment);
        } else {
            addAssignment(assignment);
        }
    }

    private void updateAssignment(Function.Assignment assignment) {
        String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
        Function.Assignment findAssignment = findAssignment(assignment);
        if (findAssignment.equals(assignment)) {
            return;
        }
        FunctionRuntimeInfo _getFunctionRuntimeInfo = _getFunctionRuntimeInfo(fullyQualifiedInstanceId);
        if (!this.runtimeFactory.externallyManaged()) {
            if (_getFunctionRuntimeInfo != null) {
                conditionallyStopFunction(_getFunctionRuntimeInfo);
            }
            if (!assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
                this.functionRuntimeInfos.remove(fullyQualifiedInstanceId);
            } else if (needsStart(assignment)) {
                FunctionRuntimeInfo functionRuntimeInfo = new FunctionRuntimeInfo();
                functionRuntimeInfo.setFunctionInstance(assignment.getInstance());
                conditionallyStartFunction(functionRuntimeInfo);
                this.functionRuntimeInfos.put(fullyQualifiedInstanceId, functionRuntimeInfo);
            }
        } else if (!assignment.getInstance().equals(findAssignment.getInstance())) {
            if (_getFunctionRuntimeInfo != null) {
                conditionallyStopFunction(_getFunctionRuntimeInfo);
            }
            if (!assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
                this.functionRuntimeInfos.remove(fullyQualifiedInstanceId);
            } else if (needsStart(assignment)) {
                FunctionRuntimeInfo functionRuntimeInfo2 = new FunctionRuntimeInfo();
                functionRuntimeInfo2.setFunctionInstance(assignment.getInstance());
                conditionallyStartFunction(functionRuntimeInfo2);
                this.functionRuntimeInfos.put(fullyQualifiedInstanceId, functionRuntimeInfo2);
            }
        } else if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
            FunctionRuntimeInfo functionRuntimeInfo3 = new FunctionRuntimeInfo();
            functionRuntimeInfo3.setFunctionInstance(assignment.getInstance());
            RuntimeSpawner runtimeSpawner = this.functionActioner.getRuntimeSpawner(assignment.getInstance(), assignment.getInstance().getFunctionMetaData().getPackageLocation().getPackagePath());
            runtimeSpawner.getRuntime().reinitialize();
            functionRuntimeInfo3.setRuntimeSpawner(runtimeSpawner);
            this.functionRuntimeInfos.put(fullyQualifiedInstanceId, functionRuntimeInfo3);
        } else {
            this.functionRuntimeInfos.remove(fullyQualifiedInstanceId);
        }
        Function.Assignment findAssignment2 = findAssignment(assignment);
        if (findAssignment2 != null) {
            deleteAssignment(findAssignment2);
        }
        setAssignment(assignment);
    }

    public synchronized void deleteAssignment(String str) {
        Map<String, Function.Assignment> map;
        FunctionRuntimeInfo _getFunctionRuntimeInfo = _getFunctionRuntimeInfo(str);
        if (_getFunctionRuntimeInfo != null) {
            Function.FunctionDetails functionDetails = _getFunctionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
            if (this.functionMetaDataManager.containsFunction(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName())) {
                conditionallyStopFunction(_getFunctionRuntimeInfo);
            } else {
                FunctionInstanceId functionInstanceId = new FunctionInstanceId(str);
                if (findFunctionAssignments(functionInstanceId.getTenant(), functionInstanceId.getNamespace(), functionInstanceId.getName(), this.workerIdToAssignments).size() > 1) {
                    conditionallyStopFunction(_getFunctionRuntimeInfo);
                } else {
                    conditionallyTerminateFunction(_getFunctionRuntimeInfo);
                }
            }
            this.functionRuntimeInfos.remove(str);
        }
        String str2 = null;
        Iterator<Map.Entry<String, Map<String, Function.Assignment>>> it = this.workerIdToAssignments.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<String, Map<String, Function.Assignment>> next = it.next();
            if (next.getValue().remove(str) != null) {
                str2 = next.getKey();
                break;
            }
        }
        if (str2 == null || (map = this.workerIdToAssignments.get(str2)) == null || !map.isEmpty()) {
            return;
        }
        this.workerIdToAssignments.remove(str2);
    }

    void deleteAssignment(Function.Assignment assignment) {
        String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
        Map<String, Function.Assignment> map = this.workerIdToAssignments.get(assignment.getWorkerId());
        if (map != null) {
            map.remove(fullyQualifiedInstanceId);
            if (map.isEmpty()) {
                this.workerIdToAssignments.remove(assignment.getWorkerId());
            }
        }
    }

    private void addAssignment(Function.Assignment assignment) {
        setAssignment(assignment);
        if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId()) && needsStart(assignment)) {
            startFunctionInstance(assignment);
        }
    }

    private void startFunctionInstance(Function.Assignment assignment) {
        String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
        FunctionRuntimeInfo _getFunctionRuntimeInfo = _getFunctionRuntimeInfo(fullyQualifiedInstanceId);
        if (_getFunctionRuntimeInfo == null) {
            _getFunctionRuntimeInfo = new FunctionRuntimeInfo().setFunctionInstance(assignment.getInstance());
            this.functionRuntimeInfos.put(fullyQualifiedInstanceId, _getFunctionRuntimeInfo);
        } else {
            log.warn("Function {} already running. Going to restart function.", _getFunctionRuntimeInfo);
            conditionallyStopFunction(_getFunctionRuntimeInfo);
        }
        conditionallyStartFunction(_getFunctionRuntimeInfo);
    }

    public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
        return this.functionRuntimeInfos.getAll();
    }

    private Function.Assignment findAssignment(String str, String str2, String str3, int i) {
        String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(str, str2, str3, i);
        Iterator<Map.Entry<String, Map<String, Function.Assignment>>> it = this.workerIdToAssignments.entrySet().iterator();
        while (it.hasNext()) {
            Function.Assignment assignment = it.next().getValue().get(fullyQualifiedInstanceId);
            if (assignment != null) {
                return assignment;
            }
        }
        return null;
    }

    private Function.Assignment findAssignment(Function.Assignment assignment) {
        return findAssignment(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), assignment.getInstance().getInstanceId());
    }

    @VisibleForTesting
    void setAssignment(Function.Assignment assignment) {
        if (!this.workerIdToAssignments.containsKey(assignment.getWorkerId())) {
            this.workerIdToAssignments.put(assignment.getWorkerId(), new HashMap());
        }
        this.workerIdToAssignments.get(assignment.getWorkerId()).put(FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()), assignment);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        stopAllOwnedFunctions();
        if (this.runtimeFactory != null) {
            this.runtimeFactory.close();
        }
    }

    public synchronized FunctionRuntimeInfo getFunctionRuntimeInfo(String str) {
        return _getFunctionRuntimeInfo(str);
    }

    private FunctionRuntimeInfo _getFunctionRuntimeInfo(String str) {
        FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfos.get(str);
        if (functionRuntimeInfo == null && this.workerIdToAssignments.containsValue(this.workerConfig.getWorkerId()) && this.workerIdToAssignments.get(this.workerConfig.getWorkerId()).containsValue(str)) {
            log.error("Assignments and RuntimeInfos are inconsistent. FunctionRuntimeInfo missing for " + str);
        }
        return functionRuntimeInfo;
    }

    private boolean needsStart(Function.Assignment assignment) {
        boolean z = false;
        Function.FunctionMetaData functionMetaData = assignment.getInstance().getFunctionMetaData();
        if (functionMetaData.getInstanceStatesMap() == null || functionMetaData.getInstanceStatesMap().isEmpty()) {
            z = true;
        } else if (assignment.getInstance().getInstanceId() < 0) {
            Iterator<Function.FunctionState> it = functionMetaData.getInstanceStatesMap().values().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                if (it.next() == Function.FunctionState.RUNNING) {
                    z = true;
                    break;
                }
            }
        } else if (functionMetaData.getInstanceStatesOrDefault(assignment.getInstance().getInstanceId(), Function.FunctionState.RUNNING) == Function.FunctionState.RUNNING) {
            z = true;
        }
        return z;
    }

    private void conditionallyStartFunction(FunctionRuntimeInfo functionRuntimeInfo) {
        if (this.isInitializePhase) {
            return;
        }
        this.workerStatsManager.startInstanceProcessTimeStart();
        this.functionActioner.startFunction(functionRuntimeInfo);
        this.workerStatsManager.startInstanceProcessTimeEnd();
    }

    private void conditionallyStopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
        if (this.isInitializePhase) {
            return;
        }
        this.workerStatsManager.stopInstanceProcessTimeStart();
        this.functionActioner.stopFunction(functionRuntimeInfo);
        this.workerStatsManager.stopInstanceProcessTimeEnd();
    }

    private void conditionallyTerminateFunction(FunctionRuntimeInfo functionRuntimeInfo) {
        if (this.isInitializePhase) {
            return;
        }
        this.workerStatsManager.startInstanceProcessTimeStart();
        this.functionActioner.terminateFunction(functionRuntimeInfo);
        this.workerStatsManager.startInstanceProcessTimeEnd();
    }

    public int getMyInstances() {
        Map<String, Function.Assignment> map = this.workerIdToAssignments.get(this.workerConfig.getWorkerId());
        if (map == null) {
            return 0;
        }
        return map.size();
    }

    public WorkerConfig getWorkerConfig() {
        return this.workerConfig;
    }

    public void setFunctionActioner(FunctionActioner functionActioner) {
        this.functionActioner = functionActioner;
    }

    public FunctionActioner getFunctionActioner() {
        return this.functionActioner;
    }

    public RuntimeFactory getRuntimeFactory() {
        return this.runtimeFactory;
    }

    public PulsarWorkerService getWorkerService() {
        return this.workerService;
    }

    public CompletableFuture<Void> getIsInitialized() {
        return this.isInitialized;
    }
}
