package org.gorpipe.spark;

import io.projectglow.GlowBase;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.spark.SparkConf;
import org.apache.spark.api.python.Py4JServer;
import org.apache.spark.ml.linalg.SQLDataTypes;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.gorpipe.spark.udfs.CharToDoubleArray;
import org.gorpipe.spark.udfs.CommaToDoubleArray;
import org.gorpipe.spark.udfs.CommaToDoubleMatrix;
import org.gorpipe.spark.udfs.CommaToIntArray;
import org.gorpipe.util.standalone.GorStandalone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/gorpipe/spark/GorSparkUtilities.class */
public class GorSparkUtilities {
    private static SparkSession spark;
    private static Py4JServer py4jServer;
    private static Optional<Process> jupyterProcess;
    private static ExecutorService es;
    private static final Logger log = LoggerFactory.getLogger(GorSparkUtilities.class);
    private static Optional<String> jupyterPath = Optional.empty();

    private GorSparkUtilities() {
    }

    public static Py4JServer getPyServer() {
        return py4jServer;
    }

    public static int getPyServerPort() {
        if (py4jServer != null) {
            return py4jServer.getListeningPort();
        }
        return 0;
    }

    public static String getPyServerSecret() {
        return py4jServer != null ? py4jServer.secret() : "";
    }

    public static Optional<String> getJupyterPath() {
        return jupyterPath;
    }

    public static void closePySpark() {
        if (py4jServer != null) {
            py4jServer.shutdown();
        }
        jupyterProcess.ifPresent((v0) -> {
            v0.destroy();
        });
        if (es != null) {
            es.shutdown();
        }
    }

    public static void initPySpark(Optional<String> optional) {
        String str = System.getenv("PYSPARK_PIN_THREAD");
        if (py4jServer != null || str == null || str.length() <= 0) {
            return;
        }
        py4jServer = new Py4JServer(spark.sparkContext().conf());
        py4jServer.start();
        getSparkSession();
        ProcessBuilder processBuilder = new ProcessBuilder("/usr/local/bin/jupyter", "notebook", "--NotebookApp.allow_origin='https://colab.research.google.com'", "--port=8888", "--NotebookApp.port_retries=0");
        optional.ifPresent(str2 -> {
            processBuilder.directory(Paths.get(str2, new String[0]).toFile());
        });
        Map<String, String> environment = processBuilder.environment();
        environment.put("PYSPARK_GATEWAY_PORT", Integer.toString(getPyServerPort()));
        environment.put("PYSPARK_GATEWAY_SECRET", getPyServerSecret());
        environment.put("PYSPARK_PIN_THREAD", "true");
        try {
            Process start = processBuilder.start();
            jupyterProcess = Optional.of(start);
            es = Executors.newFixedThreadPool(2);
            es.submit(() -> {
                InputStream inputStream = start.getInputStream();
                try {
                    jupyterPath = new BufferedReader(new InputStreamReader(inputStream)).lines().map((v0) -> {
                        return v0.trim();
                    }).filter(str3 -> {
                        return str3.startsWith("http://localhost:8888/?token=");
                    }).findFirst();
                    if (inputStream == null) {
                        return null;
                    }
                    inputStream.close();
                    return null;
                } catch (Throwable th) {
                    if (inputStream != null) {
                        try {
                            inputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            });
            es.submit(() -> {
                InputStream errorStream = start.getErrorStream();
                try {
                    Stream<String> lines = new BufferedReader(new InputStreamReader(errorStream)).lines();
                    PrintStream printStream = System.err;
                    Objects.requireNonNull(printStream);
                    jupyterPath = lines.peek(printStream::println).map((v0) -> {
                        return v0.trim();
                    }).filter(str3 -> {
                        return str3.startsWith("http://localhost:8888/?token=");
                    }).findFirst();
                    if (errorStream == null) {
                        return null;
                    }
                    errorStream.close();
                    return null;
                } catch (Throwable th) {
                    if (errorStream != null) {
                        try {
                            errorStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            });
        } catch (IOException e) {
            log.info(e.getMessage());
            jupyterProcess = Optional.empty();
        }
    }

    private static String constructRedisUri(String str) {
        String property = System.getProperty("spark.redis.port");
        String property2 = System.getProperty("spark.redis.db");
        String str2 = str + ":" + ((property == null || property.length() <= 0) ? "6379" : property);
        return (property2 == null || property2.length() <= 0) ? str2 : str2 + "/" + property2;
    }

    public static String getSparkGorRedisUri() {
        String property = System.getProperty("spark.redis.host");
        return (property == null || property.length() <= 0) ? "" : constructRedisUri(property);
    }

    private static SparkSession newSparkSession() {
        SparkConf sparkConf = new SparkConf();
        SparkSession.Builder builder = SparkSession.builder();
        if (!sparkConf.contains("spark.master")) {
            builder = builder.master("local[*]");
        }
        SparkSession orCreate = builder.config(sparkConf).getOrCreate();
        orCreate.udf().register("chartodoublearray", new CharToDoubleArray(), DataTypes.createArrayType(DataTypes.DoubleType));
        orCreate.udf().register("todoublearray", new CommaToDoubleArray(), DataTypes.createArrayType(DataTypes.DoubleType));
        orCreate.udf().register("todoublematrix", new CommaToDoubleMatrix(), SQLDataTypes.MatrixType());
        orCreate.udf().register("tointarray", new CommaToIntArray(), DataTypes.createArrayType(DataTypes.IntegerType));
        new GlowBase().register(orCreate);
        return orCreate;
    }

    public static SparkSession getSparkSession() {
        if (spark == null) {
            if (SparkSession.getDefaultSession().isEmpty()) {
                spark = newSparkSession();
            } else {
                log.debug("SparkSession from default");
                spark = (SparkSession) SparkSession.getDefaultSession().get();
            }
            initPySpark(GorStandalone.isStandalone() ? Optional.of(GorStandalone.getStandaloneRoot()) : Optional.empty());
        }
        return spark;
    }

    public static List<Row> stream2SparkRowList(Stream<org.gorpipe.gor.model.Row> stream, StructType structType) {
        return (List) stream.map(row -> {
            return new SparkGorRow(row, structType);
        }).collect(Collectors.toList());
    }
}
