package org.gorpipe.spark.redis;

import java.util.concurrent.Callable;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.gorpipe.spark.GorSparkSession;
import org.gorpipe.spark.GorSparkUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/gorpipe/spark/redis/GorSparkRedisRunner.class */
public class GorSparkRedisRunner implements Callable<String>, AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(GorSparkRedisRunner.class);
    public static GorSparkRedisRunner instance;
    private SparkSession sparkSession;
    private String redisUri;

    public GorSparkRedisRunner(GorSparkSession gorSparkSession) {
        init(gorSparkSession.getSparkSession());
    }

    public GorSparkRedisRunner() {
        init(GorSparkUtilities.getSparkSession());
    }

    public void init(SparkSession sparkSession) {
        log.info("Initializing GorSparkRedisRunner");
        sparkSession.sparkContext().setLogLevel("DEBUG");
        instance = this;
        this.sparkSession = sparkSession;
        this.redisUri = GorSparkUtilities.getSparkGorRedisUri();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public String call() throws Exception {
        log.info("Starting GorSparkRedisRunner");
        StructType structType = new StructType(new StructField[]{StructField.apply("_id", DataTypes.StringType, true, Metadata.empty()), StructField.apply("job", DataTypes.StringType, true, Metadata.empty()), StructField.apply("field", DataTypes.StringType, true, Metadata.empty()), StructField.apply("value", DataTypes.StringType, true, Metadata.empty())});
        RedisBatchConsumer redisBatchConsumer = new RedisBatchConsumer(this.sparkSession, this.redisUri);
        try {
            StreamingQuery start = this.sparkSession.readStream().format("redis").option("stream.keys", "resque").schema(structType).load().writeStream().outputMode("update").foreachBatch(redisBatchConsumer).start();
            log.info("GorSparkRedisRunner is running");
            start.awaitTermination();
            redisBatchConsumer.close();
            log.info("GorSparkRedisRunner has stopped");
            return "";
        } catch (Throwable th) {
            try {
                redisBatchConsumer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    public static void main(String[] strArr) {
        try {
            GorSparkRedisRunner gorSparkRedisRunner = new GorSparkRedisRunner();
            try {
                gorSparkRedisRunner.call();
                gorSparkRedisRunner.close();
            } finally {
            }
        } catch (Exception e) {
            log.error("Error running GorSparkRedisRunner", e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        log.info("Closing spark session");
        if (this.sparkSession != null) {
            this.sparkSession.close();
        }
    }
}
