package ai.starlake.job.ingest;

import ai.starlake.config.Settings;
import ai.starlake.job.sink.bigquery.BigQueryLoadConfig;
import ai.starlake.job.sink.bigquery.BigQueryLoadConfig$;
import ai.starlake.job.sink.bigquery.BigQueryNativeJob;
import ai.starlake.job.sink.bigquery.BigQueryNativeJob$;
import ai.starlake.job.sink.jdbc.ConnectionLoadConfig$;
import ai.starlake.job.sink.jdbc.ConnectionLoadJob;
import ai.starlake.schema.model.BigQuerySink;
import ai.starlake.schema.model.EsSink;
import ai.starlake.schema.model.FsSink;
import ai.starlake.schema.model.JdbcSink;
import ai.starlake.schema.model.NoneSink;
import ai.starlake.schema.model.Sink;
import ai.starlake.utils.FileLock;
import ai.starlake.utils.Utils$;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import java.sql.Timestamp;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.AtomicType;
import org.apache.spark.sql.types.BooleanType$;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.Tuple12;
import scala.Tuple3;
import scala.collection.Seq;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: AuditLog.scala */
/* loaded from: input_file:ai/starlake/job/ingest/AuditLog$.class */
public final class AuditLog$ implements StrictLogging, Serializable {
    public static AuditLog$ MODULE$;
    private final List<Tuple3<String, StandardSQLTypeName, AtomicType>> auditCols;
    private final Logger logger;

    static {
        new AuditLog$();
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger logger) {
        this.logger = logger;
    }

    public List<Tuple3<String, StandardSQLTypeName, AtomicType>> auditCols() {
        return this.auditCols;
    }

    private Schema bqSchema() {
        return Schema.of((Field[]) ((List) auditCols().map(tuple3 -> {
            if (tuple3 != null) {
                return Field.newBuilder((String) tuple3._1(), (StandardSQLTypeName) tuple3._2(), new Field[0]).setMode(Field.Mode.NULLABLE).setDescription("").build();
            }
            throw new MatchError(tuple3);
        }, List$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(Field.class)));
    }

    public Object sink(SparkSession sparkSession, AuditLog auditLog, Settings settings) {
        if (settings.comet().sinkToFile()) {
            sinkToFile$1(auditLog, settings, sparkSession);
        }
        Sink sink = settings.comet().audit().sink();
        if (sink instanceof JdbcSink) {
            JdbcSink jdbcSink = (JdbcSink) sink;
            return new ConnectionLoadJob(ConnectionLoadConfig$.MODULE$.fromComet(jdbcSink.connection(), settings.comet(), package$.MODULE$.Right().apply(sparkSession.createDataFrame(sparkSession.implicits().rddToDatasetHolder(sparkSession.sparkContext().parallelize(new $colon.colon(auditLog, Nil$.MODULE$), sparkSession.sparkContext().parallelize$default$2(), ClassTag$.MODULE$.apply(AuditLog.class)), sparkSession.implicits().newProductEncoder(scala.reflect.runtime.package$.MODULE$.universe().TypeTag().apply(scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: ai.starlake.job.ingest.AuditLog$$typecreator6$2
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    mirror.universe();
                    return mirror.staticClass("ai.starlake.job.ingest.AuditLog").asType().toTypeConstructor();
                }
            }))).toDF().rdd(), StructType$.MODULE$.apply((Seq) auditCols().map(tuple3 -> {
                if (tuple3 != null) {
                    return new StructField((String) tuple3._1(), (AtomicType) tuple3._3(), true, StructField$.MODULE$.apply$default$4());
                }
                throw new MatchError(tuple3);
            }, List$.MODULE$.canBuildFrom()))).toDF((Seq) auditCols().map(tuple32 -> {
                if (tuple32 != null) {
                    return (String) tuple32._1();
                }
                throw new MatchError(tuple32);
            }, List$.MODULE$.canBuildFrom()))), "audit", ConnectionLoadConfig$.MODULE$.fromComet$default$5(), ConnectionLoadConfig$.MODULE$.fromComet$default$6(), BoxesRunTime.unboxToInt(jdbcSink.partitions().getOrElse(() -> {
                return 1;
            })), BoxesRunTime.unboxToInt(jdbcSink.batchsize().getOrElse(() -> {
                return 1000;
            })), jdbcSink.getOptions(), ConnectionLoadConfig$.MODULE$.fromComet$default$10()), settings).run();
        }
        if (sink instanceof BigQuerySink) {
            BigQuerySink bigQuerySink = (BigQuerySink) sink;
            String str = (String) bigQuerySink.name().getOrElse(() -> {
                return "audit";
            });
            BigQueryNativeJob$.MODULE$.createTable(str, "audit", bqSchema());
            BigQueryLoadConfig bigQueryLoadConfig = new BigQueryLoadConfig(package$.MODULE$.Left().apply("ignore"), str, "audit", None$.MODULE$, Nil$.MODULE$, settings.comet().defaultFormat(), "CREATE_IF_NEEDED", "WRITE_APPEND", None$.MODULE$, None$.MODULE$, BigQueryLoadConfig$.MODULE$.apply$default$11(), BigQueryLoadConfig$.MODULE$.apply$default$12(), BigQueryLoadConfig$.MODULE$.apply$default$13(), bigQuerySink.getOptions(), BigQueryLoadConfig$.MODULE$.apply$default$15(), BigQueryLoadConfig$.MODULE$.apply$default$16(), BigQueryLoadConfig$.MODULE$.apply$default$17(), BigQueryLoadConfig$.MODULE$.apply$default$18());
            return Utils$.MODULE$.logFailure(new BigQueryNativeJob(bigQueryLoadConfig, auditLog.asBqInsert(new StringBuilder(1).append(bigQueryLoadConfig.outputDataset()).append(".").append(bigQueryLoadConfig.outputTable()).toString()), None$.MODULE$, settings).runBatchQuery(), logger());
        }
        if (sink instanceof EsSink) {
            throw new Exception("Sinking Audit log to Elasticsearch not yet supported");
        }
        if ((sink instanceof NoneSink ? true : sink instanceof FsSink) && !settings.comet().sinkToFile()) {
            sinkToFile$1(auditLog, settings, sparkSession);
            return BoxedUnit.UNIT;
        }
        if ((sink instanceof NoneSink ? true : sink instanceof FsSink) && settings.comet().sinkToFile()) {
            return BoxedUnit.UNIT;
        }
        throw new MatchError(sink);
    }

    public AuditLog apply(String str, String str2, String str3, String str4, boolean z, long j, long j2, long j3, Timestamp timestamp, long j4, String str5, String str6) {
        return new AuditLog(str, str2, str3, str4, z, j, j2, j3, timestamp, j4, str5, str6);
    }

    public Option<Tuple12<String, String, String, String, Object, Object, Object, Object, Timestamp, Object, String, String>> unapply(AuditLog auditLog) {
        return auditLog == null ? None$.MODULE$ : new Some(new Tuple12(auditLog.jobid(), auditLog.paths(), auditLog.domain(), auditLog.schema(), BoxesRunTime.boxToBoolean(auditLog.success()), BoxesRunTime.boxToLong(auditLog.count()), BoxesRunTime.boxToLong(auditLog.countAccepted()), BoxesRunTime.boxToLong(auditLog.countRejected()), auditLog.timestamp(), BoxesRunTime.boxToLong(auditLog.duration()), auditLog.message(), auditLog.step()));
    }

    private Object readResolve() {
        return MODULE$;
    }

    private static final void sinkToFile$1(AuditLog auditLog, Settings settings, SparkSession sparkSession) {
        FileLock fileLock = new FileLock(new Path(settings.comet().audit().path(), "audit.lock"), settings.storageHandler());
        fileLock.doExclusively(fileLock.doExclusively$default$1(), () -> {
            Path path = new Path(settings.comet().audit().path(), "ingestion-log");
            DataFrameWriter mode = sparkSession.implicits().localSeqToDatasetHolder(new $colon.colon(auditLog, Nil$.MODULE$), sparkSession.implicits().newProductEncoder(scala.reflect.runtime.package$.MODULE$.universe().TypeTag().apply(scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(MODULE$.getClass().getClassLoader()), new TypeCreator() { // from class: ai.starlake.job.ingest.AuditLog$$typecreator6$1
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    mirror.universe();
                    return mirror.staticClass("ai.starlake.job.ingest.AuditLog").asType().toTypeConstructor();
                }
            }))).toDF().write().mode(SaveMode.Append);
            if (MODULE$.logger().underlying().isInfoEnabled()) {
                MODULE$.logger().underlying().info("Saving audit to path {}", new Object[]{path});
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            if (!settings.comet().hive()) {
                if (MODULE$.logger().underlying().isInfoEnabled()) {
                    MODULE$.logger().underlying().info("Saving audit to file {}", new Object[]{path});
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                }
                mode.format(settings.comet().defaultAuditWriteFormat()).option("path", path.toString()).save();
                return;
            }
            String str = (String) settings.comet().audit().sink().name().getOrElse(() -> {
                return "audit";
            });
            String sb = new StringBuilder(1).append(str).append(".").append("audit").toString();
            sparkSession.sql(new StringBuilder(30).append("create database if not exists ").append(str).toString());
            sparkSession.sql(new StringBuilder(4).append("use ").append(str).toString());
            if (MODULE$.logger().underlying().isInfoEnabled()) {
                MODULE$.logger().underlying().info("Saving audit to table {}", new Object[]{sb});
                BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
            }
            mode.format(settings.comet().defaultAuditWriteFormat()).saveAsTable(sb);
        });
    }

    private AuditLog$() {
        MODULE$ = this;
        StrictLogging.$init$(this);
        this.auditCols = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple3[]{new Tuple3("jobid", StandardSQLTypeName.STRING, StringType$.MODULE$), new Tuple3("paths", StandardSQLTypeName.STRING, StringType$.MODULE$), new Tuple3("domain", StandardSQLTypeName.STRING, StringType$.MODULE$), new Tuple3("schema", StandardSQLTypeName.STRING, StringType$.MODULE$), new Tuple3("success", StandardSQLTypeName.BOOL, BooleanType$.MODULE$), new Tuple3("count", StandardSQLTypeName.INT64, LongType$.MODULE$), new Tuple3("countAccepted", StandardSQLTypeName.INT64, LongType$.MODULE$), new Tuple3("countRejected", StandardSQLTypeName.INT64, LongType$.MODULE$), new Tuple3("timestamp", StandardSQLTypeName.TIMESTAMP, TimestampType$.MODULE$), new Tuple3("duration", StandardSQLTypeName.INT64, LongType$.MODULE$), new Tuple3("message", StandardSQLTypeName.STRING, StringType$.MODULE$), new Tuple3("step", StandardSQLTypeName.STRING, StringType$.MODULE$)}));
    }
}
