/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.utils;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLConnection;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import net.bytebuddy.description.type.TypeDefinition;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.description.type.TypeList;
import net.bytebuddy.pool.TypePool;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.auth.AuthenticationDataBasic;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.WindowFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.shaded.com.google.protobuf.AbstractMessage;
import org.apache.pulsar.functions.runtime.shaded.com.google.protobuf.Message;
import org.apache.pulsar.functions.runtime.shaded.com.google.protobuf.MessageOrBuilder;
import org.apache.pulsar.functions.runtime.shaded.com.google.protobuf.util.JsonFormat;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FunctionCommon {
    private static final Logger log = LoggerFactory.getLogger(FunctionCommon.class);

    public static String printJson(MessageOrBuilder msg) throws IOException {
        return JsonFormat.printer().print(msg);
    }

    public static void mergeJson(String json, AbstractMessage.Builder builder) throws IOException {
        JsonFormat.parser().merge(json, (Message.Builder)builder);
    }

    public static int findAvailablePort() {
        try {
            ServerSocket socket = new ServerSocket(0);
            int port = socket.getLocalPort();
            socket.close();
            return port;
        }
        catch (IOException ex) {
            throw new RuntimeException("No free port found", ex);
        }
    }

    public static TypeDefinition[] getFunctionTypes(FunctionConfig functionConfig, TypePool typePool) throws ClassNotFoundException {
        return FunctionCommon.getFunctionTypes(functionConfig, (TypeDefinition)typePool.describe(functionConfig.getClassName()).resolve());
    }

    public static TypeDefinition[] getFunctionTypes(FunctionConfig functionConfig, TypeDefinition functionClass) {
        boolean isWindowConfigPresent = functionConfig.getWindowConfig() != null;
        return FunctionCommon.getFunctionTypes(functionClass, isWindowConfigPresent);
    }

    public static TypeDefinition[] getFunctionTypes(TypeDefinition userClass, boolean isWindowConfigPresent) {
        Class<?> classParent = FunctionCommon.getFunctionClassParent(userClass, isWindowConfigPresent);
        TypeList.Generic typeArgsList = FunctionCommon.resolveInterfaceTypeArguments(userClass, classParent);
        TypeDescription.Generic[] typeArgs = new TypeDescription.Generic[]{(TypeDescription.Generic)typeArgsList.get(0), (TypeDescription.Generic)typeArgsList.get(1)};
        if (isWindowConfigPresent && classParent.equals(Function.class)) {
            if (!typeArgs[0].asErasure().isAssignableTo(Collection.class)) {
                throw new IllegalArgumentException("Window function must take a collection as input");
            }
            typeArgs[0] = (TypeDescription.Generic)typeArgs[0].getTypeArguments().get(0);
        }
        if (typeArgs[1].asErasure().isAssignableTo(Record.class)) {
            typeArgs[1] = (TypeDescription.Generic)typeArgs[1].getTypeArguments().get(0);
        }
        if (typeArgs[1].asErasure().isAssignableTo(CompletableFuture.class)) {
            typeArgs[1] = (TypeDescription.Generic)typeArgs[1].getTypeArguments().get(0);
        }
        return typeArgs;
    }

    private static TypeList.Generic resolveInterfaceTypeArguments(TypeDefinition userClass, Class<?> interfaceClass) {
        if (!interfaceClass.isInterface()) {
            throw new IllegalArgumentException("interfaceClass must be an interface");
        }
        for (TypeDescription.Generic interfaze : userClass.getInterfaces()) {
            if (!interfaze.asErasure().isAssignableTo(interfaceClass)) continue;
            return interfaze.getTypeArguments();
        }
        if (userClass.getSuperClass() != null) {
            return FunctionCommon.resolveInterfaceTypeArguments((TypeDefinition)userClass.getSuperClass(), interfaceClass);
        }
        return null;
    }

    public static TypeDescription.Generic[] getRawFunctionTypes(TypeDefinition userClass, boolean isWindowConfigPresent) {
        Class<?> classParent = FunctionCommon.getFunctionClassParent(userClass, isWindowConfigPresent);
        TypeList.Generic typeArgsList = FunctionCommon.resolveInterfaceTypeArguments(userClass, classParent);
        TypeDescription.Generic[] typeArgs = new TypeDescription.Generic[]{(TypeDescription.Generic)typeArgsList.get(0), (TypeDescription.Generic)typeArgsList.get(1)};
        return typeArgs;
    }

    public static Class<?> getFunctionClassParent(TypeDefinition userClass, boolean isWindowConfigPresent) {
        if (isWindowConfigPresent) {
            if (userClass.asErasure().isAssignableTo(WindowFunction.class)) {
                return WindowFunction.class;
            }
            return Function.class;
        }
        if (userClass.asErasure().isAssignableTo(org.apache.pulsar.functions.api.Function.class)) {
            return org.apache.pulsar.functions.api.Function.class;
        }
        return Function.class;
    }

    public static Function.FunctionDetails.Runtime convertRuntime(FunctionConfig.Runtime runtime) {
        for (Function.FunctionDetails.Runtime type : Function.FunctionDetails.Runtime.values()) {
            if (!type.name().equals(runtime.name())) continue;
            return type;
        }
        throw new RuntimeException("Unrecognized runtime: " + runtime.name());
    }

    public static FunctionConfig.Runtime convertRuntime(Function.FunctionDetails.Runtime runtime) {
        for (FunctionConfig.Runtime type : FunctionConfig.Runtime.values()) {
            if (!type.name().equals(runtime.name())) continue;
            return type;
        }
        throw new RuntimeException("Unrecognized runtime: " + runtime.name());
    }

    public static Function.ProcessingGuarantees convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees processingGuarantees) {
        for (Function.ProcessingGuarantees type : Function.ProcessingGuarantees.values()) {
            if (!type.name().equals(processingGuarantees.name())) continue;
            return type;
        }
        throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name());
    }

    public static FunctionConfig.ProcessingGuarantees convertProcessingGuarantee(Function.ProcessingGuarantees processingGuarantees) {
        for (FunctionConfig.ProcessingGuarantees type : FunctionConfig.ProcessingGuarantees.values()) {
            if (!type.name().equals(processingGuarantees.name())) continue;
            return type;
        }
        throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name());
    }

    public static TypeDefinition getSourceType(String className, TypePool typePool) {
        return FunctionCommon.getSourceType((TypeDefinition)typePool.describe(className).resolve());
    }

    public static TypeDefinition getSourceType(TypeDefinition sourceClass) {
        if (sourceClass.asErasure().isAssignableTo(Source.class)) {
            return (TypeDefinition)FunctionCommon.resolveInterfaceTypeArguments(sourceClass, Source.class).get(0);
        }
        if (sourceClass.asErasure().isAssignableTo(BatchSource.class)) {
            return (TypeDefinition)FunctionCommon.resolveInterfaceTypeArguments(sourceClass, BatchSource.class).get(0);
        }
        throw new IllegalArgumentException(String.format("Source class %s does not implement the correct interface", sourceClass.getActualName()));
    }

    public static TypeDefinition getSinkType(String className, TypePool typePool) {
        return FunctionCommon.getSinkType((TypeDefinition)typePool.describe(className).resolve());
    }

    public static TypeDefinition getSinkType(TypeDefinition sinkClass) {
        if (sinkClass.asErasure().isAssignableTo(Sink.class)) {
            return (TypeDefinition)FunctionCommon.resolveInterfaceTypeArguments(sinkClass, Sink.class).get(0);
        }
        throw new IllegalArgumentException(String.format("Sink class %s does not implement the correct interface", sinkClass.getActualName()));
    }

    public static void downloadFromHttpUrl(String destPkgUrl, File targetFile) throws IOException {
        URL url = new URL(destPkgUrl);
        URLConnection connection = url.openConnection();
        if (StringUtils.isNotEmpty(url.getUserInfo())) {
            AuthenticationDataBasic authBasic = new AuthenticationDataBasic(url.getUserInfo());
            for (Map.Entry<String, String> header : authBasic.getHttpHeaders()) {
                connection.setRequestProperty(header.getKey(), header.getValue());
            }
        }
        try (InputStream in = connection.getInputStream();){
            log.info("Downloading function package from {} to {} ...", (Object)destPkgUrl, (Object)targetFile.getAbsoluteFile());
            Files.copy(in, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
        }
        log.info("Downloading function package from {} to {} completed!", (Object)destPkgUrl, (Object)targetFile.getAbsoluteFile());
    }

    public static File createPkgTempFile() throws IOException {
        return File.createTempFile("functions", ".tmp");
    }

    public static File extractFileFromPkgURL(String destPkgUrl) throws IOException, URISyntaxException {
        if (destPkgUrl.startsWith("file")) {
            URL url = new URL(destPkgUrl);
            File file = new File(url.toURI());
            if (!file.exists()) {
                throw new IOException(destPkgUrl + " does not exists locally");
            }
            return file;
        }
        if (destPkgUrl.startsWith("http")) {
            File tempFile = FunctionCommon.createPkgTempFile();
            tempFile.deleteOnExit();
            FunctionCommon.downloadFromHttpUrl(destPkgUrl, tempFile);
            return tempFile;
        }
        throw new IllegalArgumentException("Unsupported url protocol " + destPkgUrl + ", supported url protocols: [file/http/https]");
    }

    public static String getFullyQualifiedInstanceId(Function.Instance instance) {
        return FunctionCommon.getFullyQualifiedInstanceId(instance.getFunctionMetaData().getFunctionDetails().getTenant(), instance.getFunctionMetaData().getFunctionDetails().getNamespace(), instance.getFunctionMetaData().getFunctionDetails().getName(), instance.getInstanceId());
    }

    public static String getFullyQualifiedInstanceId(String tenant, String namespace, String functionName, int instanceId) {
        return String.format("%s/%s/%s:%d", tenant, namespace, functionName, instanceId);
    }

    public static final long getSequenceId(MessageId messageId) {
        MessageIdAdv msgId = (MessageIdAdv)messageId;
        long ledgerId = msgId.getLedgerId();
        long entryId = msgId.getEntryId();
        long offset = ledgerId << 28 | entryId;
        return offset;
    }

    public static final MessageId getMessageId(long sequenceId) {
        long ledgerId = sequenceId >>> 28;
        long entryId = sequenceId & 0xFFFFFFFL;
        return new MessageIdImpl(ledgerId, entryId, -1);
    }

    public static String getUniquePackageName(String packageName) {
        return String.format("%s-%s", UUID.randomUUID().toString(), packageName);
    }

    public static String getStateNamespace(String tenant, String namespace) {
        return String.format("%s_%s", tenant, namespace).replace("-", "_");
    }

    public static String getFullyQualifiedName(Function.FunctionDetails functionDetails) {
        return FunctionCommon.getFullyQualifiedName(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName());
    }

    public static String getFullyQualifiedName(String tenant, String namespace, String functionName) {
        return String.format("%s/%s/%s", tenant, namespace, functionName);
    }

    public static String extractTenantFromFullyQualifiedName(String fqfn) {
        return FunctionCommon.extractFromFullyQualifiedName(fqfn, 0);
    }

    public static String extractNamespaceFromFullyQualifiedName(String fqfn) {
        return FunctionCommon.extractFromFullyQualifiedName(fqfn, 1);
    }

    public static String extractNameFromFullyQualifiedName(String fqfn) {
        return FunctionCommon.extractFromFullyQualifiedName(fqfn, 2);
    }

    private static String extractFromFullyQualifiedName(String fqfn, int index) {
        String[] parts = fqfn.split("/");
        if (parts.length >= 3) {
            return parts[index];
        }
        throw new RuntimeException("Invalid Fully Qualified Function Name " + fqfn);
    }

    public static double roundDecimal(double value, int places) {
        double scale = Math.pow(10.0, places);
        return (double)Math.round(value * scale) / scale;
    }

    public static String capFirstLetter(Enum en) {
        return StringUtils.capitalize(en.toString().toLowerCase());
    }

    public static boolean isFunctionCodeBuiltin(Function.FunctionDetailsOrBuilder functionDetail) {
        return FunctionCommon.isFunctionCodeBuiltin(functionDetail, functionDetail.getComponentType());
    }

    public static boolean isFunctionCodeBuiltin(Function.FunctionDetailsOrBuilder functionDetails, Function.FunctionDetails.ComponentType componentType) {
        Function.SinkSpec sinkSpec;
        Function.SourceSpec sourceSpec;
        if (componentType == Function.FunctionDetails.ComponentType.SOURCE && functionDetails.hasSource() && !StringUtils.isEmpty((sourceSpec = functionDetails.getSource()).getBuiltin())) {
            return true;
        }
        if (componentType == Function.FunctionDetails.ComponentType.SINK && functionDetails.hasSink() && !StringUtils.isEmpty((sinkSpec = functionDetails.getSink()).getBuiltin())) {
            return true;
        }
        return componentType == Function.FunctionDetails.ComponentType.FUNCTION && !StringUtils.isEmpty(functionDetails.getBuiltin());
    }

    public static SubscriptionInitialPosition convertFromFunctionDetailsSubscriptionPosition(Function.SubscriptionPosition subscriptionPosition) {
        if (Function.SubscriptionPosition.EARLIEST.equals(subscriptionPosition)) {
            return SubscriptionInitialPosition.Earliest;
        }
        return SubscriptionInitialPosition.Latest;
    }

    public static CompressionType convertFromFunctionDetailsCompressionType(Function.CompressionType compressionType) {
        if (compressionType == null) {
            return CompressionType.LZ4;
        }
        switch (compressionType) {
            case NONE: {
                return CompressionType.NONE;
            }
            case ZLIB: {
                return CompressionType.ZLIB;
            }
            case ZSTD: {
                return CompressionType.ZSTD;
            }
            case SNAPPY: {
                return CompressionType.SNAPPY;
            }
        }
        return CompressionType.LZ4;
    }

    public static Function.CompressionType convertFromCompressionType(CompressionType compressionType) {
        if (compressionType == null) {
            return Function.CompressionType.LZ4;
        }
        switch (compressionType) {
            case NONE: {
                return Function.CompressionType.NONE;
            }
            case ZLIB: {
                return Function.CompressionType.ZLIB;
            }
            case ZSTD: {
                return Function.CompressionType.ZSTD;
            }
            case SNAPPY: {
                return Function.CompressionType.SNAPPY;
            }
        }
        return Function.CompressionType.LZ4;
    }

    private FunctionCommon() {
    }
}

