package ai.chronon.spark.consistency;

import ai.chronon.api.Constants$;
import ai.chronon.api.EventSource;
import ai.chronon.api.Extensions$;
import ai.chronon.api.Join;
import ai.chronon.api.Query;
import ai.chronon.api.Source;
import ai.chronon.online.Api;
import ai.chronon.online.DataMetrics;
import ai.chronon.online.Fetcher;
import ai.chronon.online.JoinCodec;
import ai.chronon.online.KVStore;
import ai.chronon.online.MetadataStore;
import ai.chronon.online.MetadataStore$;
import ai.chronon.spark.Extensions;
import ai.chronon.spark.PartitionRange;
import ai.chronon.spark.TableUtils;
import java.util.HashMap;
import jnr.ffi.provider.jffi.JNINativeInterface;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Array$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Serializable;
import scala.Some;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.math.Ordering;
import scala.math.Ordering$String$;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: ConsistencyJob.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005-f\u0001\u0002\u0010 \u0001!B\u0001B\r\u0001\u0003\u0002\u0003\u0006Ia\r\u0005\t}\u0001\u0011\t\u0011)A\u0005\u007f!AQ\t\u0001B\u0001B\u0003%a\t\u0003\u0005R\u0001\t\u0005\t\u0015!\u0003S\u0011\u0015A\u0006\u0001\"\u0001Z\u0011\u001d\u0001\u0007A1A\u0005\u0002\u0005Da!\u001a\u0001!\u0002\u0013\u0011\u0007b\u00024\u0001\u0005\u0004%\ta\u001a\u0005\u0007W\u0002\u0001\u000b\u0011\u00025\t\u000f1\u0004!\u0019!C\u0001[\"1\u0011\u000f\u0001Q\u0001\n9DqA\u001d\u0001C\u0002\u0013\u00051\u000f\u0003\u0004x\u0001\u0001\u0006I\u0001\u001e\u0005\bq\u0002\u0011\r\u0011\"\u0001z\u0011\u0019Q\b\u0001)A\u0005\r\"91\u0010\u0001b\u0001\n\u0003a\bbBA\r\u0001\u0001\u0006I! \u0005\n\u00037\u0001!\u0019!C\u0001\u0003;A\u0001\"a\n\u0001A\u0003%\u0011q\u0004\u0005\b\u0003S\u0001A\u0011BA\u0016\u0011\u001d\t\t\u0005\u0001C\u0005\u0003\u0007Bq!a\u0013\u0001\t\u0013\ti\u0005C\u0004\u0002P\u0001!I!a\u0011\t\u000f\u0005E\u0003\u0001\"\u0001\u0002T\u001d9\u00111L\u0010\t\u0002\u0005ucA\u0002\u0010 \u0011\u0003\ty\u0006\u0003\u0004Y5\u0011\u0005\u0011\u0011\r\u0005\b\u0003GRB\u0011AA3\u0011%\t\tKGA\u0001\n\u0013\t\u0019K\u0001\bD_:\u001c\u0018n\u001d;f]\u000eL(j\u001c2\u000b\u0005\u0001\n\u0013aC2p]NL7\u000f^3oGfT!AI\u0012\u0002\u000bM\u0004\u0018M]6\u000b\u0005\u0011*\u0013aB2ie>twN\u001c\u0006\u0002M\u0005\u0011\u0011-[\u0002\u0001'\r\u0001\u0011f\f\t\u0003U5j\u0011a\u000b\u0006\u0002Y\u0005)1oY1mC&\u0011af\u000b\u0002\u0007\u0003:L(+\u001a4\u0011\u0005)\u0002\u0014BA\u0019,\u00051\u0019VM]5bY&T\u0018M\u00197f\u0003\u001d\u0019Xm]:j_:\u0004\"\u0001\u000e\u001f\u000e\u0003UR!AN\u001c\u0002\u0007M\fHN\u0003\u0002#q)\u0011\u0011HO\u0001\u0007CB\f7\r[3\u000b\u0003m\n1a\u001c:h\u0013\tiTG\u0001\u0007Ta\u0006\u00148nU3tg&|g.\u0001\u0005k_&t7i\u001c8g!\t\u00015)D\u0001B\u0015\t\u00115%A\u0002ba&L!\u0001R!\u0003\t){\u0017N\\\u0001\bK:$G)\u0019;f!\t9eJ\u0004\u0002I\u0019B\u0011\u0011jK\u0007\u0002\u0015*\u00111jJ\u0001\u0007yI|w\u000e\u001e \n\u00055[\u0013A\u0002)sK\u0012,g-\u0003\u0002P!\n11\u000b\u001e:j]\u001eT!!T\u0016\u0002\t%l\u0007\u000f\u001c\t\u0003'Zk\u0011\u0001\u0016\u0006\u0003+\u000e\naa\u001c8mS:,\u0017BA,U\u0005\r\t\u0005/[\u0001\u0007y%t\u0017\u000e\u001e \u0015\u000bicVLX0\u0011\u0005m\u0003Q\"A\u0010\t\u000bI*\u0001\u0019A\u001a\t\u000by*\u0001\u0019A \t\u000b\u0015+\u0001\u0019\u0001$\t\u000bE+\u0001\u0019\u0001*\u0002\u000f-48\u000b^8sKV\t!\r\u0005\u0002TG&\u0011A\r\u0016\u0002\b\u0017Z\u001bFo\u001c:f\u0003!Ygo\u0015;pe\u0016\u0004\u0013!D7fi\u0006$\u0017\r^1Ti>\u0014X-F\u0001i!\t\u0019\u0016.\u0003\u0002k)\niQ*\u001a;bI\u0006$\u0018m\u0015;pe\u0016\fa\"\\3uC\u0012\fG/Y*u_J,\u0007%A\u0004gKR\u001c\u0007.\u001a:\u0016\u00039\u0004\"aU8\n\u0005A$&a\u0002$fi\u000eDWM]\u0001\tM\u0016$8\r[3sA\u0005I!n\\5o\u0007>$WmY\u000b\u0002iB\u00111+^\u0005\u0003mR\u0013\u0011BS8j]\u000e{G-Z2\u0002\u0015)|\u0017N\\\"pI\u0016\u001c\u0007%\u0001\u0005sC^$\u0016M\u00197f+\u00051\u0015!\u0003:boR\u000b'\r\\3!\u00035!(\r\u001c)s_B,'\u000f^5fgV\tQ\u0010E\u0004\u007f\u0003\u000f\tY!a\u0003\u000e\u0003}TA!!\u0001\u0002\u0004\u0005I\u0011.\\7vi\u0006\u0014G.\u001a\u0006\u0004\u0003\u000bY\u0013AC2pY2,7\r^5p]&\u0019\u0011\u0011B@\u0003\u00075\u000b\u0007\u000f\u0005\u0003\u0002\u000e\u0005]QBAA\b\u0015\u0011\t\t\"a\u0005\u0002\t1\fgn\u001a\u0006\u0003\u0003+\tAA[1wC&\u0019q*a\u0004\u0002\u001dQ\u0014G\u000e\u0015:pa\u0016\u0014H/[3tA\u0005QA/\u00192mKV#\u0018\u000e\\:\u0016\u0005\u0005}\u0001\u0003BA\u0011\u0003Gi\u0011!I\u0005\u0004\u0003K\t#A\u0003+bE2,W\u000b^5mg\u0006YA/\u00192mKV#\u0018\u000e\\:!\u00035)hNZ5mY\u0016$'+\u00198hKR1\u0011QFA\u001d\u0003{\u0001RAKA\u0018\u0003gI1!!\r,\u0005\u0019y\u0005\u000f^5p]B!\u0011\u0011EA\u001b\u0013\r\t9$\t\u0002\u000f!\u0006\u0014H/\u001b;j_:\u0014\u0016M\\4f\u0011\u0019\tY\u0004\u0006a\u0001\r\u0006Q\u0011N\u001c9viR\u000b'\r\\3\t\r\u0005}B\u00031\u0001G\u0003-yW\u000f\u001e9viR\u000b'\r\\3\u0002\u001b\t,\u0018\u000e\u001c3M_\u001e$\u0016M\u00197f)\t\t)\u0005E\u0002+\u0003\u000fJ1!!\u0013,\u0005\u0011)f.\u001b;\u0002'\t,\u0018\u000e\u001c3D_6\u0004\u0018M]5t_:Tu.\u001b8\u0015\u0003}\nACY;jY\u0012\u001cu.\u001c9be&\u001cxN\u001c+bE2,\u0017a\u00062vS2$7i\u001c8tSN$XM\\2z\u001b\u0016$(/[2t)\t\t)\u0006E\u0002T\u0003/J1!!\u0017U\u0005-!\u0015\r^1NKR\u0014\u0018nY:\u0002\u001d\r{gn]5ti\u0016t7-\u001f&pEB\u00111LG\n\u00045%zCCAA/\u0003Q1G.\u0019;uK:\\U-\u001f,bYV,')\u001f;fgRA\u0011qMAC\u0003+\u000b9\n\u0005\u0003\u0002j\u0005}d\u0002BA6\u0003wrA!!\u001c\u0002z9!\u0011qNA<\u001d\u0011\t\t(!\u001e\u000f\u0007%\u000b\u0019(C\u0001<\u0013\tI$(\u0003\u0002#q%\u0011agN\u0005\u0004\u0003{*\u0014a\u00029bG.\fw-Z\u0005\u0005\u0003\u0003\u000b\u0019IA\u0005ECR\fgI]1nK*\u0019\u0011QP\u001b\t\u000f\u0005\u001dE\u00041\u0001\u0002\n\u0006)!/Y<EMB)A'a#\u0002\u0010&\u0019\u0011QR\u001b\u0003\u000f\u0011\u000bG/Y:fiB\u0019A'!%\n\u0007\u0005MUGA\u0002S_^DQA\u001d\u000fA\u0002QDq!!'\u001d\u0001\u0004\tY*\u0001\u0006pkR\u0004X\u000f^*ju\u0016\u00042AKAO\u0013\r\tyj\u000b\u0002\u0004\u0013:$\u0018a\u0003:fC\u0012\u0014Vm]8mm\u0016$\"!!*\u0011\t\u00055\u0011qU\u0005\u0005\u0003S\u000byA\u0001\u0004PE*,7\r\u001e")
/* loaded from: input_file:ai/chronon/spark/consistency/ConsistencyJob.class */
public class ConsistencyJob implements Serializable {
    private final SparkSession session;
    private final Join joinConf;
    private final String endDate;
    private final KVStore kvStore;
    private final MetadataStore metadataStore = new MetadataStore(kvStore(), MetadataStore$.MODULE$.$lessinit$greater$default$2(), 10000);
    private final Fetcher fetcher;
    private final JoinCodec joinCodec;
    private final String rawTable;
    private final Map<String, String> tblProperties;
    private final TableUtils tableUtils;

    public static Dataset<Row> flattenKeyValueBytes(Dataset<Row> dataset, JoinCodec joinCodec, int i) {
        return ConsistencyJob$.MODULE$.flattenKeyValueBytes(dataset, joinCodec, i);
    }

    public KVStore kvStore() {
        return this.kvStore;
    }

    public MetadataStore metadataStore() {
        return this.metadataStore;
    }

    public Fetcher fetcher() {
        return this.fetcher;
    }

    public JoinCodec joinCodec() {
        return this.joinCodec;
    }

    public String rawTable() {
        return this.rawTable;
    }

    public Map<String, String> tblProperties() {
        return this.tblProperties;
    }

    public TableUtils tableUtils() {
        return this.tableUtils;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Option<PartitionRange> unfilledRange(String str, String str2) {
        String nameToFilePath = Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).nameToFilePath();
        Set set = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) this.session.sqlContext().sql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(76).append("\n           |select distinct ").append(Constants$.MODULE$.PartitionColumn()).append("\n           |from ").append(str).append("\n           |where name = '").append(nameToFilePath).append("' ").toString())).stripMargin()).collect())).map(row -> {
            return row.getString(0);
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))))).toSet();
        Ordering apply = package$.MODULE$.Ordering().apply(Ordering$String$.MODULE$);
        Option<A1> reduceOption = set.reduceOption((str3, str4) -> {
            return (String) apply.min(str3, str4);
        });
        Predef$.MODULE$.m1820assert(reduceOption.isDefined(), () -> {
            return new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(120).append("\n           |The join name ").append(nameToFilePath).append(" does not have available logged data yet.\n           |Please double check your logging status").toString())).stripMargin();
        });
        scala.collection.Set set2 = new PartitionRange((String) reduceOption.get(), this.endDate).partitions().toSet();
        Set set3 = (Set) set2.$minus$minus(tableUtils().partitions(str2));
        Set set4 = (Set) set2.$minus$minus(set);
        Set set5 = (Set) set3.$minus$minus(set4);
        Predef$.MODULE$.println(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(JNINativeInterface.ReleaseLongArrayElements).append("\n         |   Unfilled range computation:\n         |   Output table: ").append(str2).append("\n         |   Missing output partitions: ").append(set3).append("\n         |   Missing input partitions: ").append(set4).append("\n         |   Unfilled Partitions: ").append(set5).append("\n         |").toString())).stripMargin());
        if (!set5.isEmpty()) {
            return new Some(new PartitionRange((String) set5.mo2035min(Ordering$String$.MODULE$), (String) set5.mo2034max(Ordering$String$.MODULE$)));
        }
        Predef$.MODULE$.println(new StringBuilder(49).append(str2).append(" seems to be caught up - to either ").append(str).append("(latest ").append(tableUtils().lastAvailablePartition(str)).append(") or ").append(this.endDate).append(".").toString());
        return None$.MODULE$;
    }

    private void buildLogTable() {
        Option<PartitionRange> unfilledRange = unfilledRange(rawTable(), Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).loggedTable());
        if (unfilledRange.isEmpty()) {
            return;
        }
        String nameToFilePath = Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).nameToFilePath();
        PartitionRange partitionRange = unfilledRange.get();
        Dataset<Row> where = tableUtils().sql(partitionRange.genScanQuery(null, rawTable(), partitionRange.genScanQuery$default$3())).where(new StringBuilder(9).append("name = '").append(nameToFilePath).append("'").toString());
        Predef$.MODULE$.println(new StringBuilder(17).append("scanned data for ").append(nameToFilePath).toString());
        tableUtils().insertPartitions(ConsistencyJob$.MODULE$.flattenKeyValueBytes(where, joinCodec(), joinCodec().outputFields().length), Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).loggedTable(), tblProperties(), tableUtils().insertPartitions$default$4(), tableUtils().insertPartitions$default$5(), tableUtils().insertPartitions$default$6(), tableUtils().insertPartitions$default$7());
    }

    private Join buildComparisonJoin() {
        Join deepCopy = this.joinConf.deepCopy();
        Source source = new Source();
        EventSource eventSource = new EventSource();
        Query query = new Query();
        Tuple2[] tuple2Arr = (Tuple2[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(joinCodec().keyFields())).map(structField -> {
            return structField.name();
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))))).map(str -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str), str);
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class))))).$plus$plus(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(joinCodec().valueFields())).map(structField2 -> {
            return structField2.name();
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))))).map(str2 -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new StringBuilder(0).append(str2).append(ConsistencyMetrics$.MODULE$.loggedSuffix()).toString()), str2);
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class))))), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class))))).$plus$plus(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(joinCodec().timeFields())).map(structField3 -> {
            return structField3.name();
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))))).map(str3 -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str3), str3);
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class))))), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class)));
        HashMap hashMap = new HashMap();
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(tuple2Arr)).foreach(tuple2 -> {
            if (tuple2 != null) {
                return (String) hashMap.put((String) tuple2.mo1877_1(), (String) tuple2.mo1876_2());
            }
            throw new MatchError(tuple2);
        });
        query.setSelects(hashMap);
        eventSource.setQuery(query);
        eventSource.setTable(Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).loggedTable());
        source.setEvents(eventSource);
        deepCopy.setLeft(source);
        deepCopy.metaData.setName(Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).comparisonConfName());
        return deepCopy;
    }

    private void buildComparisonTable() {
        Option<PartitionRange> unfilledRange = tableUtils().unfilledRange(Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).comparisonTable(), new PartitionRange(null, this.endDate), new Some(Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).loggedTable()));
        if (unfilledRange.isEmpty()) {
            return;
        }
        new ai.chronon.spark.Join(buildComparisonJoin(), unfilledRange.get().end(), new TableUtils(this.session)).computeJoin(new Some(BoxesRunTime.boxToInteger(30)));
    }

    public DataMetrics buildConsistencyMetrics() {
        buildLogTable();
        buildComparisonTable();
        Option<PartitionRange> unfilledRange = tableUtils().unfilledRange(Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).consistencyTable(), new PartitionRange(null, this.endDate), new Some(Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).comparisonTable()));
        if (unfilledRange.isEmpty()) {
            return null;
        }
        TableUtils tableUtils = tableUtils();
        PartitionRange partitionRange = unfilledRange.get();
        Tuple2<Dataset<Row>, DataMetrics> compute = ConsistencyMetrics$.MODULE$.compute(joinCodec().valueFields(), (Dataset) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(joinCodec().valueFields())).foldLeft(tableUtils.sql(partitionRange.genScanQuery(null, Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).comparisonTable(), partitionRange.genScanQuery$default$3())), (dataset, structField) -> {
            return dataset.withColumnRenamed(structField.name(), new StringBuilder(0).append(structField.name()).append(ConsistencyMetrics$.MODULE$.backfilledSuffix()).toString());
        }), ConsistencyMetrics$.MODULE$.compute$default$3());
        if (compute == null) {
            throw new MatchError(compute);
        }
        Tuple2 tuple2 = new Tuple2(compute.mo1877_1(), compute.mo1876_2());
        Dataset<Row> dataset2 = (Dataset) tuple2.mo1877_1();
        DataMetrics dataMetrics = (DataMetrics) tuple2.mo1876_2();
        ai.chronon.spark.Extensions$ extensions$ = ai.chronon.spark.Extensions$.MODULE$;
        Extensions.DataframeOps DataframeOps = ai.chronon.spark.Extensions$.MODULE$.DataframeOps(dataset2);
        extensions$.DataframeOps(DataframeOps.withTimeBasedColumn("ds", DataframeOps.withTimeBasedColumn$default$2(), DataframeOps.withTimeBasedColumn$default$3())).save(Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).consistencyTable(), tblProperties());
        metadataStore().putConsistencyMetrics(this.joinConf, dataMetrics);
        return dataMetrics;
    }

    public ConsistencyJob(SparkSession sparkSession, Join join, String str, Api api) {
        this.session = sparkSession;
        this.joinConf = join;
        this.endDate = str;
        this.kvStore = api.genKvStore();
        this.fetcher = api.fetcher();
        this.joinCodec = fetcher().getJoinCodecs().apply(Extensions$.MODULE$.MetadataOps(join.metaData).nameToFilePath()).get();
        this.rawTable = api.logTable();
        this.tblProperties = (Map) Option$.MODULE$.apply(join.metaData.tableProperties).map(map -> {
            return ((TraversableOnce) JavaConverters$.MODULE$.mapAsScalaMapConverter(map).asScala()).toMap(Predef$.MODULE$.$conforms());
        }).getOrElse(() -> {
            return Predef$.MODULE$.Map().empty2();
        });
        this.tableUtils = new TableUtils(sparkSession);
    }
}
