package ai.chronon.spark;

import ai.chronon.api.Source;
import ai.chronon.api.ThriftJsonCodec$;
import ai.chronon.online.Api;
import ai.chronon.spark.streaming.JoinSourceRunner;
import ai.chronon.spark.streaming.TopicChecker$;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.apache.commons.io.FileUtils;
import org.apache.http.cookie.ClientCookie;
import org.apache.spark.SparkFiles$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryListener;
import org.apache.thrift.protocol.TMultiplexedProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.ArrayOps$;
import scala.collection.immutable.Seq;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ManifestFactory$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyRef;
import scala.runtime.ScalaRunTime$;

/* compiled from: Driver.scala */
/* loaded from: input_file:ai/chronon/spark/Driver$GroupByStreaming$.class */
public class Driver$GroupByStreaming$ {
    public static final Driver$GroupByStreaming$ MODULE$ = new Driver$GroupByStreaming$();
    private static transient Logger logger;
    private static volatile transient boolean bitmap$trans$0;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8 */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!bitmap$trans$0) {
                logger = LoggerFactory.getLogger(getClass());
                r0 = 1;
                bitmap$trans$0 = true;
            }
        }
        return logger;
    }

    public Logger logger() {
        return !bitmap$trans$0 ? logger$lzycompute() : logger;
    }

    public Dataset<Row> dataStream(SparkSession sparkSession, String str, String str2) {
        TopicChecker$.MODULE$.topicShouldExist(str2, str);
        sparkSession.streams().addListener(new StreamingQueryListener() { // from class: ai.chronon.spark.Driver$GroupByStreaming$$anon$1
            public void onQueryStarted(StreamingQueryListener.QueryStartedEvent queryStartedEvent) {
                Driver$GroupByStreaming$.MODULE$.logger().info(new StringBuilder(15).append("Query started: ").append(queryStartedEvent.id()).toString());
            }

            public void onQueryTerminated(StreamingQueryListener.QueryTerminatedEvent queryTerminatedEvent) {
                Driver$GroupByStreaming$.MODULE$.logger().info(new StringBuilder(18).append("Query terminated: ").append(queryTerminatedEvent.id()).toString());
            }

            public void onQueryProgress(StreamingQueryListener.QueryProgressEvent queryProgressEvent) {
                Driver$GroupByStreaming$.MODULE$.logger().info(new StringBuilder(21).append("Query made progress: ").append(queryProgressEvent.progress()).toString());
            }
        });
        return sparkSession.readStream().format("kafka").option("kafka.bootstrap.servers", str).option("subscribe", str2).option("enable.auto.commit", "true").load().selectExpr(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"value"}));
    }

    public Option<String> findFile(String str) {
        String str2 = (String) ArrayOps$.MODULE$.last$extension(Predef$.MODULE$.refArrayOps(str.split("/")));
        Seq map = package$.MODULE$.Seq().apply2((Seq) ScalaRunTime$.MODULE$.wrapRefArray(new String[]{str, str2, SparkFiles$.MODULE$.get(str2)})).map(str3 -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str3), BoxesRunTime.boxToBoolean(new File(str3).exists()));
        });
        logger().info(new StringBuilder(17).append("File Statuses:\n  ").append(map.map(tuple2 -> {
            String str4;
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            String str5 = (String) tuple2.mo1989_1();
            if (tuple2._2$mcZ$sp()) {
                str4 = new StringBuilder(7).append("exists ").append(FileUtils.byteCountToDisplaySize(Files.size(Paths.get(str5, new String[0])))).toString();
            } else {
                str4 = "is not found";
            }
            return new StringBuilder(1).append(str5).append(" ").append(str4).toString();
        }).mkString("\n  ")).toString());
        return map.find(tuple22 -> {
            return BoxesRunTime.boxToBoolean($anonfun$findFile$3(tuple22));
        }).map(tuple23 -> {
            return (String) tuple23.mo1989_1();
        });
    }

    public void run(Driver$GroupByStreaming$Args driver$GroupByStreaming$Args) {
        StreamingQuery run;
        SparkSession buildStreaming = SparkSessionBuilder$.MODULE$.buildStreaming(BoxesRunTime.unboxToBoolean(driver$GroupByStreaming$Args.debug().apply()));
        ai.chronon.api.GroupBy groupBy = (ai.chronon.api.GroupBy) findFile(driver$GroupByStreaming$Args.confPath().apply()).map(str -> {
            return (ai.chronon.api.GroupBy) ThriftJsonCodec$.MODULE$.fromJsonFile(str, false, ManifestFactory$.MODULE$.classType(ai.chronon.api.GroupBy.class), ClassTag$.MODULE$.apply(ai.chronon.api.GroupBy.class));
        }).getOrElse(() -> {
            return (ai.chronon.api.GroupBy) driver$GroupByStreaming$Args.metaDataStore().getConf(driver$GroupByStreaming$Args.confPath().apply(), ManifestFactory$.MODULE$.classType(ai.chronon.api.GroupBy.class)).get();
        });
        Option<String> findFile = findFile(driver$GroupByStreaming$Args.onlineJar().apply());
        if (BoxesRunTime.unboxToBoolean(driver$GroupByStreaming$Args.debug().apply())) {
            findFile.foreach(str2 -> {
                $anonfun$run$23(buildStreaming, str2);
                return BoxedUnit.UNIT;
            });
        }
        Api impl = driver$GroupByStreaming$Args.impl(driver$GroupByStreaming$Args.serializableProps());
        if (ai.chronon.api.Extensions$.MODULE$.GroupByOps(groupBy).streamingSource().get().isSetJoinSource()) {
            run = new JoinSourceRunner(groupBy, driver$GroupByStreaming$Args.serializableProps(), BoxesRunTime.unboxToBoolean(driver$GroupByStreaming$Args.debug().apply()), BoxesRunTime.unboxToInt(driver$GroupByStreaming$Args.lagMillis().getOrElse(() -> {
                return 2000;
            })), buildStreaming, impl).chainedStreamingQuery().start();
        } else {
            LazyRef lazyRef = new LazyRef();
            LazyRef lazyRef2 = new LazyRef();
            Option<Source> streamingSource = ai.chronon.api.Extensions$.MODULE$.GroupByOps(groupBy).streamingSource();
            Predef$.MODULE$.m1938assert(streamingSource.isDefined(), () -> {
                return "There is no valid streaming source - with a valid topic, and endDate < today";
            });
            if (!driver$GroupByStreaming$Args.kafkaBootstrap().isDefined()) {
                Predef$.MODULE$.m1938assert(host$1(lazyRef, streamingSource).isDefined() && port$1(lazyRef2, streamingSource).isDefined(), () -> {
                    return "Either specify a kafkaBootstrap url or provide host and port in your topic definition as topic/host=host/port=port";
                });
            }
            ai.chronon.spark.streaming.GroupBy groupBy2 = new ai.chronon.spark.streaming.GroupBy(dataStream(buildStreaming, driver$GroupByStreaming$Args.kafkaBootstrap().getOrElse(() -> {
                return new StringBuilder(1).append(host$1(lazyRef, streamingSource).get()).append(TMultiplexedProtocol.SEPARATOR).append(port$1(lazyRef2, streamingSource).get()).toString();
            }), ai.chronon.api.Extensions$.MODULE$.SourceOps(streamingSource.get()).cleanTopic()), buildStreaming, groupBy, driver$GroupByStreaming$Args.impl(driver$GroupByStreaming$Args.serializableProps()), BoxesRunTime.unboxToBoolean(driver$GroupByStreaming$Args.debug().apply()));
            run = groupBy2.run(groupBy2.run$default$1());
        }
        run.awaitTermination();
    }

    public static final /* synthetic */ boolean $anonfun$findFile$3(Tuple2 tuple2) {
        return tuple2._2$mcZ$sp();
    }

    public static final /* synthetic */ void $anonfun$run$23(SparkSession sparkSession, String str) {
        sparkSession.sparkContext().addJar(str);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static final /* synthetic */ Option host$lzycompute$1(LazyRef lazyRef, Option option) {
        Option option2;
        synchronized (lazyRef) {
            option2 = lazyRef.initialized() ? (Option) lazyRef.value() : (Option) lazyRef.initialize(ai.chronon.api.Extensions$.MODULE$.SourceOps((Source) option.get()).topicTokens().get("host"));
        }
        return option2;
    }

    private static final Option host$1(LazyRef lazyRef, Option option) {
        return lazyRef.initialized() ? (Option) lazyRef.value() : host$lzycompute$1(lazyRef, option);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static final /* synthetic */ Option port$lzycompute$1(LazyRef lazyRef, Option option) {
        Option option2;
        synchronized (lazyRef) {
            option2 = lazyRef.initialized() ? (Option) lazyRef.value() : (Option) lazyRef.initialize(ai.chronon.api.Extensions$.MODULE$.SourceOps((Source) option.get()).topicTokens().get(ClientCookie.PORT_ATTR));
        }
        return option2;
    }

    private static final Option port$1(LazyRef lazyRef, Option option) {
        return lazyRef.initialized() ? (Option) lazyRef.value() : port$lzycompute$1(lazyRef, option);
    }
}
