/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.utils;

import java.io.IOException;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.BatchSourceConfig;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.config.validation.ConfigValidation;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.pulsar.functions.runtime.shaded.com.google.gson.Gson;
import org.apache.pulsar.functions.runtime.shaded.com.google.gson.reflect.TypeToken;
import org.apache.pulsar.functions.runtime.shaded.net.jodah.typetools.TypeResolver;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.ResourceConfigUtils;
import org.apache.pulsar.functions.utils.ValidatorUtils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SourceConfigUtils {
    private static final Logger log = LoggerFactory.getLogger(SourceConfigUtils.class);

    public static Function.FunctionDetails convert(SourceConfig sourceConfig, ExtractedSourceDetails sourceDetails) throws IllegalArgumentException {
        boolean isBuiltin;
        Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
        boolean bl = isBuiltin = !StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith("builtin");
        if (sourceConfig.getTenant() != null) {
            functionDetailsBuilder.setTenant(sourceConfig.getTenant());
        }
        if (sourceConfig.getNamespace() != null) {
            functionDetailsBuilder.setNamespace(sourceConfig.getNamespace());
        }
        if (sourceConfig.getName() != null) {
            functionDetailsBuilder.setName(sourceConfig.getName());
        }
        functionDetailsBuilder.setRuntime(Function.FunctionDetails.Runtime.JAVA);
        if (sourceConfig.getParallelism() != null) {
            functionDetailsBuilder.setParallelism(sourceConfig.getParallelism());
        } else {
            functionDetailsBuilder.setParallelism(1);
        }
        functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
        functionDetailsBuilder.setAutoAck(true);
        if (sourceConfig.getProcessingGuarantees() != null) {
            functionDetailsBuilder.setProcessingGuarantees(FunctionCommon.convertProcessingGuarantee(sourceConfig.getProcessingGuarantees()));
        }
        Function.SourceSpec.Builder sourceSpecBuilder = Function.SourceSpec.newBuilder();
        if (sourceDetails.getSourceClassName() != null) {
            sourceSpecBuilder.setClassName(sourceDetails.getSourceClassName());
        }
        if (isBuiltin) {
            String builtin = sourceConfig.getArchive().replaceFirst("^builtin://", "");
            sourceSpecBuilder.setBuiltin(builtin);
        }
        HashMap<String, Object> configs = new HashMap<String, Object>();
        if (sourceConfig.getConfigs() != null) {
            configs.putAll(sourceConfig.getConfigs());
        }
        if (sourceConfig.getBatchSourceConfig() != null) {
            configs.put("__BATCHSOURCECONFIGS__", new Gson().toJson(sourceConfig.getBatchSourceConfig()));
            configs.put("__BATCHSOURCECLASSNAME__", sourceSpecBuilder.getClassName());
            sourceSpecBuilder.setClassName("org.apache.pulsar.functions.source.batch.BatchSourceExecutor");
        }
        sourceSpecBuilder.setConfigs(new Gson().toJson(configs));
        if (sourceConfig.getSecrets() != null && !sourceConfig.getSecrets().isEmpty()) {
            functionDetailsBuilder.setSecretsMap(new Gson().toJson(sourceConfig.getSecrets()));
        }
        if (sourceDetails.getTypeArg() != null) {
            sourceSpecBuilder.setTypeClassName(sourceDetails.getTypeArg());
        }
        functionDetailsBuilder.setSource(sourceSpecBuilder);
        Function.SinkSpec.Builder sinkSpecBuilder = Function.SinkSpec.newBuilder();
        if (!StringUtils.isEmpty(sourceConfig.getSchemaType())) {
            sinkSpecBuilder.setSchemaType(sourceConfig.getSchemaType());
        }
        if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName())) {
            sinkSpecBuilder.setSerDeClassName(sourceConfig.getSerdeClassName());
        }
        sinkSpecBuilder.setTopic(sourceConfig.getTopicName());
        if (sourceDetails.getTypeArg() != null) {
            sinkSpecBuilder.setTypeClassName(sourceDetails.getTypeArg());
        }
        if (sourceConfig.getProducerConfig() != null) {
            ProducerConfig conf = sourceConfig.getProducerConfig();
            Function.ProducerSpec.Builder pbldr = Function.ProducerSpec.newBuilder();
            if (conf.getMaxPendingMessages() != null) {
                pbldr.setMaxPendingMessages(conf.getMaxPendingMessages());
            }
            if (conf.getMaxPendingMessagesAcrossPartitions() != null) {
                pbldr.setMaxPendingMessagesAcrossPartitions(conf.getMaxPendingMessagesAcrossPartitions());
            }
            if (conf.getUseThreadLocalProducers() != null) {
                pbldr.setUseThreadLocalProducers(conf.getUseThreadLocalProducers());
            }
            if (conf.getCryptoConfig() != null) {
                pbldr.setCryptoSpec(CryptoUtils.convert(conf.getCryptoConfig()));
            }
            if (conf.getBatchBuilder() != null) {
                pbldr.setBatchBuilder(conf.getBatchBuilder());
            }
            sinkSpecBuilder.setProducerSpec(pbldr.build());
        }
        if (sourceConfig.getBatchBuilder() != null) {
            Function.ProducerSpec.Builder builder = sinkSpecBuilder.getProducerSpec() != null ? sinkSpecBuilder.getProducerSpec().toBuilder() : Function.ProducerSpec.newBuilder();
            sinkSpecBuilder.setProducerSpec(builder.setBatchBuilder(sourceConfig.getBatchBuilder()).build());
        }
        sinkSpecBuilder.setForwardSourceMessageProperty(true);
        functionDetailsBuilder.setSink(sinkSpecBuilder);
        Resources resources = Resources.mergeWithDefault(sourceConfig.getResources());
        Function.Resources.Builder bldr = Function.Resources.newBuilder();
        bldr.setCpu(resources.getCpu());
        bldr.setRam(resources.getRam());
        bldr.setDisk(resources.getDisk());
        functionDetailsBuilder.setResources(bldr);
        if (!StringUtils.isEmpty(sourceConfig.getRuntimeFlags())) {
            functionDetailsBuilder.setRuntimeFlags(sourceConfig.getRuntimeFlags());
        }
        functionDetailsBuilder.setComponentType(Function.FunctionDetails.ComponentType.SOURCE);
        if (!StringUtils.isEmpty(sourceConfig.getCustomRuntimeOptions())) {
            functionDetailsBuilder.setCustomRuntimeOptions(sourceConfig.getCustomRuntimeOptions());
        }
        return functionDetailsBuilder.build();
    }

    public static SourceConfig convertFromDetails(Function.FunctionDetails functionDetails) {
        Map<String, Object> configMap;
        SourceConfig sourceConfig = new SourceConfig();
        sourceConfig.setTenant(functionDetails.getTenant());
        sourceConfig.setNamespace(functionDetails.getNamespace());
        sourceConfig.setName(functionDetails.getName());
        sourceConfig.setParallelism(functionDetails.getParallelism());
        sourceConfig.setProcessingGuarantees(FunctionCommon.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
        Function.SourceSpec sourceSpec = functionDetails.getSource();
        if (!StringUtils.isEmpty(sourceSpec.getClassName())) {
            sourceConfig.setClassName(sourceSpec.getClassName());
        }
        if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
            sourceConfig.setArchive("builtin://" + sourceSpec.getBuiltin());
        }
        if ((configMap = SourceConfigUtils.extractSourceConfig(sourceSpec, FunctionCommon.getFullyQualifiedName(functionDetails))) != null) {
            BatchSourceConfig batchSourceConfig = SourceConfigUtils.extractBatchSourceConfig(configMap);
            if (batchSourceConfig != null) {
                sourceConfig.setBatchSourceConfig(batchSourceConfig);
                if (configMap.containsKey("__BATCHSOURCECLASSNAME__")) {
                    if (!StringUtils.isEmpty((String)configMap.get("__BATCHSOURCECLASSNAME__"))) {
                        sourceConfig.setClassName((String)configMap.get("__BATCHSOURCECLASSNAME__"));
                    } else {
                        sourceConfig.setClassName(null);
                    }
                }
            }
            configMap.remove("__BATCHSOURCECONFIGS__");
            configMap.remove("__BATCHSOURCECLASSNAME__");
            sourceConfig.setConfigs(configMap);
        }
        if (!StringUtils.isEmpty(functionDetails.getSecretsMap())) {
            Type type = new TypeToken<Map<String, Object>>(){}.getType();
            Map secretsMap = (Map)new Gson().fromJson(functionDetails.getSecretsMap(), type);
            sourceConfig.setSecrets(secretsMap);
        }
        Function.SinkSpec sinkSpec = functionDetails.getSink();
        sourceConfig.setTopicName(sinkSpec.getTopic());
        if (!StringUtils.isEmpty(sinkSpec.getSchemaType())) {
            sourceConfig.setSchemaType(sinkSpec.getSchemaType());
        }
        if (!StringUtils.isEmpty(sinkSpec.getSerDeClassName())) {
            sourceConfig.setSerdeClassName(sinkSpec.getSerDeClassName());
        }
        if (sinkSpec.getProducerSpec() != null) {
            Function.ProducerSpec spec = sinkSpec.getProducerSpec();
            ProducerConfig producerConfig = new ProducerConfig();
            if (spec.getMaxPendingMessages() != 0) {
                producerConfig.setMaxPendingMessages(spec.getMaxPendingMessages());
            }
            if (spec.getMaxPendingMessagesAcrossPartitions() != 0) {
                producerConfig.setMaxPendingMessagesAcrossPartitions(spec.getMaxPendingMessagesAcrossPartitions());
            }
            if (spec.hasCryptoSpec()) {
                producerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(spec.getCryptoSpec()));
            }
            if (spec.getBatchBuilder() != null) {
                producerConfig.setBatchBuilder(spec.getBatchBuilder());
            }
            producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers());
            sourceConfig.setProducerConfig(producerConfig);
        }
        if (functionDetails.hasResources()) {
            Resources resources = new Resources();
            resources.setCpu(functionDetails.getResources().getCpu());
            resources.setRam(functionDetails.getResources().getRam());
            resources.setDisk(functionDetails.getResources().getDisk());
            sourceConfig.setResources(resources);
        }
        if (!StringUtils.isEmpty(functionDetails.getRuntimeFlags())) {
            sourceConfig.setRuntimeFlags(functionDetails.getRuntimeFlags());
        }
        if (!StringUtils.isEmpty(functionDetails.getCustomRuntimeOptions())) {
            sourceConfig.setCustomRuntimeOptions(functionDetails.getCustomRuntimeOptions());
        }
        return sourceConfig;
    }

    public static ExtractedSourceDetails validateAndExtractDetails(SourceConfig sourceConfig, ClassLoader sourceClassLoader, boolean validateConnectorConfig) {
        Class<?> sourceClass;
        String sourceClassName;
        if (StringUtils.isEmpty(sourceConfig.getTenant())) {
            throw new IllegalArgumentException("Source tenant cannot be null");
        }
        if (StringUtils.isEmpty(sourceConfig.getNamespace())) {
            throw new IllegalArgumentException("Source namespace cannot be null");
        }
        if (StringUtils.isEmpty(sourceConfig.getName())) {
            throw new IllegalArgumentException("Source name cannot be null");
        }
        if (StringUtils.isEmpty(sourceConfig.getTopicName())) {
            throw new IllegalArgumentException("Topic name cannot be null");
        }
        if (!TopicName.isValid(sourceConfig.getTopicName())) {
            throw new IllegalArgumentException("Topic name is invalid");
        }
        if (sourceConfig.getParallelism() != null && sourceConfig.getParallelism() <= 0) {
            throw new IllegalArgumentException("Source parallelism must be a positive number");
        }
        if (sourceConfig.getResources() != null) {
            ResourceConfigUtils.validate(sourceConfig.getResources());
        }
        if ((sourceClassName = sourceConfig.getClassName()) == null) {
            try {
                sourceClassName = ConnectorUtils.getIOSourceClass((NarClassLoader)sourceClassLoader);
            }
            catch (IOException e) {
                throw new IllegalArgumentException("Failed to extract source class from archive", e);
            }
        }
        try {
            sourceClass = sourceClassLoader.loadClass(sourceClassName);
        }
        catch (ClassNotFoundException e) {
            throw new IllegalArgumentException(String.format("Source class %s not found in class loader", sourceClassName, e));
        }
        if (!Source.class.isAssignableFrom(sourceClass) && !BatchSource.class.isAssignableFrom(sourceClass)) {
            throw new IllegalArgumentException(String.format("Source class %s does not implement the correct interface", sourceClass.getName()));
        }
        if (BatchSource.class.isAssignableFrom(sourceClass)) {
            if (sourceConfig.getBatchSourceConfig() != null) {
                SourceConfigUtils.validateBatchSourceConfig(sourceConfig.getBatchSourceConfig());
            } else {
                throw new IllegalArgumentException(String.format("Source class %s implements %s but batch source source config is not specified", sourceClass.getName(), BatchSource.class.getName()));
            }
        }
        Class<?> typeArg = FunctionCommon.getSourceType(sourceClass);
        if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName()) && !StringUtils.isEmpty(sourceConfig.getSchemaType())) {
            throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
        }
        if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName())) {
            ValidatorUtils.validateSerde(sourceConfig.getSerdeClassName(), typeArg, sourceClassLoader, false);
        }
        if (!StringUtils.isEmpty(sourceConfig.getSchemaType())) {
            ValidatorUtils.validateSchema(sourceConfig.getSchemaType(), typeArg, sourceClassLoader, false);
        }
        if (sourceConfig.getProducerConfig() != null && sourceConfig.getProducerConfig().getCryptoConfig() != null) {
            ValidatorUtils.validateCryptoKeyReader(sourceConfig.getProducerConfig().getCryptoConfig(), sourceClassLoader, true);
        }
        if (typeArg.equals(TypeResolver.Unknown.class)) {
            throw new IllegalArgumentException(String.format("Failed to resolve type for Source class %s", sourceClassName));
        }
        if (validateConnectorConfig && sourceClassLoader instanceof NarClassLoader) {
            SourceConfigUtils.validateSourceConfig(sourceConfig, (NarClassLoader)sourceClassLoader);
        }
        return new ExtractedSourceDetails(sourceClassName, typeArg.getName());
    }

    public static SourceConfig clone(SourceConfig sourceConfig) {
        return ObjectMapperFactory.getThreadLocal().readValue(ObjectMapperFactory.getThreadLocal().writeValueAsBytes(sourceConfig), SourceConfig.class);
    }

    public static SourceConfig validateUpdate(SourceConfig existingConfig, SourceConfig newConfig) {
        SourceConfig mergedConfig = SourceConfigUtils.clone(existingConfig);
        if (!existingConfig.getTenant().equals(newConfig.getTenant())) {
            throw new IllegalArgumentException("Tenants differ");
        }
        if (!existingConfig.getNamespace().equals(newConfig.getNamespace())) {
            throw new IllegalArgumentException("Namespaces differ");
        }
        if (!existingConfig.getName().equals(newConfig.getName())) {
            throw new IllegalArgumentException("Function Names differ");
        }
        if (!StringUtils.isEmpty(newConfig.getClassName())) {
            mergedConfig.setClassName(newConfig.getClassName());
        }
        if (!StringUtils.isEmpty(newConfig.getTopicName())) {
            mergedConfig.setTopicName(newConfig.getTopicName());
        }
        if (!StringUtils.isEmpty(newConfig.getSerdeClassName())) {
            mergedConfig.setSerdeClassName(newConfig.getSerdeClassName());
        }
        if (!StringUtils.isEmpty(newConfig.getSchemaType())) {
            mergedConfig.setSchemaType(newConfig.getSchemaType());
        }
        if (newConfig.getConfigs() != null) {
            mergedConfig.setConfigs(newConfig.getConfigs());
        }
        if (newConfig.getSecrets() != null) {
            mergedConfig.setSecrets(newConfig.getSecrets());
        }
        if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals((Object)existingConfig.getProcessingGuarantees())) {
            throw new IllegalArgumentException("Processing Guarantees cannot be altered");
        }
        if (newConfig.getParallelism() != null) {
            mergedConfig.setParallelism(newConfig.getParallelism());
        }
        if (newConfig.getResources() != null) {
            mergedConfig.setResources(ResourceConfigUtils.merge(existingConfig.getResources(), newConfig.getResources()));
        }
        if (!StringUtils.isEmpty(newConfig.getArchive())) {
            mergedConfig.setArchive(newConfig.getArchive());
        }
        if (!StringUtils.isEmpty(newConfig.getRuntimeFlags())) {
            mergedConfig.setRuntimeFlags(newConfig.getRuntimeFlags());
        }
        if (!StringUtils.isEmpty(newConfig.getCustomRuntimeOptions())) {
            mergedConfig.setCustomRuntimeOptions(newConfig.getCustomRuntimeOptions());
        }
        if (SourceConfigUtils.isBatchSource(existingConfig) != SourceConfigUtils.isBatchSource(newConfig)) {
            throw new IllegalArgumentException("Sources cannot be update between regular sources and batchsource");
        }
        if (newConfig.getBatchSourceConfig() != null) {
            SourceConfigUtils.validateBatchSourceConfigUpdate(existingConfig.getBatchSourceConfig(), newConfig.getBatchSourceConfig());
            mergedConfig.setBatchSourceConfig(newConfig.getBatchSourceConfig());
        }
        return mergedConfig;
    }

    public static void validateBatchSourceConfig(BatchSourceConfig batchSourceConfig) throws IllegalArgumentException {
        if (StringUtils.isEmpty(batchSourceConfig.getDiscoveryTriggererClassName())) {
            log.error("BatchSourceConfig does not specify Discovery Trigger ClassName");
            throw new IllegalArgumentException("BatchSourceConfig does not specify Discovery Trigger ClassName");
        }
    }

    public static Map<String, Object> extractSourceConfig(Function.SourceSpec sourceSpec, String fqfn) {
        if (!StringUtils.isEmpty(sourceSpec.getConfigs())) {
            TypeReference<HashMap<String, Object>> typeRef = new TypeReference<HashMap<String, Object>>(){};
            try {
                return ObjectMapperFactory.getThreadLocal().readValue(sourceSpec.getConfigs(), typeRef);
            }
            catch (IOException e) {
                log.error("Failed to read configs for source {}", (Object)fqfn, (Object)e);
                throw new RuntimeException(e);
            }
        }
        return null;
    }

    public static BatchSourceConfig extractBatchSourceConfig(Map<String, Object> configMap) {
        if (configMap.containsKey("__BATCHSOURCECONFIGS__")) {
            String batchSourceConfigJson = (String)configMap.get("__BATCHSOURCECONFIGS__");
            return new Gson().fromJson(batchSourceConfigJson, BatchSourceConfig.class);
        }
        return null;
    }

    public static Map<String, String> computeBatchSourceIntermediateTopicSubscriptions(Function.FunctionDetails details, String fqfn) {
        Map<String, Object> configMap = SourceConfigUtils.extractSourceConfig(details.getSource(), fqfn);
        if (configMap != null) {
            BatchSourceConfig batchSourceConfig = SourceConfigUtils.extractBatchSourceConfig(configMap);
            String intermediateTopicName = SourceConfigUtils.computeBatchSourceIntermediateTopicName(details.getTenant(), details.getNamespace(), details.getName()).toString();
            if (batchSourceConfig != null) {
                HashMap<String, String> subscriptionMap = new HashMap<String, String>();
                subscriptionMap.put(intermediateTopicName, SourceConfigUtils.computeBatchSourceInstanceSubscriptionName(details.getTenant(), details.getNamespace(), details.getName()));
                return subscriptionMap;
            }
        }
        return null;
    }

    public static String computeBatchSourceInstanceSubscriptionName(String tenant, String namespace, String sourceName) {
        return "BatchSourceExecutor-" + tenant + "/" + namespace + "/" + sourceName;
    }

    public static TopicName computeBatchSourceIntermediateTopicName(String tenant, String namespace, String sourceName) {
        return TopicName.get(TopicDomain.persistent.name(), tenant, namespace, sourceName + "-intermediate");
    }

    public static boolean isBatchSource(SourceConfig sourceConfig) {
        return sourceConfig.getBatchSourceConfig() != null;
    }

    public static void validateBatchSourceConfigUpdate(BatchSourceConfig existingConfig, BatchSourceConfig newConfig) {
        if (!existingConfig.getDiscoveryTriggererClassName().equals(newConfig.getDiscoveryTriggererClassName())) {
            throw new IllegalArgumentException("DiscoverTriggerer class cannot be updated for batchsources");
        }
    }

    public static void validateSourceConfig(SourceConfig sourceConfig, NarClassLoader narClassLoader) {
        try {
            ConnectorDefinition defn = ConnectorUtils.getConnectorDefinition(narClassLoader);
            if (defn.getSourceConfigClass() != null) {
                Class<?> configClass = Class.forName(defn.getSourceConfigClass(), true, narClassLoader);
                Object configObject = ObjectMapperFactory.getThreadLocal().convertValue(sourceConfig.getConfigs(), configClass);
                if (configObject != null) {
                    ConfigValidation.validateConfig(configObject);
                }
            }
        }
        catch (IOException e) {
            throw new IllegalArgumentException("Error validating source config", e);
        }
        catch (ClassNotFoundException e) {
            throw new IllegalArgumentException("Could not find source config class");
        }
        catch (IllegalArgumentException e) {
            throw new IllegalArgumentException("Could not validate source config: " + e.getMessage());
        }
    }

    public static class ExtractedSourceDetails {
        private String sourceClassName;
        private String typeArg;

        public String getSourceClassName() {
            return this.sourceClassName;
        }

        public String getTypeArg() {
            return this.typeArg;
        }

        public void setSourceClassName(String sourceClassName) {
            this.sourceClassName = sourceClassName;
        }

        public void setTypeArg(String typeArg) {
            this.typeArg = typeArg;
        }

        public ExtractedSourceDetails(String sourceClassName, String typeArg) {
            this.sourceClassName = sourceClassName;
            this.typeArg = typeArg;
        }
    }
}

