package org.gorpipe.spark;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.gson.reflect.TypeToken;
import gorsat.Utilities.StringUtilities;
import gorsat.process.SparkPipeInstance;
import io.kubernetes.client.openapi.ApiCallback;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.apis.CustomObjectsApi;
import io.kubernetes.client.openapi.models.V1DeleteOptions;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.Watch;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.spark.sql.SparkSession;
import org.gorpipe.exceptions.GorSystemException;
import org.gorpipe.gor.model.DriverBackedFileReader;
import org.gorpipe.gor.monitor.GorMonitor;
import org.gorpipe.gor.util.Util;
import org.gorpipe.spark.redis.RedisBatchConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/gorpipe/spark/SparkOperatorRunner.class */
public class SparkOperatorRunner {
    private static final Logger log = LoggerFactory.getLogger(SparkOperatorRunner.class);
    public static final String SPARKAPPLICATION_COMPLETED_STATE = "COMPLETED";
    public static final String SPARKAPPLICATION_FAILED_STATE = "FAILED";
    public static final String SPARKAPPLICATION_RUNNING_STATE = "RUNNING";
    private static final String GOR_PROJECT_MOUNT_NAME = "gorproject";
    private static final String BASE_NFS_MOUNT_POINT = "/mnt/csa/";
    CustomObjectsApi apiInstance;
    CoreV1Api core;
    ObjectMapper objectMapper;
    String jobName;
    String namespace;
    SparkSession sparkSession;
    String securityContext;
    private static final boolean debug = false;
    boolean hostMount = false;
    ApiClient client = Config.defaultClient();

    public SparkOperatorRunner(GorSparkSession gorSparkSession) throws IOException {
        Configuration.setDefaultApiClient(this.client);
        this.apiInstance = new CustomObjectsApi();
        this.core = new CoreV1Api(this.client);
        this.objectMapper = new ObjectMapper(new YAMLFactory());
        this.sparkSession = gorSparkSession.getSparkSession();
        this.securityContext = gorSparkSession.getProjectContext().getFileReader().getSecurityContext();
    }

    Map<String, Object> loadBody(String str, String str2, String str3, Map<String, Object> map) throws IOException {
        List list;
        int indexOf;
        Map<String, Object> map2 = (Map) this.objectMapper.readValue(str, Map.class);
        Map map3 = (Map) map2.get("metadata");
        this.jobName = map3.get("name").toString();
        if (map2.containsKey("yaml")) {
            this.objectMapper = new ObjectMapper(new YAMLFactory());
            Path resolve = Paths.get(str2, new String[debug]).resolve(Paths.get(map2.get("yaml").toString(), new String[debug]));
            DriverBackedFileReader driverBackedFileReader = new DriverBackedFileReader(this.securityContext, str2, (Object[]) null);
            Map map4 = (Map) this.objectMapper.readValue((String) driverBackedFileReader.readFile(driverBackedFileReader.resolveUrl(resolve.toString()).getSourceReference().getUrl()).collect(Collectors.joining("\n")), Map.class);
            Map map5 = (Map) map4.get("metadata");
            String str4 = (String) map5.get("name");
            this.namespace = map5.containsKey("namespace") ? map5.get("namespace").toString() : "gorkube";
            if (str4.equals("${name.val}")) {
                map5.put("name", this.jobName);
            }
            List list2 = (List) ((Map) map2.get("spec")).get("arguments");
            Map map6 = (Map) map4.get("spec");
            if (map6.get("arguments").toString().equals("${arguments.val}")) {
                map6.put("arguments", list2);
            }
            if (str4.equals("${name.val}")) {
                map5.put("name", this.jobName);
            }
            String substring = this.objectMapper.writeValueAsString(map4).substring(4);
            for (Map.Entry<String, Object> entry : map.entrySet()) {
                substring = substring.replace("${" + entry.getKey() + ".val}", entry.getValue().toString());
            }
            map2 = (Map) this.objectMapper.readValue(substring, Map.class);
        } else {
            this.namespace = map3.containsKey("namespace") ? map3.get("namespace").toString() : "gorkube";
        }
        if (map2.containsKey("spec")) {
            Map map7 = (Map) map2.get("spec");
            if (map7.containsKey("arguments") && (indexOf = (list = (List) map7.get("arguments")).indexOf("#{result_dir}")) != -1) {
                list.set(indexOf, str3);
            }
            if (map7.containsKey("executor")) {
                Map map8 = (Map) map7.get("executor");
                Object obj = map8.get("cores");
                if (obj instanceof String) {
                    map8.put("cores", Integer.valueOf(Integer.parseInt(obj.toString())));
                }
                Object obj2 = map8.get("instances");
                if (obj2 instanceof String) {
                    map8.put("instances", Integer.valueOf(Integer.parseInt(obj2.toString())));
                }
            }
        }
        return map2;
    }

    public String getSparkApplicationState(String str) throws ApiException {
        try {
            Map map = (Map) ((Map) this.apiInstance.getNamespacedCustomObject("sparkoperator.k8s.io", "v1beta2", this.namespace, "sparkapplications", str)).get("status");
            return map != null ? (String) ((Map) map.get("applicationState")).get("state") : "";
        } catch (ApiException e) {
            if (e.getMessage().contains("Not Found")) {
                return "";
            }
            throw e;
        }
    }

    public void deleteSparkApplication(String str) throws ApiException {
        this.apiInstance.deleteNamespacedCustomObject("sparkoperator.k8s.io", "v1beta2", this.namespace, "sparkapplications", str, (Integer) null, (Boolean) null, (String) null, (String) null, new V1DeleteOptions());
    }

    public boolean waitForSparkApplicationToComplete(GorMonitor gorMonitor, String str) throws ApiException, InterruptedException {
        String sparkApplicationState = getSparkApplicationState(str);
        while (true) {
            String str2 = sparkApplicationState;
            if (str2.equals(SPARKAPPLICATION_COMPLETED_STATE)) {
                return true;
            }
            if (str2.equals(SPARKAPPLICATION_FAILED_STATE)) {
                throw new GorSystemException(str2, (Throwable) null);
            }
            if (gorMonitor != null && gorMonitor.isCancelled()) {
                deleteSparkApplication(str);
                return false;
            }
            Thread.sleep(1000L);
            sparkApplicationState = getSparkApplicationState(str);
        }
    }

    /* JADX WARN: Type inference failed for: r2v2, types: [org.gorpipe.spark.SparkOperatorRunner$1] */
    void waitSparkApplicationState(GorMonitor gorMonitor, String str, String str2) throws ApiException {
        try {
            Watch createWatch = Watch.createWatch(this.client, this.core.listNamespacedPodCall(this.namespace, (String) null, (Boolean) null, (String) null, (String) null, (String) null, (Integer) null, (String) null, (String) null, 120, (Boolean) null, (ApiCallback) null), new TypeToken<Watch.Response<V1Pod>>() { // from class: org.gorpipe.spark.SparkOperatorRunner.1
            }.getType());
            try {
                Iterator it = createWatch.iterator();
                while (it.hasNext()) {
                    Watch.Response response = (Watch.Response) it.next();
                    if (response.type != null && response.type.equals("MODIFIED") && response.object != null && ((V1Pod) response.object).getStatus() != null) {
                        String phase = ((V1Pod) response.object).getStatus().getPhase();
                        if (str2.equals(phase)) {
                            break;
                        } else if ("Failed".equals(phase) || "Error".equals(phase)) {
                            throw new GorSystemException(((V1Pod) response.object).toString(), (Throwable) null);
                        }
                    }
                    if (gorMonitor != null && gorMonitor.isCancelled()) {
                        deleteSparkApplication(str);
                    }
                }
                if (createWatch != null) {
                    createWatch.close();
                }
            } finally {
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    public void createSparkApplicationFromJson(String str) throws ApiException, JsonProcessingException {
        createSparkApplication(new ObjectMapper(), str);
    }

    public void createSparkApplicationFromYaml(String str) throws ApiException, JsonProcessingException {
        createSparkApplication(new ObjectMapper(new YAMLFactory()), str);
    }

    public void createSparkApplication(ObjectMapper objectMapper, String str) throws ApiException, JsonProcessingException {
        this.apiInstance.createNamespacedCustomObject("sparkoperator.k8s.io", "v1beta2", this.namespace, "sparkapplications", objectMapper.readValue(str, Object.class), "true", (String) null, (String) null);
    }

    /* JADX WARN: Finally extract failed */
    public static String getSparkOperatorYaml(String str) throws IOException {
        String str2 = debug;
        try {
            Path path = Paths.get(str, new String[debug]);
            if (Files.exists(path, new LinkOption[debug])) {
                Path resolve = path.resolve("config/sparkoperator.yaml");
                if (Files.exists(resolve, new LinkOption[debug])) {
                    str2 = new String(Files.readAllBytes(resolve));
                }
            }
            if (str2 == null) {
                str2 = Util.readAndCloseStream(SparkPipeInstance.class.getResourceAsStream("sparkoperator.yaml"));
            }
            return str2;
        } catch (Throwable th) {
            if (str2 == null) {
                Util.readAndCloseStream(SparkPipeInstance.class.getResourceAsStream("sparkoperator.yaml"));
            }
            throw th;
        }
    }

    public void runQueryHandler(String str, String str2, String str3, Path path, GorMonitor gorMonitor, String[] strArr, String[] strArr2, String[] strArr3, String[] strArr4, String[] strArr5) throws IOException, ApiException, InterruptedException {
        runSparkOperator(gorMonitor, str, path, new String[]{str2, str3, path.toString(), String.join(";;", strArr), String.join(";", strArr2), String.join(";", strArr4), String.join(";", strArr3)}, strArr5);
    }

    public void runSparkOperator(GorMonitor gorMonitor, String str, Path path, String[] strArr, String[] strArr2) throws IOException, ApiException, InterruptedException {
        SparkOperatorSpecs sparkOperatorSpecs = new SparkOperatorSpecs();
        ArrayList arrayList = new ArrayList();
        arrayList.add(Map.of("name", GOR_PROJECT_MOUNT_NAME, "mountPath", path));
        sparkOperatorSpecs.addConfig("spec.executor.volumeMounts", arrayList);
        sparkOperatorSpecs.addConfig("spec.driver.volumeMounts", arrayList);
        Path path2 = Paths.get(BASE_NFS_MOUNT_POINT, new String[debug]);
        Path absolutePath = path.toRealPath(new LinkOption[debug]).toAbsolutePath();
        Path relativize = path2.relativize(absolutePath);
        if (this.hostMount) {
            ArrayList arrayList2 = new ArrayList();
            arrayList2.add(Map.of("name", GOR_PROJECT_MOUNT_NAME, "hostPath", Map.of("path", path, "type", "Directory")));
            sparkOperatorSpecs.addConfig("spec.volumes", arrayList2);
            String path3 = absolutePath.toString();
            sparkOperatorSpecs.addDriverHostPath(GOR_PROJECT_MOUNT_NAME, path3, path3, null, false);
            sparkOperatorSpecs.addExecutorHostPath(GOR_PROJECT_MOUNT_NAME, path3, path3, null, false);
        } else {
            ArrayList arrayList3 = new ArrayList();
            arrayList3.add(Map.of("name", GOR_PROJECT_MOUNT_NAME, "persistentVolumeClaim", Map.of("claimName", "pvc-gor-nfs-v2")));
            sparkOperatorSpecs.addConfig("spec.volumes", arrayList3);
            sparkOperatorSpecs.addDriverVolumeClaim(GOR_PROJECT_MOUNT_NAME, "pvc-gor-nfs-v2", absolutePath.toString(), relativize.toString(), false);
            sparkOperatorSpecs.addExecutorVolumeClaim(GOR_PROJECT_MOUNT_NAME, "pvc-gor-nfs-v2", absolutePath.toString(), relativize.toString(), false);
        }
        sparkOperatorSpecs.addConfig("spec.arguments", Arrays.asList(strArr));
        sparkOperatorSpecs.addConfig("metadata.name", str);
        int length = strArr2.length;
        for (int i = debug; i < length; i++) {
            String[] split = strArr2[i].split("=");
            try {
                sparkOperatorSpecs.addConfig(split[debug], Integer.valueOf(Integer.parseInt(split[1])));
            } catch (NumberFormatException e) {
                sparkOperatorSpecs.addConfig(split[debug], split[1]);
            }
        }
        runYaml(getSparkOperatorYaml(path.toString()), path.toString(), sparkOperatorSpecs);
        waitForSparkApplicationToComplete(gorMonitor, str);
    }

    public Path run(String str, String str2, String str3, GorMonitor gorMonitor, String[] strArr, String[] strArr2, String str4) throws IOException, ApiException, InterruptedException {
        Path path;
        String str5 = strArr2[debug];
        int indexOf = str5.indexOf(" -j ");
        String str6 = debug;
        if (indexOf > 0) {
            int indexOf2 = str5.indexOf(32, indexOf + 4);
            str6 = str5.substring(indexOf + 4, indexOf2).trim();
            str5 = str5.substring(debug, indexOf) + str5.substring(indexOf2);
        }
        String str7 = strArr.length > 1 ? String.join(";", (CharSequence[]) Arrays.copyOfRange(strArr, debug, strArr.length - 1)) + ";" + str5 : str5;
        String createMD5 = StringUtilities.createMD5(str7);
        Path path2 = Paths.get(str3, new String[debug]);
        if (str4 == null) {
            path = path2.resolve("result_cache").resolve(createMD5 + ".parquet");
            str4 = path.toAbsolutePath().normalize().toString();
        } else {
            path = Paths.get(str4, new String[debug]);
        }
        if (str6 == null) {
            str6 = createMD5;
        }
        String[] strArr3 = {str, str2, str3, str7, createMD5, str4, str6};
        if (!Files.exists(path, new LinkOption[debug])) {
            runSparkOperator(gorMonitor, "gorquery-" + str6, path2, strArr3, strArr2[1].split(" "));
        }
        return path;
    }

    private void runLocal(SparkSession sparkSession, String[] strArr) {
        String str = strArr[debug];
        String str2 = strArr[1];
        String str3 = strArr[2];
        String str4 = strArr[3];
        String str5 = strArr[4];
        String str6 = strArr[5];
        String str7 = strArr[6];
        try {
            RedisBatchConsumer redisBatchConsumer = new RedisBatchConsumer(sparkSession, str);
            try {
                Iterator<Future<List<String>>> it = redisBatchConsumer.runJobBatch(Collections.singletonList(new String[]{str4, str5, str3, str2, str7, str6})).values().iterator();
                while (it.hasNext()) {
                    it.next().get();
                }
                redisBatchConsumer.close();
            } finally {
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new GorSystemException(e);
        }
    }

    public void runYaml(String str, String str2, SparkOperatorSpecs sparkOperatorSpecs) throws IOException, ApiException {
        if (str2 == null || str2.length() == 0) {
            str2 = Paths.get(".", new String[debug]).toAbsolutePath().normalize().toString();
        }
        Map<String, Object> loadBody = loadBody(str, str2, "", new HashMap());
        sparkOperatorSpecs.apply(loadBody);
        this.apiInstance.createNamespacedCustomObject("sparkoperator.k8s.io", "v1beta2", this.namespace, "sparkapplications", loadBody, "true", (String) null, (String) null);
    }
}
