package streaming.dsl.mmlib.algs.python;

import java.io.File;
import java.io.FileWriter;
import java.util.HashMap;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.types.MapType$;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.util.PythonProjectExecuteRunner;
import org.apache.spark.util.PythonProjectExecuteRunner$;
import scala.Array$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction2;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;
import scala.runtime.ScalaRunTime$;
import streaming.common.NetUtils;
import streaming.dsl.MLSQLExecuteContext;
import streaming.dsl.ScriptSQLExec$;
import streaming.dsl.mmlib.algs.SQLPythonAlg$;
import streaming.dsl.mmlib.algs.SQLPythonFunc$;

/* compiled from: PythonTrain.scala */
/* loaded from: input_file:streaming/dsl/mmlib/algs/python/PythonTrain$$anonfun$4.class */
public final class PythonTrain$$anonfun$4 extends AbstractFunction2<Object, Iterator<String>, Iterator<Row>> implements Serializable {
    public static final long serialVersionUID = 0;
    private final /* synthetic */ PythonTrain $outer;
    private final String path$2;
    private final boolean keepVersion$1;
    private final boolean keepLocalDirectory$1;
    private final Option partitionKey$1;
    private final ObjectRef kafkaParam$1;
    private final boolean enableDataLocal$1;
    private final IntRef stopFlagNum$1;
    public final Option pythonProject$1;
    public final MLSQLExecuteContext mlsqlContext$1;
    public final Option pythonProjectPath$1;
    private final String projectName$1;
    private final PythonScriptType projectType$1;
    private final ObjectRef fitParam$1;
    private final Map envs$1;

    public final Iterator<Row> apply(int i, Iterator<String> iterator) {
        boolean z;
        Object boxToInteger;
        String stringBuilder;
        Option<String> downloadPythonProject;
        Tuple2 tuple2 = new Tuple2(BoxesRunTime.boxToInteger(i), iterator);
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        int _1$mcI$sp = tuple2._1$mcI$sp();
        Iterator iterator2 = (Iterator) tuple2._2();
        ScriptSQLExec$.MODULE$.setContext(this.mlsqlContext$1);
        LocalPathConfig buildFromParams = LocalPathConfig$.MODULE$.buildFromParams(this.path$2);
        String localDataPath = buildFromParams.localDataPath();
        if (this.enableDataLocal$1) {
            localDataPath = new StringBuilder().append(localDataPath).append("/").append(BoxesRunTime.boxToInteger(_1$mcI$sp)).toString();
            this.$outer.logInfo(new PythonTrain$$anonfun$4$$anonfun$apply$1(this, new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"dataLocalFormat enabled ,system will generate data in ", " "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{localDataPath}))));
            if (!new File(localDataPath).exists()) {
                FileUtils.forceMkdir(new File(localDataPath));
            }
            FileWriter fileWriter = new FileWriter(new File(localDataPath, new StringBuilder().append(UUID.randomUUID().toString()).append(".json").toString()));
            try {
                iterator2.foreach(new PythonTrain$$anonfun$4$$anonfun$apply$2(this, fileWriter));
            } finally {
                fileWriter.close();
            }
        }
        HashMap hashMap = new HashMap();
        java.util.Map map = (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter((Map) this.fitParam$1.elem).asJava();
        if (!((Map) this.fitParam$1.elem).contains("modelPath")) {
            map = (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(((Map) this.fitParam$1.elem).$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("modelPath"), this.path$2))).asJava();
        }
        Map<String, String> loadResourceInTrain = new ResourceManager((Map) this.fitParam$1.elem).loadResourceInTrain();
        String stringBuilder2 = new StringBuilder().append(buildFromParams.localRunPath()).append("/").append(this.projectName$1).toString();
        String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", "/", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{buildFromParams.localModelPath(), BoxesRunTime.boxToInteger(_1$mcI$sp)}));
        hashMap.put("fitParam", map);
        if (((Map) this.kafkaParam$1.elem).isEmpty()) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            hashMap.put("kafkaParam", JavaConverters$.MODULE$.mapAsJavaMapConverter(((Map) this.kafkaParam$1.elem).$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("group_id"), new StringBuilder().append((String) ((Map) this.kafkaParam$1.elem).apply("group_id")).append("_").append(BoxesRunTime.boxToInteger(_1$mcI$sp)).toString()))).asJava());
        }
        hashMap.put(RunPythonConfig$.MODULE$.internalSystemParam(), JavaConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("stopFlagNum"), BoxesRunTime.boxToInteger(this.stopFlagNum$1.elem)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("tempModelLocalPath"), s), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("tempDataLocalPath"), localDataPath), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("resource"), JavaConverters$.MODULE$.mapAsJavaMapConverter(loadResourceInTrain).asJava())}))).asJava());
        Option option = this.pythonProjectPath$1;
        try {
        } catch (Exception e) {
            this.$outer.logError(new PythonTrain$$anonfun$4$$anonfun$apply$8(this, e));
            e.printStackTrace();
            z = true;
        }
        if (!(option instanceof Some)) {
            if (None$.MODULE$.equals(option)) {
                throw new RuntimeException("The project or script you configured in pythonScriptPath is not a validate project");
            }
            throw new MatchError(option);
        }
        PythonScriptType pythonScriptType = this.projectType$1;
        MLFlow$ mLFlow$ = MLFlow$.MODULE$;
        if (pythonScriptType != null ? !pythonScriptType.equals(mLFlow$) : mLFlow$ != null) {
            String stringBuilder3 = new StringBuilder().append(stringBuilder2).append("/").append(Predef$.MODULE$.refArrayOps(((String) this.pythonProjectPath$1.get()).split("/")).last()).toString();
            this.$outer.logInfo(new PythonTrain$$anonfun$4$$anonfun$apply$4(this, stringBuilder3));
            downloadPythonProject = SQLPythonAlg$.MODULE$.downloadPythonProject(stringBuilder3, this.pythonProjectPath$1);
        } else {
            this.$outer.logInfo(new PythonTrain$$anonfun$4$$anonfun$apply$3(this, stringBuilder2));
            downloadPythonProject = SQLPythonAlg$.MODULE$.downloadPythonProject(stringBuilder2, this.pythonProjectPath$1);
        }
        Seq<String> generateCommand = new PythonAlgExecCommand((PythonScript) this.pythonProject$1.get(), None$.MODULE$, None$.MODULE$, this.envs$1).generateCommand(MLProject$.MODULE$.train_command());
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n           |\n           |----------------------------------------------------------------\n           |host: ", "\n           |\n           |Command: ", "\n           |TaskDirectory: ", "\n           |DataDirectory: ", "\n           |ModelDirectory: ", "\n           |\n           |Notice:\n           |\n           |If you wanna keep [", "] for debug, please set\n           |keepLocalDirectory=true in train statement.\n           |\n           |e.g:\n           |\n           |train data as PythonParallelExt.`/tmp/abc` where keepLocalDirectory=\"true\";\n           |----------------------------------------------------------------\n         "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{NetUtils.getHost(), generateCommand.mkString(" "), stringBuilder2, localDataPath, s, stringBuilder2})))).stripMargin();
        this.$outer.logInfo(new PythonTrain$$anonfun$4$$anonfun$apply$5(this, stripMargin));
        long currentTimeMillis = System.currentTimeMillis();
        z = false;
        Seq seq = new PythonProjectExecuteRunner(stringBuilder2, this.keepLocalDirectory$1, this.envs$1, new PythonTrain$$anonfun$4$$anonfun$5(this), PythonProjectExecuteRunner$.MODULE$.$lessinit$greater$default$5(), PythonProjectExecuteRunner$.MODULE$.$lessinit$greater$default$6(), PythonProjectExecuteRunner$.MODULE$.$lessinit$greater$default$7()).run(generateCommand, hashMap, MapType$.MODULE$.apply(StringType$.MODULE$, MapType$.MODULE$.apply(StringType$.MODULE$, StringType$.MODULE$)), ((PythonScript) this.pythonProject$1.get()).fileContent(), ((PythonScript) this.pythonProject$1.get()).fileName(), (byte[][]) Array$.MODULE$.apply(Nil$.MODULE$, ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Byte.TYPE)))).map(new PythonTrain$$anonfun$4$$anonfun$6(this)).toSeq();
        double unboxToDouble = seq.size() > 0 ? BoxesRunTime.unboxToDouble(seq.head()) : 0.0d;
        long currentTimeMillis2 = System.currentTimeMillis();
        Some some = this.partitionKey$1;
        try {
            try {
                if (some instanceof Some) {
                    boxToInteger = new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", "="})).s(Predef$.MODULE$.genericWrapArray(new Object[]{(String) some.x()}))).append(BoxesRunTime.boxToInteger(_1$mcI$sp)).toString();
                } else {
                    if (!None$.MODULE$.equals(some)) {
                        throw new MatchError(some);
                    }
                    boxToInteger = BoxesRunTime.boxToInteger(_1$mcI$sp);
                }
                stringBuilder = new StringBuilder().append(SQLPythonFunc$.MODULE$.getAlgModelPath(this.path$2, this.keepVersion$1)).append("/").append(boxToInteger).toString();
                if (!z) {
                    FileSystem fileSystem = FileSystem.get(new Configuration());
                    if (this.keepVersion$1) {
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    } else {
                        BoxesRunTime.boxToBoolean(fileSystem.delete(new Path(stringBuilder), true));
                    }
                    fileSystem.copyFromLocalFile(new Path(s), new Path(stringBuilder));
                }
            } catch (Exception e2) {
                e2.printStackTrace();
                z = true;
            }
            return Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Row[]{Row$.MODULE$.fromSeq(Seq$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{stringBuilder, BoxesRunTime.boxToInteger(_1$mcI$sp), ((PythonScript) this.pythonProject$1.get()).fileName(), BoxesRunTime.boxToDouble(unboxToDouble), z ? "fail" : "success", BoxesRunTime.boxToLong(currentTimeMillis), BoxesRunTime.boxToLong(currentTimeMillis2), (Map) this.fitParam$1.elem, stripMargin})))})).toIterator();
        } finally {
            loadResourceInTrain.foreach(new PythonTrain$$anonfun$4$$anonfun$apply$9(this));
            FileUtils.deleteQuietly(new File(s));
            if (!this.keepLocalDirectory$1) {
                FileUtils.deleteQuietly(new File(localDataPath));
            }
        }
    }

    public /* synthetic */ PythonTrain streaming$dsl$mmlib$algs$python$PythonTrain$$anonfun$$$outer() {
        return this.$outer;
    }

    public final /* bridge */ /* synthetic */ Object apply(Object obj, Object obj2) {
        return apply(BoxesRunTime.unboxToInt(obj), (Iterator<String>) obj2);
    }

    public final double streaming$dsl$mmlib$algs$python$PythonTrain$$anonfun$$filterScore$1(String str) {
        if (str == null || !str.startsWith("mlsql_validation_score:")) {
            return 0.0d;
        }
        return new StringOps(Predef$.MODULE$.augmentString((String) Predef$.MODULE$.refArrayOps(str.split(":")).last())).toDouble();
    }

    public PythonTrain$$anonfun$4(PythonTrain pythonTrain, String str, boolean z, boolean z2, Option option, ObjectRef objectRef, boolean z3, IntRef intRef, Option option2, MLSQLExecuteContext mLSQLExecuteContext, Option option3, String str2, PythonScriptType pythonScriptType, ObjectRef objectRef2, Map map) {
        if (pythonTrain == null) {
            throw null;
        }
        this.$outer = pythonTrain;
        this.path$2 = str;
        this.keepVersion$1 = z;
        this.keepLocalDirectory$1 = z2;
        this.partitionKey$1 = option;
        this.kafkaParam$1 = objectRef;
        this.enableDataLocal$1 = z3;
        this.stopFlagNum$1 = intRef;
        this.pythonProject$1 = option2;
        this.mlsqlContext$1 = mLSQLExecuteContext;
        this.pythonProjectPath$1 = option3;
        this.projectName$1 = str2;
        this.projectType$1 = pythonScriptType;
        this.fitParam$1 = objectRef2;
        this.envs$1 = map;
    }
}
