package streaming.dsl.mmlib.algs;

import java.io.File;
import java.lang.ref.SoftReference;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import org.apache.commons.io.FileUtils;
import org.apache.http.client.fluent.Form;
import org.apache.http.client.fluent.Request;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.MapType$;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.util.ExternalCommandRunner$;
import org.apache.spark.util.TaskContextUtil$;
import scala.Function1;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Iterable$;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.StringBuilder;
import scala.runtime.AbstractFunction1;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.DoubleRef;
import scala.runtime.EmptyMethodCache;
import scala.runtime.IntRef;
import scala.runtime.MethodCache;
import scala.runtime.ObjectRef;
import scala.runtime.ScalaRunTime$;
import streaming.common.HDFSOperator$;
import streaming.common.NetUtils;
import streaming.dsl.mmlib.algs.python.PythonScript;
import streaming.dsl.mmlib.algs.tf.cluster.ClusterSpec;
import streaming.dsl.mmlib.algs.tf.cluster.ClusterSpec$;

/* compiled from: SQLDTFAlg.scala */
/* loaded from: input_file:streaming/dsl/mmlib/algs/SQLDTFAlg$$anonfun$11.class */
public final class SQLDTFAlg$$anonfun$11 extends AbstractFunction1<Tuple2<Object, Map<String, String>>, Row> implements Serializable {
    public static final long serialVersionUID = 0;
    public final String path$1;
    private final boolean keepVersion$1;
    private final boolean enableDataLocal$1;
    public final ObjectRef kafkaParam$1;
    private final IntRef stopFlagNum$1;
    private final Map systemParam$1;
    private final String pythonPath$1;
    private final String[] pythonParam$1;
    private final ObjectRef tempDataLocalPath$1;
    private final HashMap tfDataMap$1;
    private final Option userPythonScript$1;
    public final Broadcast rowsBr$1;
    private final String clusterUuid$1;
    private final String driverHost$1;
    private final int LIMIT$1;
    private final int driverPort$1;

    public final Row apply(Tuple2<Object, Map<String, String>> tuple2) {
        Map<String, String> map = (Map) tuple2._2();
        final int _1$mcI$sp = tuple2._1$mcI$sp();
        String str = (String) map.apply("jobName");
        int i = new StringOps(Predef$.MODULE$.augmentString((String) map.apply("taskIndex"))).toInt();
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("jobName", str);
        jSONObject.put("taskIndex", BoxesRunTime.boxToInteger(i));
        String host = NetUtils.getHost();
        ServerSocket availableAndReturn = NetUtils.availableAndReturn(ClusterSpec$.MODULE$.MIN_PORT_NUMBER(), ClusterSpec$.MODULE$.MAX_PORT_NUMBER());
        if (availableAndReturn == null) {
            throw new RuntimeException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Fail to create tensorflow cluster, maybe executor cannot bind port "})).s(Nil$.MODULE$));
        }
        int localPort = availableAndReturn.getLocalPort();
        reportToMaster$1(reportToMaster$default$1$1(), map, host, localPort);
        int i2 = 0;
        boolean z = false;
        AtomicReference atomicReference = new AtomicReference();
        while (!z && i2 < 100) {
            JSONArray fromObject = JSONArray.fromObject(fetchClusterFromMaster$1());
            Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Waiting all worker/ps started. Wait times: ", " times. Already registered tf worker/ps: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(i2), BoxesRunTime.boxToInteger(fromObject.size())})));
            if (fromObject.size() == this.LIMIT$1) {
                atomicReference.set(new ClusterSpec(getHostAndPortFromJson$1(fromObject, "worker").toList(), getHostAndPortFromJson$1(fromObject, "ps").toList()));
                z = true;
            } else {
                Thread.sleep(5000L);
                i2++;
            }
        }
        NetUtils.releasePort(availableAndReturn);
        if (atomicReference.get() == null) {
            throw new RuntimeException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Fail to create tensorflow cluster, this maybe caused by  executor cannot connect driver at ", ":", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.driverHost$1, BoxesRunTime.boxToInteger(this.driverPort$1)})));
        }
        ClusterSpec clusterSpec = (ClusterSpec) atomicReference.get();
        JSONObject jSONObject2 = new JSONObject();
        jSONObject2.put("worker", JavaConverters$.MODULE$.seqAsJavaListConverter(clusterSpec.worker()).asJava());
        jSONObject2.put("ps", JavaConverters$.MODULE$.seqAsJavaListConverter(clusterSpec.ps()).asJava());
        Predef$.MODULE$.println(jSONObject2.toString());
        String str2 = (String) this.tempDataLocalPath$1.elem;
        if (!isPs$1(str) && this.enableDataLocal$1) {
            String str3 = (String) this.tfDataMap$1.apply(BoxesRunTime.boxToInteger(_1$mcI$sp));
            str2 = new StringBuilder().append(str2).append("/").append(BoxesRunTime.boxToInteger(_1$mcI$sp)).toString();
            Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Copy HDFS ", " to local ", " "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str3, str2})));
            SQLPythonFunc$.MODULE$.recordSingleLineLog((Map) this.kafkaParam$1.elem, new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Copy HDFS ", " to local ", " "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str3, str2})), SQLPythonFunc$.MODULE$.recordSingleLineLog$default$3());
            HDFSOperator$.MODULE$.copyToLocalFile(new StringBuilder().append(str2).append("/").append(Predef$.MODULE$.refArrayOps(str3.split("/")).last()).toString(), str3, true);
        }
        final java.util.HashMap hashMap = new java.util.HashMap();
        java.util.Map map2 = (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(map).asJava();
        if (!map.contains("modelPath")) {
            map2 = (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(map.$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("modelPath"), this.path$1))).asJava();
        }
        ObjectRef create = ObjectRef.create(Predef$.MODULE$.Map().empty());
        if (((TraversableOnce) map.keys().map(new SQLDTFAlg$$anonfun$11$$anonfun$apply$1(this), Iterable$.MODULE$.canBuildFrom())).toSet().contains("resource")) {
            Functions$.MODULE$.mapParams(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"resource"})).s(Nil$.MODULE$), map).foreach(new SQLDTFAlg$$anonfun$11$$anonfun$apply$2(this, create));
        }
        final PythonScript pythonScript = (PythonScript) this.userPythonScript$1.get();
        String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", "/", "/", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{SQLPythonFunc$.MODULE$.getLocalBasePath(), UUID.randomUUID().toString(), BoxesRunTime.boxToInteger(_1$mcI$sp)}));
        String s2 = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", "/", "/", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{SQLPythonFunc$.MODULE$.getLocalBasePath(), UUID.randomUUID().toString(), BoxesRunTime.boxToInteger(_1$mcI$sp)}));
        hashMap.put("fitParam", map2);
        hashMap.put("kafkaParam", JavaConverters$.MODULE$.mapAsJavaMapConverter(((Map) this.kafkaParam$1.elem).$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("group_id"), new StringBuilder().append((String) ((Map) this.kafkaParam$1.elem).apply("group_id")).append("_").append(BoxesRunTime.boxToInteger(_1$mcI$sp)).toString()))).asJava());
        hashMap.put("internalSystemParam", JavaConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("stopFlagNum"), BoxesRunTime.boxToInteger(this.stopFlagNum$1.elem)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("tempModelLocalPath"), s), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("tempDataLocalPath"), str2), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("resource"), JavaConverters$.MODULE$.mapAsJavaMapConverter((Map) create.elem).asJava()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("clusterSpec"), jSONObject2.toString()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("roleSpec"), jSONObject.toString()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("checkpointDir"), s2)}))).asJava());
        hashMap.put("systemParam", JavaConverters$.MODULE$.mapAsJavaMapConverter(this.systemParam$1).asJava());
        final Seq seq = (Seq) ((TraversableLike) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{this.pythonPath$1})).$plus$plus(Predef$.MODULE$.refArrayOps(this.pythonParam$1), Seq$.MODULE$.canBuildFrom())).$plus$plus(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{pythonScript.fileName()})), Seq$.MODULE$.canBuildFrom());
        String localRunPath = SQLPythonFunc$.MODULE$.getLocalRunPath(UUID.randomUUID().toString());
        long currentTimeMillis = System.currentTimeMillis();
        final DoubleRef create2 = DoubleRef.create(0.0d);
        final BooleanRef create3 = BooleanRef.create(false);
        if (isPs$1(str)) {
            final AtomicReference atomicReference2 = new AtomicReference();
            final TaskContext taskContext = TaskContext$.MODULE$.get();
            Thread thread = new Thread(new Runnable(this, _1$mcI$sp, hashMap, pythonScript, seq, create2, create3, atomicReference2, taskContext) { // from class: streaming.dsl.mmlib.algs.SQLDTFAlg$$anonfun$11$$anon$1
                private final /* synthetic */ SQLDTFAlg$$anonfun$11 $outer;
                private final int algIndex$1;
                private final java.util.HashMap paramMap$1;
                private final PythonScript pythonScript$1;
                private final Seq command$1;
                private final DoubleRef score$1;
                private final BooleanRef trainFailFlag$1;
                private final AtomicReference pythonWorker$1;
                private final TaskContext context$1;
                private static Class[] reflParams$Cache1 = new Class[0];
                private static volatile SoftReference reflPoly$Cache1 = new SoftReference(new EmptyMethodCache());

                public static Method reflMethod$Method1(Class cls) {
                    EmptyMethodCache emptyMethodCache = (MethodCache) reflPoly$Cache1.get();
                    if (emptyMethodCache == null) {
                        emptyMethodCache = new EmptyMethodCache();
                        reflPoly$Cache1 = new SoftReference(emptyMethodCache);
                    }
                    Method find = emptyMethodCache.find(cls);
                    if (find != null) {
                        return find;
                    }
                    Method ensureAccessible = ScalaRunTime$.MODULE$.ensureAccessible(cls.getMethod("getWorker", reflParams$Cache1));
                    reflPoly$Cache1 = new SoftReference(emptyMethodCache.add(cls, ensureAccessible));
                    return ensureAccessible;
                }

                @Override // java.lang.Runnable
                public void run() {
                    TaskContextUtil$.MODULE$.setContext(this.context$1);
                    String localRunPath2 = SQLPythonFunc$.MODULE$.getLocalRunPath(UUID.randomUUID().toString());
                    try {
                        Seq seq2 = this.command$1;
                        java.util.HashMap hashMap2 = this.paramMap$1;
                        MapType apply = MapType$.MODULE$.apply(StringType$.MODULE$, MapType$.MODULE$.apply(StringType$.MODULE$, StringType$.MODULE$));
                        String fileContent = this.pythonScript$1.fileContent();
                        String fileName = this.pythonScript$1.fileName();
                        Function1<Object, Seq<Object>> recordAnyLog = SQLPythonFunc$.MODULE$.recordAnyLog((Map) this.$outer.kafkaParam$1.elem, SQLPythonFunc$.MODULE$.recordAnyLog$default$2());
                        Iterator<String> run = ExternalCommandRunner$.MODULE$.run(localRunPath2, seq2, hashMap2, apply, fileContent, fileName, this.$outer.path$1, recordAnyLog, (byte[][]) this.$outer.rowsBr$1.value(), ExternalCommandRunner$.MODULE$.run$default$10(), ExternalCommandRunner$.MODULE$.run$default$11(), ExternalCommandRunner$.MODULE$.run$default$12(), ExternalCommandRunner$.MODULE$.run$default$13(), ExternalCommandRunner$.MODULE$.run$default$14());
                        try {
                            this.pythonWorker$1.set((Process) reflMethod$Method1(run.getClass()).invoke(run, new Object[0]));
                            this.score$1.elem = SQLPythonFunc$.MODULE$.recordUserLog(this.algIndex$1, this.pythonScript$1, (Map) this.$outer.kafkaParam$1.elem, run, SQLPythonFunc$.MODULE$.recordUserLog$default$5());
                        } catch (InvocationTargetException e) {
                            throw e.getCause();
                        }
                    } catch (Exception e2) {
                        e2.printStackTrace();
                        this.trainFailFlag$1.elem = true;
                    }
                }

                {
                    if (this == null) {
                        throw null;
                    }
                    this.$outer = this;
                    this.algIndex$1 = _1$mcI$sp;
                    this.paramMap$1 = hashMap;
                    this.pythonScript$1 = pythonScript;
                    this.command$1 = seq;
                    this.score$1 = create2;
                    this.trainFailFlag$1 = create3;
                    this.pythonWorker$1 = atomicReference2;
                    this.context$1 = taskContext;
                }
            });
            thread.setDaemon(true);
            thread.start();
            boolean z2 = true;
            while (z2) {
                String fetchClusterStatusFromMaster$1 = fetchClusterStatusFromMaster$1();
                if (new StringOps(Predef$.MODULE$.augmentString(fetchClusterStatusFromMaster$1)).toInt() == clusterSpec.worker().size()) {
                    z2 = false;
                }
                try {
                    Thread.sleep(10000L);
                } catch (Exception e) {
                    z2 = false;
                }
                Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"check worker finish size. targetSize:", " vs ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(clusterSpec.worker().size()), BoxesRunTime.boxToInteger(new StringOps(Predef$.MODULE$.augmentString(fetchClusterStatusFromMaster$1)).toInt())})));
            }
            ((Process) atomicReference2.get()).destroy();
            thread.interrupt();
        } else {
            try {
                create2.elem = SQLPythonFunc$.MODULE$.recordUserLog(_1$mcI$sp, pythonScript, (Map) this.kafkaParam$1.elem, ExternalCommandRunner$.MODULE$.run(localRunPath, seq, hashMap, MapType$.MODULE$.apply(StringType$.MODULE$, MapType$.MODULE$.apply(StringType$.MODULE$, StringType$.MODULE$)), pythonScript.fileContent(), pythonScript.fileName(), this.path$1, SQLPythonFunc$.MODULE$.recordAnyLog((Map) this.kafkaParam$1.elem, SQLPythonFunc$.MODULE$.recordAnyLog$default$2()), (byte[][]) this.rowsBr$1.value(), ExternalCommandRunner$.MODULE$.run$default$10(), ExternalCommandRunner$.MODULE$.run$default$11(), ExternalCommandRunner$.MODULE$.run$default$12(), ExternalCommandRunner$.MODULE$.run$default$13(), ExternalCommandRunner$.MODULE$.run$default$14()), SQLPythonFunc$.MODULE$.recordUserLog$default$5());
            } catch (Exception e2) {
                e2.printStackTrace();
                create3.elem = true;
            }
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        if (isPs$1(str)) {
            return Row$.MODULE$.fromSeq(Seq$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{"", BoxesRunTime.boxToInteger(_1$mcI$sp), pythonScript.fileName(), BoxesRunTime.boxToDouble(create2.elem), create3.elem ? "fail" : "success", BoxesRunTime.boxToLong(currentTimeMillis), BoxesRunTime.boxToLong(currentTimeMillis2), map})));
        }
        if (create3.elem) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            reportToMaster$1("/cluster/worker/status", map, host, localPort);
        }
        long currentTimeMillis3 = System.currentTimeMillis();
        String stringBuilder = new StringBuilder().append(SQLPythonFunc$.MODULE$.getAlgModelPath(this.path$1, this.keepVersion$1)).append("/").append(BoxesRunTime.boxToInteger(_1$mcI$sp)).toString();
        try {
            try {
                if (this.keepVersion$1) {
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                } else {
                    BoxesRunTime.boxToBoolean(HDFSOperator$.MODULE$.deleteDir(stringBuilder));
                }
                if (new File(s).exists()) {
                    HDFSOperator$.MODULE$.copyToHDFS(s, stringBuilder, true, false);
                } else if (new File(s2).exists()) {
                    HDFSOperator$.MODULE$.copyToHDFS(s2, stringBuilder, true, false);
                }
            } catch (Exception e3) {
                create3.elem = true;
            }
            FileUtils.deleteDirectory(new File(s));
            FileUtils.deleteDirectory(new File(str2));
            FileUtils.deleteDirectory(new File(s2));
            return Row$.MODULE$.fromSeq(Seq$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{stringBuilder, BoxesRunTime.boxToInteger(_1$mcI$sp), pythonScript.fileName(), BoxesRunTime.boxToDouble(create2.elem), create3.elem ? "fail" : "success", BoxesRunTime.boxToLong(currentTimeMillis), BoxesRunTime.boxToLong(currentTimeMillis3), map})));
        } catch (Throwable th) {
            FileUtils.deleteDirectory(new File(s));
            FileUtils.deleteDirectory(new File(str2));
            FileUtils.deleteDirectory(new File(s2));
            throw th;
        }
    }

    private final boolean isPs$1(String str) {
        return str != null ? str.equals("ps") : "ps" == 0;
    }

    private final String reportToMaster$1(String str, Map map, String str2, int i) {
        return Request.Post(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"http://", ":", "", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.driverHost$1, BoxesRunTime.boxToInteger(this.driverPort$1), str}))).bodyForm(Form.form().add("cluster", this.clusterUuid$1).add("hostAndPort", new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", ":", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str2, BoxesRunTime.boxToInteger(i)}))).add("jobName", (String) map.apply("jobName")).add("taskIndex", (String) map.apply("taskIndex")).build()).execute().returnContent().asString();
    }

    private final String reportToMaster$default$1$1() {
        return "/cluster/register";
    }

    private final String fetchClusterFromMaster$1() {
        return Request.Get(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"http://", ":", "/cluster?cluster=", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.driverHost$1, BoxesRunTime.boxToInteger(this.driverPort$1), this.clusterUuid$1}))).execute().returnContent().asString();
    }

    private final Buffer getHostAndPortFromJson$1(JSONArray jSONArray, String str) {
        return (Buffer) ((TraversableLike) ((TraversableLike) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(jSONArray).asScala()).map(new SQLDTFAlg$$anonfun$11$$anonfun$getHostAndPortFromJson$1$1(this), Buffer$.MODULE$.canBuildFrom())).filter(new SQLDTFAlg$$anonfun$11$$anonfun$getHostAndPortFromJson$1$2(this, str))).map(new SQLDTFAlg$$anonfun$11$$anonfun$getHostAndPortFromJson$1$3(this), Buffer$.MODULE$.canBuildFrom());
    }

    private final String fetchClusterStatusFromMaster$1() {
        return Request.Get(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"http://", ":", "/cluster/worker/finish?cluster=", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.driverHost$1, BoxesRunTime.boxToInteger(this.driverPort$1), this.clusterUuid$1}))).execute().returnContent().asString();
    }

    public SQLDTFAlg$$anonfun$11(SQLDTFAlg sQLDTFAlg, String str, boolean z, boolean z2, ObjectRef objectRef, IntRef intRef, Map map, String str2, String[] strArr, ObjectRef objectRef2, HashMap hashMap, Option option, Broadcast broadcast, String str3, String str4, int i, int i2) {
        this.path$1 = str;
        this.keepVersion$1 = z;
        this.enableDataLocal$1 = z2;
        this.kafkaParam$1 = objectRef;
        this.stopFlagNum$1 = intRef;
        this.systemParam$1 = map;
        this.pythonPath$1 = str2;
        this.pythonParam$1 = strArr;
        this.tempDataLocalPath$1 = objectRef2;
        this.tfDataMap$1 = hashMap;
        this.userPythonScript$1 = option;
        this.rowsBr$1 = broadcast;
        this.clusterUuid$1 = str3;
        this.driverHost$1 = str4;
        this.LIMIT$1 = i;
        this.driverPort$1 = i2;
    }
}
