package streaming.dsl.mmlib.algs.python;

import java.io.File;
import java.util.HashMap;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.types.MapType$;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.util.PythonProjectExecuteRunner;
import org.apache.spark.util.PythonProjectExecuteRunner$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;
import streaming.common.HDFSOperator$;
import streaming.common.NetUtils;
import streaming.dsl.MLSQLExecuteContext;
import streaming.dsl.ScriptSQLExec$;
import streaming.dsl.mmlib.algs.SQLPythonAlg$;
import streaming.dsl.mmlib.algs.SQLPythonFunc$;

/* compiled from: PythonTrain.scala */
/* loaded from: input_file:streaming/dsl/mmlib/algs/python/PythonTrain$$anonfun$10.class */
public final class PythonTrain$$anonfun$10 extends AbstractFunction1<Tuple2<Object, Map<String, String>>, Row> implements Serializable {
    public static final long serialVersionUID = 0;
    private final /* synthetic */ PythonTrain $outer;
    private final String path$1;
    private final boolean keepVersion$2;
    private final boolean keepLocalDirectory$2;
    private final ObjectRef kafkaParam$2;
    private final boolean enableDataLocal$2;
    private final String dataHDFSPath$1;
    private final Broadcast rowsBr$1;
    private final IntRef stopFlagNum$2;
    private final Map systemParam$1;
    private final MLFlowConfig mlflowConfig$1;
    private final PythonConfig pythonConfig$1;
    private final Map envs$2;
    public final Option pythonProject$2;
    public final MLSQLExecuteContext mlsqlContext$2;
    public final Option pythonProjectPath$2;
    private final String projectName$2;
    private final PythonScriptType projectType$2;

    /* JADX WARN: Finally extract failed */
    public final Row apply(Tuple2<Object, Map<String, String>> tuple2) {
        boolean z;
        Option<String> downloadPythonProject;
        ScriptSQLExec$.MODULE$.setContext(this.mlsqlContext$2);
        Map map = (Map) tuple2._2();
        int _1$mcI$sp = tuple2._1$mcI$sp();
        LocalPathConfig buildFromParams = LocalPathConfig$.MODULE$.buildFromParams(this.path$1);
        String localDataPath = buildFromParams.localDataPath();
        if (this.enableDataLocal$2) {
            localDataPath = new StringBuilder().append(localDataPath).append("/").append(BoxesRunTime.boxToInteger(_1$mcI$sp)).toString();
            this.$outer.logInfo(new PythonTrain$$anonfun$10$$anonfun$apply$10(this, new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"dataLocalFormat enabled ,system will generate data in ", " "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{localDataPath}))));
            HDFSOperator$.MODULE$.copyToLocalFile(localDataPath, this.dataHDFSPath$1, true);
        }
        HashMap hashMap = new HashMap();
        java.util.Map map2 = (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(map).asJava();
        if (!map.contains("modelPath")) {
            map2 = (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(map.$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("modelPath"), this.path$1))).asJava();
        }
        Map<String, String> loadResourceInTrain = new ResourceManager(map).loadResourceInTrain();
        String stringBuilder = new StringBuilder().append(buildFromParams.localRunPath()).append("/").append(this.projectName$2).toString();
        String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", "/", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{buildFromParams.localModelPath(), BoxesRunTime.boxToInteger(_1$mcI$sp)}));
        hashMap.put("fitParam", map2);
        if (((Map) this.kafkaParam$2.elem).isEmpty()) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            hashMap.put("kafkaParam", JavaConverters$.MODULE$.mapAsJavaMapConverter(((Map) this.kafkaParam$2.elem).$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("group_id"), new StringBuilder().append((String) ((Map) this.kafkaParam$2.elem).apply("group_id")).append("_").append(BoxesRunTime.boxToInteger(_1$mcI$sp)).toString()))).asJava());
        }
        hashMap.put(RunPythonConfig$.MODULE$.internalSystemParam(), JavaConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("stopFlagNum"), BoxesRunTime.boxToInteger(this.stopFlagNum$2.elem)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("tempModelLocalPath"), s), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("tempDataLocalPath"), localDataPath), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("resource"), JavaConverters$.MODULE$.mapAsJavaMapConverter(loadResourceInTrain).asJava())}))).asJava());
        hashMap.put(RunPythonConfig$.MODULE$.systemParam(), JavaConverters$.MODULE$.mapAsJavaMapConverter(this.systemParam$1).asJava());
        Option option = this.pythonProjectPath$2;
        try {
        } catch (Exception e) {
            this.$outer.logError(new PythonTrain$$anonfun$10$$anonfun$apply$16(this, e));
            e.printStackTrace();
            z = true;
        }
        if (!(option instanceof Some)) {
            if (None$.MODULE$.equals(option)) {
                throw new RuntimeException("The project or script you configured in pythonScriptPath is not a validate project");
            }
            throw new MatchError(option);
        }
        PythonScriptType pythonScriptType = this.projectType$2;
        MLFlow$ mLFlow$ = MLFlow$.MODULE$;
        if (pythonScriptType != null ? !pythonScriptType.equals(mLFlow$) : mLFlow$ != null) {
            String stringBuilder2 = new StringBuilder().append(stringBuilder).append("/").append(Predef$.MODULE$.refArrayOps(((String) this.pythonProjectPath$2.get()).split("/")).last()).toString();
            this.$outer.logInfo(new PythonTrain$$anonfun$10$$anonfun$apply$12(this, stringBuilder2));
            downloadPythonProject = SQLPythonAlg$.MODULE$.downloadPythonProject(stringBuilder2, this.pythonProjectPath$2);
        } else {
            this.$outer.logInfo(new PythonTrain$$anonfun$10$$anonfun$apply$11(this, stringBuilder));
            downloadPythonProject = SQLPythonAlg$.MODULE$.downloadPythonProject(stringBuilder, this.pythonProjectPath$2);
        }
        Seq<String> generateCommand = new PythonAlgExecCommand((PythonScript) this.pythonProject$2.get(), Option$.MODULE$.apply(this.mlflowConfig$1), Option$.MODULE$.apply(this.pythonConfig$1), this.envs$2).generateCommand(MLProject$.MODULE$.train_command());
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n           |\n           |----------------------------------------------------------------\n           |host: ", "\n           |\n           |Command: ", "\n           |TaskDirectory: ", "\n           |DataDirectory: ", "\n           |ModelDirectory: ", "\n           |\n           |Notice:\n           |\n           |If you wanna keep [", "] for debug, please set\n           |keepLocalDirectory=true in train statement.\n           |\n           |e.g:\n           |\n           |train data as PythonAlg.`/tmp/abc` where keepLocalDirectory=\"true\";\n           |----------------------------------------------------------------\n         "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{NetUtils.getHost(), generateCommand.mkString(" "), stringBuilder, localDataPath, s, stringBuilder})))).stripMargin();
        this.$outer.logInfo(new PythonTrain$$anonfun$10$$anonfun$apply$13(this, stripMargin));
        long currentTimeMillis = System.currentTimeMillis();
        z = false;
        Seq seq = new PythonProjectExecuteRunner(stringBuilder, this.keepLocalDirectory$2, this.envs$2, new PythonTrain$$anonfun$10$$anonfun$11(this), PythonProjectExecuteRunner$.MODULE$.$lessinit$greater$default$5(), PythonProjectExecuteRunner$.MODULE$.$lessinit$greater$default$6(), PythonProjectExecuteRunner$.MODULE$.$lessinit$greater$default$7()).run(generateCommand, hashMap, MapType$.MODULE$.apply(StringType$.MODULE$, MapType$.MODULE$.apply(StringType$.MODULE$, StringType$.MODULE$)), ((PythonScript) this.pythonProject$2.get()).fileContent(), ((PythonScript) this.pythonProject$2.get()).fileName(), (byte[][]) this.rowsBr$1.value()).map(new PythonTrain$$anonfun$10$$anonfun$12(this)).toSeq();
        double unboxToDouble = seq.size() > 0 ? BoxesRunTime.unboxToDouble(seq.head()) : 0.0d;
        long currentTimeMillis2 = System.currentTimeMillis();
        String stringBuilder3 = new StringBuilder().append(SQLPythonFunc$.MODULE$.getAlgModelPath(this.path$1, this.keepVersion$2)).append("/").append(BoxesRunTime.boxToInteger(_1$mcI$sp)).toString();
        try {
            if (!z) {
                try {
                    FileSystem fileSystem = FileSystem.get(new Configuration());
                    if (this.keepVersion$2) {
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    } else {
                        BoxesRunTime.boxToBoolean(fileSystem.delete(new Path(stringBuilder3), true));
                    }
                    fileSystem.copyFromLocalFile(new Path(s), new Path(stringBuilder3));
                } catch (Exception e2) {
                    e2.printStackTrace();
                    z = true;
                }
            }
            FileUtils.forceDelete(new File(s));
            if (!this.keepLocalDirectory$2) {
                FileUtils.forceDelete(new File(localDataPath));
            }
            loadResourceInTrain.foreach(new PythonTrain$$anonfun$10$$anonfun$apply$17(this));
            return Row$.MODULE$.fromSeq(Seq$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{stringBuilder3, BoxesRunTime.boxToInteger(_1$mcI$sp), ((PythonScript) this.pythonProject$2.get()).fileName(), BoxesRunTime.boxToDouble(unboxToDouble), z ? "fail" : "success", BoxesRunTime.boxToLong(currentTimeMillis), BoxesRunTime.boxToLong(currentTimeMillis2), map, stripMargin})));
        } catch (Throwable th) {
            FileUtils.forceDelete(new File(s));
            if (!this.keepLocalDirectory$2) {
                FileUtils.forceDelete(new File(localDataPath));
            }
            loadResourceInTrain.foreach(new PythonTrain$$anonfun$10$$anonfun$apply$17(this));
            throw th;
        }
    }

    public /* synthetic */ PythonTrain streaming$dsl$mmlib$algs$python$PythonTrain$$anonfun$$$outer() {
        return this.$outer;
    }

    public final double streaming$dsl$mmlib$algs$python$PythonTrain$$anonfun$$filterScore$2(String str) {
        if (str == null || !str.startsWith("mlsql_validation_score:")) {
            return 0.0d;
        }
        return new StringOps(Predef$.MODULE$.augmentString((String) Predef$.MODULE$.refArrayOps(str.split(":")).last())).toDouble();
    }

    public PythonTrain$$anonfun$10(PythonTrain pythonTrain, String str, boolean z, boolean z2, ObjectRef objectRef, boolean z3, String str2, Broadcast broadcast, IntRef intRef, Map map, MLFlowConfig mLFlowConfig, PythonConfig pythonConfig, Map map2, Option option, MLSQLExecuteContext mLSQLExecuteContext, Option option2, String str3, PythonScriptType pythonScriptType) {
        if (pythonTrain == null) {
            throw null;
        }
        this.$outer = pythonTrain;
        this.path$1 = str;
        this.keepVersion$2 = z;
        this.keepLocalDirectory$2 = z2;
        this.kafkaParam$2 = objectRef;
        this.enableDataLocal$2 = z3;
        this.dataHDFSPath$1 = str2;
        this.rowsBr$1 = broadcast;
        this.stopFlagNum$2 = intRef;
        this.systemParam$1 = map;
        this.mlflowConfig$1 = mLFlowConfig;
        this.pythonConfig$1 = pythonConfig;
        this.envs$2 = map2;
        this.pythonProject$2 = option;
        this.mlsqlContext$2 = mLSQLExecuteContext;
        this.pythonProjectPath$2 = option2;
        this.projectName$2 = str3;
        this.projectType$2 = pythonScriptType;
    }
}
