package ai.chronon.spark.streaming;

import ai.chronon.api.Constants$;
import ai.chronon.api.DataModel$;
import ai.chronon.api.DataType;
import ai.chronon.api.Extensions;
import ai.chronon.api.Extensions$;
import ai.chronon.api.JoinSource;
import ai.chronon.api.Query;
import ai.chronon.api.Source;
import ai.chronon.api.StructType;
import ai.chronon.online.Api;
import ai.chronon.online.AvroConversions$;
import ai.chronon.online.DataStream;
import ai.chronon.online.Fetcher;
import ai.chronon.online.Fetcher$;
import ai.chronon.online.Fetcher$Request$;
import ai.chronon.online.GroupByServingInfoParsed;
import ai.chronon.online.KVStore;
import ai.chronon.online.Metrics;
import ai.chronon.online.Metrics$Context$;
import ai.chronon.online.Metrics$Environment$;
import ai.chronon.online.Metrics$Name$;
import ai.chronon.online.Mutation;
import ai.chronon.online.SparkConversions$;
import ai.chronon.online.StreamBuilder;
import ai.chronon.online.StreamDecoder;
import ai.chronon.online.TopicInfo;
import ai.chronon.online.TopicInfo$;
import ai.chronon.spark.GenericRowHandler$;
import ai.chronon.spark.TableUtils;
import ai.chronon.spark.streaming.JoinSourceRunner;
import com.google.gson.Gson;
import java.io.Serializable;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder$;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.BooleanType$;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType$;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Enumeration;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Product;
import scala.Some;
import scala.Tuple2;
import scala.collection.ArrayOps$;
import scala.collection.IterableOnce;
import scala.collection.IterableOnceOps;
import scala.collection.Iterator;
import scala.collection.SeqOps;
import scala.collection.StringOps$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Seq;
import scala.concurrent.Await$;
import scala.concurrent.duration.Cpackage;
import scala.math.Ordering$Long$;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyBoolean;
import scala.runtime.ScalaRunTime$;
import scala.runtime.Statics;
import scala.util.ScalaJavaConversions$;

/* compiled from: JoinSourceRunner.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0011-b\u0001\u0002:t\u0001qD!\"a\b\u0001\u0005\u0003\u0005\u000b\u0011BA\u0011\u0011)\ti\u0003\u0001B\u0001B\u0003%\u0011q\u0006\u0005\u000b\u0003\u000b\u0002!\u0011!Q\u0001\n\u0005\u001d\u0003BCA'\u0001\t\u0005\t\u0015!\u0003\u0002P!Q\u0011Q\u000b\u0001\u0003\u0002\u0003\u0006Y!a\u0016\t\u0015\u00055\u0004A!A!\u0002\u0017\ty\u0007C\u0004\u0002|\u0001!\t!! \t\u0015\u0005E\u0005\u0001#b\u0001\n\u0007\t\u0019\nC\u0005\u0002*\u0002\u0011\r\u0011\"\u0001\u0002,\"A\u00111\u0018\u0001!\u0002\u0013\tiK\u0002\u0004\u0002>\u0002!\u0015q\u0018\u0005\u000b\u0003\u000f\\!Q3A\u0005\u0002\u0005%\u0007BCAl\u0017\tE\t\u0015!\u0003\u0002L\"Q\u0011\u0011\\\u0006\u0003\u0016\u0004%\t!!3\t\u0015\u0005m7B!E!\u0002\u0013\tY\r\u0003\u0006\u0002^.\u0011)\u001a!C\u0001\u0003\u0013D!\"a8\f\u0005#\u0005\u000b\u0011BAf\u0011)\t\to\u0003BK\u0002\u0013\u0005\u0011\u0011\u001a\u0005\u000b\u0003G\\!\u0011#Q\u0001\n\u0005-\u0007bBA>\u0017\u0011\u0005\u0011Q\u001d\u0005\n\u0003g\\\u0011\u0011!C\u0001\u0003kD\u0011\"a@\f#\u0003%\tA!\u0001\t\u0013\t]1\"%A\u0005\u0002\t\u0005\u0001\"\u0003B\r\u0017E\u0005I\u0011\u0001B\u0001\u0011%\u0011YbCI\u0001\n\u0003\u0011\t\u0001C\u0005\u0003\u001e-\t\t\u0011\"\u0011\u0003 !I!qF\u0006\u0002\u0002\u0013\u0005!\u0011\u0007\u0005\n\u0005gY\u0011\u0011!C\u0001\u0005kA\u0011B!\u0011\f\u0003\u0003%\tEa\u0011\t\u0013\tE3\"!A\u0005\u0002\tM\u0003\"\u0003B,\u0017\u0005\u0005I\u0011\tB-\u0011%\u0011ifCA\u0001\n\u0003\u0012y\u0006C\u0005\u0003b-\t\t\u0011\"\u0011\u0003d!I!QM\u0006\u0002\u0002\u0013\u0005#qM\u0004\n\u0005W\u0002\u0011\u0011!E\u0005\u0005[2\u0011\"!0\u0001\u0003\u0003EIAa\u001c\t\u000f\u0005mD\u0005\"\u0001\u0003\b\"I!\u0011\r\u0013\u0002\u0002\u0013\u0015#1\r\u0005\n\u0005\u0013#\u0013\u0011!CA\u0005\u0017C\u0011B!&%\u0003\u0003%\tIa&\t\u0013\t%\u0006A1A\u0005\u0002\t-\u0006\u0002\u0003BY\u0001\u0001\u0006IA!,\t\u0019\tM\u0006\u0001%A\u0001\u0004\u0003\u0006IA!.\t\u0013\t\u001d\u0007A1A\u0005\u0002\t%\u0007\u0002\u0003Bf\u0001\u0001\u0006IAa/\t\u0013\t5\u0007A1A\u0005\u0002\t=\u0007\u0002\u0003Bi\u0001\u0001\u0006I!a\u0010\t\u0013\tM\u0007A1A\u0005\u0002\tU\u0007\u0002\u0003Bo\u0001\u0001\u0006IAa6\t\u0013\t}\u0007A1A\u0005\u0002\tU\u0007\u0002\u0003Bq\u0001\u0001\u0006IAa6\t\u000f\t\r\b\u0001\"\u0003\u0003f\"I!q\u001e\u0001C\u0002\u0013%!\u0011\u001f\u0005\t\u0005g\u0004\u0001\u0015!\u0003\u0002H!I!Q\u001f\u0001C\u0002\u0013%!q\u001f\u0005\t\u0005\u007f\u0004\u0001\u0015!\u0003\u0003z\"I1\u0011\u0001\u0001C\u0002\u0013%!\u0011\u0007\u0005\t\u0007\u0007\u0001\u0001\u0015!\u0003\u0002P!I1Q\u0001\u0001C\u0002\u0013%!\u0011\u0007\u0005\t\u0007\u000f\u0001\u0001\u0015!\u0003\u0002P!I1\u0011\u0002\u0001C\u0002\u0013%!\u0011\u0007\u0005\t\u0007\u0017\u0001\u0001\u0015!\u0003\u0002P\u001911Q\u0002\u0001E\u0007\u001fA!b!\u0005@\u0005+\u0007I\u0011AAe\u0011)\u0019\u0019b\u0010B\tB\u0003%\u00111\u001a\u0005\b\u0003wzD\u0011AB\u000b\u0011)\t\tj\u0010EC\u0002\u0013\r\u00111\u0013\u0005\n\u0007;y$\u0019!C\u0005\u0007?A\u0001ba\t@A\u0003%1\u0011\u0005\u0005\n\u0007Ky$\u0019!C\u0005\u0007?A\u0001ba\n@A\u0003%1\u0011\u0005\u0005\n\u0007Sy$\u0019!C\u0005\u0005cA\u0001ba\u000b@A\u0003%\u0011q\n\u0005\n\u0007[y$\u0019!C\u0005\u0003\u0013D\u0001ba\f@A\u0003%\u00111\u001a\u0005\n\u0007cy$\u0019!C\u0005\u0005WC\u0001ba\r@A\u0003%!Q\u0016\u0005\u000b\u0007ky\u0004R1A\u0005\n\r]\u0002BCB%\u007f!\u0015\r\u0011\"\u0003\u00048!I1QJ C\u0002\u0013%!q\u001a\u0005\t\u0007\u001fz\u0004\u0015!\u0003\u0002@!91\u0011K \u0005\u0002\rM\u0003\"CAz\u007f\u0005\u0005I\u0011AB7\u0011%\typPI\u0001\n\u0003\u0011\t\u0001C\u0005\u0003\u001e}\n\t\u0011\"\u0011\u0003 !I!qF \u0002\u0002\u0013\u0005!\u0011\u0007\u0005\n\u0005gy\u0014\u0011!C\u0001\u0007cB\u0011B!\u0011@\u0003\u0003%\tEa\u0011\t\u0013\tEs(!A\u0005\u0002\rU\u0004\"\u0003B,\u007f\u0005\u0005I\u0011IB=\u0011%\u0011ifPA\u0001\n\u0003\u0012y\u0006C\u0005\u0003b}\n\t\u0011\"\u0011\u0003d!I!QM \u0002\u0002\u0013\u00053QP\u0004\n\u0007\u0003\u0003\u0011\u0011!E\u0005\u0007\u00073\u0011b!\u0004\u0001\u0003\u0003EIa!\"\t\u000f\u0005mt\f\"\u0001\u0004\u000e\"I!\u0011M0\u0002\u0002\u0013\u0015#1\r\u0005\n\u0005\u0013{\u0016\u0011!CA\u0007\u001fC\u0011B!&`\u0003\u0003%\tia%\t\u000f\re\u0005\u0001\"\u0001\u0004\u001c\"91Q\u0016\u0001\u0005\n\r=\u0006bBBZ\u0001\u0011%1Q\u0017\u0005\b\u0007w\u0003A\u0011BB_\u0011\u001d\u0019)\r\u0001C\u0005\u0007\u000fDqaa5\u0001\t\u0013\u0019)\u000eC\u0004\u0004b\u0002!Iaa9\t\u000f\r=\b\u0001\"\u0001\u0004r\"9AQ\u0001\u0001\u0005\u0002\u0011\u001dq!\u0003C\ng\u0006\u0005\t\u0012\u0001C\u000b\r!\u00118/!A\t\u0002\u0011]\u0001bBA>]\u0012\u0005A\u0011\u0004\u0005\n\t7q\u0017\u0013!C\u0001\t;A\u0011\u0002\"\to\u0003\u0003%I\u0001b\t\u0003!){\u0017N\\*pkJ\u001cWMU;o]\u0016\u0014(B\u0001;v\u0003%\u0019HO]3b[&twM\u0003\u0002wo\u0006)1\u000f]1sW*\u0011\u00010_\u0001\bG\"\u0014xN\\8o\u0015\u0005Q\u0018AA1j\u0007\u0001\u0019B\u0001A?\u0002\bA\u0019a0a\u0001\u000e\u0003}T!!!\u0001\u0002\u000bM\u001c\u0017\r\\1\n\u0007\u0005\u0015qP\u0001\u0004B]f\u0014VM\u001a\t\u0005\u0003\u0013\tIB\u0004\u0003\u0002\f\u0005Ua\u0002BA\u0007\u0003'i!!a\u0004\u000b\u0007\u0005E10\u0001\u0004=e>|GOP\u0005\u0003\u0003\u0003I1!a\u0006��\u0003\u001d\u0001\u0018mY6bO\u0016LA!a\u0007\u0002\u001e\ta1+\u001a:jC2L'0\u00192mK*\u0019\u0011qC@\u0002\u0017\u001d\u0014x.\u001e9Cs\u000e{gN\u001a\t\u0005\u0003G\tI#\u0004\u0002\u0002&)\u0019\u0011qE<\u0002\u0007\u0005\u0004\u0018.\u0003\u0003\u0002,\u0005\u0015\"aB$s_V\u0004()_\u0001\u0005G>tg\r\u0005\u0005\u00022\u0005e\u0012qHA \u001d\u0011\t\u0019$!\u000e\u0011\u0007\u00055q0C\u0002\u00028}\fa\u0001\u0015:fI\u00164\u0017\u0002BA\u001e\u0003{\u00111!T1q\u0015\r\t9d \t\u0005\u0003c\t\t%\u0003\u0003\u0002D\u0005u\"AB*ue&tw-A\u0003eK\n,x\rE\u0002\u007f\u0003\u0013J1!a\u0013��\u0005\u001d\u0011un\u001c7fC:\f\u0011\u0002\\1h\u001b&dG.[:\u0011\u0007y\f\t&C\u0002\u0002T}\u00141!\u00138u\u0003\u001d\u0019Xm]:j_:\u0004B!!\u0017\u0002j5\u0011\u00111\f\u0006\u0005\u0003;\ny&A\u0002tc2T1A^A1\u0015\u0011\t\u0019'!\u001a\u0002\r\u0005\u0004\u0018m\u00195f\u0015\t\t9'A\u0002pe\u001eLA!a\u001b\u0002\\\ta1\u000b]1sWN+7o]5p]\u00069\u0011\r]5J[Bd\u0007\u0003BA9\u0003oj!!a\u001d\u000b\u0007\u0005Ut/\u0001\u0004p]2Lg.Z\u0005\u0005\u0003s\n\u0019HA\u0002Ba&\fa\u0001P5oSRtDCCA@\u0003\u0013\u000bY)!$\u0002\u0010R1\u0011\u0011QAC\u0003\u000f\u00032!a!\u0001\u001b\u0005\u0019\bbBA+\u000f\u0001\u000f\u0011q\u000b\u0005\b\u0003[:\u00019AA8\u0011\u001d\tyb\u0002a\u0001\u0003CA\u0011\"!\f\b!\u0003\u0005\r!a\f\t\u000f\u0005\u0015s\u00011\u0001\u0002H!9\u0011QJ\u0004A\u0002\u0005=\u0013A\u00027pO\u001e,'/\u0006\u0002\u0002\u0016B!\u0011qSAO\u001b\t\tIJ\u0003\u0003\u0002\u001c\u0006\u0015\u0014!B:mMRR\u0017\u0002BAP\u00033\u0013a\u0001T8hO\u0016\u0014\bf\u0001\u0005\u0002$B\u0019a0!*\n\u0007\u0005\u001dvPA\u0005ue\u0006t7/[3oi\u000691m\u001c8uKb$XCAAW!\u0011\ty+!.\u000f\t\u0005E\u0014\u0011W\u0005\u0005\u0003g\u000b\u0019(A\u0004NKR\u0014\u0018nY:\n\t\u0005]\u0016\u0011\u0018\u0002\b\u0007>tG/\u001a=u\u0015\u0011\t\u0019,a\u001d\u0002\u0011\r|g\u000e^3yi\u0002\u0012qaU2iK6\f7o\u0005\u0004\f{\u0006\u001d\u0011\u0011\u0019\t\u0004}\u0006\r\u0017bAAc\u007f\n9\u0001K]8ek\u000e$\u0018\u0001\u00057fMR\u001cFO]3b[N\u001b\u0007.Z7b+\t\tY\r\u0005\u0003\u0002N\u0006MWBAAh\u0015\u0011\t\t.a\u0017\u0002\u000bQL\b/Z:\n\t\u0005U\u0017q\u001a\u0002\u000b'R\u0014Xo\u0019;UsB,\u0017!\u00057fMR\u001cFO]3b[N\u001b\u0007.Z7bA\u0005\u0001B.\u001a4u'>,(oY3TG\",W.Y\u0001\u0012Y\u00164GoU8ve\u000e,7k\u00195f[\u0006\u0004\u0013A\u00036pS:\u001c6\r[3nC\u0006Y!n\\5o'\u000eDW-\\1!\u0003AQw.\u001b8T_V\u00148-Z*dQ\u0016l\u0017-A\tk_&t7k\\;sG\u0016\u001c6\r[3nC\u0002\"\"\"a:\u0002l\u00065\u0018q^Ay!\r\tIoC\u0007\u0002\u0001!9\u0011q\u0019\u000bA\u0002\u0005-\u0007bBAm)\u0001\u0007\u00111\u001a\u0005\b\u0003;$\u0002\u0019AAf\u0011\u001d\t\t\u000f\u0006a\u0001\u0003\u0017\fAaY8qsRQ\u0011q]A|\u0003s\fY0!@\t\u0013\u0005\u001dW\u0003%AA\u0002\u0005-\u0007\"CAm+A\u0005\t\u0019AAf\u0011%\ti.\u0006I\u0001\u0002\u0004\tY\rC\u0005\u0002bV\u0001\n\u00111\u0001\u0002L\u0006q1m\u001c9zI\u0011,g-Y;mi\u0012\nTC\u0001B\u0002U\u0011\tYM!\u0002,\u0005\t\u001d\u0001\u0003\u0002B\u0005\u0005'i!Aa\u0003\u000b\t\t5!qB\u0001\nk:\u001c\u0007.Z2lK\u0012T1A!\u0005��\u0003)\tgN\\8uCRLwN\\\u0005\u0005\u0005+\u0011YAA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016\fabY8qs\u0012\"WMZ1vYR$#'\u0001\bd_BLH\u0005Z3gCVdG\u000fJ\u001a\u0002\u001d\r|\u0007/\u001f\u0013eK\u001a\fW\u000f\u001c;%i\u0005i\u0001O]8ek\u000e$\bK]3gSb,\"A!\t\u0011\t\t\r\"QF\u0007\u0003\u0005KQAAa\n\u0003*\u0005!A.\u00198h\u0015\t\u0011Y#\u0001\u0003kCZ\f\u0017\u0002BA\"\u0005K\tA\u0002\u001d:pIV\u001cG/\u0011:jif,\"!a\u0014\u0002\u001dA\u0014x\u000eZ;di\u0016cW-\\3oiR!!q\u0007B\u001f!\rq(\u0011H\u0005\u0004\u0005wy(aA!os\"I!q\b\u000f\u0002\u0002\u0003\u0007\u0011qJ\u0001\u0004q\u0012\n\u0014a\u00049s_\u0012,8\r^%uKJ\fGo\u001c:\u0016\u0005\t\u0015\u0003C\u0002B$\u0005\u001b\u00129$\u0004\u0002\u0003J)\u0019!1J@\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0003\u0003P\t%#\u0001C%uKJ\fGo\u001c:\u0002\u0011\r\fg.R9vC2$B!a\u0012\u0003V!I!q\b\u0010\u0002\u0002\u0003\u0007!qG\u0001\u0013aJ|G-^2u\u000b2,W.\u001a8u\u001d\u0006lW\r\u0006\u0003\u0003\"\tm\u0003\"\u0003B ?\u0005\u0005\t\u0019AA(\u0003!A\u0017m\u001d5D_\u0012,GCAA(\u0003!!xn\u0015;sS:<GC\u0001B\u0011\u0003\u0019)\u0017/^1mgR!\u0011q\tB5\u0011%\u0011yDIA\u0001\u0002\u0004\u00119$A\u0004TG\",W.Y:\u0011\u0007\u0005%HeE\u0003%\u0005c\u0012i\b\u0005\b\u0003t\te\u00141ZAf\u0003\u0017\fY-a:\u000e\u0005\tU$b\u0001B<\u007f\u00069!/\u001e8uS6,\u0017\u0002\u0002B>\u0005k\u0012\u0011#\u00112tiJ\f7\r\u001e$v]\u000e$\u0018n\u001c85!\u0011\u0011yH!\"\u000e\u0005\t\u0005%\u0002\u0002BB\u0005S\t!![8\n\t\u0005m!\u0011\u0011\u000b\u0003\u0005[\nQ!\u00199qYf$\"\"a:\u0003\u000e\n=%\u0011\u0013BJ\u0011\u001d\t9m\na\u0001\u0003\u0017Dq!!7(\u0001\u0004\tY\rC\u0004\u0002^\u001e\u0002\r!a3\t\u000f\u0005\u0005x\u00051\u0001\u0002L\u00069QO\\1qa2LH\u0003\u0002BM\u0005K\u0003RA BN\u0005?K1A!(��\u0005\u0019y\u0005\u000f^5p]BYaP!)\u0002L\u0006-\u00171ZAf\u0013\r\u0011\u0019k \u0002\u0007)V\u0004H.\u001a\u001b\t\u0013\t\u001d\u0006&!AA\u0002\u0005\u001d\u0018a\u0001=%a\u0005aa/\u00197vKj\u001b6\r[3nCV\u0011!Q\u0016\t\u0005\u0003G\u0011y+\u0003\u0003\u0002V\u0006\u0015\u0012!\u0004<bYV,'lU2iK6\f\u0007%A\u0002yII\u0002rA B\\\u0005w\u000by$C\u0002\u0003:~\u0014a\u0001V;qY\u0016\u0014\u0004C\u0002B_\u0005\u0007\fy$\u0004\u0002\u0003@*!!\u0011\u0019B%\u0003%IW.\\;uC\ndW-\u0003\u0003\u0003F\n}&aA*fc\u0006\t\u0012\r\u001a3ji&|g.\u00197D_2,XN\\:\u0016\u0005\tm\u0016AE1eI&$\u0018n\u001c8bY\u000e{G.^7og\u0002\nq\"\u001a<f]R$\u0016.\\3D_2,XN\\\u000b\u0003\u0003\u007f\t\u0001#\u001a<f]R$\u0016.\\3D_2,XN\u001c\u0011\u0002\u0015-,\u0017pQ8mk6t7/\u0006\u0002\u0003XB)aP!7\u0002@%\u0019!1\\@\u0003\u000b\u0005\u0013(/Y=\u0002\u0017-,\u0017pQ8mk6t7\u000fI\u0001\rm\u0006dW/Z\"pYVlgn]\u0001\u000em\u0006dW/Z\"pYVlgn\u001d\u0011\u0002\u000f\u001d,G\u000f\u0015:paR1\u0011q\bBt\u0005WDqA!;5\u0001\u0004\ty$\u0001\u0003qe>\u0004\bb\u0002Bwi\u0001\u0007\u0011qH\u0001\bI\u00164\u0017-\u001e7u\u0003Q)8/Z#wK:$H+[7f\r>\u0014\u0018+^3ssV\u0011\u0011qI\u0001\u0016kN,WI^3oiRKW.\u001a$peF+XM]=!\u00039!\u0018.\\3QKJ\u001cWM\u001c;jY\u0016,\"A!?\u0011\u0007y\u0014Y0C\u0002\u0003~~\u0014a\u0001R8vE2,\u0017a\u0004;j[\u0016\u0004VM]2f]RLG.\u001a\u0011\u0002'5Lg.[7v[F+XM]=EK2\f\u00170T:\u0002)5Lg.[7v[F+XM]=EK2\f\u00170T:!\u00031\tX/\u001a:z'\"Lg\r^'t\u00035\tX/\u001a:z'\"Lg\r^'tA\u0005AR.[2s_\n\u000bGo\u00195J]R,'O^1m\u001b&dG.[:\u000235L7M]8CCR\u001c\u0007.\u00138uKJ4\u0018\r\\'jY2L7\u000f\t\u0002\u0011!V$(+Z9vKN$\b*\u001a7qKJ\u001cbaP?\u0002\b\u0005\u0005\u0017aC5oaV$8k\u00195f[\u0006\fA\"\u001b8qkR\u001c6\r[3nC\u0002\"Baa\u0006\u0004\u001aA\u0019\u0011\u0011^ \t\u000f\rE!\t1\u0001\u0002L\"\u001a1)a)\u0002\u0015-,\u00170\u00138eS\u000e,7/\u0006\u0002\u0004\"A)aP!7\u0002P\u0005Y1.Z=J]\u0012L7-Z:!\u000311\u0018\r\\;f\u0013:$\u0017nY3t\u000351\u0018\r\\;f\u0013:$\u0017nY3tA\u00059Ao]%oI\u0016D\u0018\u0001\u0003;t\u0013:$W\r\u001f\u0011\u0002\u001d-,\u0017p\u00159be.\u001c6\r[3nC\u0006y1.Z=Ta\u0006\u00148nU2iK6\f\u0007%A\u0005lKf\u001c6\r[3nC\u0006Q1.Z=TG\",W.\u0019\u0011\u0002\u0015-,\u0017\u0010V8CsR,7/\u0006\u0002\u0004:A9apa\u000f\u00038\r}\u0012bAB\u001f\u007f\nIa)\u001e8di&|g.\r\t\u0006}\ne7\u0011\t\t\u0004}\u000e\r\u0013bAB#\u007f\n!!)\u001f;fQ\rq\u00151U\u0001\rm\u0006dW/\u001a+p\u0005f$Xm\u001d\u0015\u0004\u001f\u0006\r\u0016\u0001E:ue\u0016\fW.\u001b8h\t\u0006$\u0018m]3u\u0003E\u0019HO]3b[&tw\rR1uCN,G\u000fI\u0001\ri>\u0004V\u000f\u001e*fcV,7\u000f\u001e\u000b\u0005\u0007+\u001a\u0019\u0007\u0005\u0003\u0004X\ruc\u0002BA9\u00073JAaa\u0017\u0002t\u000591JV*u_J,\u0017\u0002BB0\u0007C\u0012!\u0002U;u%\u0016\fX/Z:u\u0015\u0011\u0019Y&a\u001d\t\u000f\r\u0015$\u000b1\u0001\u0004h\u0005)\u0011N\u001c9viB!\u0011\u0011LB5\u0013\u0011\u0019Y'a\u0017\u0003\u0007I{w\u000f\u0006\u0003\u0004\u0018\r=\u0004\"CB\t'B\u0005\t\u0019AAf)\u0011\u00119da\u001d\t\u0013\t}r+!AA\u0002\u0005=C\u0003BA$\u0007oB\u0011Ba\u0010Z\u0003\u0003\u0005\rAa\u000e\u0015\t\t\u000521\u0010\u0005\n\u0005\u007fQ\u0016\u0011!a\u0001\u0003\u001f\"B!a\u0012\u0004��!I!qH/\u0002\u0002\u0003\u0007!qG\u0001\u0011!V$(+Z9vKN$\b*\u001a7qKJ\u00042!!;`'\u0015y6q\u0011B?!!\u0011\u0019h!#\u0002L\u000e]\u0011\u0002BBF\u0005k\u0012\u0011#\u00112tiJ\f7\r\u001e$v]\u000e$\u0018n\u001c82)\t\u0019\u0019\t\u0006\u0003\u0004\u0018\rE\u0005bBB\tE\u0002\u0007\u00111\u001a\u000b\u0005\u0007+\u001b9\nE\u0003\u007f\u00057\u000bY\rC\u0005\u0003(\u000e\f\t\u00111\u0001\u0004\u0018\u0005aq.\u001e;qkR\u001c6\r[3nCR11QTBQ\u0007G#B!a3\u0004 \"9\u0011Q\u000b3A\u0004\u0005]\u0003bBB\tI\u0002\u0007\u00111\u001a\u0005\b\u0007K#\u0007\u0019ABT\u0003\u0015\tX/\u001a:z!\u0011\t\u0019c!+\n\t\r-\u0016Q\u0005\u0002\u0006#V,'/_\u0001\fK:\u0014\u0018n\u00195Rk\u0016\u0014\u0018\u0010\u0006\u0003\u0004(\u000eE\u0006bBBSK\u0002\u00071qU\u0001\rEVLG\u000eZ*dQ\u0016l\u0017m\u001d\u000b\u0005\u0003O\u001c9\fC\u0004\u0004:\u001a\u0004\r!a3\u0002\u00151,g\r^*dQ\u0016l\u0017-\u0001\ttKJ4\u0018N\\4J]\u001a|\u0007K]8ysV\u00111q\u0018\t\u0005\u0003c\u001a\t-\u0003\u0003\u0004D\u0006M$\u0001G$s_V\u0004()_*feZLgnZ%oM>\u0004\u0016M]:fI\u00061A-Z2pI\u0016$Ba!3\u0004PB!\u0011\u0011OBf\u0013\u0011\u0019i-a\u001d\u0003\u0015\u0011\u000bG/Y*ue\u0016\fW\u000eC\u0004\u0004R\"\u0004\ra!3\u0002\u0015\u0011\fG/Y*ue\u0016\fW.A\u000bj]R,'O\\1m'R\u0014X-Y7Ck&dG-\u001a:\u0015\t\r]7Q\u001c\t\u0005\u0003c\u001aI.\u0003\u0003\u0004\\\u0006M$!D*ue\u0016\fWNQ;jY\u0012,'\u000fC\u0004\u0004`&\u0004\r!a\u0010\u0002\u0015M$(/Z1n)f\u0004X-A\u0006ck&dGm\u0015;sK\u0006lG\u0003BBe\u0007KDqaa:k\u0001\u0004\u0019I/A\u0003u_BL7\r\u0005\u0003\u0002r\r-\u0018\u0002BBw\u0003g\u0012\u0011\u0002V8qS\u000eLeNZ8\u0002\u0015A,'oY3oi&dW\r\u0006\u0004\u0004t\u000emH\u0011\u0001\t\u0006}\nm5Q\u001f\t\u0004}\u000e]\u0018bAB}\u007f\n!Aj\u001c8h\u0011\u001d\u0019ip\u001ba\u0001\u0007\u007f\f1!\u0019:s!\u0015q(\u0011\\B{\u0011\u001d!\u0019a\u001ba\u0001\u0005s\f\u0011\u0001]\u0001\u0016G\"\f\u0017N\\3e'R\u0014X-Y7j]\u001e\fV/\u001a:z+\t!I\u0001\u0005\u0004\u0005\f\u0011=1qM\u0007\u0003\t\u001bQ1\u0001^A.\u0013\u0011!\t\u0002\"\u0004\u0003!\u0011\u000bG/Y*ue\u0016\fWn\u0016:ji\u0016\u0014\u0018\u0001\u0005&pS:\u001cv.\u001e:dKJ+hN\\3s!\r\t\u0019I\\\n\u0005]v\u0014i\b\u0006\u0002\u0005\u0016\u0005YB\u0005\\3tg&t\u0017\u000e\u001e\u0013he\u0016\fG/\u001a:%I\u00164\u0017-\u001e7uII*\"\u0001b\b+\t\u0005=\"QA\u0001\roJLG/\u001a*fa2\f7-\u001a\u000b\u0003\tK\u0001BAa\t\u0005(%!A\u0011\u0006B\u0013\u0005\u0019y%M[3di\u0002")
/* loaded from: input_file:ai/chronon/spark/streaming/JoinSourceRunner.class */
public class JoinSourceRunner implements Serializable {
    private transient Logger logger;
    private volatile JoinSourceRunner$Schemas$ Schemas$module;
    private volatile JoinSourceRunner$PutRequestHelper$ PutRequestHelper$module;
    public final ai.chronon.api.GroupBy ai$chronon$spark$streaming$JoinSourceRunner$$groupByConf;
    private final Map<String, String> conf;
    public final boolean ai$chronon$spark$streaming$JoinSourceRunner$$debug;
    private final int lagMillis;
    private final SparkSession session;
    public final Api ai$chronon$spark$streaming$JoinSourceRunner$$apiImpl;
    private final Metrics.Context context;
    private final StructType valueZSchema;
    private final /* synthetic */ Tuple2 x$2;
    private final Seq<String> additionalColumns;
    private final String eventTimeColumn;
    private final String[] keyColumns;
    private final String[] valueColumns;
    private final boolean ai$chronon$spark$streaming$JoinSourceRunner$$useEventTimeForQuery;
    private final double ai$chronon$spark$streaming$JoinSourceRunner$$timePercentile;
    private final int ai$chronon$spark$streaming$JoinSourceRunner$$minimumQueryDelayMs;
    private final int ai$chronon$spark$streaming$JoinSourceRunner$$queryShiftMs;
    private final int microBatchIntervalMillis;
    private volatile transient boolean bitmap$trans$0;

    /* JADX INFO: Access modifiers changed from: private */
    /* compiled from: JoinSourceRunner.scala */
    /* loaded from: input_file:ai/chronon/spark/streaming/JoinSourceRunner$PutRequestHelper.class */
    public class PutRequestHelper implements Serializable, Product {
        private transient Logger logger;
        private transient Function1<Object, byte[]> keyToBytes;
        private transient Function1<Object, byte[]> valueToBytes;
        private final org.apache.spark.sql.types.StructType inputSchema;
        private final int[] keyIndices;
        private final int[] valueIndices;
        private final int tsIndex;
        private final org.apache.spark.sql.types.StructType keySparkSchema;
        private final StructType keySchema;
        private final String streamingDataset;
        private volatile transient byte bitmap$trans$0;
        public final /* synthetic */ JoinSourceRunner $outer;

        @Override // scala.Product
        public Iterator<String> productElementNames() {
            Iterator<String> productElementNames;
            productElementNames = productElementNames();
            return productElementNames;
        }

        public org.apache.spark.sql.types.StructType inputSchema() {
            return this.inputSchema;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v0 */
        /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r0v10, types: [ai.chronon.spark.streaming.JoinSourceRunner$PutRequestHelper] */
        private Logger logger$lzycompute() {
            ?? r0 = this;
            synchronized (r0) {
                if (((byte) (this.bitmap$trans$0 & 1)) == 0) {
                    this.logger = LoggerFactory.getLogger(getClass());
                    r0 = this;
                    r0.bitmap$trans$0 = (byte) (this.bitmap$trans$0 | 1);
                }
            }
            return this.logger;
        }

        public Logger logger() {
            return ((byte) (this.bitmap$trans$0 & 1)) == 0 ? logger$lzycompute() : this.logger;
        }

        private int[] keyIndices() {
            return this.keyIndices;
        }

        private int[] valueIndices() {
            return this.valueIndices;
        }

        private int tsIndex() {
            return this.tsIndex;
        }

        private org.apache.spark.sql.types.StructType keySparkSchema() {
            return this.keySparkSchema;
        }

        private StructType keySchema() {
            return this.keySchema;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v0 */
        /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r0v10, types: [ai.chronon.spark.streaming.JoinSourceRunner$PutRequestHelper] */
        private Function1<Object, byte[]> keyToBytes$lzycompute() {
            ?? r0 = this;
            synchronized (r0) {
                if (((byte) (this.bitmap$trans$0 & 2)) == 0) {
                    this.keyToBytes = AvroConversions$.MODULE$.encodeBytes(keySchema(), GenericRowHandler$.MODULE$.func());
                    r0 = this;
                    r0.bitmap$trans$0 = (byte) (this.bitmap$trans$0 | 2);
                }
            }
            return this.keyToBytes;
        }

        private Function1<Object, byte[]> keyToBytes() {
            return ((byte) (this.bitmap$trans$0 & 2)) == 0 ? keyToBytes$lzycompute() : this.keyToBytes;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v0 */
        /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r0v10, types: [ai.chronon.spark.streaming.JoinSourceRunner$PutRequestHelper] */
        private Function1<Object, byte[]> valueToBytes$lzycompute() {
            ?? r0 = this;
            synchronized (r0) {
                if (((byte) (this.bitmap$trans$0 & 4)) == 0) {
                    this.valueToBytes = AvroConversions$.MODULE$.encodeBytes(ai$chronon$spark$streaming$JoinSourceRunner$PutRequestHelper$$$outer().valueZSchema(), GenericRowHandler$.MODULE$.func());
                    r0 = this;
                    r0.bitmap$trans$0 = (byte) (this.bitmap$trans$0 | 4);
                }
            }
            return this.valueToBytes;
        }

        private Function1<Object, byte[]> valueToBytes() {
            return ((byte) (this.bitmap$trans$0 & 4)) == 0 ? valueToBytes$lzycompute() : this.valueToBytes;
        }

        private String streamingDataset() {
            return this.streamingDataset;
        }

        public KVStore.PutRequest toPutRequest(Row row) {
            Object[] objArr = (Object[]) ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.intArrayOps(keyIndices()), obj -> {
                return row.get(BoxesRunTime.unboxToInt(obj));
            }, ClassTag$.MODULE$.Any());
            Object[] objArr2 = (Object[]) ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.intArrayOps(valueIndices()), obj2 -> {
                return row.get(BoxesRunTime.unboxToInt(obj2));
            }, ClassTag$.MODULE$.Any());
            ai$chronon$spark$streaming$JoinSourceRunner$PutRequestHelper$$$outer().context().distribution(Metrics$Name$.MODULE$.PutKeyNullPercent(), (ArrayOps$.MODULE$.count$extension(Predef$.MODULE$.genericArrayOps(objArr), obj3 -> {
                return BoxesRunTime.boxToBoolean($anonfun$toPutRequest$3(obj3));
            }) * 100) / objArr.length);
            ai$chronon$spark$streaming$JoinSourceRunner$PutRequestHelper$$$outer().context().distribution(Metrics$Name$.MODULE$.PutValueNullPercent(), (ArrayOps$.MODULE$.count$extension(Predef$.MODULE$.genericArrayOps(objArr2), obj4 -> {
                return BoxesRunTime.boxToBoolean($anonfun$toPutRequest$4(obj4));
            }) * 100) / objArr2.length);
            long unboxToLong = BoxesRunTime.unboxToLong(row.get(tsIndex()));
            byte[] mo2005apply = keyToBytes().mo2005apply(objArr);
            byte[] mo2005apply2 = valueToBytes().mo2005apply(objArr2);
            if (ai$chronon$spark$streaming$JoinSourceRunner$PutRequestHelper$$$outer().ai$chronon$spark$streaming$JoinSourceRunner$$debug) {
                Gson gson = new Gson();
                logger().info(StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString(new StringBuilder(169).append("\n             |dataset: ").append(streamingDataset()).append("\n             |keys: ").append(gson.toJson(objArr)).append("\n             |values: ").append(gson.toJson(objArr2)).append("\n             |keyBytes: ").append(Base64.getEncoder().encodeToString(mo2005apply)).append("\n             |valueBytes: ").append(Base64.getEncoder().encodeToString(mo2005apply2)).append("\n             |ts: ").append(unboxToLong).append("|  UTC: ").append(DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC)).format(Instant.ofEpochMilli(unboxToLong))).append("| PST: ").append(DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("America/Los_Angeles")).format(Instant.ofEpochMilli(unboxToLong))).append("\n             |").toString())));
            }
            return new KVStore.PutRequest(mo2005apply, mo2005apply2, streamingDataset(), Option$.MODULE$.apply(BoxesRunTime.boxToLong(unboxToLong)));
        }

        public PutRequestHelper copy(org.apache.spark.sql.types.StructType structType) {
            return new PutRequestHelper(ai$chronon$spark$streaming$JoinSourceRunner$PutRequestHelper$$$outer(), structType);
        }

        public org.apache.spark.sql.types.StructType copy$default$1() {
            return inputSchema();
        }

        @Override // scala.Product
        public String productPrefix() {
            return "PutRequestHelper";
        }

        @Override // scala.Product
        public int productArity() {
            return 1;
        }

        @Override // scala.Product
        public Object productElement(int i) {
            switch (i) {
                case 0:
                    return inputSchema();
                default:
                    return Statics.ioobe(i);
            }
        }

        @Override // scala.Product
        public Iterator<Object> productIterator() {
            return ScalaRunTime$.MODULE$.typedProductIterator(this);
        }

        @Override // scala.Equals
        public boolean canEqual(Object obj) {
            return obj instanceof PutRequestHelper;
        }

        @Override // scala.Product
        public String productElementName(int i) {
            switch (i) {
                case 0:
                    return "inputSchema";
                default:
                    return (String) Statics.ioobe(i);
            }
        }

        public int hashCode() {
            return ScalaRunTime$.MODULE$._hashCode(this);
        }

        public String toString() {
            return ScalaRunTime$.MODULE$._toString(this);
        }

        @Override // scala.Equals
        public boolean equals(Object obj) {
            boolean z;
            if (this != obj) {
                if ((obj instanceof PutRequestHelper) && ((PutRequestHelper) obj).ai$chronon$spark$streaming$JoinSourceRunner$PutRequestHelper$$$outer() == ai$chronon$spark$streaming$JoinSourceRunner$PutRequestHelper$$$outer()) {
                    PutRequestHelper putRequestHelper = (PutRequestHelper) obj;
                    org.apache.spark.sql.types.StructType inputSchema = inputSchema();
                    org.apache.spark.sql.types.StructType inputSchema2 = putRequestHelper.inputSchema();
                    if (inputSchema != null ? inputSchema.equals(inputSchema2) : inputSchema2 == null) {
                        if (putRequestHelper.canEqual(this)) {
                            z = true;
                            if (!z) {
                            }
                        }
                    }
                    z = false;
                    if (!z) {
                    }
                }
                return false;
            }
            return true;
        }

        public /* synthetic */ JoinSourceRunner ai$chronon$spark$streaming$JoinSourceRunner$PutRequestHelper$$$outer() {
            return this.$outer;
        }

        public static final /* synthetic */ int $anonfun$keyIndices$1(PutRequestHelper putRequestHelper, String str) {
            return putRequestHelper.inputSchema().fieldIndex(str);
        }

        public static final /* synthetic */ int $anonfun$valueIndices$1(PutRequestHelper putRequestHelper, String str) {
            return putRequestHelper.inputSchema().fieldIndex(str);
        }

        public static final /* synthetic */ boolean $anonfun$toPutRequest$3(Object obj) {
            return obj == null;
        }

        public static final /* synthetic */ boolean $anonfun$toPutRequest$4(Object obj) {
            return obj == null;
        }

        public PutRequestHelper(JoinSourceRunner joinSourceRunner, org.apache.spark.sql.types.StructType structType) {
            this.inputSchema = structType;
            if (joinSourceRunner == null) {
                throw null;
            }
            this.$outer = joinSourceRunner;
            Product.$init$(this);
            this.keyIndices = (int[]) ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps(joinSourceRunner.keyColumns()), str -> {
                return BoxesRunTime.boxToInteger($anonfun$keyIndices$1(this, str));
            }, ClassTag$.MODULE$.Int());
            this.valueIndices = (int[]) ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps(joinSourceRunner.valueColumns()), str2 -> {
                return BoxesRunTime.boxToInteger($anonfun$valueIndices$1(this, str2));
            }, ClassTag$.MODULE$.Int());
            this.tsIndex = structType.fieldIndex(joinSourceRunner.eventTimeColumn());
            this.keySparkSchema = new org.apache.spark.sql.types.StructType((StructField[]) ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.intArrayOps(keyIndices()), structType, ClassTag$.MODULE$.apply(StructField.class)));
            this.keySchema = SparkConversions$.MODULE$.toChrononStruct("key", keySparkSchema());
            this.streamingDataset = Extensions$.MODULE$.GroupByOps(joinSourceRunner.ai$chronon$spark$streaming$JoinSourceRunner$$groupByConf).streamingDataset();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* compiled from: JoinSourceRunner.scala */
    /* loaded from: input_file:ai/chronon/spark/streaming/JoinSourceRunner$Schemas.class */
    public class Schemas implements Serializable, Product {
        private final org.apache.spark.sql.types.StructType leftStreamSchema;
        private final org.apache.spark.sql.types.StructType leftSourceSchema;
        private final org.apache.spark.sql.types.StructType joinSchema;
        private final org.apache.spark.sql.types.StructType joinSourceSchema;
        public final /* synthetic */ JoinSourceRunner $outer;

        @Override // scala.Product
        public Iterator<String> productElementNames() {
            Iterator<String> productElementNames;
            productElementNames = productElementNames();
            return productElementNames;
        }

        public org.apache.spark.sql.types.StructType leftStreamSchema() {
            return this.leftStreamSchema;
        }

        public org.apache.spark.sql.types.StructType leftSourceSchema() {
            return this.leftSourceSchema;
        }

        public org.apache.spark.sql.types.StructType joinSchema() {
            return this.joinSchema;
        }

        public org.apache.spark.sql.types.StructType joinSourceSchema() {
            return this.joinSourceSchema;
        }

        public Schemas copy(org.apache.spark.sql.types.StructType structType, org.apache.spark.sql.types.StructType structType2, org.apache.spark.sql.types.StructType structType3, org.apache.spark.sql.types.StructType structType4) {
            return new Schemas(ai$chronon$spark$streaming$JoinSourceRunner$Schemas$$$outer(), structType, structType2, structType3, structType4);
        }

        public org.apache.spark.sql.types.StructType copy$default$1() {
            return leftStreamSchema();
        }

        public org.apache.spark.sql.types.StructType copy$default$2() {
            return leftSourceSchema();
        }

        public org.apache.spark.sql.types.StructType copy$default$3() {
            return joinSchema();
        }

        public org.apache.spark.sql.types.StructType copy$default$4() {
            return joinSourceSchema();
        }

        @Override // scala.Product
        public String productPrefix() {
            return "Schemas";
        }

        @Override // scala.Product
        public int productArity() {
            return 4;
        }

        @Override // scala.Product
        public Object productElement(int i) {
            switch (i) {
                case 0:
                    return leftStreamSchema();
                case 1:
                    return leftSourceSchema();
                case 2:
                    return joinSchema();
                case 3:
                    return joinSourceSchema();
                default:
                    return Statics.ioobe(i);
            }
        }

        @Override // scala.Product
        public Iterator<Object> productIterator() {
            return ScalaRunTime$.MODULE$.typedProductIterator(this);
        }

        @Override // scala.Equals
        public boolean canEqual(Object obj) {
            return obj instanceof Schemas;
        }

        @Override // scala.Product
        public String productElementName(int i) {
            switch (i) {
                case 0:
                    return "leftStreamSchema";
                case 1:
                    return "leftSourceSchema";
                case 2:
                    return "joinSchema";
                case 3:
                    return "joinSourceSchema";
                default:
                    return (String) Statics.ioobe(i);
            }
        }

        public int hashCode() {
            return ScalaRunTime$.MODULE$._hashCode(this);
        }

        public String toString() {
            return ScalaRunTime$.MODULE$._toString(this);
        }

        @Override // scala.Equals
        public boolean equals(Object obj) {
            boolean z;
            if (this != obj) {
                if ((obj instanceof Schemas) && ((Schemas) obj).ai$chronon$spark$streaming$JoinSourceRunner$Schemas$$$outer() == ai$chronon$spark$streaming$JoinSourceRunner$Schemas$$$outer()) {
                    Schemas schemas = (Schemas) obj;
                    org.apache.spark.sql.types.StructType leftStreamSchema = leftStreamSchema();
                    org.apache.spark.sql.types.StructType leftStreamSchema2 = schemas.leftStreamSchema();
                    if (leftStreamSchema != null ? leftStreamSchema.equals(leftStreamSchema2) : leftStreamSchema2 == null) {
                        org.apache.spark.sql.types.StructType leftSourceSchema = leftSourceSchema();
                        org.apache.spark.sql.types.StructType leftSourceSchema2 = schemas.leftSourceSchema();
                        if (leftSourceSchema != null ? leftSourceSchema.equals(leftSourceSchema2) : leftSourceSchema2 == null) {
                            org.apache.spark.sql.types.StructType joinSchema = joinSchema();
                            org.apache.spark.sql.types.StructType joinSchema2 = schemas.joinSchema();
                            if (joinSchema != null ? joinSchema.equals(joinSchema2) : joinSchema2 == null) {
                                org.apache.spark.sql.types.StructType joinSourceSchema = joinSourceSchema();
                                org.apache.spark.sql.types.StructType joinSourceSchema2 = schemas.joinSourceSchema();
                                if (joinSourceSchema != null ? joinSourceSchema.equals(joinSourceSchema2) : joinSourceSchema2 == null) {
                                    if (schemas.canEqual(this)) {
                                        z = true;
                                        if (!z) {
                                        }
                                    }
                                }
                            }
                        }
                    }
                    z = false;
                    if (!z) {
                    }
                }
                return false;
            }
            return true;
        }

        public /* synthetic */ JoinSourceRunner ai$chronon$spark$streaming$JoinSourceRunner$Schemas$$$outer() {
            return this.$outer;
        }

        public Schemas(JoinSourceRunner joinSourceRunner, org.apache.spark.sql.types.StructType structType, org.apache.spark.sql.types.StructType structType2, org.apache.spark.sql.types.StructType structType3, org.apache.spark.sql.types.StructType structType4) {
            this.leftStreamSchema = structType;
            this.leftSourceSchema = structType2;
            this.joinSchema = structType3;
            this.joinSourceSchema = structType4;
            if (joinSourceRunner == null) {
                throw null;
            }
            this.$outer = joinSourceRunner;
            Product.$init$(this);
        }
    }

    private JoinSourceRunner$Schemas$ Schemas() {
        if (this.Schemas$module == null) {
            Schemas$lzycompute$1();
        }
        return this.Schemas$module;
    }

    private JoinSourceRunner$PutRequestHelper$ PutRequestHelper() {
        if (this.PutRequestHelper$module == null) {
            PutRequestHelper$lzycompute$1();
        }
        return this.PutRequestHelper$module;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [ai.chronon.spark.streaming.JoinSourceRunner] */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$trans$0) {
                this.logger = LoggerFactory.getLogger(getClass());
                r0 = this;
                r0.bitmap$trans$0 = true;
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return !this.bitmap$trans$0 ? logger$lzycompute() : this.logger;
    }

    public Metrics.Context context() {
        return this.context;
    }

    public StructType valueZSchema() {
        return this.valueZSchema;
    }

    public Seq<String> additionalColumns() {
        return this.additionalColumns;
    }

    public String eventTimeColumn() {
        return this.eventTimeColumn;
    }

    public String[] keyColumns() {
        return this.keyColumns;
    }

    public String[] valueColumns() {
        return this.valueColumns;
    }

    private String getProp(String str, String str2) {
        return this.session.conf().get(new StringBuilder(27).append("spark.chronon.stream.chain.").append(str).toString(), str2);
    }

    public boolean ai$chronon$spark$streaming$JoinSourceRunner$$useEventTimeForQuery() {
        return this.ai$chronon$spark$streaming$JoinSourceRunner$$useEventTimeForQuery;
    }

    public double ai$chronon$spark$streaming$JoinSourceRunner$$timePercentile() {
        return this.ai$chronon$spark$streaming$JoinSourceRunner$$timePercentile;
    }

    public int ai$chronon$spark$streaming$JoinSourceRunner$$minimumQueryDelayMs() {
        return this.ai$chronon$spark$streaming$JoinSourceRunner$$minimumQueryDelayMs;
    }

    public int ai$chronon$spark$streaming$JoinSourceRunner$$queryShiftMs() {
        return this.ai$chronon$spark$streaming$JoinSourceRunner$$queryShiftMs;
    }

    private int microBatchIntervalMillis() {
        return this.microBatchIntervalMillis;
    }

    public org.apache.spark.sql.types.StructType outputSchema(org.apache.spark.sql.types.StructType structType, Query query, SparkSession sparkSession) {
        if (query.selects == null) {
            return structType;
        }
        return sparkSession.createDataFrame(sparkSession.sparkContext().emptyRDD(ClassTag$.MODULE$.apply(Row.class)), structType).selectExpr(ScalaJavaConversions$.MODULE$.MapOps(query.selects).toScala().map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            return new StringBuilder(6).append("(").append(((String) tuple2.mo1986_2()).toLowerCase()).append(") AS ").append((String) tuple2.mo1987_1()).toString();
        }).toSeq()).schema();
    }

    private Query enrichQuery(Query query) {
        Query deepCopy = query.deepCopy();
        if (Extensions$.MODULE$.GroupByOps(this.ai$chronon$spark$streaming$JoinSourceRunner$$groupByConf).streamingSource().get().getJoinSource().getJoin().getLeft().isSetEntities()) {
            deepCopy.selects.put(Constants$.MODULE$.ReversalColumn(), Constants$.MODULE$.ReversalColumn());
            deepCopy.selects.put(Constants$.MODULE$.MutationTimeColumn(), Constants$.MODULE$.MutationTimeColumn());
        } else if (query.isSetTimeColumn()) {
            deepCopy.selects.put(Constants$.MODULE$.TimeColumn(), deepCopy.timeColumn);
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        return deepCopy;
    }

    private Schemas buildSchemas(org.apache.spark.sql.types.StructType structType) {
        Option<Source> streamingSource = Extensions$.MODULE$.GroupByOps(this.ai$chronon$spark$streaming$JoinSourceRunner$$groupByConf).streamingSource();
        Predef$.MODULE$.m1936assert(streamingSource.get().isSetJoinSource(), () -> {
            return new StringBuilder(36).append("No JoinSource found in the groupBy: ").append(this.ai$chronon$spark$streaming$JoinSourceRunner$$groupByConf.metaData.name).toString();
        });
        Predef$.MODULE$.m1936assert(streamingSource.isDefined(), () -> {
            return new StringBuilder(44).append("No streaming source present in the groupBy: ").append(this.ai$chronon$spark$streaming$JoinSourceRunner$$groupByConf.metaData.name).toString();
        });
        JoinSource joinSource = streamingSource.get().getJoinSource();
        Source left = joinSource.getJoin().getLeft();
        Predef$.MODULE$.m1936assert(Extensions$.MODULE$.SourceOps(left).topic() != null, () -> {
            return "join source left side should have a topic";
        });
        org.apache.spark.sql.types.StructType apply = StructType$.MODULE$.apply(package$.MODULE$.Seq().apply2((Seq) ScalaRunTime$.MODULE$.wrapRefArray(new StructField[]{new StructField(Constants$.MODULE$.ReversalColumn(), BooleanType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), new StructField(Constants$.MODULE$.MutationTimeColumn(), LongType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4())})));
        org.apache.spark.sql.types.StructType structType2 = structType;
        if (left.isSetEntities()) {
            structType2 = StructType$.MODULE$.apply((Seq) ((SeqOps) apply.$plus$plus(structType2)).distinct());
        }
        org.apache.spark.sql.types.StructType outputSchema = outputSchema(structType2, enrichQuery(Extensions$.MODULE$.SourceOps(left).query()), this.session);
        org.apache.spark.sql.types.StructType apply2 = StructType$.MODULE$.apply((Seq) outputSchema.$plus$plus(SparkConversions$.MODULE$.fromChrononSchema(this.ai$chronon$spark$streaming$JoinSourceRunner$$apiImpl.buildFetcher(this.ai$chronon$spark$streaming$JoinSourceRunner$$debug, this.ai$chronon$spark$streaming$JoinSourceRunner$$apiImpl.buildFetcher$default$2()).buildJoinCodec(joinSource.getJoin()).valueSchema())));
        org.apache.spark.sql.types.StructType outputSchema2 = outputSchema(apply2, enrichQuery(joinSource.query), this.session);
        logger().info(StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString(new StringBuilder(281).append("\n       |Schemas across chain of transformations\n       |leftSchema:\n       |  ").append(structType.catalogString()).append("\n       |left stream Schema:\n       |  ").append(structType2.catalogString()).append("\n       |left schema after applying left query:\n       |  ").append(outputSchema.catalogString()).append("\n       |join schema:\n       |  ").append(apply2.catalogString()).append("\n       |join schema after applying joinSource.query:\n       |  ").append(outputSchema2.catalogString()).append("\n       |").toString())));
        return new Schemas(this, structType2, outputSchema, apply2, outputSchema2);
    }

    private GroupByServingInfoParsed servingInfoProxy() {
        return this.ai$chronon$spark$streaming$JoinSourceRunner$$apiImpl.buildFetcher(this.ai$chronon$spark$streaming$JoinSourceRunner$$debug, this.ai$chronon$spark$streaming$JoinSourceRunner$$apiImpl.buildFetcher$default$2()).getGroupByServingInfo().apply(this.ai$chronon$spark$streaming$JoinSourceRunner$$groupByConf.getMetaData().getName()).get();
    }

    private DataStream decode(DataStream dataStream) {
        StreamDecoder streamDecoder = this.ai$chronon$spark$streaming$JoinSourceRunner$$apiImpl.streamDecoder(servingInfoProxy());
        Dataset<Row> df = dataStream.df();
        Metrics.Context withSuffix = context().withSuffix("ingress");
        Dataset filter = df.as(this.session.implicits().newByteArrayEncoder()).map(bArr -> {
            withSuffix.increment(Metrics$Name$.MODULE$.RowCount());
            withSuffix.count(Metrics$Name$.MODULE$.Bytes(), bArr.length);
            try {
                return streamDecoder.decode(bArr);
            } catch (Throwable th) {
                this.logger().info(new StringBuilder(51).append("Error while decoding streaming events from stream: ").append(dataStream.topicInfo().name()).toString());
                th.printStackTrace();
                withSuffix.incrementException(th, this.logger());
                return null;
            }
        }, Encoders$.MODULE$.kryo(ClassTag$.MODULE$.apply(Mutation.class))).filter(mutation -> {
            return BoxesRunTime.boxToBoolean($anonfun$decode$2(mutation));
        });
        org.apache.spark.sql.types.StructType fromChrononSchema = SparkConversions$.MODULE$.fromChrononSchema(streamDecoder.schema());
        logger().info(StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString(new StringBuilder(99).append("\n         | streaming source: ").append(Extensions$.MODULE$.GroupByOps(this.ai$chronon$spark$streaming$JoinSourceRunner$$groupByConf).streamingSource().get()).append("\n         | streaming dataset: ").append(Extensions$.MODULE$.GroupByOps(this.ai$chronon$spark$streaming$JoinSourceRunner$$groupByConf).streamingDataset()).append("\n         | stream schema: ").append(fromChrononSchema.catalogString()).append("\n         |").toString())));
        return dataStream.copy(filter.flatMap(mutation2 -> {
            return (Seq) package$.MODULE$.Seq().apply2((Seq) ScalaRunTime$.MODULE$.wrapRefArray(new Object[]{mutation2.after(), mutation2.before()})).filter(objArr -> {
                return BoxesRunTime.boxToBoolean($anonfun$decode$4(objArr));
            }).map(objArr2 -> {
                return (Row) SparkConversions$.MODULE$.toSparkRow(objArr2, streamDecoder.schema(), GenericRowHandler$.MODULE$.func());
            });
        }, RowEncoder$.MODULE$.apply(fromChrononSchema)), dataStream.copy$default$2(), dataStream.copy$default$3());
    }

    private StreamBuilder internalStreamBuilder(String str) {
        StreamBuilder generateStreamBuilder = this.ai$chronon$spark$streaming$JoinSourceRunner$$apiImpl.generateStreamBuilder(str);
        if (generateStreamBuilder != null) {
            return generateStreamBuilder;
        }
        if (str != null ? !str.equals("kafka") : "kafka" != 0) {
            throw new RuntimeException(new StringBuilder(95).append("Couldn't access builder for type ").append(str).append(". Please implement one by overriding Api.generateStreamBuilder").toString());
        }
        return KafkaStreamBuilder$.MODULE$;
    }

    private DataStream buildStream(TopicInfo topicInfo) {
        return internalStreamBuilder(topicInfo.topicType()).from(topicInfo, this.session, this.conf);
    }

    public Option<Object> percentile(long[] jArr, double d) {
        if (jArr == null || jArr.length == 0) {
            return None$.MODULE$;
        }
        return new Some(BoxesRunTime.boxToLong(((long[]) ArrayOps$.MODULE$.sorted$extension(Predef$.MODULE$.longArrayOps(jArr), Ordering$Long$.MODULE$))[(int) scala.math.package$.MODULE$.ceil((r0.length - 1) * d)]));
    }

    public DataStreamWriter<Row> chainedStreamingQuery() {
        JoinSource joinSource = Extensions$.MODULE$.GroupByOps(this.ai$chronon$spark$streaming$JoinSourceRunner$$groupByConf).streamingSource().get().getJoinSource();
        Source source = joinSource.join.left;
        DataStream decode = decode(buildStream(TopicInfo$.MODULE$.parse(Extensions$.MODULE$.SourceOps(source).topic())));
        String buildLeftStreamingQuery = Extensions$.MODULE$.GroupByOps(this.ai$chronon$spark$streaming$JoinSourceRunner$$groupByConf).buildLeftStreamingQuery(Extensions$.MODULE$.SourceOps(source).query(), ArrayOps$.MODULE$.toSeq$extension(Predef$.MODULE$.refArrayOps(decode.df().schema().fieldNames())));
        Dataset applyQuery$1 = applyQuery$1(decode.df(), Extensions$.MODULE$.SourceOps(source).query(), decode);
        final String replaceFirst = joinSource.join.metaData.getName().replaceFirst("\\.", "/");
        logger().info(new StringBuilder(28).append("Upstream join request name: ").append(replaceFirst).toString());
        Seq seq = new TableUtils(this.session).getColumnsFromQuery(buildLeftStreamingQuery).map(str -> {
            return str.toLowerCase();
        }).toSet().toSeq();
        Schemas buildSchemas = buildSchemas(new org.apache.spark.sql.types.StructType((StructField[]) ((IterableOnceOps) decode.df().schema().filter(structField -> {
            return BoxesRunTime.boxToBoolean($anonfun$chainedStreamingQuery$6(seq, structField));
        })).toSet().toArray(ClassTag$.MODULE$.apply(StructField.class))));
        final Tuple2<String, DataType>[] chrononSchema = SparkConversions$.MODULE$.toChrononSchema(buildSchemas.joinSchema());
        ExpressionEncoder apply = RowEncoder$.MODULE$.apply(buildSchemas.joinSchema());
        final String[] fieldNames = buildSchemas.joinSchema().fieldNames();
        final String[] fieldNames2 = buildSchemas.leftSourceSchema().fieldNames();
        logger().info(StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString(new StringBuilder(134).append("\n         |left columns ").append(Predef$.MODULE$.wrapRefArray(fieldNames2).mkString(",")).append("\n         |reqColumns ").append(seq.mkString(",")).append("\n         |Fetching upstream join to enrich the stream... Fetching lag time: ").append(this.lagMillis).append("\n         |").toString())));
        Object refArrayOps = Predef$.MODULE$.refArrayOps(fieldNames2);
        final int indexWhere$extension = ArrayOps$.MODULE$.indexWhere$extension(refArrayOps, str2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$chainedStreamingQuery$8(this, str2));
        }, ArrayOps$.MODULE$.indexWhere$default$2$extension(refArrayOps));
        Dataset applyQuery$12 = applyQuery$1(applyQuery$1.mapPartitions(new MapPartitionsFunction<Row, Row>(this, fieldNames2, indexWhere$extension, replaceFirst, fieldNames, chrononSchema) { // from class: ai.chronon.spark.streaming.JoinSourceRunner$$anon$1
            private final /* synthetic */ JoinSourceRunner $outer;
            private final String[] leftColumns$1;
            private final int leftTimeIndex$1;
            private final String joinRequestName$1;
            private final String[] joinFields$1;
            private final Tuple2[] joinChrononSchema$1;

            public java.util.Iterator<Row> call(java.util.Iterator<Row> it) {
                boolean z = Math.random() <= 0.1d;
                Fetcher orSetFetcher = LocalIOCache$.MODULE$.getOrSetFetcher(() -> {
                    this.$outer.logger().info(new StringBuilder(22).append("Initializing Fetcher. ").append(System.currentTimeMillis()).toString());
                    this.$outer.context().increment("chain.fetcher.init");
                    return this.$outer.ai$chronon$spark$streaming$JoinSourceRunner$$apiImpl.buildFetcher(this.$outer.ai$chronon$spark$streaming$JoinSourceRunner$$debug, this.$outer.ai$chronon$spark$streaming$JoinSourceRunner$$apiImpl.buildFetcher$default$2());
                });
                Row[] rowArr = (Row[]) ScalaJavaConversions$.MODULE$.IteratorOps(it).toScala().toArray(ClassTag$.MODULE$.apply(Row.class));
                Fetcher.Request[] requestArr = (Fetcher.Request[]) ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps(rowArr), row -> {
                    Map valuesMap = row.getValuesMap(Predef$.MODULE$.copyArrayToImmutableIndexedSeq(this.leftColumns$1));
                    long unboxToLong = BoxesRunTime.unboxToLong(row.get(this.leftTimeIndex$1));
                    this.$outer.context().distribution(Metrics$Name$.MODULE$.LagMillis(), System.currentTimeMillis() - unboxToLong);
                    return new Fetcher.Request(this.joinRequestName$1, valuesMap, (this.$outer.ai$chronon$spark$streaming$JoinSourceRunner$$useEventTimeForQuery() ? new Some(BoxesRunTime.boxToLong(unboxToLong)) : None$.MODULE$).map(j -> {
                        return j + this.$outer.ai$chronon$spark$streaming$JoinSourceRunner$$queryShiftMs();
                    }), Fetcher$Request$.MODULE$.apply$default$4());
                }, ClassTag$.MODULE$.apply(Fetcher.Request.class));
                Option<Object> percentile = this.$outer.percentile((long[]) ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps(rowArr), row2 -> {
                    return BoxesRunTime.boxToLong($anonfun$call$4(this, row2));
                }, ClassTag$.MODULE$.Long()), this.$outer.ai$chronon$spark$streaming$JoinSourceRunner$$timePercentile());
                if (percentile.isDefined()) {
                    long currentTimeMillis = System.currentTimeMillis() - BoxesRunTime.unboxToLong(percentile.get());
                    this.$outer.context().distribution(Metrics$Name$.MODULE$.BatchLagMillis(), currentTimeMillis);
                    if (this.$outer.ai$chronon$spark$streaming$JoinSourceRunner$$minimumQueryDelayMs() > 0 && currentTimeMillis >= 0 && currentTimeMillis < this.$outer.ai$chronon$spark$streaming$JoinSourceRunner$$minimumQueryDelayMs()) {
                        long ai$chronon$spark$streaming$JoinSourceRunner$$minimumQueryDelayMs = this.$outer.ai$chronon$spark$streaming$JoinSourceRunner$$minimumQueryDelayMs() - currentTimeMillis;
                        Thread.sleep(ai$chronon$spark$streaming$JoinSourceRunner$$minimumQueryDelayMs);
                        this.$outer.context().distribution(Metrics$Name$.MODULE$.QueryDelaySleepMillis(), ai$chronon$spark$streaming$JoinSourceRunner$$minimumQueryDelayMs);
                    }
                }
                if (this.$outer.ai$chronon$spark$streaming$JoinSourceRunner$$debug && z) {
                    ArrayOps$.MODULE$.foreach$extension(Predef$.MODULE$.refArrayOps(requestArr), request -> {
                        $anonfun$call$5(this, request);
                        return BoxedUnit.UNIT;
                    });
                }
                scala.collection.Seq seq2 = (scala.collection.Seq) Await$.MODULE$.result(orSetFetcher.fetchJoin(ArrayOps$.MODULE$.toSeq$extension(Predef$.MODULE$.refArrayOps(requestArr)), orSetFetcher.fetchJoin$default$2()), new Cpackage.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(5)).second());
                if (this.$outer.ai$chronon$spark$streaming$JoinSourceRunner$$debug && z) {
                    this.$outer.logger().info(new StringBuilder(39).append("responses/request size: ").append(seq2.size()).append("/").append(ArrayOps$.MODULE$.size$extension(Predef$.MODULE$.refArrayOps(requestArr))).append("\n  responses: ").append(seq2).toString());
                    seq2.foreach(response -> {
                        $anonfun$call$6(this, response);
                        return BoxedUnit.UNIT;
                    });
                }
                return ScalaJavaConversions$.MODULE$.JIteratorOps(seq2.iterator().map(response2 -> {
                    Map map = (Map) response2.request().keys().$plus$plus2((IterableOnce) response2.values().get());
                    Fetcher$.MODULE$.logResponseStats(response2, this.$outer.context());
                    return (Row) SparkConversions$.MODULE$.toSparkRow(ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps(this.joinFields$1), str3 -> {
                        return map.getOrElse(str3, () -> {
                            return null;
                        });
                    }, ClassTag$.MODULE$.AnyRef()), ai.chronon.api.StructType$.MODULE$.from("record", this.joinChrononSchema$1), SparkConversions$.MODULE$.toSparkRow$default$3());
                })).toJava();
            }

            public static final /* synthetic */ long $anonfun$call$4(JoinSourceRunner$$anon$1 joinSourceRunner$$anon$1, Row row) {
                return BoxesRunTime.unboxToLong(row.get(joinSourceRunner$$anon$1.leftTimeIndex$1));
            }

            public static final /* synthetic */ void $anonfun$call$5(JoinSourceRunner$$anon$1 joinSourceRunner$$anon$1, Fetcher.Request request) {
                joinSourceRunner$$anon$1.$outer.logger().info(new StringBuilder(15).append("request: ").append(request.keys()).append(", ts: ").append(request.atMillis()).toString());
            }

            public static final /* synthetic */ void $anonfun$call$6(JoinSourceRunner$$anon$1 joinSourceRunner$$anon$1, Fetcher.Response response) {
                joinSourceRunner$$anon$1.$outer.logger().info(new StringBuilder(25).append("request: ").append(response.request().keys()).append(", ts: ").append(response.request().atMillis()).append(", values: ").append(response.values()).toString());
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.leftColumns$1 = fieldNames2;
                this.leftTimeIndex$1 = indexWhere$extension;
                this.joinRequestName$1 = replaceFirst;
                this.joinFields$1 = fieldNames;
                this.joinChrononSchema$1 = chrononSchema;
            }
        }, apply), joinSource.query, decode);
        DataStreamWriter trigger = applyQuery$12.writeStream().outputMode("append").trigger(Trigger.ProcessingTime(microBatchIntervalMillis()));
        final PutRequestHelper putRequestHelper = new PutRequestHelper(this, applyQuery$12.schema());
        return trigger.foreachBatch(new VoidFunction2<Dataset<Row>, Long>(this, putRequestHelper) { // from class: ai.chronon.spark.streaming.JoinSourceRunner$$anon$2
            private final /* synthetic */ JoinSourceRunner $outer;
            private final JoinSourceRunner.PutRequestHelper putRequestHelper$1;

            public void call(Dataset<Row> dataset, Long l) {
                KVStore orSetKvStore = LocalIOCache$.MODULE$.getOrSetKvStore(() -> {
                    return this.$outer.ai$chronon$spark$streaming$JoinSourceRunner$$apiImpl.genKvStore();
                });
                Row[] rowArr = (Row[]) dataset.collect();
                KVStore.PutRequest[] putRequestArr = (KVStore.PutRequest[]) ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps(rowArr), row -> {
                    return this.putRequestHelper$1.toPutRequest(row);
                }, ClassTag$.MODULE$.apply(KVStore.PutRequest.class));
                if (this.$outer.ai$chronon$spark$streaming$JoinSourceRunner$$debug) {
                    this.$outer.logger().info(new StringBuilder(25).append(" Final df size to write: ").append(rowArr.length).toString());
                    this.$outer.logger().info(new StringBuilder(34).append(" Size of putRequests to kv store- ").append(putRequestArr.length).toString());
                } else {
                    ArrayOps$.MODULE$.foreach$extension(Predef$.MODULE$.refArrayOps(putRequestArr), putRequest -> {
                        $anonfun$call$12(this, putRequest);
                        return BoxedUnit.UNIT;
                    });
                    orSetKvStore.multiPut(Predef$.MODULE$.wrapRefArray(putRequestArr));
                }
            }

            public static final /* synthetic */ void $anonfun$call$12(JoinSourceRunner$$anon$2 joinSourceRunner$$anon$2, KVStore.PutRequest putRequest) {
                putRequest.tsMillis().foreach(j
                /*  JADX ERROR: Method code generation error
                    jadx.core.utils.exceptions.CodegenException: Error generate insn: 0x000d: INVOKE 
                      (wrap:scala.Option<java.lang.Object>:0x0001: INVOKE (r5v0 'putRequest' ai.chronon.online.KVStore$PutRequest) VIRTUAL call: ai.chronon.online.KVStore.PutRequest.tsMillis():scala.Option A[MD:():scala.Option<java.lang.Object> (m), WRAPPED])
                      (wrap:scala.runtime.java8.JFunction1$mcVJ$sp:0x0006: INVOKE_CUSTOM 
                      (wrap:ai.chronon.online.Metrics$Context:0x000a: INVOKE 
                      (wrap:ai.chronon.online.Metrics$Context:0x0005: INVOKE 
                      (wrap:ai.chronon.spark.streaming.JoinSourceRunner:0x0002: IGET (r4v0 'joinSourceRunner$$anon$2' ai.chronon.spark.streaming.JoinSourceRunner$$anon$2) A[WRAPPED] ai.chronon.spark.streaming.JoinSourceRunner$$anon$2.$outer ai.chronon.spark.streaming.JoinSourceRunner)
                     VIRTUAL call: ai.chronon.spark.streaming.JoinSourceRunner.context():ai.chronon.online.Metrics$Context A[MD:():ai.chronon.online.Metrics$Context (m), WRAPPED])
                      ("egress")
                     VIRTUAL call: ai.chronon.online.Metrics.Context.withSuffix(java.lang.String):ai.chronon.online.Metrics$Context A[MD:(java.lang.String):ai.chronon.online.Metrics$Context (m), WRAPPED])
                      (r5v0 'putRequest' ai.chronon.online.KVStore$PutRequest)
                     A[MD:(ai.chronon.online.Metrics$Context, ai.chronon.online.KVStore$PutRequest):scala.runtime.java8.JFunction1$mcVJ$sp (s), WRAPPED]
                     handle type: INVOKE_STATIC
                     lambda: scala.runtime.java8.JFunction1$mcVJ$sp.apply$mcVJ$sp(long):void
                     call insn: INVOKE (r1 I:ai.chronon.online.Metrics$Context), (r2 I:ai.chronon.online.KVStore$PutRequest), (v2 long) STATIC call: ai.chronon.spark.streaming.JoinSourceRunner.$anonfun$chainedStreamingQuery$9(ai.chronon.online.Metrics$Context, ai.chronon.online.KVStore$PutRequest, long):void A[MD:(ai.chronon.online.Metrics$Context, ai.chronon.online.KVStore$PutRequest, long):void (m)])
                     VIRTUAL call: scala.Option.foreach(scala.Function1):void A[MD:<U>:(scala.Function1<A, U>):void (m)] in method: ai.chronon.spark.streaming.JoinSourceRunner$$anon$2.$anonfun$call$12(ai.chronon.spark.streaming.JoinSourceRunner$$anon$2, ai.chronon.online.KVStore$PutRequest):void, file: input_file:ai/chronon/spark/streaming/JoinSourceRunner$$anon$2.class
                    	at jadx.core.codegen.InsnGen.makeInsn(InsnGen.java:310)
                    	at jadx.core.codegen.InsnGen.makeInsn(InsnGen.java:273)
                    	at jadx.core.codegen.RegionGen.makeSimpleBlock(RegionGen.java:94)
                    	at jadx.core.dex.nodes.IBlock.generate(IBlock.java:15)
                    	at jadx.core.codegen.RegionGen.makeRegion(RegionGen.java:66)
                    	at jadx.core.dex.regions.Region.generate(Region.java:35)
                    	at jadx.core.codegen.RegionGen.makeRegion(RegionGen.java:66)
                    	at jadx.core.codegen.MethodGen.addRegionInsns(MethodGen.java:297)
                    	at jadx.core.codegen.MethodGen.addInstructions(MethodGen.java:276)
                    	at jadx.core.codegen.ClassGen.addMethodCode(ClassGen.java:406)
                    	at jadx.core.codegen.ClassGen.addMethod(ClassGen.java:335)
                    	at jadx.core.codegen.ClassGen.lambda$addInnerClsAndMethods$3(ClassGen.java:301)
                    	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
                    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
                    	at java.base/java.util.stream.SortedOps$RefSortingSink.end(SortedOps.java:395)
                    	at java.base/java.util.stream.Sink$ChainedReference.end(Sink.java:261)
                    Caused by: jadx.core.utils.exceptions.JadxRuntimeException: Unexpected argument type in lambda call: InsnWrapArg
                    	at jadx.core.codegen.InsnGen.makeInlinedLambdaMethod(InsnGen.java:1043)
                    	at jadx.core.codegen.InsnGen.makeInvokeLambda(InsnGen.java:936)
                    	at jadx.core.codegen.InsnGen.makeInvoke(InsnGen.java:827)
                    	at jadx.core.codegen.InsnGen.makeInsnBody(InsnGen.java:422)
                    	at jadx.core.codegen.InsnGen.addWrappedArg(InsnGen.java:145)
                    	at jadx.core.codegen.InsnGen.addArg(InsnGen.java:121)
                    	at jadx.core.codegen.InsnGen.addArg(InsnGen.java:108)
                    	at jadx.core.codegen.InsnGen.generateMethodArguments(InsnGen.java:1117)
                    	at jadx.core.codegen.InsnGen.makeInvoke(InsnGen.java:884)
                    	at jadx.core.codegen.InsnGen.makeInsnBody(InsnGen.java:422)
                    	at jadx.core.codegen.InsnGen.makeInsn(InsnGen.java:303)
                    	... 15 more
                    */
                /*
                    r0 = r5
                    r1 = r4
                    ai.chronon.spark.streaming.JoinSourceRunner r1 = r1.$outer
                    ai.chronon.online.Metrics$Context r1 = r1.context()
                    java.lang.String r2 = "egress"
                    ai.chronon.online.Metrics$Context r1 = r1.withSuffix(r2)
                    ai.chronon.spark.streaming.JoinSourceRunner.ai$chronon$spark$streaming$JoinSourceRunner$$emitRequestMetric$1(r0, r1)
                    return
                */
                throw new UnsupportedOperationException("Method not decompiled: ai.chronon.spark.streaming.JoinSourceRunner$$anon$2.$anonfun$call$12(ai.chronon.spark.streaming.JoinSourceRunner$$anon$2, ai.chronon.online.KVStore$PutRequest):void");
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.putRequestHelper$1 = putRequestHelper;
            }
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5, types: [ai.chronon.spark.streaming.JoinSourceRunner] */
    private final void Schemas$lzycompute$1() {
        ?? r0 = this;
        synchronized (r0) {
            if (this.Schemas$module == null) {
                r0 = this;
                r0.Schemas$module = new JoinSourceRunner$Schemas$(this);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5, types: [ai.chronon.spark.streaming.JoinSourceRunner] */
    private final void PutRequestHelper$lzycompute$1() {
        ?? r0 = this;
        synchronized (r0) {
            if (this.PutRequestHelper$module == null) {
                r0 = this;
                r0.PutRequestHelper$module = new JoinSourceRunner$PutRequestHelper$(this);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static final /* synthetic */ boolean bothNull$lzycompute$1(LazyBoolean lazyBoolean, Mutation mutation) {
        boolean initialize;
        boolean z;
        synchronized (lazyBoolean) {
            if (lazyBoolean.initialized()) {
                initialize = lazyBoolean.value();
            } else {
                initialize = lazyBoolean.initialize((mutation.before() == null || mutation.after() == null) ? false : true);
            }
            z = initialize;
        }
        return z;
    }

    private static final boolean bothNull$1(LazyBoolean lazyBoolean, Mutation mutation) {
        return lazyBoolean.initialized() ? lazyBoolean.value() : bothNull$lzycompute$1(lazyBoolean, mutation);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static final /* synthetic */ boolean bothSame$lzycompute$1(LazyBoolean lazyBoolean, Mutation mutation) {
        boolean value;
        synchronized (lazyBoolean) {
            value = lazyBoolean.initialized() ? lazyBoolean.value() : lazyBoolean.initialize(Predef$.MODULE$.genericWrapArray(mutation.before()).sameElements(Predef$.MODULE$.genericWrapArray(mutation.after())));
        }
        return value;
    }

    private static final boolean bothSame$1(LazyBoolean lazyBoolean, Mutation mutation) {
        return lazyBoolean.initialized() ? lazyBoolean.value() : bothSame$lzycompute$1(lazyBoolean, mutation);
    }

    public static final /* synthetic */ boolean $anonfun$decode$2(Mutation mutation) {
        return (mutation == null || (bothNull$1(new LazyBoolean(), mutation) && bothSame$1(new LazyBoolean(), mutation))) ? false : true;
    }

    public static final /* synthetic */ boolean $anonfun$decode$4(Object[] objArr) {
        return objArr != null;
    }

    private final Dataset applyQuery$1(Dataset dataset, Query query, DataStream dataStream) {
        Extensions.GroupByOps.QueryParts buildQueryParts = Extensions$.MODULE$.GroupByOps(this.ai$chronon$spark$streaming$JoinSourceRunner$$groupByConf).buildQueryParts(query);
        logger().info(StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString(new StringBuilder(91).append("\n           |decoded schema: ").append(dataStream.df().schema().catalogString()).append("\n           |queryParts: ").append(buildQueryParts).append("\n           |df schema: ").append(dataset.schema().prettyJson()).append("\n           |").toString())));
        return ((Dataset) buildQueryParts.selects().map(seq -> {
            return seq.toSeq();
        }).map(seq2 -> {
            return dataset.selectExpr(seq2);
        }).getOrElse(() -> {
            return dataset;
        })).filter(buildQueryParts.wheres().map(str -> {
            return new StringBuilder(2).append("(").append(str).append(")").toString();
        }).mkString(" AND "));
    }

    public static final /* synthetic */ boolean $anonfun$chainedStreamingQuery$6(Seq seq, StructField structField) {
        return seq.map(str -> {
            return str.contains(".") ? str.split("\\.")[0] : str;
        }).contains(structField.name());
    }

    public static final /* synthetic */ boolean $anonfun$chainedStreamingQuery$8(JoinSourceRunner joinSourceRunner, String str) {
        String eventTimeColumn = joinSourceRunner.eventTimeColumn();
        return str != null ? str.equals(eventTimeColumn) : eventTimeColumn == null;
    }

    public JoinSourceRunner(ai.chronon.api.GroupBy groupBy, Map<String, String> map, boolean z, int i, SparkSession sparkSession, Api api) {
        StructType mutationValueChrononSchema;
        Tuple2 $minus$greater$extension;
        this.ai$chronon$spark$streaming$JoinSourceRunner$$groupByConf = groupBy;
        this.conf = map;
        this.ai$chronon$spark$streaming$JoinSourceRunner$$debug = z;
        this.lagMillis = i;
        this.session = sparkSession;
        this.ai$chronon$spark$streaming$JoinSourceRunner$$apiImpl = api;
        this.context = Metrics$Context$.MODULE$.apply(Metrics$Environment$.MODULE$.GroupByStreaming(), groupBy);
        Enumeration.Value dataModel = Extensions$.MODULE$.GroupByOps(groupBy).dataModel();
        Enumeration.Value Events = DataModel$.MODULE$.Events();
        if (Events != null ? !Events.equals(dataModel) : dataModel != null) {
            Enumeration.Value Entities = DataModel$.MODULE$.Entities();
            if (Entities != null ? !Entities.equals(dataModel) : dataModel != null) {
                throw new MatchError(dataModel);
            }
            mutationValueChrononSchema = servingInfoProxy().mutationValueChrononSchema();
        } else {
            mutationValueChrononSchema = servingInfoProxy().valueChrononSchema();
        }
        this.valueZSchema = mutationValueChrononSchema;
        Enumeration.Value dataModel2 = Extensions$.MODULE$.GroupByOps(groupBy).dataModel();
        Enumeration.Value Entities2 = DataModel$.MODULE$.Entities();
        if (Entities2 != null ? !Entities2.equals(dataModel2) : dataModel2 != null) {
            Enumeration.Value Events2 = DataModel$.MODULE$.Events();
            if (Events2 != null ? !Events2.equals(dataModel2) : dataModel2 != null) {
                throw new MatchError(dataModel2);
            }
            $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(package$.MODULE$.Seq().empty2()), Constants$.MODULE$.TimeColumn());
        } else {
            $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(Constants$.MODULE$.MutationFields().map(structField -> {
                return structField.name();
            })), Constants$.MODULE$.MutationTimeColumn());
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        this.x$2 = new Tuple2((Seq) tuple2.mo1987_1(), (String) tuple2.mo1986_2());
        this.additionalColumns = (Seq) this.x$2.mo1987_1();
        this.eventTimeColumn = (String) this.x$2.mo1986_2();
        this.keyColumns = (String[]) ScalaJavaConversions$.MODULE$.ListOps(groupBy.keyColumns).toScala().toArray(ClassTag$.MODULE$.apply(String.class));
        this.valueColumns = (String[]) ArrayOps$.MODULE$.$plus$plus$extension(Predef$.MODULE$.refArrayOps(Extensions$.MODULE$.GroupByOps(groupBy).aggregationInputs()), (IterableOnce) additionalColumns(), ClassTag$.MODULE$.apply(String.class));
        this.ai$chronon$spark$streaming$JoinSourceRunner$$useEventTimeForQuery = StringOps$.MODULE$.toBoolean$extension(Predef$.MODULE$.augmentString(getProp("event_time_query", "true")));
        this.ai$chronon$spark$streaming$JoinSourceRunner$$timePercentile = StringOps$.MODULE$.toDouble$extension(Predef$.MODULE$.augmentString(getProp("time_percentile", "0.95")));
        this.ai$chronon$spark$streaming$JoinSourceRunner$$minimumQueryDelayMs = StringOps$.MODULE$.toInt$extension(Predef$.MODULE$.augmentString(getProp("query_delay_ms", "0")));
        this.ai$chronon$spark$streaming$JoinSourceRunner$$queryShiftMs = StringOps$.MODULE$.toInt$extension(Predef$.MODULE$.augmentString(getProp("query_shift_ms", "0")));
        this.microBatchIntervalMillis = StringOps$.MODULE$.toInt$extension(Predef$.MODULE$.augmentString(getProp("batch_interval_millis", "1000")));
    }
}
