package ai.starlake.job.sink.jdbc;

import ai.starlake.config.Settings;
import ai.starlake.config.SparkEnv;
import ai.starlake.extract.JdbcDbUtils$;
import ai.starlake.utils.JobResult;
import ai.starlake.utils.SparkJob;
import ai.starlake.utils.SparkJobResult;
import ai.starlake.utils.SparkUtils$;
import ai.starlake.utils.Utils$;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import java.sql.Connection;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.DatasetLogging;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap;
import org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite;
import org.apache.spark.sql.types.StructType;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.collection.immutable.Seq;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Statics;
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;
import scala.util.Try;
import scala.util.Try$;

/* compiled from: SparkJdbcWriter.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005\u001da\u0001B\u0007\u000f\u0001eA\u0001B\n\u0001\u0003\u0002\u0003\u0006Ia\n\u0005\tW\u0001\u0011)\u0019!C\u0002Y!A1\u0007\u0001B\u0001B\u0003%Q\u0006C\u00035\u0001\u0011\u0005Q\u0007C\u0003;\u0001\u0011\u00053\bC\u0004H\u0001\t\u0007I\u0011\u0001%\t\rQ\u0003\u0001\u0015!\u0003J\u0011\u001d)\u0006A1A\u0005\u0002YCa!\u001c\u0001!\u0002\u00139\u0006\"\u00028\u0001\t\u0013y\u0007\"B:\u0001\t\u0003!\b\"B?\u0001\t\u0003r(aD*qCJ\\'\n\u001a2d/JLG/\u001a:\u000b\u0005=\u0001\u0012\u0001\u00026eE\u000eT!!\u0005\n\u0002\tMLgn\u001b\u0006\u0003'Q\t1A[8c\u0015\t)b#\u0001\u0005ti\u0006\u0014H.Y6f\u0015\u00059\u0012AA1j\u0007\u0001\u00192\u0001\u0001\u000e!!\tYb$D\u0001\u001d\u0015\u0005i\u0012!B:dC2\f\u0017BA\u0010\u001d\u0005\u0019\te.\u001f*fMB\u0011\u0011\u0005J\u0007\u0002E)\u00111\u0005F\u0001\u0006kRLGn]\u0005\u0003K\t\u0012\u0001b\u00159be.TuNY\u0001\nG2L7i\u001c8gS\u001e\u0004\"\u0001K\u0015\u000e\u00039I!A\u000b\b\u00031)#'mY\"p]:,7\r^5p]2{\u0017\rZ\"p]\u001aLw-\u0001\u0005tKR$\u0018N\\4t+\u0005i\u0003C\u0001\u00182\u001b\u0005y#B\u0001\u0019\u0015\u0003\u0019\u0019wN\u001c4jO&\u0011!g\f\u0002\t'\u0016$H/\u001b8hg\u0006I1/\u001a;uS:<7\u000fI\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0005YJDCA\u001c9!\tA\u0003\u0001C\u0003,\t\u0001\u000fQ\u0006C\u0003'\t\u0001\u0007q%\u0001\u0003oC6,W#\u0001\u001f\u0011\u0005u\"eB\u0001 C!\tyD$D\u0001A\u0015\t\t\u0005$\u0001\u0004=e>|GOP\u0005\u0003\u0007r\ta\u0001\u0015:fI\u00164\u0017BA#G\u0005\u0019\u0019FO]5oO*\u00111\tH\u0001\u0005G>tg-F\u0001J!\tQ%+D\u0001L\u0015\t9EJ\u0003\u0002N\u001d\u00061\u0001.\u00193p_BT!a\u0014)\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\t\u0016aA8sO&\u00111k\u0013\u0002\u000e\u0007>tg-[4ve\u0006$\u0018n\u001c8\u0002\u000b\r|gN\u001a\u0011\u0002\u0017)$'mY(qi&|gn]\u000b\u0002/B\u0019\u0001,Y2\u000e\u0003eS!AW.\u0002\tU$\u0018\u000e\u001c\u0006\u00039v\u000b\u0001bY1uC2L8\u000f\u001e\u0006\u0003=~\u000b1a]9m\u0015\t\u0001g*A\u0003ta\u0006\u00148.\u0003\u0002c3\n\u00112)Y:f\u0013:\u001cXM\\:ji&4X-T1q!\t!'N\u0004\u0002fQ6\taM\u0003\u0002h)\u00059Q\r\u001f;sC\u000e$\u0018BA5g\u0003-QEMY2EEV#\u0018\u000e\\:\n\u0005-d'!\u0003+bE2,g*Y7f\u0015\tIg-\u0001\u0007kI\n\u001cw\n\u001d;j_:\u001c\b%\u0001\u0004jg\u001aKG.\u001a\u000b\u0002aB\u00111$]\u0005\u0003er\u0011qAQ8pY\u0016\fg.A\u0004sk:TEIQ\"\u0015\u0003U\u00042A\u001e={\u001b\u00059(B\u0001.\u001d\u0013\tIxOA\u0002Uef\u0004\"!I>\n\u0005q\u0014#AD*qCJ\\'j\u001c2SKN,H\u000e^\u0001\u0004eVtG#A@\u0011\tYD\u0018\u0011\u0001\t\u0004C\u0005\r\u0011bAA\u0003E\tI!j\u001c2SKN,H\u000e\u001e")
/* loaded from: input_file:ai/starlake/job/sink/jdbc/SparkJdbcWriter.class */
public class SparkJdbcWriter implements SparkJob {
    private final JdbcConnectionLoadConfig cliConfig;
    private final Settings settings;
    private final Configuration conf;
    private final CaseInsensitiveMap<String> jdbcOptions;
    private SparkEnv ai$starlake$utils$SparkJob$$sparkEnv;
    private SparkSession session;
    private String appName;
    private Logger logger;
    private volatile byte bitmap$0;

    @Override // ai.starlake.utils.SparkJob
    public SparkConf withExtraSparkConf(SparkConf sparkConf) {
        SparkConf withExtraSparkConf;
        withExtraSparkConf = withExtraSparkConf(sparkConf);
        return withExtraSparkConf;
    }

    @Override // ai.starlake.utils.SparkJob
    public String getTableLocation(String str, String str2) {
        String tableLocation;
        tableLocation = getTableLocation(str, str2);
        return tableLocation;
    }

    @Override // ai.starlake.utils.SparkJob
    public String getTableLocation(String str) {
        String tableLocation;
        tableLocation = getTableLocation(str);
        return tableLocation;
    }

    @Override // ai.starlake.utils.SparkJob
    public void registerUdf(String str) {
        registerUdf(str);
    }

    @Override // ai.starlake.utils.JobBase
    public String applicationId() {
        String applicationId;
        applicationId = applicationId();
        return applicationId;
    }

    @Override // org.apache.spark.sql.DatasetLogging
    public <T> DatasetLogging.DatasetHelper<T> DatasetHelper(Dataset<T> dataset) {
        DatasetLogging.DatasetHelper<T> DatasetHelper;
        DatasetHelper = DatasetHelper(dataset);
        return DatasetHelper;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [ai.starlake.job.sink.jdbc.SparkJdbcWriter] */
    private SparkEnv ai$starlake$utils$SparkJob$$sparkEnv$lzycompute() {
        SparkEnv ai$starlake$utils$SparkJob$$sparkEnv;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                ai$starlake$utils$SparkJob$$sparkEnv = ai$starlake$utils$SparkJob$$sparkEnv();
                this.ai$starlake$utils$SparkJob$$sparkEnv = ai$starlake$utils$SparkJob$$sparkEnv;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.ai$starlake$utils$SparkJob$$sparkEnv;
    }

    @Override // ai.starlake.utils.SparkJob
    public SparkEnv ai$starlake$utils$SparkJob$$sparkEnv() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? ai$starlake$utils$SparkJob$$sparkEnv$lzycompute() : this.ai$starlake$utils$SparkJob$$sparkEnv;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [ai.starlake.job.sink.jdbc.SparkJdbcWriter] */
    private SparkSession session$lzycompute() {
        SparkSession session;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                session = session();
                this.session = session;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.session;
    }

    @Override // ai.starlake.utils.SparkJob
    public SparkSession session() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? session$lzycompute() : this.session;
    }

    @Override // ai.starlake.utils.JobBase
    public String appName() {
        return this.appName;
    }

    @Override // ai.starlake.utils.JobBase
    public void ai$starlake$utils$JobBase$_setter_$appName_$eq(String str) {
        this.appName = str;
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger logger) {
        this.logger = logger;
    }

    @Override // ai.starlake.utils.JobBase
    public Settings settings() {
        return this.settings;
    }

    @Override // ai.starlake.utils.JobBase
    public String name() {
        return new StringBuilder(13).append("cnxload-JDBC-").append(this.cliConfig.outputDomainAndTableName()).toString();
    }

    public Configuration conf() {
        return this.conf;
    }

    public CaseInsensitiveMap<String> jdbcOptions() {
        return this.jdbcOptions;
    }

    private boolean isFile() {
        return this.cliConfig.sourceFile().isLeft();
    }

    public Try<SparkJobResult> runJDBC() {
        Either<String, Dataset<Row>> sourceFile = this.cliConfig.sourceFile();
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info("Input path {}", sourceFile);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        return Try$.MODULE$.apply(() -> {
            Dataset dataset;
            if (sourceFile instanceof Left) {
                dataset = this.session().read().format(this.settings().appConfig().defaultWriteFormat()).load((String) ((Left) sourceFile).value());
            } else {
                if (!(sourceFile instanceof Right)) {
                    throw new MatchError(sourceFile);
                }
                dataset = (Dataset) ((Right) sourceFile).value();
            }
            Dataset dataset2 = dataset;
            String str = this.cliConfig.outputDomainAndTableName().split("\\.")[0];
            String str2 = (String) this.jdbcOptions().apply("url");
            JdbcDbUtils$.MODULE$.withJDBCConnection(this.jdbcOptions(), connection -> {
                $anonfun$runJDBC$2(this, str2, str, dataset2, connection);
                return BoxedUnit.UNIT;
            }, this.settings());
            boolean contains = str2.contains("jdbc:duckdb");
            DataFrameWriter option = dataset2.write().format(contains ? "starlake-duckdb" : "jdbc").option("dbtable", this.cliConfig.outputDomainAndTableName());
            SparkUtils$.MODULE$.dialect(str2);
            JdbcDbUtils$.MODULE$.withJDBCConnection(this.jdbcOptions(), connection2 -> {
                $anonfun$runJDBC$7(this, connection2);
                return BoxedUnit.UNIT;
            }, this.settings());
            DataFrameWriter options = option.mode(SaveMode.Append).options(this.cliConfig.options());
            if (contains) {
                options.option("numPartitions", "1").save();
            } else {
                options.save();
            }
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info("JDBC save done to table {} at {}", new Object[]{this.cliConfig.outputDomainAndTableName(), this.cliConfig.options()});
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            }
            return new SparkJobResult(None$.MODULE$, None$.MODULE$);
        });
    }

    @Override // ai.starlake.utils.JobBase
    public Try<JobResult> run() {
        return Utils$.MODULE$.logFailure(runJDBC(), logger());
    }

    public static final /* synthetic */ boolean $anonfun$runJDBC$5(Connection connection, String str) {
        return JdbcDbUtils$.MODULE$.executeAlterTable(str, connection);
    }

    public static final /* synthetic */ boolean $anonfun$runJDBC$6(Connection connection, String str) {
        return JdbcDbUtils$.MODULE$.executeAlterTable(str, connection);
    }

    public static final /* synthetic */ void $anonfun$runJDBC$2(SparkJdbcWriter sparkJdbcWriter, String str, String str2, Dataset dataset, Connection connection) {
        boolean tableExists = JdbcDbUtils$.MODULE$.tableExists(connection, str, sparkJdbcWriter.cliConfig.outputDomainAndTableName());
        if (!tableExists && sparkJdbcWriter.settings().appConfig().createSchemaIfNotExists()) {
            if (sparkJdbcWriter.logger().underlying().isInfoEnabled()) {
                sparkJdbcWriter.logger().underlying().info("table {} not found, trying to create it", sparkJdbcWriter.cliConfig.outputDomainAndTableName());
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            JdbcDbUtils$.MODULE$.createSchema(connection, str2);
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else if (sparkJdbcWriter.logger().underlying().isInfoEnabled()) {
            sparkJdbcWriter.logger().underlying().info("Schema {} found", str2);
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
        }
        StructType schema = dataset.schema();
        if (!SparkUtils$.MODULE$.isFlat(schema) || !tableExists) {
            JdbcOptionsInWrite jdbcOptionsInWrite = new JdbcOptionsInWrite(str, sparkJdbcWriter.cliConfig.outputDomainAndTableName(), sparkJdbcWriter.jdbcOptions());
            if (sparkJdbcWriter.logger().underlying().isInfoEnabled()) {
                sparkJdbcWriter.logger().underlying().info("Table {} not found, creating it with schema {}", new Object[]{sparkJdbcWriter.cliConfig.outputDomainAndTableName(), schema});
                BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit7 = BoxedUnit.UNIT;
            }
            SparkUtils$.MODULE$.createTable(connection, sparkJdbcWriter.cliConfig.outputDomainAndTableName(), schema, false, jdbcOptionsInWrite, Predef$.MODULE$.Map().empty(), sparkJdbcWriter.settings());
            return;
        }
        Option<StructType> schemaOption = SparkUtils$.MODULE$.getSchemaOption(connection, sparkJdbcWriter.jdbcOptions(), sparkJdbcWriter.cliConfig.outputDomainAndTableName());
        StructType added = SparkUtils$.MODULE$.added(schema, (StructType) schemaOption.getOrElse(() -> {
            return schema;
        }));
        Seq<String> alterTableDropColumnsString = SparkUtils$.MODULE$.alterTableDropColumnsString(SparkUtils$.MODULE$.dropped(schema, (StructType) schemaOption.getOrElse(() -> {
            return schema;
        })), sparkJdbcWriter.cliConfig.outputDomainAndTableName());
        if (alterTableDropColumnsString.nonEmpty()) {
            if (sparkJdbcWriter.logger().underlying().isInfoEnabled()) {
                sparkJdbcWriter.logger().underlying().info("alter table {} with {} columns to drop", new Object[]{sparkJdbcWriter.cliConfig.outputDomainAndTableName(), BoxesRunTime.boxToInteger(alterTableDropColumnsString.size())});
                BoxedUnit boxedUnit8 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit9 = BoxedUnit.UNIT;
            }
            if (sparkJdbcWriter.logger().underlying().isDebugEnabled()) {
                sparkJdbcWriter.logger().underlying().debug("alter table {}", alterTableDropColumnsString.mkString("\n"));
                BoxedUnit boxedUnit10 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit11 = BoxedUnit.UNIT;
            }
        } else {
            BoxedUnit boxedUnit12 = BoxedUnit.UNIT;
        }
        Seq<String> alterTableAddColumnsString = SparkUtils$.MODULE$.alterTableAddColumnsString(added, sparkJdbcWriter.cliConfig.outputDomainAndTableName(), Predef$.MODULE$.Map().empty());
        if (alterTableAddColumnsString.nonEmpty()) {
            if (sparkJdbcWriter.logger().underlying().isInfoEnabled()) {
                sparkJdbcWriter.logger().underlying().info("alter table {} with {} columns to add", new Object[]{sparkJdbcWriter.cliConfig.outputDomainAndTableName(), BoxesRunTime.boxToInteger(alterTableAddColumnsString.size())});
                BoxedUnit boxedUnit13 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit14 = BoxedUnit.UNIT;
            }
            if (sparkJdbcWriter.logger().underlying().isDebugEnabled()) {
                sparkJdbcWriter.logger().underlying().debug("alter table {}", alterTableAddColumnsString.mkString("\n"));
                BoxedUnit boxedUnit15 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit16 = BoxedUnit.UNIT;
            }
        } else {
            BoxedUnit boxedUnit17 = BoxedUnit.UNIT;
        }
        alterTableDropColumnsString.foreach(str3 -> {
            return BoxesRunTime.boxToBoolean($anonfun$runJDBC$5(connection, str3));
        });
        alterTableAddColumnsString.foreach(str4 -> {
            return BoxesRunTime.boxToBoolean($anonfun$runJDBC$6(connection, str4));
        });
    }

    public static final /* synthetic */ void $anonfun$runJDBC$7(SparkJdbcWriter sparkJdbcWriter, Connection connection) {
        JdbcDbUtils$.MODULE$.truncateTable(connection, sparkJdbcWriter.cliConfig.outputDomainAndTableName());
    }

    public SparkJdbcWriter(JdbcConnectionLoadConfig jdbcConnectionLoadConfig, Settings settings) {
        this.cliConfig = jdbcConnectionLoadConfig;
        this.settings = settings;
        StrictLogging.$init$(this);
        DatasetLogging.$init$(this);
        ai$starlake$utils$JobBase$_setter_$appName_$eq((String) Option$.MODULE$.apply(System.getenv("SL_JOB_ID")).orElse(() -> {
            return this.settings().appConfig().jobIdEnvName().flatMap(str3 -> {
                return Option$.MODULE$.apply(System.getenv(str3));
            });
        }).getOrElse(() -> {
            return new StringBuilder(1).append(this.name()).append("-").append(System.currentTimeMillis()).toString();
        }));
        SparkJob.$init$((SparkJob) this);
        this.conf = session().sparkContext().hadoopConfiguration();
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info("JDBC Config {}", jdbcConnectionLoadConfig);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        this.jdbcOptions = JdbcDbUtils$.MODULE$.jdbcOptions(jdbcConnectionLoadConfig.options(), jdbcConnectionLoadConfig.format());
        Statics.releaseFence();
    }
}
