package ai.chronon.spark.streaming;

import ai.chronon.api.Constants$;
import ai.chronon.api.DataModel$;
import ai.chronon.api.Extensions$;
import ai.chronon.api.Query;
import ai.chronon.api.QueryUtils$;
import ai.chronon.api.Source;
import ai.chronon.api.StructType;
import ai.chronon.online.Api;
import ai.chronon.online.AvroConversions$;
import ai.chronon.online.GroupByServingInfoParsed;
import ai.chronon.online.KVStore;
import ai.chronon.online.Metrics;
import ai.chronon.online.Metrics$Context$;
import ai.chronon.online.Metrics$Environment$;
import ai.chronon.online.Metrics$Name$;
import ai.chronon.online.Mutation;
import ai.chronon.online.SparkConversions$;
import ai.chronon.online.StreamDecoder;
import ai.chronon.spark.GenericRowHandler$;
import com.google.gson.Gson;
import java.io.Serializable;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$implicits$;
import org.apache.spark.sql.catalyst.encoders.RowEncoder$;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.$less$colon$less$;
import scala.Enumeration;
import scala.Function1;
import scala.MatchError;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.ArrayOps$;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.JavaConverters$;
import scala.collection.SeqOps;
import scala.collection.StringOps$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.Buffer;
import scala.concurrent.duration.package;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.util.Try;

/* compiled from: GroupBy.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005\u001dd\u0001\u0002\n\u0014\u0001qA\u0001b\f\u0001\u0003\u0002\u0003\u0006I\u0001\r\u0005\t\u000b\u0002\u0011\t\u0011)A\u0005\r\"A!\n\u0001B\u0001B\u0003%1\n\u0003\u0005Q\u0001\t\u0005\t\u0015!\u0003R\u0011!9\u0006A!A!\u0002\u0013A\u0006\"B.\u0001\t\u0003a\u0006\u0002\u00033\u0001\u0011\u000b\u0007I\u0011A3\t\u000bA\u0004A\u0011B9\t\u000bq\u0004A\u0011A?\t\u0013\u0005-\u0001!%A\u0005\u0002\u00055\u0001bBA\u0012\u0001\u0011\u0005\u0011Q\u0005\u0005\n\u0003{\u0001\u0011\u0013!C\u0001\u0003\u001b9\u0011\"a\u0010\u0014\u0003\u0003E\t!!\u0011\u0007\u0011I\u0019\u0012\u0011!E\u0001\u0003\u0007Baa\u0017\b\u0005\u0002\u0005M\u0003\"CA+\u001dE\u0005I\u0011AA\u0007\u0011%\t9FDA\u0001\n\u0013\tIFA\u0004He>,\bOQ=\u000b\u0005Q)\u0012!C:ue\u0016\fW.\u001b8h\u0015\t1r#A\u0003ta\u0006\u00148N\u0003\u0002\u00193\u000591\r\u001b:p]>t'\"\u0001\u000e\u0002\u0005\u0005L7\u0001A\n\u0004\u0001u\u0019\u0003C\u0001\u0010\"\u001b\u0005y\"\"\u0001\u0011\u0002\u000bM\u001c\u0017\r\\1\n\u0005\tz\"AB!osJ+g\r\u0005\u0002%Y9\u0011QE\u000b\b\u0003M%j\u0011a\n\u0006\u0003Qm\ta\u0001\u0010:p_Rt\u0014\"\u0001\u0011\n\u0005-z\u0012a\u00029bG.\fw-Z\u0005\u0003[9\u0012AbU3sS\u0006d\u0017N_1cY\u0016T!aK\u0010\u0002\u0017%t\u0007/\u001e;TiJ,\u0017-\u001c\t\u0003c\ts!A\r!\u000f\u0005MjdB\u0001\u001b<\u001d\t)\u0004H\u0004\u0002'm%\tq'A\u0002pe\u001eL!!\u000f\u001e\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u00059\u0014B\u0001\f=\u0015\tI$(\u0003\u0002?\u007f\u0005\u00191/\u001d7\u000b\u0005Ya\u0014BA\u0016B\u0015\tqt(\u0003\u0002D\t\nIA)\u0019;b\rJ\fW.\u001a\u0006\u0003W\u0005\u000bqa]3tg&|g\u000e\u0005\u0002H\u00116\t\u0011)\u0003\u0002J\u0003\na1\u000b]1sWN+7o]5p]\u0006YqM]8va\nK8i\u001c8g!\tau*D\u0001N\u0015\tqu#A\u0002ba&L!AE'\u0002\u0015=tG.\u001b8f\u00136\u0004H\u000e\u0005\u0002S+6\t1K\u0003\u0002U/\u00051qN\u001c7j]\u0016L!AV*\u0003\u0007\u0005\u0003\u0018.A\u0003eK\n,x\r\u0005\u0002\u001f3&\u0011!l\b\u0002\b\u0005>|G.Z1o\u0003\u0019a\u0014N\\5u}Q1Ql\u00181bE\u000e\u0004\"A\u0018\u0001\u000e\u0003MAQa\f\u0004A\u0002ABQ!\u0012\u0004A\u0002\u0019CQA\u0013\u0004A\u0002-CQ\u0001\u0015\u0004A\u0002ECqa\u0016\u0004\u0011\u0002\u0003\u0007\u0001,\u0001\u0004m_\u001e<WM]\u000b\u0002MB\u0011qM[\u0007\u0002Q*\u0011\u0011NO\u0001\u0006g24GG[\u0005\u0003W\"\u0014a\u0001T8hO\u0016\u0014\bFA\u0004n!\tqb.\u0003\u0002p?\tIAO]1og&,g\u000e^\u0001\u0014EVLG\u000eZ*ue\u0016\fW.\u001b8h#V,'/\u001f\u000b\u0003ej\u0004\"a]<\u000f\u0005Q,\bC\u0001\u0014 \u0013\t1x$\u0001\u0004Qe\u0016$WMZ\u0005\u0003qf\u0014aa\u0015;sS:<'B\u0001< \u0011\u0015Y\b\u00021\u0001s\u0003)Ig\u000e];u)\u0006\u0014G.Z\u0001\u0004eVtGc\u0001@\u0002\bA\u0019q0a\u0001\u000e\u0005\u0005\u0005!B\u0001\u000bB\u0013\u0011\t)!!\u0001\u0003\u001dM#(/Z1nS:<\u0017+^3ss\"A\u0011\u0011B\u0005\u0011\u0002\u0003\u0007\u0001,A\u0003m_\u000e\fG.A\u0007sk:$C-\u001a4bk2$H%M\u000b\u0003\u0003\u001fQ3\u0001WA\tW\t\t\u0019\u0002\u0005\u0003\u0002\u0016\u0005}QBAA\f\u0015\u0011\tI\"a\u0007\u0002\u0013Ut7\r[3dW\u0016$'bAA\u000f?\u0005Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\t\u0005\u0005\u0012q\u0003\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0017a\u00042vS2$G)\u0019;b'R\u0014X-Y7\u0015\t\u0005\u001d\u00121\b\t\u0006\u007f\u0006%\u0012QF\u0005\u0005\u0003W\t\tA\u0001\tECR\f7\u000b\u001e:fC6<&/\u001b;feB!\u0011qFA\u001b\u001d\r\u0011\u0016\u0011G\u0005\u0004\u0003g\u0019\u0016aB&W'R|'/Z\u0005\u0005\u0003o\tID\u0001\u0006QkR\u0014V-];fgRT1!a\rT\u0011!\tIa\u0003I\u0001\u0002\u0004A\u0016!\u00072vS2$G)\u0019;b'R\u0014X-Y7%I\u00164\u0017-\u001e7uIE\nqa\u0012:pkB\u0014\u0015\u0010\u0005\u0002_\u001dM!a\"HA#!\u0011\t9%!\u0015\u000e\u0005\u0005%#\u0002BA&\u0003\u001b\n!![8\u000b\u0005\u0005=\u0013\u0001\u00026bm\u0006L1!LA%)\t\t\t%A\u000e%Y\u0016\u001c8/\u001b8ji\u0012:'/Z1uKJ$C-\u001a4bk2$H%N\u0001\roJLG/\u001a*fa2\f7-\u001a\u000b\u0003\u00037\u0002B!!\u0018\u0002d5\u0011\u0011q\f\u0006\u0005\u0003C\ni%\u0001\u0003mC:<\u0017\u0002BA3\u0003?\u0012aa\u00142kK\u000e$\b")
/* loaded from: input_file:ai/chronon/spark/streaming/GroupBy.class */
public class GroupBy implements Serializable {
    private transient Logger logger;
    private final Dataset<Row> inputStream;
    private final SparkSession session;
    private final ai.chronon.api.GroupBy groupByConf;
    private final Api onlineImpl;
    private final boolean debug;
    private volatile transient boolean bitmap$trans$0;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [ai.chronon.spark.streaming.GroupBy] */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$trans$0) {
                this.logger = LoggerFactory.getLogger(getClass());
                r0 = this;
                r0.bitmap$trans$0 = true;
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return !this.bitmap$trans$0 ? logger$lzycompute() : this.logger;
    }

    private String buildStreamingQuery(String str) {
        Map map;
        Seq apply;
        Query query = Extensions$.MODULE$.SourceOps((Source) Extensions$.MODULE$.GroupByOps(this.groupByConf).streamingSource().get()).query();
        Map map2 = (Map) Option$.MODULE$.apply(query.selects).map(map3 -> {
            return ((IterableOnceOps) JavaConverters$.MODULE$.mapAsScalaMapConverter(map3).asScala()).toMap($less$colon$less$.MODULE$.refl());
        }).orNull($less$colon$less$.MODULE$.refl());
        String str2 = (String) Option$.MODULE$.apply(query.timeColumn).getOrElse(() -> {
            return Constants$.MODULE$.TimeColumn();
        });
        Enumeration.Value dataModel = Extensions$.MODULE$.GroupByOps(this.groupByConf).dataModel();
        Enumeration.Value Entities = DataModel$.MODULE$.Entities();
        if (Entities != null ? !Entities.equals(dataModel) : dataModel != null) {
            Enumeration.Value Events = DataModel$.MODULE$.Events();
            if (Events != null ? !Events.equals(dataModel) : dataModel != null) {
                throw new MatchError(dataModel);
            }
            map = (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(Constants$.MODULE$.TimeColumn()), str2)}));
        } else {
            map = (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(Constants$.MODULE$.TimeColumn()), str2), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(Constants$.MODULE$.ReversalColumn()), (Object) null), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(Constants$.MODULE$.MutationTimeColumn()), (Object) null)}));
        }
        Map map4 = map;
        Buffer buffer = (Buffer) JavaConverters$.MODULE$.asScalaBufferConverter(this.groupByConf.getKeyColumns()).asScala();
        scala.collection.Seq seq = (scala.collection.Seq) Option$.MODULE$.apply(query.wheres).map(list -> {
            return (Buffer) JavaConverters$.MODULE$.asScalaBufferConverter(list).asScala();
        }).getOrElse(() -> {
            return package$.MODULE$.Seq().empty();
        });
        Map map5 = (Map) Option$.MODULE$.apply(map2).getOrElse(() -> {
            return Predef$.MODULE$.Map().empty();
        });
        String mkString = ((IterableOnceOps) buffer.map(str3 -> {
            return new StringBuilder(12).append(map5.getOrElse(str3, () -> {
                return str3;
            })).append(" IS NOT NULL").toString();
        })).mkString(" OR ");
        Enumeration.Value dataModel2 = Extensions$.MODULE$.GroupByOps(this.groupByConf).dataModel();
        Enumeration.Value Entities2 = DataModel$.MODULE$.Entities();
        if (Entities2 != null ? !Entities2.equals(dataModel2) : dataModel2 != null) {
            Enumeration.Value Events2 = DataModel$.MODULE$.Events();
            if (Events2 != null ? !Events2.equals(dataModel2) : dataModel2 != null) {
                throw new MatchError(dataModel2);
            }
            apply = package$.MODULE$.Seq().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{new StringBuilder(12).append(str2).append(" is NOT NULL").toString()}));
        } else {
            apply = package$.MODULE$.Seq().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{new StringBuilder(12).append(Constants$.MODULE$.MutationTimeColumn()).append(" is NOT NULL").toString()}));
        }
        return QueryUtils$.MODULE$.build(map2, str, (scala.collection.Seq) ((SeqOps) seq.$plus$plus(apply)).$colon$plus(new StringBuilder(2).append("(").append(mkString).append(")").toString()), map2 == null ? null : map4);
    }

    public StreamingQuery run(boolean z) {
        return buildDataStream(z).start();
    }

    public boolean run$default$1() {
        return false;
    }

    public DataStreamWriter<KVStore.PutRequest> buildDataStream(boolean z) {
        Tuple2 $minus$greater$extension;
        StructType mutationValueChrononSchema;
        String sb = new StringBuilder(7).append(Extensions$.MODULE$.MetadataOps(this.groupByConf.metaData).cleanName()).append("_stream").toString();
        GroupByServingInfoParsed groupByServingInfoParsed = (GroupByServingInfoParsed) ((Try) this.onlineImpl.buildFetcher(z).getGroupByServingInfo().apply(this.groupByConf.getMetaData().getName())).get();
        StreamDecoder streamDecoder = this.onlineImpl.streamDecoder(groupByServingInfoParsed);
        Predef$.MODULE$.assert(Extensions$.MODULE$.GroupByOps(this.groupByConf).streamingSource().isDefined(), () -> {
            return "No streaming source defined in GroupBy. Please set a topic/mutationTopic.";
        });
        Source source = (Source) Extensions$.MODULE$.GroupByOps(this.groupByConf).streamingSource().get();
        String buildStreamingQuery = buildStreamingQuery(sb);
        Metrics.Context apply = Metrics$Context$.MODULE$.apply(Metrics$Environment$.MODULE$.GroupByStreaming(), this.groupByConf);
        Metrics.Context withSuffix = apply.withSuffix("ingress");
        Dataset filter = this.inputStream.as(this.session.implicits().newByteArrayEncoder()).map(bArr -> {
            withSuffix.increment(Metrics$Name$.MODULE$.RowCount());
            withSuffix.count(Metrics$Name$.MODULE$.Bytes(), bArr.length);
            try {
                return streamDecoder.decode(bArr);
            } catch (Throwable th) {
                this.logger().info(new StringBuilder(0).append(new StringBuilder(48).append("Error while decoding streaming events for ").append(this.groupByConf.getMetaData().getName()).append(" with ").toString()).append(new StringBuilder(7).append("schema ").append(ai.chronon.online.Extensions$.MODULE$.ChrononStructTypeOps(streamDecoder.schema()).catalogString()).toString()).append(new StringBuilder(2).append(" \n").append(Extensions$.MODULE$.ThrowableOps(th).traceString()).toString()).toString());
                withSuffix.incrementException(th);
                return null;
            }
        }, Encoders$.MODULE$.kryo(ClassTag$.MODULE$.apply(Mutation.class))).filter(mutation -> {
            return BoxesRunTime.boxToBoolean($anonfun$buildDataStream$3(mutation));
        });
        org.apache.spark.sql.types.StructType fromChrononSchema = SparkConversions$.MODULE$.fromChrononSchema(streamDecoder.schema());
        logger().info(StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString(new StringBuilder(157).append("\n        | group by serving info: ").append(groupByServingInfoParsed).append("\n        | Streaming source: ").append(source).append("\n        | streaming Query: ").append(buildStreamingQuery).append("\n        | streaming dataset: ").append(Extensions$.MODULE$.GroupByOps(this.groupByConf).streamingDataset()).append("\n        | stream schema: ").append(fromChrononSchema).append("\n        |").toString())));
        filter.flatMap(mutation2 -> {
            return (Seq) ((IterableOps) package$.MODULE$.Seq().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Object[]{mutation2.after(), mutation2.before()})).filter(objArr -> {
                return BoxesRunTime.boxToBoolean($anonfun$buildDataStream$5(objArr));
            })).map(objArr2 -> {
                return (Row) SparkConversions$.MODULE$.toSparkRow(objArr2, streamDecoder.schema(), GenericRowHandler$.MODULE$.func());
            });
        }, RowEncoder$.MODULE$.apply(fromChrononSchema)).createOrReplaceTempView(sb);
        Extensions$.MODULE$.GroupByOps(this.groupByConf).setups().foreach(str -> {
            return this.session.sql(str);
        });
        Dataset sql = this.session.sql(buildStreamingQuery);
        Predef$.MODULE$.assert(ArrayOps$.MODULE$.contains$extension(Predef$.MODULE$.refArrayOps(sql.schema().fieldNames()), Constants$.MODULE$.TimeColumn()), () -> {
            return new StringBuilder(44).append("time column ").append(Constants$.MODULE$.TimeColumn()).append(" must be included in the selects").toString();
        });
        Enumeration.Value dataModel = Extensions$.MODULE$.GroupByOps(this.groupByConf).dataModel();
        Enumeration.Value Entities = DataModel$.MODULE$.Entities();
        if (dataModel != null ? dataModel.equals(Entities) : Entities == null) {
            Predef$.MODULE$.assert(ArrayOps$.MODULE$.contains$extension(Predef$.MODULE$.refArrayOps(sql.schema().fieldNames()), Constants$.MODULE$.MutationTimeColumn()), () -> {
                return "Required Mutation ts";
            });
        }
        String[] strArr = (String[]) ((IterableOnceOps) JavaConverters$.MODULE$.asScalaBufferConverter(this.groupByConf.keyColumns).asScala()).toArray(ClassTag$.MODULE$.apply(String.class));
        ArrayOps$ arrayOps$ = ArrayOps$.MODULE$;
        Object refArrayOps = Predef$.MODULE$.refArrayOps(strArr);
        org.apache.spark.sql.types.StructType schema = sql.schema();
        int[] iArr = (int[]) arrayOps$.map$extension(refArrayOps, str2 -> {
            return BoxesRunTime.boxToInteger(schema.fieldIndex(str2));
        }, ClassTag$.MODULE$.Int());
        Enumeration.Value dataModel2 = Extensions$.MODULE$.GroupByOps(this.groupByConf).dataModel();
        Enumeration.Value Entities2 = DataModel$.MODULE$.Entities();
        if (Entities2 != null ? !Entities2.equals(dataModel2) : dataModel2 != null) {
            Enumeration.Value Events = DataModel$.MODULE$.Events();
            if (Events != null ? !Events.equals(dataModel2) : dataModel2 != null) {
                throw new MatchError(dataModel2);
            }
            $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(package$.MODULE$.Seq().empty()), Constants$.MODULE$.TimeColumn());
        } else {
            $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(Constants$.MODULE$.MutationAvroColumns()), Constants$.MODULE$.MutationTimeColumn());
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        Tuple2 tuple22 = new Tuple2((Seq) tuple2._1(), (String) tuple2._2());
        Seq seq = (Seq) tuple22._1();
        String str3 = (String) tuple22._2();
        String[] strArr2 = (String[]) ArrayOps$.MODULE$.$plus$plus$extension(Predef$.MODULE$.refArrayOps(Extensions$.MODULE$.GroupByOps(this.groupByConf).aggregationInputs()), seq, ClassTag$.MODULE$.apply(String.class));
        ArrayOps$ arrayOps$2 = ArrayOps$.MODULE$;
        Object refArrayOps2 = Predef$.MODULE$.refArrayOps(strArr2);
        org.apache.spark.sql.types.StructType schema2 = sql.schema();
        int[] iArr2 = (int[]) arrayOps$2.map$extension(refArrayOps2, str4 -> {
            return BoxesRunTime.boxToInteger(schema2.fieldIndex(str4));
        }, ClassTag$.MODULE$.Int());
        int fieldIndex = sql.schema().fieldIndex(str3);
        String streamingDataset = Extensions$.MODULE$.GroupByOps(this.groupByConf).streamingDataset();
        StructType keyChrononSchema = groupByServingInfoParsed.keyChrononSchema();
        Enumeration.Value dataModel3 = Extensions$.MODULE$.GroupByOps(this.groupByConf).dataModel();
        Enumeration.Value Events2 = DataModel$.MODULE$.Events();
        if (Events2 != null ? !Events2.equals(dataModel3) : dataModel3 != null) {
            Enumeration.Value Entities3 = DataModel$.MODULE$.Entities();
            if (Entities3 != null ? !Entities3.equals(dataModel3) : dataModel3 != null) {
                throw new MatchError(dataModel3);
            }
            mutationValueChrononSchema = groupByServingInfoParsed.mutationValueChrononSchema();
        } else {
            mutationValueChrononSchema = groupByServingInfoParsed.valueChrononSchema();
        }
        Function1 encodeBytes = AvroConversions$.MODULE$.encodeBytes(keyChrononSchema, GenericRowHandler$.MODULE$.func());
        Function1 encodeBytes2 = AvroConversions$.MODULE$.encodeBytes(mutationValueChrononSchema, GenericRowHandler$.MODULE$.func());
        DataWriter dataWriter = new DataWriter(this.onlineImpl, apply.withSuffix("egress"), 120, this.debug);
        Function1 function1 = row -> {
            Object[] objArr = (Object[]) ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.intArrayOps(iArr), obj -> {
                return row.get(BoxesRunTime.unboxToInt(obj));
            }, ClassTag$.MODULE$.Any());
            Object[] objArr2 = (Object[]) ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.intArrayOps(iArr2), obj2 -> {
                return row.get(BoxesRunTime.unboxToInt(obj2));
            }, ClassTag$.MODULE$.Any());
            long unboxToLong = BoxesRunTime.unboxToLong(row.get(fieldIndex));
            byte[] bArr2 = (byte[]) encodeBytes.apply(objArr);
            byte[] bArr3 = (byte[]) encodeBytes2.apply(objArr2);
            if (this.debug) {
                Gson gson = new Gson();
                this.logger().info(StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString(new StringBuilder(196).append("\n               |streaming dataset: ").append(streamingDataset).append("\n               |keys: ").append(gson.toJson(objArr)).append("\n               |values: ").append(gson.toJson(objArr2)).append("\n               |keyBytes: ").append(Base64.getEncoder().encodeToString(bArr2)).append("\n               |valueBytes: ").append(Base64.getEncoder().encodeToString(bArr3)).append("\n               |ts: ").append(unboxToLong).append("  |  UTC: ").append(DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC)).format(Instant.ofEpochMilli(unboxToLong))).append(" | PST: ").append(DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("America/Los_Angeles")).format(Instant.ofEpochMilli(unboxToLong))).append("\n               |").toString())));
            }
            return new KVStore.PutRequest(bArr2, bArr3, streamingDataset, Option$.MODULE$.apply(BoxesRunTime.boxToLong(unboxToLong)));
        };
        SparkSession$implicits$ implicits = this.session.implicits();
        TypeTags universe = scala.reflect.runtime.package$.MODULE$.universe();
        final GroupBy groupBy = null;
        return sql.map(function1, implicits.newProductEncoder(universe.TypeTag().apply(scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(GroupBy.class.getClassLoader()), new TypeCreator(groupBy) { // from class: ai.chronon.spark.streaming.GroupBy$$typecreator14$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe2 = mirror.universe();
                return universe2.internal().reificationSupport().TypeRef(universe2.internal().reificationSupport().SingleType(universe2.internal().reificationSupport().ThisType(mirror.staticPackage("ai.chronon.online").asModule().moduleClass()), mirror.staticModule("ai.chronon.online.KVStore")), mirror.staticClass("ai.chronon.online.KVStore.PutRequest"), Nil$.MODULE$);
            }
        }))).writeStream().outputMode("append").trigger(Trigger.Continuous(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(2)).minute())).foreach(dataWriter);
    }

    public boolean buildDataStream$default$1() {
        return false;
    }

    public static final /* synthetic */ boolean $anonfun$buildDataStream$3(Mutation mutation) {
        return mutation != null && (mutation.before() == null || mutation.after() == null || !Predef$.MODULE$.genericWrapArray(mutation.before()).sameElements(Predef$.MODULE$.genericWrapArray(mutation.after())));
    }

    public static final /* synthetic */ boolean $anonfun$buildDataStream$5(Object[] objArr) {
        return objArr != null;
    }

    public GroupBy(Dataset<Row> dataset, SparkSession sparkSession, ai.chronon.api.GroupBy groupBy, Api api, boolean z) {
        this.inputStream = dataset;
        this.session = sparkSession;
        this.groupByConf = groupBy;
        this.onlineImpl = api;
        this.debug = z;
    }
}
