package tech.mlsql.plugins.app.pythoncontroller;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.TaskCompletionListener;
import scala.Array$;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq$;
import scala.collection.immutable.Map;
import scala.collection.immutable.MapLike;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import streaming.dsl.MLSQLExecuteContext;
import streaming.dsl.ScriptSQLExec$;
import tech.mlsql.app.CustomController;
import tech.mlsql.arrow.python.PythonWorkerFactory$Tool$;
import tech.mlsql.arrow.python.runner.PythonProjectRunner;
import tech.mlsql.job.JobManager$;
import tech.mlsql.job.MLSQLJobInfo;
import tech.mlsql.job.MLSQLJobType$;
import tech.mlsql.session.SetSession;

/* compiled from: PythonApp.scala */
@ScalaSignature(bytes = "\u0006\u000193AAB\u0004\u0001%!)a\u0004\u0001C\u0001?!)!\u0005\u0001C!G!)A\u0007\u0001C\u0001k!)Q\b\u0001C\u0001}!)q\t\u0001C\u0001\u0011\n\u0001\u0002+\u001f;i_:\u001cuN\u001c;s_2dWM\u001d\u0006\u0003\u0011%\t\u0001\u0003]=uQ>t7m\u001c8ue>dG.\u001a:\u000b\u0005)Y\u0011aA1qa*\u0011A\"D\u0001\ba2,x-\u001b8t\u0015\tqq\"A\u0003nYN\fHNC\u0001\u0011\u0003\u0011!Xm\u00195\u0004\u0001M\u0019\u0001aE\r\u0011\u0005Q9R\"A\u000b\u000b\u0003Y\tQa]2bY\u0006L!\u0001G\u000b\u0003\r\u0005s\u0017PU3g!\tQB$D\u0001\u001c\u0015\tQQ\"\u0003\u0002\u001e7\t\u00012)^:u_6\u001cuN\u001c;s_2dWM]\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003\u0001\u0002\"!\t\u0001\u000e\u0003\u001d\t1A];o)\t!s\u0006\u0005\u0002&Y9\u0011aE\u000b\t\u0003OUi\u0011\u0001\u000b\u0006\u0003SE\ta\u0001\u0010:p_Rt\u0014BA\u0016\u0016\u0003\u0019\u0001&/\u001a3fM&\u0011QF\f\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005-*\u0002\"\u0002\u0019\u0003\u0001\u0004\t\u0014A\u00029be\u0006l7\u000f\u0005\u0003&e\u0011\"\u0013BA\u001a/\u0005\ri\u0015\r]\u0001\u0011G>tg-[4ve\u0016dunZ\"p]\u001a$\u0012A\u000e\t\u0005oq\"C%D\u00019\u0015\tI$(A\u0005j[6,H/\u00192mK*\u00111(F\u0001\u000bG>dG.Z2uS>t\u0017BA\u001a9\u0003A9W\r^*dQ\u0016l\u0017-\u00118e\u0007>tg\r\u0006\u00027\u007f!)\u0001\t\u0002a\u0001\u0003\u0006QQM\u001c<TKN\u001c\u0018n\u001c8\u0011\u0005\t+U\"A\"\u000b\u0005\u0011k\u0011aB:fgNLwN\\\u0005\u0003\r\u000e\u0013!bU3u'\u0016\u001c8/[8o\u00035I7\u000fT8dC2l\u0015m\u001d;feR\u0011\u0011\n\u0014\t\u0003))K!aS\u000b\u0003\u000f\t{w\u000e\\3b]\")Q*\u0002a\u0001c\u0005!1m\u001c8g\u0001")
/* loaded from: input_file:tech/mlsql/plugins/app/pythoncontroller/PythonController.class */
public class PythonController implements CustomController {
    public String run(Map<String, String> map) {
        Map apply;
        MLSQLExecuteContext context = ScriptSQLExec$.MODULE$.context();
        SparkSession sparkSession = context.execListener().sparkSession();
        SetSession setSession = new SetSession(sparkSession, context.owner());
        Some fetchPythonEnv = setSession.fetchPythonEnv();
        if (fetchPythonEnv instanceof Some) {
            apply = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) ((Dataset) fetchPythonEnv.value()).collect())).map(setItem -> {
                String k = setItem.k();
                return (k != null ? !k.equals("PYTHON_ENV") : "PYTHON_ENV" != 0) ? new Tuple2(setItem.k(), setItem.v()) : new Tuple2(setItem.k(), new StringBuilder(38).append(setItem.v()).append(" && export ARROW_PRE_0_15_IPC_FORMAT=1").toString());
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class))))).toMap(Predef$.MODULE$.$conforms());
        } else {
            if (!None$.MODULE$.equals(fetchPythonEnv)) {
                throw new MatchError(fetchPythonEnv);
            }
            apply = Predef$.MODULE$.Map().apply(Nil$.MODULE$);
        }
        Map $plus$plus = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("PY_EXECUTE_USER"), context.owner()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("PYTHON_ENV"), "export ARROW_PRE_0_15_IPC_FORMAT=1")})).$plus$plus(apply);
        int i = new StringOps(Predef$.MODULE$.augmentString((String) map.apply("scriptId"))).toInt();
        String str = (String) map.apply("sql");
        Map $plus$plus2 = getSchemaAndConf(setSession).$plus$plus(configureLogConf());
        String str2 = (String) $plus$plus2.getOrElse("runIn", () -> {
            return "executor";
        });
        String groupId = ScriptSQLExec$.MODULE$.context().groupId();
        MLSQLJobInfo jobInfo = JobManager$.MODULE$.getJobInfo((String) map.apply("owner"), (String) map.getOrElse("jobType", () -> {
            return MLSQLJobType$.MODULE$.SCRIPT();
        }), (String) map.apply("jobName"), (String) map.apply("sql"), new StringOps(Predef$.MODULE$.augmentString((String) map.getOrElse("timeout", () -> {
            return "-1";
        }))).toLong());
        MLSQLJobInfo copy = jobInfo.copy(jobInfo.copy$default$1(), jobInfo.copy$default$2(), jobInfo.copy$default$3(), jobInfo.copy$default$4(), groupId, jobInfo.copy$default$6(), jobInfo.copy$default$7(), jobInfo.copy$default$8());
        ObjectRef create = ObjectRef.create("");
        JobManager$.MODULE$.run(sparkSession, copy, () -> {
            if ("executor".equals(str2)) {
                create.elem = (String) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) sparkSession.sparkContext().parallelize(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{str})), 1, ClassTag$.MODULE$.apply(String.class)).map(str3 -> {
                    final AtomicReference atomicReference = new AtomicReference();
                    final PythonController pythonController = null;
                    TaskContext$.MODULE$.get().addTaskCompletionListener(new TaskCompletionListener(pythonController, atomicReference) { // from class: tech.mlsql.plugins.app.pythoncontroller.PythonController$$anon$1
                        private final AtomicReference pythonProjectRunnerRef$1;

                        public void onTaskCompletion(TaskContext taskContext) {
                            if (this.pythonProjectRunnerRef$1.get() != null) {
                                ((PythonProjectRunner) this.pythonProjectRunnerRef$1.get()).getPythonProcess().map(subProcess -> {
                                    subProcess.close();
                                    return BoxedUnit.UNIT;
                                });
                            }
                        }

                        {
                            this.pythonProjectRunnerRef$1 = atomicReference;
                        }
                    });
                    final ObjectRef create2 = ObjectRef.create("[]");
                    final PythonController pythonController2 = null;
                    Thread thread = new Thread(new Runnable(pythonController2, create2, atomicReference, str3, i, $plus$plus, $plus$plus2) { // from class: tech.mlsql.plugins.app.pythoncontroller.PythonController$$anon$2
                        private final ObjectRef wow$1;
                        private final AtomicReference pythonProjectRunnerRef$1;
                        private final String item$1;
                        private final int scriptId$1;
                        private final Map envs$1;
                        private final Map runnerConf$1;

                        @Override // java.lang.Runnable
                        public void run() {
                            this.wow$1.elem = PyRunner$.MODULE$.runPython(this.pythonProjectRunnerRef$1, this.item$1, this.scriptId$1, this.envs$1, this.runnerConf$1);
                        }

                        {
                            this.wow$1 = create2;
                            this.pythonProjectRunnerRef$1 = atomicReference;
                            this.item$1 = str3;
                            this.scriptId$1 = i;
                            this.envs$1 = $plus$plus;
                            this.runnerConf$1 = $plus$plus2;
                        }
                    });
                    thread.start();
                    while (!TaskContext$.MODULE$.get().isInterrupted() && thread.isAlive()) {
                        Thread.sleep(1000L);
                    }
                    return (String) create2.elem;
                }, ClassTag$.MODULE$.apply(String.class)).collect())).head();
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                if (!"driver".equals(str2)) {
                    throw new MatchError(str2);
                }
                create.elem = PyRunner$.MODULE$.runPython(new AtomicReference<>(), str, i, $plus$plus, $plus$plus2);
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
        });
        return (String) create.elem;
    }

    public Map<String, String> configureLogConf() {
        MLSQLExecuteContext context = ScriptSQLExec$.MODULE$.context();
        Map<String, String> allConfs = context.execListener().sparkSession().sqlContext().getAllConfs();
        return ((MapLike) allConfs.filter(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$configureLogConf$1(tuple2));
        })).$plus$plus(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("PY_EXECUTE_USER"), context.owner()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("groupId"), context.groupId())}))).$plus$plus(isLocalMaster(allConfs) ? Predef$.MODULE$.Map().apply(Nil$.MODULE$) : Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(PythonWorkerFactory$Tool$.MODULE$.REDIRECT_IMPL()), "tech.mlsql.log.RedirectStreamsToSocketServer")})));
    }

    public Map<String, String> getSchemaAndConf(SetSession setSession) {
        Map<String, String> apply;
        Some fetchPythonRunnerConf = setSession.fetchPythonRunnerConf();
        if (fetchPythonRunnerConf instanceof Some) {
            apply = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) ((Dataset) fetchPythonRunnerConf.value()).collect())).map(setItem -> {
                return new Tuple2(setItem.k(), setItem.v());
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class))))).toMap(Predef$.MODULE$.$conforms());
        } else {
            if (!None$.MODULE$.equals(fetchPythonRunnerConf)) {
                throw new MatchError(fetchPythonRunnerConf);
            }
            apply = Predef$.MODULE$.Map().apply(Nil$.MODULE$);
        }
        return apply;
    }

    public boolean isLocalMaster(Map<String, String> map) {
        String str = (String) map.getOrElse("spark.master", () -> {
            return "";
        });
        if (str != null ? !str.equals("local") : "local" != 0) {
            if (!str.startsWith("local[")) {
                return false;
            }
        }
        return true;
    }

    public static final /* synthetic */ boolean $anonfun$configureLogConf$1(Tuple2 tuple2) {
        return ((String) tuple2._1()).startsWith("spark.mlsql.log.driver");
    }
}
