package org.apache.pulsar.functions.runtime.thread;

import java.util.Optional;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.io.netty.util.internal.PlatformDependent;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Reflections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.class */
public class ThreadRuntimeFactory implements RuntimeFactory {
    private static final Logger log = LoggerFactory.getLogger(ThreadRuntimeFactory.class);
    private ThreadGroup threadGroup;
    private FunctionCacheManager fnCache;
    private ClientBuilder clientBuilder;
    private PulsarClient pulsarClient;
    private PulsarAdmin pulsarAdmin;
    private String stateStorageImplClass;
    private String storageServiceUrl;
    private SecretsProvider defaultSecretsProvider;
    private FunctionCollectorRegistry collectorRegistry;
    private String narExtractionDirectory;
    private volatile boolean closed;
    private SecretsProviderConfigurator secretsProviderConfigurator;
    private ClassLoader rootClassLoader;
    private Optional<ConnectorsManager> connectorsManager;

    public ThreadRuntimeFactory(String str, String str2, String str3, String str4, AuthenticationConfig authenticationConfig, SecretsProvider secretsProvider, FunctionCollectorRegistry functionCollectorRegistry, String str5, ClassLoader classLoader, boolean z, String str6) throws Exception {
        initialize(str, Optional.empty(), str2, authenticationConfig, str3, str4, null, secretsProvider, functionCollectorRegistry, str5, classLoader, z, str6, Optional.empty(), null);
    }

    public ThreadRuntimeFactory(String str, String str2, String str3, String str4, AuthenticationConfig authenticationConfig, SecretsProvider secretsProvider, FunctionCollectorRegistry functionCollectorRegistry, String str5, ClassLoader classLoader, boolean z, String str6, FunctionCacheManager functionCacheManager) throws Exception {
        initialize(str, Optional.empty(), str2, authenticationConfig, str3, str4, null, secretsProvider, functionCollectorRegistry, str5, classLoader, z, str6, Optional.empty(), functionCacheManager);
    }

    private void initialize(String str, Optional<ThreadRuntimeFactoryConfig.MemoryLimit> optional, String str2, AuthenticationConfig authenticationConfig, String str3, String str4, SecretsProviderConfigurator secretsProviderConfigurator, SecretsProvider secretsProvider, FunctionCollectorRegistry functionCollectorRegistry, String str5, ClassLoader classLoader, boolean z, String str6, Optional<ConnectorsManager> optional2, FunctionCacheManager functionCacheManager) throws PulsarClientException {
        if (classLoader == null) {
            classLoader = Thread.currentThread().getContextClassLoader();
        }
        this.rootClassLoader = classLoader;
        this.secretsProviderConfigurator = secretsProviderConfigurator;
        this.defaultSecretsProvider = secretsProvider;
        this.fnCache = functionCacheManager;
        if (functionCacheManager == null) {
            this.fnCache = new FunctionCacheManagerImpl(classLoader);
        }
        this.threadGroup = new ThreadGroup(str);
        this.pulsarAdmin = z ? InstanceUtils.createPulsarAdminClient(str6, authenticationConfig) : null;
        this.clientBuilder = InstanceUtils.createPulsarClientBuilder(str2, authenticationConfig, calculateClientMemoryLimit(optional));
        this.pulsarClient = this.clientBuilder.build();
        this.stateStorageImplClass = str3;
        this.storageServiceUrl = str4;
        this.collectorRegistry = functionCollectorRegistry;
        this.narExtractionDirectory = str5;
        this.connectorsManager = optional2;
    }

    private Optional<Long> calculateClientMemoryLimit(Optional<ThreadRuntimeFactoryConfig.MemoryLimit> optional) {
        if (optional.isPresent()) {
            Long absoluteValue = optional.get().getAbsoluteValue();
            Double percentOfMaxDirectMemory = optional.get().getPercentOfMaxDirectMemory();
            if (absoluteValue != null) {
                Preconditions.checkArgument(absoluteValue.longValue() > 0, "Absolute memory limit for Pulsar client has to be positive");
            }
            if (percentOfMaxDirectMemory != null) {
                Preconditions.checkArgument(percentOfMaxDirectMemory.doubleValue() > 0.0d && percentOfMaxDirectMemory.doubleValue() <= 100.0d, "Percent of max direct memory limit for Pulsar client must be between 0 and 100");
            }
            if (absoluteValue != null && percentOfMaxDirectMemory != null) {
                return Optional.of(Long.valueOf(Math.min(absoluteValue.longValue(), getBytesPercentDirectMem(percentOfMaxDirectMemory.doubleValue()))));
            }
            if (absoluteValue != null) {
                return Optional.of(absoluteValue);
            }
            if (percentOfMaxDirectMemory != null) {
                return Optional.of(Long.valueOf(getBytesPercentDirectMem(percentOfMaxDirectMemory.doubleValue())));
            }
        }
        return Optional.empty();
    }

    private long getBytesPercentDirectMem(double d) {
        return (long) (PlatformDependent.maxDirectMemory() * (d / 100.0d));
    }

    @Override // org.apache.pulsar.functions.runtime.RuntimeFactory
    public void initialize(WorkerConfig workerConfig, AuthenticationConfig authenticationConfig, SecretsProviderConfigurator secretsProviderConfigurator, ConnectorsManager connectorsManager, Optional<FunctionAuthProvider> optional, Optional<RuntimeCustomizer> optional2) throws Exception {
        ThreadRuntimeFactoryConfig threadRuntimeFactoryConfig = (ThreadRuntimeFactoryConfig) RuntimeUtils.getRuntimeFunctionConfig(workerConfig.getFunctionRuntimeFactoryConfigs(), ThreadRuntimeFactoryConfig.class);
        initialize(threadRuntimeFactoryConfig.getThreadGroupName(), Optional.ofNullable(threadRuntimeFactoryConfig.getPulsarClientMemoryLimit()), workerConfig.getPulsarServiceUrl(), authenticationConfig, workerConfig.getStateStorageProviderImplementation(), workerConfig.getStateStorageServiceUrl(), secretsProviderConfigurator, null, null, workerConfig.getNarExtractionDirectory(), null, workerConfig.isExposeAdminClientEnabled(), workerConfig.getPulsarWebServiceUrl(), Optional.of(connectorsManager), null);
    }

    @Override // org.apache.pulsar.functions.runtime.RuntimeFactory
    public ThreadRuntime createContainer(InstanceConfig instanceConfig, String str, String str2, Long l) {
        SecretsProvider secretsProvider = this.defaultSecretsProvider;
        if (secretsProvider == null) {
            secretsProvider = (SecretsProvider) Reflections.createInstance(this.secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails()), this.rootClassLoader);
            log.info("Initializing secrets provider {} with configs: {}", secretsProvider.getClass().getName(), this.secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()));
            secretsProvider.init(this.secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()));
        }
        return new ThreadRuntime(instanceConfig, this.fnCache, this.threadGroup, str, this.pulsarClient, this.clientBuilder, this.pulsarAdmin, this.stateStorageImplClass, this.storageServiceUrl, secretsProvider, this.collectorRegistry, this.narExtractionDirectory, this.connectorsManager);
    }

    @Override // org.apache.pulsar.functions.runtime.RuntimeFactory, java.lang.AutoCloseable
    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.threadGroup.interrupt();
        this.fnCache.close();
        try {
            this.pulsarClient.close();
        } catch (PulsarClientException e) {
            log.warn("Failed to close pulsar client when closing function container factory", e);
        }
        if (this.pulsarAdmin != null) {
            this.pulsarAdmin.close();
        }
        InstanceCache.shutdown();
    }

    public ThreadRuntimeFactory() {
    }

    public ThreadGroup getThreadGroup() {
        return this.threadGroup;
    }
}
