package ai.chronon.spark.stats;

import ai.chronon.aggregator.row.RowAggregator;
import ai.chronon.api.Constants$;
import ai.chronon.api.Extensions$;
import ai.chronon.api.Join;
import ai.chronon.online.AvroConversions$;
import ai.chronon.spark.Extensions;
import ai.chronon.spark.KvRdd;
import ai.chronon.spark.PartitionRange;
import ai.chronon.spark.TableUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.immutable.C$colon$colon;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.ScalaVersionSpecificCollectionsConverter$;

/* compiled from: SummaryJob.scala */
@ScalaSignature(bytes = "\u0006\u0001u4A\u0001E\t\u00015!AA\u0005\u0001B\u0001B\u0003%Q\u0005\u0003\u00051\u0001\t\u0005\t\u0015!\u00032\u0011!9\u0004A!A!\u0002\u0013A\u0004\"B\"\u0001\t\u0003!\u0005b\u0002&\u0001\u0005\u0004%\ta\u0013\u0005\u0007!\u0002\u0001\u000b\u0011\u0002'\t\u000fE\u0003!\u0019!C\u0005%\"11\u000b\u0001Q\u0001\naBq\u0001\u0016\u0001C\u0002\u0013%!\u000b\u0003\u0004V\u0001\u0001\u0006I\u0001\u000f\u0005\b-\u0002\u0011\r\u0011\"\u0003X\u0011\u0019Y\u0006\u0001)A\u00051\")A\f\u0001C\u0001;\"9a\u000eAI\u0001\n\u0003y\u0007b\u0002>\u0001#\u0003%\ta\u001f\u0002\u000b'VlW.\u0019:z\u0015>\u0014'B\u0001\n\u0014\u0003\u0015\u0019H/\u0019;t\u0015\t!R#A\u0003ta\u0006\u00148N\u0003\u0002\u0017/\u000591\r\u001b:p]>t'\"\u0001\r\u0002\u0005\u0005L7\u0001A\n\u0004\u0001m\t\u0003C\u0001\u000f \u001b\u0005i\"\"\u0001\u0010\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0001j\"AB!osJ+g\r\u0005\u0002\u001dE%\u00111%\b\u0002\r'\u0016\u0014\u0018.\u00197ju\u0006\u0014G.Z\u0001\bg\u0016\u001c8/[8o!\t1c&D\u0001(\u0015\tA\u0013&A\u0002tc2T!\u0001\u0006\u0016\u000b\u0005-b\u0013AB1qC\u000eDWMC\u0001.\u0003\ry'oZ\u0005\u0003_\u001d\u0012Ab\u00159be.\u001cVm]:j_:\f\u0001B[8j]\u000e{gN\u001a\t\u0003eUj\u0011a\r\u0006\u0003iU\t1!\u00199j\u0013\t14G\u0001\u0003K_&t\u0017aB3oI\u0012\u000bG/\u001a\t\u0003s\u0001s!A\u000f \u0011\u0005mjR\"\u0001\u001f\u000b\u0005uJ\u0012A\u0002\u001fs_>$h(\u0003\u0002@;\u00051\u0001K]3eK\u001aL!!\u0011\"\u0003\rM#(/\u001b8h\u0015\tyT$\u0001\u0004=S:LGO\u0010\u000b\u0005\u000b\u001eC\u0015\n\u0005\u0002G\u00015\t\u0011\u0003C\u0003%\t\u0001\u0007Q\u0005C\u00031\t\u0001\u0007\u0011\u0007C\u00038\t\u0001\u0007\u0001(\u0001\u0006uC\ndW-\u0016;jYN,\u0012\u0001\u0014\t\u0003\u001b:k\u0011aE\u0005\u0003\u001fN\u0011!\u0002V1cY\u0016,F/\u001b7t\u0003-!\u0018M\u00197f+RLGn\u001d\u0011\u0002\u001f\u0011\f\u0017\u000e\\=Ti\u0006$8\u000fV1cY\u0016,\u0012\u0001O\u0001\u0011I\u0006LG._*uCR\u001cH+\u00192mK\u0002\n1\u0003Z1jYf\u001cF/\u0019;t\u0003Z\u0014x\u000eV1cY\u0016\fA\u0003Z1jYf\u001cF/\u0019;t\u0003Z\u0014x\u000eV1cY\u0016\u0004\u0013A\u0003;bE2,\u0007K]8qgV\t\u0001\f\u0005\u0003:3bB\u0014B\u0001.C\u0005\ri\u0015\r]\u0001\fi\u0006\u0014G.\u001a)s_B\u001c\b%\u0001\u0005eC&d\u0017PU;o)\rq\u0016-\u001b\t\u00039}K!\u0001Y\u000f\u0003\tUs\u0017\u000e\u001e\u0005\bE6\u0001\n\u00111\u0001d\u0003!\u0019H/\u001a9ECf\u001c\bc\u0001\u000feM&\u0011Q-\b\u0002\u0007\u001fB$\u0018n\u001c8\u0011\u0005q9\u0017B\u00015\u001e\u0005\rIe\u000e\u001e\u0005\bU6\u0001\n\u00111\u0001l\u0003\u0019\u0019\u0018-\u001c9mKB\u0011A\u0004\\\u0005\u0003[v\u0011a\u0001R8vE2,\u0017A\u00053bS2L(+\u001e8%I\u00164\u0017-\u001e7uIE*\u0012\u0001\u001d\u0016\u0003GF\\\u0013A\u001d\t\u0003gbl\u0011\u0001\u001e\u0006\u0003kZ\f\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0005]l\u0012AC1o]>$\u0018\r^5p]&\u0011\u0011\u0010\u001e\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0017A\u00053bS2L(+\u001e8%I\u00164\u0017-\u001e7uII*\u0012\u0001 \u0016\u0003WF\u0004")
/* loaded from: input_file:ai/chronon/spark/stats/SummaryJob.class */
public class SummaryJob implements Serializable {
    private final Join joinConf;
    private final String endDate;
    private final TableUtils tableUtils;
    private final String dailyStatsTable;
    private final String dailyStatsAvroTable;
    private final Map<String, String> tableProps;

    public TableUtils tableUtils() {
        return this.tableUtils;
    }

    private String dailyStatsTable() {
        return this.dailyStatsTable;
    }

    private String dailyStatsAvroTable() {
        return this.dailyStatsAvroTable;
    }

    private Map<String, String> tableProps() {
        return this.tableProps;
    }

    public void dailyRun(Option<Object> option, double d) {
        Seq seq = (Seq) tableUtils().unfilledRanges(dailyStatsTable(), new PartitionRange(null, this.endDate), new Some(new C$colon$colon(Extensions$.MODULE$.MetadataOps(this.joinConf.metaData).outputTable(), Nil$.MODULE$)), tableUtils().unfilledRanges$default$4(), tableUtils().unfilledRanges$default$5(), tableUtils().unfilledRanges$default$6()).getOrElse(() -> {
            return Nil$.MODULE$;
        });
        if (seq.isEmpty()) {
            Predef$.MODULE$.println(new StringBuilder(23).append("No data to compute for ").append(dailyStatsTable()).toString());
        } else {
            seq.foreach(partitionRange -> {
                $anonfun$dailyRun$2(this, option, d, partitionRange);
                return BoxedUnit.UNIT;
            });
            Predef$.MODULE$.println("Finished writing stats.");
        }
    }

    public Option<Object> dailyRun$default$1() {
        return None$.MODULE$;
    }

    public double dailyRun$default$2() {
        return 0.1d;
    }

    public static final /* synthetic */ void $anonfun$dailyRun$6(SummaryJob summaryJob, Seq seq, double d, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        PartitionRange partitionRange = (PartitionRange) tuple2.mo1955_1();
        int _2$mcI$sp = tuple2._2$mcI$sp();
        Predef$.MODULE$.println(new StringBuilder(21).append("Computing range [").append(_2$mcI$sp + 1).append("/").append(seq.size()).append("]: ").append(partitionRange).toString());
        StatsCompute statsCompute = new StatsCompute(summaryJob.tableUtils().sql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(107).append("\n               |SELECT *\n               |FROM ").append(Extensions$.MODULE$.MetadataOps(summaryJob.joinConf.metaData).outputTable()).append("\n               |WHERE ds BETWEEN '").append(partitionRange.start()).append("' AND '").append(partitionRange.end()).append("'\n               |").toString())).stripMargin()), Predef$.MODULE$.wrapRefArray(Extensions$.MODULE$.JoinOps(summaryJob.joinConf).leftKeyCols()));
        RowAggregator buildAggregator = StatsGenerator$.MODULE$.buildAggregator(statsCompute.metrics(), statsCompute.selectedDf());
        KvRdd dailySummary = statsCompute.dailySummary(buildAggregator, d, statsCompute.dailySummary$default$3());
        if (summaryJob.joinConf.metaData.online) {
            Dataset<Row> avroDf = dailySummary.toAvroDf();
            Extensions.DataframeOps DataframeOps = ai.chronon.spark.Extensions$.MODULE$.DataframeOps(avroDf.union(summaryJob.tableUtils().sparkSession().createDataFrame(summaryJob.tableUtils().sparkSession().sparkContext().parallelize((Seq) ((TraversableLike) new C$colon$colon(Constants$.MODULE$.StatsKeySchemaKey(), new C$colon$colon(Constants$.MODULE$.StatsValueSchemaKey(), Nil$.MODULE$)).zip((Seq) new C$colon$colon(dailySummary.keyZSchema(), new C$colon$colon(dailySummary.valueZSchema(), Nil$.MODULE$)).map(structType -> {
                return AvroConversions$.MODULE$.fromChrononSchema(structType, AvroConversions$.MODULE$.fromChrononSchema$default$2()).toString(true);
            }, Seq$.MODULE$.canBuildFrom()), Seq$.MODULE$.canBuildFrom())).map(tuple22 -> {
                if (tuple22 == null) {
                    throw new MatchError(tuple22);
                }
                String str = (String) tuple22.mo1955_1();
                String str2 = (String) tuple22.mo1954_2();
                return Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{str.getBytes(Constants$.MODULE$.UTF8()), str2.getBytes(Constants$.MODULE$.UTF8()), str, str2}));
            }, Seq$.MODULE$.canBuildFrom()), summaryJob.tableUtils().sparkSession().sparkContext().parallelize$default$2(), ClassTag$.MODULE$.apply(Row.class)), avroDf.schema())).withColumn(Constants$.MODULE$.PartitionColumn(), functions$.MODULE$.lit(summaryJob.endDate)));
            DataframeOps.save(summaryJob.dailyStatsAvroTable(), summaryJob.tableProps(), DataframeOps.save$default$3(), DataframeOps.save$default$4());
        }
        Extensions.DataframeOps DataframeOps2 = ai.chronon.spark.Extensions$.MODULE$.DataframeOps(statsCompute.addDerivedMetrics(dailySummary.toFlatDf(), buildAggregator));
        DataframeOps2.save(summaryJob.dailyStatsTable(), summaryJob.tableProps(), DataframeOps2.save$default$3(), DataframeOps2.save$default$4());
        Predef$.MODULE$.println(new StringBuilder(19).append("Finished range [").append(_2$mcI$sp + 1).append("/").append(seq.size()).append("].").toString());
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ void $anonfun$dailyRun$2(SummaryJob summaryJob, Option option, double d, PartitionRange partitionRange) {
        Predef$.MODULE$.println(new StringBuilder(47).append("Daily output statistics table ").append(summaryJob.dailyStatsTable()).append(" unfilled range: ").append(partitionRange).toString());
        Seq seq = (Seq) option.map(obj -> {
            return partitionRange.steps(BoxesRunTime.unboxToInt(obj));
        }).getOrElse(() -> {
            return new C$colon$colon(partitionRange, Nil$.MODULE$);
        });
        Predef$.MODULE$.println(new StringBuilder(19).append("Ranges to compute: ").append(Extensions$.MODULE$.StringsOps((Iterable) seq.map(partitionRange2 -> {
            return partitionRange2.toString();
        }, Seq$.MODULE$.canBuildFrom())).pretty()).toString());
        ((IterableLike) seq.zipWithIndex(Seq$.MODULE$.canBuildFrom())).foreach(tuple2 -> {
            $anonfun$dailyRun$6(summaryJob, seq, d, tuple2);
            return BoxedUnit.UNIT;
        });
    }

    public SummaryJob(SparkSession sparkSession, Join join, String str) {
        this.joinConf = join;
        this.endDate = str;
        this.tableUtils = new TableUtils(sparkSession);
        this.dailyStatsTable = new StringBuilder(13).append(join.metaData.outputNamespace).append(".").append(Extensions$.MODULE$.MetadataOps(join.metaData).cleanName()).append("_stats_daily").toString();
        this.dailyStatsAvroTable = new StringBuilder(20).append(join.metaData.outputNamespace).append(".").append(Extensions$.MODULE$.MetadataOps(join.metaData).cleanName()).append("_stats_daily_upload").toString();
        this.tableProps = (Map) Option$.MODULE$.apply(join.metaData.tableProperties).map(map -> {
            return ScalaVersionSpecificCollectionsConverter$.MODULE$.convertJavaMapToScala(map).toMap(Predef$.MODULE$.$conforms());
        }).orNull(Predef$.MODULE$.$conforms());
    }
}
