package org.apache.spark.sql.rikai.model;

import ai.eto.rikai.sql.model.ModelSpec;
import ai.eto.rikai.sql.model.SparkUDFModel;
import ai.eto.rikai.sql.spark.Python$;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.HashMap;
import java.util.List;
import org.apache.spark.api.python.PythonAccumulatorV2;
import org.apache.spark.api.python.PythonEvalType$;
import org.apache.spark.api.python.PythonFunction;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.python.UserDefinedPythonFunction;
import org.apache.spark.sql.rikai.model.ModelResolver;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataType$;
import org.json4s.DefaultFormats$;
import org.json4s.NoTypeHints$;
import org.json4s.jackson.JsonMethods$;
import org.json4s.jackson.Serialization$;
import org.json4s.package$;
import org.mlflow_project.apachecommons.io.IOUtils;
import scala.Predef$;
import scala.Some;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List$;
import scala.collection.immutable.StringOps;
import scala.reflect.ManifestFactory$;
import scala.util.Random$;

/* compiled from: ModelResolver.scala */
/* loaded from: input_file:org/apache/spark/sql/rikai/model/ModelResolver$.class */
public final class ModelResolver$ {
    public static ModelResolver$ MODULE$;

    static {
        new ModelResolver$();
    }

    private void registerUdf(SparkSession sparkSession, Seq<Object> seq, String str, DataType dataType, int i) {
        sparkSession.udf().registerPython(str, new UserDefinedPythonFunction(str, new PythonFunction(seq, new HashMap(), (List) JavaConverters$.MODULE$.seqAsJavaListConverter(List$.MODULE$.empty()).asJava(), Python$.MODULE$.pythonExec(), Python$.MODULE$.pythonVer(), (List) JavaConverters$.MODULE$.seqAsJavaListConverter(Seq$.MODULE$.empty()).asJava(), (PythonAccumulatorV2) null), dataType, i, true));
    }

    public SparkUDFModel resolve(SparkSession sparkSession, String str, ModelSpec modelSpec) {
        Path createTempFile = Files.createTempFile("model-spec", ".json", new FileAttribute[0]);
        Path createTempFile2 = Files.createTempFile("model-code", ".cpt", new FileAttribute[0]);
        Path createTempFile3 = Files.createTempFile("model-type", ".json", new FileAttribute[0]);
        try {
            Files.write(createTempFile, Serialization$.MODULE$.write(modelSpec, DefaultFormats$.MODULE$.preservingEmptyValues()).getBytes(), new OpenOption[0]);
            Python$.MODULE$.execute(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(916).append("from pyspark.serializers import CloudPickleSerializer;\n           |import json\n           |import base64\n           |spec = json.load(open(\"").append(createTempFile).append("\", \"r\"))\n           |from rikai.spark.sql.codegen import command_from_spec\n           |serialize_func, func, deserialize_func, dataType = command_from_spec(\"").append(str).append("\", spec)\n           |pickle = CloudPickleSerializer()\n           |with open(\"").append(createTempFile2).append("\", \"w\") as fobj:\n           |    json.dump({\n           |        \"func\": base64.b64encode(pickle.dumps((func.func, func.returnType))).decode('utf-8'),\n           |        \"serializer\": base64.b64encode(pickle.dumps((serialize_func.func, serialize_func.returnType))).decode('utf-8'),\n           |        \"deserializer\": base64.b64encode(pickle.dumps((deserialize_func.func, deserialize_func.returnType))).decode('utf-8'),\n           |    }, fobj)\n           |with open(\"").append(createTempFile3).append("\", \"w\") as fobj:\n           |    fobj.write(dataType.json())\n           |").toString())).stripMargin(), sparkSession);
            String mkString = ((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(Files.readAllLines(createTempFile2)).asScala()).mkString(IOUtils.LINE_SEPARATOR_UNIX);
            ModelResolver.FuncDesc funcDesc = (ModelResolver.FuncDesc) package$.MODULE$.jvalue2extractable(JsonMethods$.MODULE$.parse(package$.MODULE$.string2JsonInput(mkString), JsonMethods$.MODULE$.parse$default$2(), JsonMethods$.MODULE$.parse$default$3())).extract(Serialization$.MODULE$.formats(NoTypeHints$.MODULE$), ManifestFactory$.MODULE$.classType(ModelResolver.FuncDesc.class));
            DataType fromJson = DataType$.MODULE$.fromJson(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(Files.readAllLines(createTempFile3)).asScala()).mkString(IOUtils.LINE_SEPARATOR_UNIX));
            String sb = new StringBuilder(1).append(modelSpec.name().getOrElse(() -> {
                return "model";
            })).append("_").append(Random$.MODULE$.alphanumeric().take(6).mkString().toLowerCase()).toString();
            String sb2 = new StringBuilder(4).append(sb).append("_pre").toString();
            String sb3 = new StringBuilder(5).append(sb).append("_post").toString();
            registerUdf(sparkSession, funcDesc.funcCmd(), sb, BinaryType$.MODULE$, PythonEvalType$.MODULE$.SQL_SCALAR_PANDAS_ITER_UDF());
            registerUdf(sparkSession, funcDesc.serializerFunc(), sb2, BinaryType$.MODULE$, PythonEvalType$.MODULE$.SQL_BATCHED_UDF());
            registerUdf(sparkSession, funcDesc.deserializerFunc(), sb3, fromJson, PythonEvalType$.MODULE$.SQL_BATCHED_UDF());
            return new SparkUDFModel((String) modelSpec.name().get(), (String) modelSpec.uri().getOrElse(() -> {
                return "";
            }), sb, modelSpec.flavor(), new Some(sb2), new Some(sb3));
        } finally {
            Files.delete(createTempFile2);
            Files.delete(createTempFile);
            Files.delete(createTempFile3);
        }
    }

    private ModelResolver$() {
        MODULE$ = this;
    }
}
