package ai.chronon.spark;

import ai.chronon.api.Source;
import ai.chronon.online.Api;
import ai.chronon.spark.streaming.JoinSourceRunner;
import ai.chronon.spark.streaming.TopicChecker$;
import org.apache.http.cookie.ClientCookie;
import org.apache.spark.SparkFiles$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryListener;
import scala.Option;
import scala.Predef$;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.mutable.StringBuilder;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.VolatileByteRef;

/* compiled from: Driver.scala */
/* loaded from: input_file:ai/chronon/spark/Driver$GroupByStreaming$.class */
public class Driver$GroupByStreaming$ {
    public static final Driver$GroupByStreaming$ MODULE$ = null;

    static {
        new Driver$GroupByStreaming$();
    }

    public Dataset<Row> dataStream(SparkSession sparkSession, String str, String str2) {
        TopicChecker$.MODULE$.topicShouldExist(str2, str);
        sparkSession.streams().addListener(new StreamingQueryListener() { // from class: ai.chronon.spark.Driver$GroupByStreaming$$anon$1
            public void onQueryStarted(StreamingQueryListener.QueryStartedEvent queryStartedEvent) {
                Predef$.MODULE$.println(new StringBuilder().append((Object) "Query started: ").append(queryStartedEvent.id()).toString());
            }

            public void onQueryTerminated(StreamingQueryListener.QueryTerminatedEvent queryTerminatedEvent) {
                Predef$.MODULE$.println(new StringBuilder().append((Object) "Query terminated: ").append(queryTerminatedEvent.id()).toString());
            }

            public void onQueryProgress(StreamingQueryListener.QueryProgressEvent queryProgressEvent) {
                Predef$.MODULE$.println(new StringBuilder().append((Object) "Query made progress: ").append(queryProgressEvent.progress()).toString());
            }
        });
        return sparkSession.readStream().format("kafka").option("kafka.bootstrap.servers", str).option("subscribe", str2).option("enable.auto.commit", "true").load().selectExpr(Predef$.MODULE$.wrapRefArray(new String[]{"value"}));
    }

    /* JADX WARN: Multi-variable type inference failed */
    public Option<String> findFile(String str) {
        String str2 = (String) Predef$.MODULE$.refArrayOps(str.split("/")).mo2134last();
        Seq seq = (Seq) ((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{str, str2, SparkFiles$.MODULE$.get(str2)}))).map(new Driver$GroupByStreaming$$anonfun$52(), Seq$.MODULE$.canBuildFrom());
        Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"File Statuses:\\n  ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{((Seq) seq.map(new Driver$GroupByStreaming$$anonfun$53(), Seq$.MODULE$.canBuildFrom())).mkString("\n  ")})));
        return seq.find(new Driver$GroupByStreaming$$anonfun$findFile$1()).map(new Driver$GroupByStreaming$$anonfun$findFile$2());
    }

    public void run(Driver$GroupByStreaming$Args driver$GroupByStreaming$Args) {
        StreamingQuery run;
        VolatileByteRef create = VolatileByteRef.create((byte) 0);
        SparkSession buildStreaming = SparkSessionBuilder$.MODULE$.buildStreaming(BoxesRunTime.unboxToBoolean(driver$GroupByStreaming$Args.debug().apply()));
        ai.chronon.api.GroupBy groupBy = (ai.chronon.api.GroupBy) findFile(driver$GroupByStreaming$Args.confPath().apply()).map(new Driver$GroupByStreaming$$anonfun$54()).getOrElse(new Driver$GroupByStreaming$$anonfun$55(driver$GroupByStreaming$Args));
        Option<String> findFile = findFile(driver$GroupByStreaming$Args.onlineJar().apply());
        if (BoxesRunTime.unboxToBoolean(driver$GroupByStreaming$Args.debug().apply())) {
            findFile.foreach(new Driver$GroupByStreaming$$anonfun$run$7(buildStreaming));
        }
        Api impl = driver$GroupByStreaming$Args.impl(driver$GroupByStreaming$Args.serializableProps());
        if (ai.chronon.api.Extensions$.MODULE$.GroupByOps(groupBy).streamingSource().get().isSetJoinSource()) {
            run = new JoinSourceRunner(groupBy, driver$GroupByStreaming$Args.serializableProps(), BoxesRunTime.unboxToBoolean(driver$GroupByStreaming$Args.debug().apply()), BoxesRunTime.unboxToInt(driver$GroupByStreaming$Args.lagMillis().getOrElse(new Driver$GroupByStreaming$$anonfun$1())), buildStreaming, impl).chainedStreamingQuery().start();
        } else {
            ObjectRef<Object> zero = ObjectRef.zero();
            ObjectRef<Object> zero2 = ObjectRef.zero();
            Option<Source> streamingSource = ai.chronon.api.Extensions$.MODULE$.GroupByOps(groupBy).streamingSource();
            Predef$.MODULE$.m1892assert(streamingSource.isDefined(), new Driver$GroupByStreaming$$anonfun$56());
            if (!driver$GroupByStreaming$Args.kafkaBootstrap().isDefined()) {
                Predef$.MODULE$.m1892assert(ai$chronon$spark$Driver$GroupByStreaming$$host$1(streamingSource, zero, create).isDefined() && ai$chronon$spark$Driver$GroupByStreaming$$port$1(streamingSource, zero2, create).isDefined(), new Driver$GroupByStreaming$$anonfun$57());
            }
            ai.chronon.spark.streaming.GroupBy groupBy2 = new ai.chronon.spark.streaming.GroupBy(dataStream(buildStreaming, driver$GroupByStreaming$Args.kafkaBootstrap().getOrElse(new Driver$GroupByStreaming$$anonfun$58(streamingSource, zero, zero2, create)), ai.chronon.api.Extensions$.MODULE$.SourceOps(streamingSource.get()).cleanTopic()), buildStreaming, groupBy, driver$GroupByStreaming$Args.impl(driver$GroupByStreaming$Args.serializableProps()), BoxesRunTime.unboxToBoolean(driver$GroupByStreaming$Args.debug().apply()));
            run = groupBy2.run(groupBy2.run$default$1());
        }
        run.awaitTermination();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    /* JADX WARN: Type inference failed for: r1v7, types: [T, scala.Option] */
    private final Option host$lzycompute$1(Option option, ObjectRef objectRef, VolatileByteRef volatileByteRef) {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (volatileByteRef.elem & 1)) == 0) {
                objectRef.elem = ai.chronon.api.Extensions$.MODULE$.SourceOps((Source) option.get()).topicTokens().get("host");
                volatileByteRef.elem = (byte) (volatileByteRef.elem | 1);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return (Option) objectRef.elem;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public final Option ai$chronon$spark$Driver$GroupByStreaming$$host$1(Option option, ObjectRef objectRef, VolatileByteRef volatileByteRef) {
        return ((byte) (volatileByteRef.elem & 1)) == 0 ? host$lzycompute$1(option, objectRef, volatileByteRef) : (Option) objectRef.elem;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    /* JADX WARN: Type inference failed for: r1v7, types: [T, scala.Option] */
    private final Option port$lzycompute$1(Option option, ObjectRef objectRef, VolatileByteRef volatileByteRef) {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (volatileByteRef.elem & 2)) == 0) {
                objectRef.elem = ai.chronon.api.Extensions$.MODULE$.SourceOps((Source) option.get()).topicTokens().get(ClientCookie.PORT_ATTR);
                volatileByteRef.elem = (byte) (volatileByteRef.elem | 2);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return (Option) objectRef.elem;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public final Option ai$chronon$spark$Driver$GroupByStreaming$$port$1(Option option, ObjectRef objectRef, VolatileByteRef volatileByteRef) {
        return ((byte) (volatileByteRef.elem & 2)) == 0 ? port$lzycompute$1(option, objectRef, volatileByteRef) : (Option) objectRef.elem;
    }

    public Driver$GroupByStreaming$() {
        MODULE$ = this;
    }
}
