package gorsat.commands;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.spark.api.python.Py4JServer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.gorpipe.exceptions.GorResourceException;
import org.gorpipe.exceptions.GorSystemException;
import org.gorpipe.spark.GorSparkUtilities;

/* loaded from: input_file:gorsat/Commands/PysparkAnalysis.class */
public class PysparkAnalysis implements AutoCloseable {
    public static Map<String, PysparkAnalysis> datasetMap = new HashMap();
    ExecutorService es;
    Process pythonProcess;
    Dataset<? extends Row> ds;
    List<String> cmds = new ArrayList();
    CountDownLatch latch = new CountDownLatch(1);
    ByteArrayOutputStream baOutput = new ByteArrayOutputStream();
    ByteArrayOutputStream baError = new ByteArrayOutputStream();

    public Dataset<Row> getDataset() {
        return this.ds;
    }

    public void setDataset(Dataset<Row> dataset) {
        this.ds = dataset;
    }

    public void markDone() throws InterruptedException {
        this.latch.countDown();
        this.latch = new CountDownLatch(1);
        this.latch.await();
    }

    public void waitFor() throws InterruptedException {
        this.latch.await();
    }

    public Dataset<Row> pyspark(String str, Dataset<? extends Row> dataset, String str2) throws IOException, InterruptedException {
        datasetMap.put(str, this);
        this.ds = dataset;
        GorSparkUtilities.getSparkSession();
        Py4JServer initPy4jServer = GorSparkUtilities.initPy4jServer();
        String[] split = str2.trim().split(" ");
        String str3 = System.getenv("PYSPARK_PYTHON");
        this.cmds.add(str3 != null ? str3 : "python3");
        this.cmds.add(split[0]);
        this.cmds.add(str);
        for (int i = 1; i < split.length; i++) {
            this.cmds.add(split[i]);
        }
        ProcessBuilder processBuilder = new ProcessBuilder(this.cmds);
        Map<String, String> environment = processBuilder.environment();
        String num = Integer.toString(initPy4jServer.getListeningPort());
        String secret = initPy4jServer.secret();
        environment.put("PYSPARK_GATEWAY_PORT", num);
        environment.put("PYSPARK_GATEWAY_SECRET", secret);
        environment.put("PYSPARK_PIN_THREAD", "true");
        this.pythonProcess = processBuilder.start();
        this.es = Executors.newFixedThreadPool(2);
        this.es.submit(() -> {
            return Long.valueOf(this.pythonProcess.getErrorStream().transferTo(this.baOutput));
        });
        this.es.submit(() -> {
            return Long.valueOf(this.pythonProcess.getInputStream().transferTo(this.baError));
        });
        waitFor();
        return getDataset();
    }

    public String cmdString() {
        return String.join(" ", this.cmds);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.pythonProcess != null) {
            try {
                this.latch.countDown();
                int waitFor = this.pythonProcess.waitFor();
                if (waitFor != 0) {
                    throw new GorResourceException("Non zero exit code " + waitFor + "\n" + this.baOutput + "\n" + this.baError, cmdString());
                }
            } catch (InterruptedException e) {
                throw new GorSystemException(e);
            }
        }
        if (this.es != null) {
            this.es.shutdown();
        }
    }
}
