package streaming.dsl.mmlib.algs;

import java.io.BufferedWriter;
import java.io.File;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.execution.datasources.json.WowJsonInferSchema$;
import org.apache.spark.sql.types.MapType$;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.ExternalCommandRunner$;
import scala.Array$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction2;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.runtime.ObjectRef;
import scala.runtime.ScalaRunTime$;
import streaming.dsl.mmlib.algs.python.PythonScript;

/* compiled from: SQLPythonAlgBatchPrediction.scala */
/* loaded from: input_file:streaming/dsl/mmlib/algs/SQLPythonAlgBatchPrediction$$anonfun$1.class */
public final class SQLPythonAlgBatchPrediction$$anonfun$1 extends AbstractFunction2<Object, Iterator<Row>, Iterator<Nothing$>> implements Serializable {
    public static final long serialVersionUID = 0;
    private final String wowPath$1;
    private final Map kafkaParam$1;
    private final Map systemParam$1;
    private final Map fitParam$1;
    private final Option userPythonScript$1;
    private final StructType schema$1;
    private final ObjectRef resourceParams$1;
    private final String sessionLocalTimeZone$1;
    private final String hdfsModelPath$1;

    public final Iterator<Nothing$> apply(int i, Iterator<Row> iterator) {
        String str = (String) this.systemParam$1.getOrElse("pythonPath", new SQLPythonAlgBatchPrediction$$anonfun$1$$anonfun$2(this));
        String[] strArr = (String[]) Predef$.MODULE$.refArrayOps(((String) this.systemParam$1.getOrElse("pythonParam", new SQLPythonAlgBatchPrediction$$anonfun$1$$anonfun$4(this))).split(",")).filterNot(new SQLPythonAlgBatchPrediction$$anonfun$1$$anonfun$5(this));
        String localTempDataPath = SQLPythonFunc$.MODULE$.getLocalTempDataPath(this.wowPath$1);
        String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", "/", "/", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{SQLPythonFunc$.MODULE$.getLocalBasePath(), UUID.randomUUID().toString(), BoxesRunTime.boxToInteger(i)}));
        String s2 = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", "/", "/", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{SQLPythonFunc$.MODULE$.getLocalBasePath(), UUID.randomUUID().toString(), BoxesRunTime.boxToInteger(i)}));
        String s3 = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", "/data"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.wowPath$1}));
        FileSystem fileSystem = FileSystem.get(new Configuration());
        fileSystem.copyToLocalFile(new Path(this.hdfsModelPath$1), new Path(s));
        String stringBuilder = new StringBuilder().append(localTempDataPath).append("/").append(BoxesRunTime.boxToInteger(i)).toString();
        FileUtils.forceMkdir(new File(stringBuilder));
        BufferedWriter newBufferedWriter = Files.newBufferedWriter(Paths.get(new StringBuilder().append(stringBuilder).append("/0.json").toString(), new String[0]), Charset.forName("utf-8"), new OpenOption[0]);
        try {
            WowJsonInferSchema$.MODULE$.toJson(iterator, this.schema$1, this.sessionLocalTimeZone$1, new SQLPythonAlgBatchPrediction$$anonfun$1$$anonfun$apply$1(this, newBufferedWriter));
            newBufferedWriter.flush();
            newBufferedWriter.close();
            HashMap hashMap = new HashMap();
            PythonScript pythonScript = (PythonScript) this.userPythonScript$1.get();
            hashMap.put("fitParam", JavaConverters$.MODULE$.mapAsJavaMapConverter(this.fitParam$1).asJava());
            hashMap.put("kafkaParam", JavaConverters$.MODULE$.mapAsJavaMapConverter(this.kafkaParam$1.$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("group_id"), new StringBuilder().append((String) this.kafkaParam$1.apply("group_id")).append("_").append(BoxesRunTime.boxToInteger(i)).toString()))).asJava());
            hashMap.put("internalSystemParam", JavaConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("tempModelLocalPath"), s), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("tempDataLocalPath"), stringBuilder), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("tempResultLocalPath"), s2), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("resource"), JavaConverters$.MODULE$.mapAsJavaMapConverter((Map) this.resourceParams$1.elem).asJava())}))).asJava());
            hashMap.put("systemParam", JavaConverters$.MODULE$.mapAsJavaMapConverter(this.systemParam$1).asJava());
            Seq seq = (Seq) ((TraversableLike) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{str})).$plus$plus(Predef$.MODULE$.refArrayOps(strArr), Seq$.MODULE$.canBuildFrom())).$plus$plus(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{pythonScript.fileName()})), Seq$.MODULE$.canBuildFrom());
            System.currentTimeMillis();
            try {
                SQLPythonFunc$.MODULE$.recordUserLog(i, pythonScript, this.kafkaParam$1, ExternalCommandRunner$.MODULE$.run(SQLPythonFunc$.MODULE$.getLocalRunPath(UUID.randomUUID().toString()), seq, hashMap, MapType$.MODULE$.apply(StringType$.MODULE$, MapType$.MODULE$.apply(StringType$.MODULE$, StringType$.MODULE$)), pythonScript.fileContent(), pythonScript.fileName(), "", SQLPythonFunc$.MODULE$.recordAnyLog(this.kafkaParam$1, SQLPythonFunc$.MODULE$.recordAnyLog$default$2()), (byte[][]) Array$.MODULE$.apply(Nil$.MODULE$, ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Byte.TYPE))), ExternalCommandRunner$.MODULE$.run$default$10(), ExternalCommandRunner$.MODULE$.run$default$11(), ExternalCommandRunner$.MODULE$.run$default$12(), ExternalCommandRunner$.MODULE$.run$default$13(), ExternalCommandRunner$.MODULE$.run$default$14()), SQLPythonFunc$.MODULE$.recordUserLog$default$5());
            } catch (Exception e) {
                e.printStackTrace();
            }
            try {
                try {
                    fileSystem.delete(new Path(s3), true);
                    fileSystem.copyFromLocalFile(new Path(s2), new Path(s3));
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
                FileUtils.deleteDirectory(new File(s));
                FileUtils.deleteDirectory(new File(stringBuilder));
                return Seq$.MODULE$.apply(Nil$.MODULE$).toIterator();
            } catch (Throwable th) {
                FileUtils.deleteDirectory(new File(s));
                FileUtils.deleteDirectory(new File(stringBuilder));
                throw th;
            }
        } catch (Throwable th2) {
            newBufferedWriter.close();
            throw th2;
        }
    }

    public final /* bridge */ /* synthetic */ Object apply(Object obj, Object obj2) {
        return apply(BoxesRunTime.unboxToInt(obj), (Iterator<Row>) obj2);
    }

    public SQLPythonAlgBatchPrediction$$anonfun$1(SQLPythonAlgBatchPrediction sQLPythonAlgBatchPrediction, String str, Map map, Map map2, Map map3, Option option, StructType structType, ObjectRef objectRef, String str2, String str3) {
        this.wowPath$1 = str;
        this.kafkaParam$1 = map;
        this.systemParam$1 = map2;
        this.fitParam$1 = map3;
        this.userPythonScript$1 = option;
        this.schema$1 = structType;
        this.resourceParams$1 = objectRef;
        this.sessionLocalTimeZone$1 = str2;
        this.hdfsModelPath$1 = str3;
    }
}
