package ai.chronon.spark.stats;

import ai.chronon.api.Constants$;
import ai.chronon.api.EventSource;
import ai.chronon.api.Extensions$;
import ai.chronon.api.Join;
import ai.chronon.api.Query;
import ai.chronon.api.Source;
import ai.chronon.online.DataMetrics;
import ai.chronon.online.JoinCodec$;
import ai.chronon.spark.Extensions;
import ai.chronon.spark.PartitionRange;
import ai.chronon.spark.TableUtils;
import java.util.HashMap;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import scala.Array$;
import scala.MatchError;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Serializable;
import scala.Some;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.ScalaVersionSpecificCollectionsConverter$;

/* compiled from: ConsistencyJob.scala */
@ScalaSignature(bytes = "\u0006\u000194A\u0001D\u0007\u0001-!A\u0001\u0005\u0001B\u0001B\u0003%\u0011\u0005\u0003\u0005-\u0001\t\u0005\t\u0015!\u0003.\u0011!\u0019\u0004A!A!\u0002\u0013!\u0004\"B \u0001\t\u0003\u0001\u0005b\u0002$\u0001\u0005\u0004%\ta\u0012\u0005\u0007/\u0002\u0001\u000b\u0011\u0002%\t\u000fa\u0003!\u0019!C\u00013\"1a\f\u0001Q\u0001\niCQa\u0018\u0001\u0005\n\u0001DQ!\u0019\u0001\u0005\n\tDQA\u001a\u0001\u0005\u0002\u001d\u0014abQ8og&\u001cH/\u001a8ds*{'M\u0003\u0002\u000f\u001f\u0005)1\u000f^1ug*\u0011\u0001#E\u0001\u0006gB\f'o\u001b\u0006\u0003%M\tqa\u00195s_:|gNC\u0001\u0015\u0003\t\t\u0017n\u0001\u0001\u0014\u0007\u00019R\u0004\u0005\u0002\u001975\t\u0011DC\u0001\u001b\u0003\u0015\u00198-\u00197b\u0013\ta\u0012D\u0001\u0004B]f\u0014VM\u001a\t\u00031yI!aH\r\u0003\u0019M+'/[1mSj\f'\r\\3\u0002\u000fM,7o]5p]B\u0011!EK\u0007\u0002G)\u0011A%J\u0001\u0004gFd'B\u0001\t'\u0015\t9\u0003&\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002S\u0005\u0019qN]4\n\u0005-\u001a#\u0001D*qCJ\\7+Z:tS>t\u0017\u0001\u00036pS:\u001cuN\u001c4\u0011\u00059\nT\"A\u0018\u000b\u0005A\n\u0012aA1qS&\u0011!g\f\u0002\u0005\u0015>Lg.A\u0004f]\u0012$\u0015\r^3\u0011\u0005UbdB\u0001\u001c;!\t9\u0014$D\u00019\u0015\tIT#\u0001\u0004=e>|GOP\u0005\u0003we\ta\u0001\u0015:fI\u00164\u0017BA\u001f?\u0005\u0019\u0019FO]5oO*\u00111(G\u0001\u0007y%t\u0017\u000e\u001e \u0015\t\u0005\u001bE)\u0012\t\u0003\u0005\u0002i\u0011!\u0004\u0005\u0006A\u0011\u0001\r!\t\u0005\u0006Y\u0011\u0001\r!\f\u0005\u0006g\u0011\u0001\r\u0001N\u0001\u000ei\nd\u0007K]8qKJ$\u0018.Z:\u0016\u0003!\u0003B!\u0013(Q!6\t!J\u0003\u0002L\u0019\u0006I\u0011.\\7vi\u0006\u0014G.\u001a\u0006\u0003\u001bf\t!bY8mY\u0016\u001cG/[8o\u0013\ty%JA\u0002NCB\u0004\"!\u0015,\u000e\u0003IS!a\u0015+\u0002\t1\fgn\u001a\u0006\u0002+\u0006!!.\u0019<b\u0013\ti$+\u0001\buE2\u0004&o\u001c9feRLWm\u001d\u0011\u0002\u0015Q\f'\r\\3Vi&d7/F\u0001[!\tYF,D\u0001\u0010\u0013\tivB\u0001\u0006UC\ndW-\u0016;jYN\f1\u0002^1cY\u0016,F/\u001b7tA\u0005\u0019\"-^5mI\u000e{W\u000e]1sSN|gNS8j]R\tQ&\u0001\u000bck&dGmQ8na\u0006\u0014\u0018n]8o)\u0006\u0014G.\u001a\u000b\u0002GB\u0011\u0001\u0004Z\u0005\u0003Kf\u0011A!\u00168ji\u00069\"-^5mI\u000e{gn]5ti\u0016t7-_'fiJL7m\u001d\u000b\u0002QB\u0011\u0011\u000e\\\u0007\u0002U*\u00111.E\u0001\u0007_:d\u0017N\\3\n\u00055T'a\u0003#bi\u0006lU\r\u001e:jGN\u0004")
/* loaded from: input_file:ai/chronon/spark/stats/ConsistencyJob.class */
public class ConsistencyJob implements Serializable {
    private final SparkSession session;
    private final Join joinConf;
    private final String endDate;
    private final Map<String, String> tblProperties;
    private final TableUtils tableUtils;

    public Map<String, String> tblProperties() {
        return this.tblProperties;
    }

    public TableUtils tableUtils() {
        return this.tableUtils;
    }

    private Join buildComparisonJoin() {
        Predef$.MODULE$.println("Building Join With left as logged");
        Join deepCopy = this.joinConf.deepCopy();
        Source source = new Source();
        EventSource eventSource = new EventSource();
        Query query = new Query();
        Tuple2[] tuple2Arr = (Tuple2[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(Extensions$.MODULE$.JoinOps(this.joinConf).leftKeyCols())).map(str -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str), str);
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class)));
        HashMap hashMap = new HashMap();
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(tuple2Arr)).foreach(tuple2 -> {
            if (tuple2 != null) {
                return (String) hashMap.put((String) tuple2._1(), (String) tuple2._2());
            }
            throw new MatchError(tuple2);
        });
        query.setSelects(hashMap);
        query.setTimeColumn(Constants$.MODULE$.TimeColumn());
        query.setStartPartition(Extensions$.MODULE$.SourceOps(this.joinConf.left).query().startPartition);
        query.setWheres(ScalaVersionSpecificCollectionsConverter$.MODULE$.convertScalaSeqToJava(this.joinConf.metaData.consistencySamplePercent < ((double) 100) ? (Seq) new $colon.colon(new StringBuilder(10).append("RAND() <= ").append(this.joinConf.metaData.consistencySamplePercent / 100).toString(), Nil$.MODULE$) : Nil$.MODULE$));
        eventSource.setQuery(query);
        eventSource.setTable(Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).loggedTable());
        source.setEvents(eventSource);
        deepCopy.setLeft(source);
        deepCopy.metaData.setName(Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).comparisonConfName());
        if (deepCopy.metaData.isSetTableProperties()) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            deepCopy.metaData.setTableProperties(new HashMap());
        }
        deepCopy.metaData.tableProperties.put(Constants$.MODULE$.ChrononOOCTable(), BoxesRunTime.boxToBoolean(true).toString());
        return deepCopy;
    }

    private void buildComparisonTable() {
        Seq seq = (Seq) tableUtils().unfilledRanges(Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).comparisonTable(), new PartitionRange(null, this.endDate), new Some(new $colon.colon(Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).loggedTable(), Nil$.MODULE$)), tableUtils().unfilledRanges$default$4(), tableUtils().unfilledRanges$default$5(), tableUtils().unfilledRanges$default$6()).getOrElse(() -> {
            return Nil$.MODULE$;
        });
        if (seq.isEmpty()) {
            return;
        }
        ai.chronon.spark.Join join = new ai.chronon.spark.Join(buildComparisonJoin(), ((PartitionRange) seq.last()).end(), new TableUtils(this.session));
        Predef$.MODULE$.println("Starting compute Join for comparison table");
        join.computeJoin(new Some(BoxesRunTime.boxToInteger(30)));
    }

    public DataMetrics buildConsistencyMetrics() {
        if (!this.joinConf.metaData.isSetConsistencySamplePercent()) {
            Predef$.MODULE$.println("consistencySamplePercent is unset and will default to 100");
            this.joinConf.metaData.consistencySamplePercent = 100.0d;
        }
        if (this.joinConf.metaData.consistencySamplePercent == 0) {
            Predef$.MODULE$.println(new StringBuilder(71).append("Exit ConsistencyJob because consistencySamplePercent = 0 for join conf ").append(this.joinConf.metaData.name).toString());
            return new DataMetrics(Nil$.MODULE$);
        }
        buildComparisonTable();
        Predef$.MODULE$.println("Determining Range between consistency table and comparison table");
        Seq seq = (Seq) tableUtils().unfilledRanges(Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).consistencyTable(), new PartitionRange(null, this.endDate), new Some(new $colon.colon(Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).comparisonTable(), Nil$.MODULE$)), tableUtils().unfilledRanges$default$4(), tableUtils().unfilledRanges$default$5(), tableUtils().unfilledRanges$default$6()).getOrElse(() -> {
            return Nil$.MODULE$;
        });
        if (seq.isEmpty()) {
            return null;
        }
        return new DataMetrics((Seq) ((Seq) seq.map(partitionRange -> {
            Dataset<Row> sql = this.tableUtils().sql(partitionRange.genScanQuery(null, Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).comparisonTable(), partitionRange.genScanQuery$default$3()));
            Dataset<Row> select = this.tableUtils().sql(partitionRange.genScanQuery(null, Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).loggedTable(), partitionRange.genScanQuery$default$3())).drop(Constants$.MODULE$.SchemaHash()).select(Predef$.MODULE$.wrapRefArray((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(sql.columns())).map(str -> {
                return functions$.MODULE$.col(str);
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Column.class)))));
            Predef$.MODULE$.println("Starting compare job for stats");
            List convertJavaListToScala = this.joinConf.isSetRowIds() ? ScalaVersionSpecificCollectionsConverter$.MODULE$.convertJavaListToScala(this.joinConf.rowIds) : (List) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(JoinCodec$.MODULE$.timeFields())).map(structField -> {
                return structField.name();
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))))).toList().$plus$plus(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(Extensions$.MODULE$.JoinOps(this.joinConf).leftKeyCols())), List$.MODULE$.canBuildFrom());
            Predef$.MODULE$.println(new StringBuilder(45).append("Using ").append(convertJavaListToScala.mkString("[", ",", "]")).append(" as join keys between log and backfill.").toString());
            Tuple2<Dataset<Row>, DataMetrics> compare = CompareJob$.MODULE$.compare(sql, select, convertJavaListToScala, CompareJob$.MODULE$.compare$default$4());
            if (compare == null) {
                throw new MatchError(compare);
            }
            Tuple2 tuple2 = new Tuple2((Dataset) compare._1(), (DataMetrics) compare._2());
            Dataset<Row> dataset = (Dataset) tuple2._1();
            DataMetrics dataMetrics = (DataMetrics) tuple2._2();
            Predef$.MODULE$.println("Saving output.");
            Extensions.DataframeOps DataframeOps = ai.chronon.spark.Extensions$.MODULE$.DataframeOps(dataset);
            Dataset<Row> withTimeBasedColumn = DataframeOps.withTimeBasedColumn("ds", DataframeOps.withTimeBasedColumn$default$2(), DataframeOps.withTimeBasedColumn$default$3());
            Predef$.MODULE$.println(new StringBuilder(14).append("output schema ").append(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(withTimeBasedColumn.schema().fields())).map(structField2 -> {
                return new Tuple2(structField2.name(), structField2.dataType());
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class))))).toMap(Predef$.MODULE$.$conforms()).mkString("\n - ")).toString());
            this.tableUtils().insertPartitions(withTimeBasedColumn, Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).consistencyTable(), this.tblProperties(), this.tableUtils().insertPartitions$default$4(), this.tableUtils().insertPartitions$default$5(), this.tableUtils().insertPartitions$default$6(), true);
            return dataMetrics;
        }, Seq$.MODULE$.canBuildFrom())).flatMap(dataMetrics -> {
            return dataMetrics.series();
        }, Seq$.MODULE$.canBuildFrom()));
    }

    public ConsistencyJob(SparkSession sparkSession, Join join, String str) {
        this.session = sparkSession;
        this.joinConf = join;
        this.endDate = str;
        this.tblProperties = (Map) Option$.MODULE$.apply(join.metaData.tableProperties).map(map -> {
            return ((TraversableOnce) JavaConverters$.MODULE$.mapAsScalaMapConverter(map).asScala()).toMap(Predef$.MODULE$.$conforms());
        }).getOrElse(() -> {
            return Predef$.MODULE$.Map().empty();
        });
        this.tableUtils = new TableUtils(sparkSession);
    }
}
