package org.gorpipe.spark.redis;

import com.fasterxml.jackson.databind.ObjectMapper;
import gorsat.Commands.CommandParseUtilities;
import gorsat.Commands.Processor;
import gorsat.process.GenericGorRunner;
import gorsat.process.PipeInstance;
import gorsat.process.PipeOptions;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.gorpipe.exceptions.ExceptionUtilities;
import org.gorpipe.gor.session.GorContext;
import org.gorpipe.gor.session.ProjectContext;
import org.gorpipe.gor.session.SystemContext;
import org.gorpipe.model.gor.iterators.RowSource;
import org.gorpipe.spark.GorQueryRDD;
import org.gorpipe.spark.GorSparkSession;
import org.gorpipe.spark.GorSparkUtilities;
import org.gorpipe.spark.platform.GorQuery;
import org.gorpipe.spark.platform.JedisURIHelper;
import org.gorpipe.spark.platform.SharedRedisPools;
import py4j.Base64;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/* loaded from: input_file:org/gorpipe/spark/redis/GorSparkRedisRunner.class */
public class GorSparkRedisRunner implements Callable<String> {
    public static GorSparkRedisRunner instance;
    private SparkSession sparkSession;
    private String redisUri;
    private JedisPool jedisPool;
    private Map<String, Future<List<String>>> futureActionSet;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/gorpipe/spark/redis/GorSparkRedisRunner$MonitorThread.class */
    public class MonitorThread implements Runnable {
        boolean running = true;

        MonitorThread() {
        }

        public void stopRunning() {
            this.running = false;
        }

        @Override // java.lang.Runnable
        public void run() {
            String str = null;
            while (this.running) {
                try {
                    for (String str2 : GorSparkRedisRunner.this.futureActionSet.keySet()) {
                        Future future = (Future) GorSparkRedisRunner.this.futureActionSet.get(str2);
                        String[] split = str2.split(",");
                        try {
                            str = str2;
                            GorSparkRedisRunner.this.setValues(split, "result", (String[]) ((List) future.get(500L, TimeUnit.MILLISECONDS)).stream().map(str3 -> {
                                return str3.split("\t");
                            }).map(strArr -> {
                                return strArr[2];
                            }).toArray(i -> {
                                return new String[i];
                            }));
                            GorSparkRedisRunner.this.setValue(split, "status", "DONE");
                            break;
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (ExecutionException e2) {
                            str = str2;
                            GorSparkRedisRunner.this.setValue(split, "error", ExceptionUtilities.gorExceptionToJson(e2.getCause()));
                            GorSparkRedisRunner.this.setValue(split, "status", "FAILED");
                        } catch (TimeoutException e3) {
                        }
                    }
                    if (str != null) {
                        GorSparkRedisRunner.this.futureActionSet.remove(str);
                        str = null;
                    }
                    if (GorSparkRedisRunner.this.futureActionSet.isEmpty()) {
                        Thread.sleep(500L);
                    }
                } catch (InterruptedException e4) {
                    e4.printStackTrace();
                    return;
                }
            }
        }
    }

    /* loaded from: input_file:org/gorpipe/spark/redis/GorSparkRedisRunner$SparkGorQuery.class */
    class SparkGorQuery implements Callable<List<String>> {
        GenericGorRunner genericGorRunner = new GenericGorRunner();
        RowSource iterator;
        Processor processor;
        String cachefile;

        public SparkGorQuery(RowSource rowSource, Processor processor, String str) {
            this.iterator = rowSource;
            this.processor = processor;
            this.cachefile = str;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public List<String> call() throws Exception {
            this.genericGorRunner.run(this.iterator, this.processor);
            return Collections.singletonList("a\tb\t" + this.cachefile);
        }
    }

    public GorSparkRedisRunner(GorSparkSession gorSparkSession) {
        init(gorSparkSession.getSparkSession());
    }

    public GorSparkRedisRunner() {
        init(GorSparkUtilities.getSparkSession());
    }

    public void init(SparkSession sparkSession) {
        instance = this;
        this.sparkSession = sparkSession;
        this.redisUri = GorSparkUtilities.getSparkGorRedisUri();
        this.futureActionSet = new ConcurrentHashMap();
        try {
            this.jedisPool = SharedRedisPools.getJedisPool(JedisURIHelper.create(this.redisUri));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public String call() throws Exception {
        ExecutorService newWorkStealingPool = Executors.newWorkStealingPool(4);
        MonitorThread monitorThread = new MonitorThread();
        newWorkStealingPool.submit(monitorThread);
        GorSparkSession gorSparkSession = new GorSparkSession("");
        gorSparkSession.setSparkSession(this.sparkSession);
        gorSparkSession.redisUri_$eq(this.redisUri);
        SystemContext build = new SystemContext.Builder().setServer(true).setStartTime(System.currentTimeMillis()).build();
        this.sparkSession.readStream().format("redis").option("stream.keys", "resque").schema(new StructType(new StructField[]{StructField.apply("_id", DataTypes.StringType, true, Metadata.empty()), StructField.apply("job", DataTypes.StringType, true, Metadata.empty()), StructField.apply("field", DataTypes.StringType, true, Metadata.empty()), StructField.apply("value", DataTypes.StringType, true, Metadata.empty())})).load().writeStream().outputMode("update").foreachBatch((dataset, l) -> {
            List list = (List) dataset.collectAsList().stream().filter(row -> {
                return row.getString(2).equals("payload");
            }).map(row2 -> {
                String str;
                String string = row2.getString(1);
                String string2 = row2.getString(3);
                try {
                    Map map = (Map) new ObjectMapper().readValue(string2.substring(1, string2.length() - 1), Map.class);
                    String str2 = (String) map.get("query");
                    String str3 = (String) map.get("fingerprint");
                    String str4 = (String) map.get("projectRoot");
                    String str5 = (String) map.get(GorQuery.REQUEST_ID_FIELD);
                    String str6 = new String(Base64.decode(str2));
                    String str7 = "result_cache/" + str3 + CommandParseUtilities.getExtensionForQuery(str6, false);
                    if (map.containsKey("cachefile") && (str = (String) map.get("cachefile")) != null) {
                        str7 = str;
                    }
                    return new String[]{str6, str3, str4, str5, string, str7};
                } catch (IOException e) {
                    e.printStackTrace();
                    return new String[0];
                }
            }).collect(Collectors.toList());
            Optional findFirst = list.stream().map(strArr -> {
                return strArr[2];
            }).findFirst();
            if (findFirst.isPresent()) {
                String str = (String) findFirst.get();
                String[] strArr2 = (String[]) list.stream().map(strArr3 -> {
                    return strArr3[0];
                }).toArray(i -> {
                    return new String[i];
                });
                String[] strArr4 = (String[]) list.stream().map(strArr5 -> {
                    return strArr5[1];
                }).toArray(i2 -> {
                    return new String[i2];
                });
                String[] strArr6 = (String[]) list.stream().map(strArr7 -> {
                    return strArr7[5];
                }).toArray(i3 -> {
                    return new String[i3];
                });
                String[] strArr8 = (String[]) list.stream().map(strArr9 -> {
                    return strArr9[4];
                }).toArray(i4 -> {
                    return new String[i4];
                });
                String str2 = (String) list.stream().map(strArr10 -> {
                    return strArr10[4];
                }).collect(Collectors.joining(","));
                setValue(strArr8, "status", "RUNNING");
                TreeSet treeSet = new TreeSet();
                for (int i5 = 0; i5 < strArr2.length; i5++) {
                    String str3 = strArr2[i5];
                    String upperCase = str3.toUpperCase();
                    if (upperCase.startsWith("SELECT ") || upperCase.startsWith("SPARK ") || upperCase.startsWith("GORSPARK ") || upperCase.startsWith("NORSPARK ")) {
                        String str4 = strArr8[i5];
                        String substring = str4.substring(str4.lastIndexOf(58) + 1);
                        int indexOf = str3.indexOf(32);
                        String str5 = str3.substring(0, indexOf + 1) + "-j " + substring + str3.substring(indexOf);
                        new PipeOptions().parseOptions(new String[]{str5, "-queryhandler", "spark"});
                        gorSparkSession.init(new ProjectContext.Builder().setRoot(str).setCacheDir("result_cache").setConfigFile((String) null).build(), build, null);
                        PipeInstance pipeInstance = new PipeInstance(new GorContext(gorSparkSession));
                        String str6 = strArr6[i5];
                        pipeInstance.init(str5, false, "");
                        pipeInstance.theInputSource().pushdownWrite(str6);
                        this.futureActionSet.put(str4, newWorkStealingPool.submit(new SparkGorQuery(pipeInstance.getIterator(), pipeInstance.getPipeStep(), str6)));
                    } else {
                        treeSet.add(Integer.valueOf(i5));
                    }
                }
                if (treeSet.size() > 0) {
                    String[] strArr11 = new String[treeSet.size()];
                    String[] strArr12 = new String[treeSet.size()];
                    String[] strArr13 = new String[treeSet.size()];
                    String[] strArr14 = new String[treeSet.size()];
                    int i6 = 0;
                    Iterator it = treeSet.iterator();
                    while (it.hasNext()) {
                        int intValue = ((Integer) it.next()).intValue();
                        strArr11[i6] = strArr2[intValue];
                        strArr12[i6] = strArr4[intValue];
                        strArr14[i6] = strArr8[intValue].substring(strArr8[intValue].lastIndexOf(58) + 1);
                        strArr13[i6] = strArr6[intValue];
                        i6++;
                    }
                    this.futureActionSet.put(str2, new GorQueryRDD(this.sparkSession, strArr11, strArr12, strArr13, str, "result_cache", gorSparkSession.getProjectContext() != null ? gorSparkSession.getProjectContext().getGorConfigFile() : null, gorSparkSession.getProjectContext() != null ? gorSparkSession.getProjectContext().getGorAliasFile() : null, strArr14, this.redisUri).toJavaRDD().collectAsync());
                }
            }
        }).start().awaitTermination();
        monitorThread.stopRunning();
        newWorkStealingPool.shutdown();
        return "";
    }

    public void setValue(String[] strArr, String str, String str2) {
        Jedis resource = this.jedisPool.getResource();
        try {
            for (String str3 : strArr) {
                resource.hset(str3, str, str2);
                resource.expire(str3, (int) getJobExpiration().getSeconds());
            }
            if (resource != null) {
                resource.close();
            }
        } catch (Throwable th) {
            if (resource != null) {
                try {
                    resource.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void setValues(String[] strArr, String str, String[] strArr2) {
        Jedis resource = this.jedisPool.getResource();
        for (int i = 0; i < strArr.length; i++) {
            try {
                String str2 = strArr[i];
                resource.hset(str2, str, strArr2[i]);
                resource.expire(str2, (int) getJobExpiration().getSeconds());
            } catch (Throwable th) {
                if (resource != null) {
                    try {
                        resource.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (resource != null) {
            resource.close();
        }
    }

    public Duration getJobExpiration() {
        return Duration.ofMinutes(20L);
    }

    public static void main(String[] strArr) {
        try {
            new GorSparkRedisRunner().call();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 400222347:
                if (implMethodName.equals("lambda$call$6c3f590a$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/VoidFunction2") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/gorpipe/spark/redis/GorSparkRedisRunner") && serializedLambda.getImplMethodSignature().equals("(Lorg/gorpipe/spark/GorSparkSession;Lorg/gorpipe/gor/session/SystemContext;Ljava/util/concurrent/ExecutorService;Lorg/apache/spark/sql/Dataset;Ljava/lang/Long;)V")) {
                    GorSparkRedisRunner gorSparkRedisRunner = (GorSparkRedisRunner) serializedLambda.getCapturedArg(0);
                    GorSparkSession gorSparkSession = (GorSparkSession) serializedLambda.getCapturedArg(1);
                    SystemContext systemContext = (SystemContext) serializedLambda.getCapturedArg(2);
                    ExecutorService executorService = (ExecutorService) serializedLambda.getCapturedArg(3);
                    return (dataset, l) -> {
                        List list = (List) dataset.collectAsList().stream().filter(row -> {
                            return row.getString(2).equals("payload");
                        }).map(row2 -> {
                            String str;
                            String string = row2.getString(1);
                            String string2 = row2.getString(3);
                            try {
                                Map map = (Map) new ObjectMapper().readValue(string2.substring(1, string2.length() - 1), Map.class);
                                String str2 = (String) map.get("query");
                                String str3 = (String) map.get("fingerprint");
                                String str4 = (String) map.get("projectRoot");
                                String str5 = (String) map.get(GorQuery.REQUEST_ID_FIELD);
                                String str6 = new String(Base64.decode(str2));
                                String str7 = "result_cache/" + str3 + CommandParseUtilities.getExtensionForQuery(str6, false);
                                if (map.containsKey("cachefile") && (str = (String) map.get("cachefile")) != null) {
                                    str7 = str;
                                }
                                return new String[]{str6, str3, str4, str5, string, str7};
                            } catch (IOException e) {
                                e.printStackTrace();
                                return new String[0];
                            }
                        }).collect(Collectors.toList());
                        Optional findFirst = list.stream().map(strArr -> {
                            return strArr[2];
                        }).findFirst();
                        if (findFirst.isPresent()) {
                            String str = (String) findFirst.get();
                            String[] strArr2 = (String[]) list.stream().map(strArr3 -> {
                                return strArr3[0];
                            }).toArray(i -> {
                                return new String[i];
                            });
                            String[] strArr4 = (String[]) list.stream().map(strArr5 -> {
                                return strArr5[1];
                            }).toArray(i2 -> {
                                return new String[i2];
                            });
                            String[] strArr6 = (String[]) list.stream().map(strArr7 -> {
                                return strArr7[5];
                            }).toArray(i3 -> {
                                return new String[i3];
                            });
                            String[] strArr8 = (String[]) list.stream().map(strArr9 -> {
                                return strArr9[4];
                            }).toArray(i4 -> {
                                return new String[i4];
                            });
                            String str2 = (String) list.stream().map(strArr10 -> {
                                return strArr10[4];
                            }).collect(Collectors.joining(","));
                            setValue(strArr8, "status", "RUNNING");
                            TreeSet treeSet = new TreeSet();
                            for (int i5 = 0; i5 < strArr2.length; i5++) {
                                String str3 = strArr2[i5];
                                String upperCase = str3.toUpperCase();
                                if (upperCase.startsWith("SELECT ") || upperCase.startsWith("SPARK ") || upperCase.startsWith("GORSPARK ") || upperCase.startsWith("NORSPARK ")) {
                                    String str4 = strArr8[i5];
                                    String substring = str4.substring(str4.lastIndexOf(58) + 1);
                                    int indexOf = str3.indexOf(32);
                                    String str5 = str3.substring(0, indexOf + 1) + "-j " + substring + str3.substring(indexOf);
                                    new PipeOptions().parseOptions(new String[]{str5, "-queryhandler", "spark"});
                                    gorSparkSession.init(new ProjectContext.Builder().setRoot(str).setCacheDir("result_cache").setConfigFile((String) null).build(), systemContext, null);
                                    PipeInstance pipeInstance = new PipeInstance(new GorContext(gorSparkSession));
                                    String str6 = strArr6[i5];
                                    pipeInstance.init(str5, false, "");
                                    pipeInstance.theInputSource().pushdownWrite(str6);
                                    this.futureActionSet.put(str4, executorService.submit(new SparkGorQuery(pipeInstance.getIterator(), pipeInstance.getPipeStep(), str6)));
                                } else {
                                    treeSet.add(Integer.valueOf(i5));
                                }
                            }
                            if (treeSet.size() > 0) {
                                String[] strArr11 = new String[treeSet.size()];
                                String[] strArr12 = new String[treeSet.size()];
                                String[] strArr13 = new String[treeSet.size()];
                                String[] strArr14 = new String[treeSet.size()];
                                int i6 = 0;
                                Iterator it = treeSet.iterator();
                                while (it.hasNext()) {
                                    int intValue = ((Integer) it.next()).intValue();
                                    strArr11[i6] = strArr2[intValue];
                                    strArr12[i6] = strArr4[intValue];
                                    strArr14[i6] = strArr8[intValue].substring(strArr8[intValue].lastIndexOf(58) + 1);
                                    strArr13[i6] = strArr6[intValue];
                                    i6++;
                                }
                                this.futureActionSet.put(str2, new GorQueryRDD(this.sparkSession, strArr11, strArr12, strArr13, str, "result_cache", gorSparkSession.getProjectContext() != null ? gorSparkSession.getProjectContext().getGorConfigFile() : null, gorSparkSession.getProjectContext() != null ? gorSparkSession.getProjectContext().getGorAliasFile() : null, strArr14, this.redisUri).toJavaRDD().collectAsync());
                            }
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
