package quix.python;

import com.google.common.io.Resources;
import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import com.zaxxer.nuprocess.NuProcessBuilder;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import monix.eval.Task;
import monix.eval.Task$;
import monix.execution.atomic.AtomicInt;
import monix.execution.atomic.AtomicInt$;
import monix.reactive.Observable;
import monix.reactive.Observable$;
import monix.reactive.OverflowStrategy$Unbounded$;
import py4j.GatewayServer;
import quix.api.execute.ActiveQuery;
import quix.api.execute.AsyncQueryExecutor;
import quix.api.execute.Batch;
import quix.api.execute.Batch$;
import quix.api.execute.BatchColumn$;
import quix.api.execute.Builder;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: PythonExecutor.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u001dd\u0001B\t\u0013\u0001]A\u0001B\u0010\u0001\u0003\u0002\u0003\u0006Ia\u0010\u0005\u0006\u0007\u0002!\t\u0001\u0012\u0005\b\u000f\u0002\u0001\r\u0011\"\u0001I\u0011\u001d\u0019\u0006\u00011A\u0005\u0002QCaA\u0017\u0001!B\u0013I\u0005\"B.\u0001\t\u0003a\u0006\"B9\u0001\t\u0003\u0012\b\"B?\u0001\t\u0003q\bbBA\u0005\u0001\u0011\u0005\u00111\u0002\u0005\b\u0003+\u0001A\u0011BA\f\u0011\u001d\t9\u0003\u0001C\u0001\u0003SAq!!\u0010\u0001\t\u0003\tydB\u0005\u0002HI\t\t\u0011#\u0001\u0002J\u0019A\u0011CEA\u0001\u0012\u0003\tY\u0005\u0003\u0004D\u001d\u0011\u0005\u0011Q\n\u0005\n\u0003\u001fr\u0011\u0013!C\u0001\u0003#\u0012a\u0002U=uQ>tW\t_3dkR|'O\u0003\u0002\u0014)\u00051\u0001/\u001f;i_:T\u0011!F\u0001\u0005cVL\u0007p\u0001\u0001\u0014\t\u0001Ab\u0004\u000e\t\u00033qi\u0011A\u0007\u0006\u00027\u0005)1oY1mC&\u0011QD\u0007\u0002\u0007\u0003:L(+\u001a4\u0011\t}!c%M\u0007\u0002A)\u0011\u0011EI\u0001\bKb,7-\u001e;f\u0015\t\u0019C#A\u0002ba&L!!\n\u0011\u0003%\u0005\u001b\u0018P\\2Rk\u0016\u0014\u00180\u0012=fGV$xN\u001d\t\u0003O9r!\u0001\u000b\u0017\u0011\u0005%RR\"\u0001\u0016\u000b\u0005-2\u0012A\u0002\u001fs_>$h(\u0003\u0002.5\u00051\u0001K]3eK\u001aL!a\f\u0019\u0003\rM#(/\u001b8h\u0015\ti#\u0004\u0005\u0002 e%\u00111\u0007\t\u0002\u0006\u0005\u0006$8\r\u001b\t\u0003kqj\u0011A\u000e\u0006\u0003oa\nAb]2bY\u0006dwnZ4j]\u001eT!!\u000f\u001e\u0002\u0011QL\b/Z:bM\u0016T\u0011aO\u0001\u0004G>l\u0017BA\u001f7\u0005-a\u0015M_=M_\u001e<\u0017N\\4\u0002\r\r|gNZ5h!\t\u0001\u0015)D\u0001\u0013\u0013\t\u0011%C\u0001\u0007QsRDwN\\\"p]\u001aLw-\u0001\u0004=S:LGO\u0010\u000b\u0003\u000b\u001a\u0003\"\u0001\u0011\u0001\t\u000fy\u0012\u0001\u0013!a\u0001\u007f\u0005!\u0001o\u001c:u+\u0005I\u0005C\u0001&R\u001b\u0005Y%B\u0001'N\u0003\u0019\tGo\\7jG*\u0011ajT\u0001\nKb,7-\u001e;j_:T\u0011\u0001U\u0001\u0006[>t\u0017\u000e_\u0005\u0003%.\u0013\u0011\"\u0011;p[&\u001c\u0017J\u001c;\u0002\u0011A|'\u000f^0%KF$\"!\u0016-\u0011\u0005e1\u0016BA,\u001b\u0005\u0011)f.\u001b;\t\u000fe#\u0011\u0011!a\u0001\u0013\u0006\u0019\u0001\u0010J\u0019\u0002\u000bA|'\u000f\u001e\u0011\u0002\t\r|\u0007/\u001f\u000b\u0004;\u000e|\u0007c\u00010b+6\tqL\u0003\u0002a\u001f\u0006!QM^1m\u0013\t\u0011wL\u0001\u0003UCN\\\u0007\"\u00023\u0007\u0001\u0004)\u0017a\u00013jeB\u0011a-\\\u0007\u0002O*\u0011\u0001.[\u0001\u0005M&dWM\u0003\u0002kW\u0006\u0019a.[8\u000b\u00031\fAA[1wC&\u0011an\u001a\u0002\u0005!\u0006$\b\u000eC\u0003q\r\u0001\u0007a%\u0001\u0005gS2,g.Y7f\u0003\u001d\u0011XO\u001c+bg.$2!X:y\u0011\u0015!x\u00011\u0001v\u0003\u0015\tX/\u001a:z!\rybOJ\u0005\u0003o\u0002\u00121\"Q2uSZ,\u0017+^3ss\")\u0011p\u0002a\u0001u\u00069!-^5mI\u0016\u0014\b\u0003B\u0010|MEJ!\u0001 \u0011\u0003\u000f\t+\u0018\u000e\u001c3fe\u0006YQ.Y6f!J|7-Z:t)\ry\u0018q\u0001\t\u0005=\u0006\f\t\u0001E\u0002A\u0003\u0007I1!!\u0002\u0013\u0005Q\u0001\u0016\u0010\u001e5p]J+hN\\5oOB\u0013xnY3tg\")A\u000f\u0003a\u0001k\u0006a\u0001O]3qCJ,g)\u001b7fgR1\u0011QBA\b\u0003'\u00012AX1f\u0011\u001d\t\t\"\u0003a\u0001\u0003\u0003\tq\u0001\u001d:pG\u0016\u001c8\u000fC\u0003u\u0013\u0001\u0007Q/\u0001\nhK:,'/\u0019;f+N,'oU2sSB$HCBA\r\u0003G\t)\u0003\u0005\u0003\u0002\u001c\u0005\u0005RBAA\u000f\u0015\r\tyb[\u0001\u0005Y\u0006tw-C\u00020\u0003;AQ\u0001\u001a\u0006A\u0002\u0015DQ\u0001\u001e\u0006A\u0002U\fa\u0002\u001d:fa\u0006\u0014XmR1uK^\f\u0017\u0010\u0006\u0004\u0002,\u0005e\u00121\b\t\u0005=\u0006\fi\u0003\u0005\u0003\u00020\u0005URBAA\u0019\u0015\t\t\u0019$\u0001\u0003qsRR\u0017\u0002BA\u001c\u0003c\u0011QbR1uK^\f\u0017pU3sm\u0016\u0014\bbBA\t\u0017\u0001\u0007\u0011\u0011\u0001\u0005\u0006i.\u0001\r!^\u0001\u0004eVtGcB/\u0002B\u0005\r\u0013Q\t\u0005\b\u0003#a\u0001\u0019AA\u0001\u0011\u0015!H\u00021\u0001v\u0011\u0015IH\u00021\u0001{\u00039\u0001\u0016\u0010\u001e5p]\u0016CXmY;u_J\u0004\"\u0001\u0011\b\u0014\u00059ABCAA%\u0003m!C.Z:tS:LG\u000fJ4sK\u0006$XM\u001d\u0013eK\u001a\fW\u000f\u001c;%cU\u0011\u00111\u000b\u0016\u0004\u007f\u0005U3FAA,!\u0011\tI&a\u0019\u000e\u0005\u0005m#\u0002BA/\u0003?\n\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\u0005\u0005$$\u0001\u0006b]:|G/\u0019;j_:LA!!\u001a\u0002\\\t\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3")
/* loaded from: input_file:quix/python/PythonExecutor.class */
public class PythonExecutor implements AsyncQueryExecutor<String, Batch>, LazyLogging {
    private final PythonConfig config;
    private AtomicInt port;
    private transient Logger logger;
    private volatile transient boolean bitmap$trans$0;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [quix.python.PythonExecutor] */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$trans$0) {
                this.logger = LazyLogging.logger$(this);
                r0 = this;
                r0.bitmap$trans$0 = true;
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return !this.bitmap$trans$0 ? logger$lzycompute() : this.logger;
    }

    public AtomicInt port() {
        return this.port;
    }

    public void port_$eq(AtomicInt atomicInt) {
        this.port = atomicInt;
    }

    public Task<BoxedUnit> copy(Path path, String str) {
        return Task$.MODULE$.apply(() -> {
            return Resources.toByteArray(Resources.getResource(str));
        }).flatMap(bArr -> {
            return Task$.MODULE$.apply(() -> {
                return Files.write(Paths.get(path.toString(), str), bArr, new OpenOption[0]);
            }).map(path2 -> {
                $anonfun$copy$4(path2);
                return BoxedUnit.UNIT;
            });
        });
    }

    public Task<BoxedUnit> runTask(ActiveQuery<String> activeQuery, Builder<String, Batch> builder) {
        return makeProcess(activeQuery).bracket(pythonRunningProcess -> {
            return this.run(pythonRunningProcess, activeQuery, builder);
        }, pythonRunningProcess2 -> {
            return pythonRunningProcess2.close();
        });
    }

    public Task<PythonRunningProcess> makeProcess(ActiveQuery<String> activeQuery) {
        PythonRunningProcess pythonRunningProcess = new PythonRunningProcess(activeQuery.id(), PythonRunningProcess$.MODULE$.apply$default$2(), PythonRunningProcess$.MODULE$.apply$default$3(), PythonRunningProcess$.MODULE$.apply$default$4(), PythonRunningProcess$.MODULE$.apply$default$5());
        return prepareFiles(pythonRunningProcess, activeQuery).flatMap(path -> {
            return this.prepareGateway(pythonRunningProcess, activeQuery).map(gatewayServer -> {
                return pythonRunningProcess;
            });
        });
    }

    public Task<Path> prepareFiles(PythonRunningProcess pythonRunningProcess, ActiveQuery<String> activeQuery) {
        Path path = Paths.get(this.config.userScriptsDir(), activeQuery.user().email());
        Path path2 = Paths.get(path.toString(), "bin");
        byte[] bytes = generateUserScript(path, activeQuery).getBytes("UTF-8");
        return Task$.MODULE$.apply(() -> {
            return Files.notExists(path, new LinkOption[0]) ? Files.createDirectories(path, new FileAttribute[0]) : BoxedUnit.UNIT;
        }).flatMap(obj -> {
            return Task$.MODULE$.apply(() -> {
                return Files.notExists(path2, new LinkOption[0]) ? Files.createDirectories(path2, new FileAttribute[0]) : BoxedUnit.UNIT;
            }).flatMap(obj -> {
                return Task$.MODULE$.apply(() -> {
                    return Files.createTempFile(path, "script-", ".py", new FileAttribute[0]);
                }).flatMap(path3 -> {
                    return Task$.MODULE$.apply(() -> {
                        return Files.write(path3, bytes, new OpenOption[0]);
                    }).flatMap(path3 -> {
                        return this.copy(path, "quix.py").flatMap(boxedUnit -> {
                            return this.copy(path, "packages.py").flatMap(boxedUnit -> {
                                return this.copy(path2, "activator.py").flatMap(boxedUnit -> {
                                    return Task$.MODULE$.apply(() -> {
                                        pythonRunningProcess.file_$eq(new Some(path3));
                                    }).map(boxedUnit -> {
                                        return path3;
                                    });
                                });
                            });
                        });
                    });
                });
            });
        });
    }

    private String generateUserScript(Path path, ActiveQuery<String> activeQuery) {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(133).append("\n         |from packages import Packages\n         |packages = Packages('").append(path).append("', '").append(this.config.indexUrl()).append("', '").append(this.config.extraIndexUrl()).append("')\n         |packages.install(").append(((TraversableOnce) this.config.packages().map(str -> {
            return new StringBuilder(11).append('\'').append(str).append('\'').toString();
        }, Seq$.MODULE$.canBuildFrom())).mkString(", ")).append(")\n         |\n         |").toString())).stripMargin();
        return new StringBuilder(0).append(stripMargin).append(new StringOps(Predef$.MODULE$.augmentString("\n         |try:\n         |  from py4j.java_gateway import JavaGateway\n         |except ImportError:\n         |  import sys\n         |  print(\"mandatory py4j package is missing, installing\", file = sys.stderr)\n         |  packages.install('py4j')\n         |\n         |from quix import Quix\n         |\n         |quix = Quix()\n         |\n         |")).stripMargin()).append(this.config.additionalCode()).append(activeQuery.text()).toString();
    }

    public Task<GatewayServer> prepareGateway(PythonRunningProcess pythonRunningProcess, ActiveQuery<String> activeQuery) {
        return Task$.MODULE$.apply(() -> {
            return new PythonBridge(activeQuery.id());
        }).flatMap(pythonBridge -> {
            return Task$.MODULE$.apply(() -> {
                pythonRunningProcess.bridge_$eq(new Some(pythonBridge));
            }).flatMap(boxedUnit -> {
                return Task$.MODULE$.apply(() -> {
                    AtomicInt port = this.port();
                    return new GatewayServer(pythonBridge, port.incrementAndGet(port.incrementAndGet$default$1()));
                }).flatMap(gatewayServer -> {
                    return Task$.MODULE$.apply(() -> {
                        pythonRunningProcess.gatewayServer_$eq(new Some(gatewayServer));
                    }).map(boxedUnit -> {
                        return gatewayServer;
                    });
                });
            });
        });
    }

    public Task<BoxedUnit> run(PythonRunningProcess pythonRunningProcess, ActiveQuery<String> activeQuery, Builder<String, Batch> builder) {
        Observable create = Observable$.MODULE$.create(OverflowStrategy$Unbounded$.MODULE$, Observable$.MODULE$.create$default$2(), sync -> {
            return Task$.MODULE$.apply(() -> {
                return new NuProcessBuilder(new String[]{"python3", "-W", "ignore", pythonRunningProcess.file().getOrElse(() -> {
                    throw new IllegalStateException("No file to execute");
                }).toString(), BoxesRunTime.boxToInteger(((GatewayServer) pythonRunningProcess.gatewayServer().getOrElse(() -> {
                    throw new IllegalStateException("No running gateway");
                })).getPort()).toString(), activeQuery.id(), activeQuery.user().email()});
            }).flatMap(nuProcessBuilder -> {
                return Task$.MODULE$.apply(() -> {
                    return new PythonProcessHandler(activeQuery.id(), sync);
                }).flatMap(pythonProcessHandler -> {
                    return Task$.MODULE$.apply(() -> {
                        nuProcessBuilder.setProcessListener(pythonProcessHandler);
                    }).flatMap(boxedUnit -> {
                        return Task$.MODULE$.apply(() -> {
                            pythonRunningProcess.gatewayServer().foreach(gatewayServer -> {
                                gatewayServer.start();
                                return BoxedUnit.UNIT;
                            });
                        }).flatMap(boxedUnit -> {
                            return Task$.MODULE$.apply(() -> {
                                pythonRunningProcess.process_$eq(new Some(nuProcessBuilder.start()));
                            }).map(boxedUnit -> {
                                $anonfun$run$14(boxedUnit);
                                return BoxedUnit.UNIT;
                            });
                        });
                    });
                });
            }).runToFuture(sync.scheduler());
        });
        Observable apply = Observable$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Observable[]{Observable$.MODULE$.create(OverflowStrategy$Unbounded$.MODULE$, Observable$.MODULE$.create$default$2(), sync2 -> {
            return Task$.MODULE$.apply(() -> {
                pythonRunningProcess.bridge().foreach(pythonBridge -> {
                    pythonBridge.register(sync2);
                    return BoxedUnit.UNIT;
                });
            }).runToFuture(sync2.scheduler());
        }), create}));
        return apply.merge(Predef$.MODULE$.$conforms(), apply.merge$default$2()).takeWhileInclusive(pythonMessage -> {
            return BoxesRunTime.boxToBoolean($anonfun$run$18(activeQuery, pythonMessage));
        }).mapEval(pythonMessage2 -> {
            Task endSubQuery;
            if (pythonMessage2 instanceof ProcessStartSuccess) {
                endSubQuery = builder.start(activeQuery);
            } else if (pythonMessage2 instanceof ProcessStartFailure) {
                endSubQuery = builder.error(activeQuery.id(), ((ProcessStartFailure) pythonMessage2).t());
            } else if (pythonMessage2 instanceof ProcessEndSuccess) {
                endSubQuery = builder.end(activeQuery);
            } else if (pythonMessage2 instanceof ProcessStdout) {
                ProcessStdout processStdout = (ProcessStdout) pythonMessage2;
                endSubQuery = builder.log(processStdout.jobId(), processStdout.line(), "INFO");
            } else if (pythonMessage2 instanceof ProcessStderr) {
                ProcessStderr processStderr = (ProcessStderr) pythonMessage2;
                endSubQuery = builder.log(processStderr.jobId(), processStderr.line(), "ERROR");
            } else if (pythonMessage2 instanceof TabFields) {
                TabFields tabFields = (TabFields) pythonMessage2;
                String tabId = tabFields.tabId();
                endSubQuery = builder.startSubQuery(tabId, tabId, new Batch(Nil$.MODULE$, Option$.MODULE$.apply(tabFields.fields().map(BatchColumn$.MODULE$, Seq$.MODULE$.canBuildFrom())), Batch$.MODULE$.apply$default$3(), Batch$.MODULE$.apply$default$4()));
            } else if (pythonMessage2 instanceof TabRow) {
                TabRow tabRow = (TabRow) pythonMessage2;
                endSubQuery = builder.addSubQuery(tabRow.tabId(), new Batch(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Seq[]{tabRow.row()})), Batch$.MODULE$.apply$default$2(), Batch$.MODULE$.apply$default$3(), Batch$.MODULE$.apply$default$4()));
            } else {
                endSubQuery = pythonMessage2 instanceof TabEnd ? builder.endSubQuery(((TabEnd) pythonMessage2).tabId(), builder.endSubQuery$default$2()) : Task$.MODULE$.apply(() -> {
                    if (!this.logger().underlying().isInfoEnabled()) {
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    } else {
                        this.logger().underlying().info("method=run event=unknown-event query-id={} user={} event={}", new Object[]{activeQuery.id(), activeQuery.user().email(), pythonMessage2});
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    }
                });
            }
            return endSubQuery;
        }).lastL();
    }

    public static final /* synthetic */ void $anonfun$copy$4(Path path) {
    }

    public static final /* synthetic */ void $anonfun$run$14(BoxedUnit boxedUnit) {
    }

    public static final /* synthetic */ boolean $anonfun$run$18(ActiveQuery activeQuery, PythonMessage pythonMessage) {
        return pythonMessage instanceof ProcessEndSuccess ? false : !activeQuery.isCancelled();
    }

    public PythonExecutor(PythonConfig pythonConfig) {
        this.config = pythonConfig;
        LazyLogging.$init$(this);
        this.port = AtomicInt$.MODULE$.apply(25333);
    }
}
