package ai.starlake.job.sink.kafka;

import ai.starlake.config.DatasetArea$;
import ai.starlake.config.Settings;
import ai.starlake.config.SparkEnv;
import ai.starlake.schema.handlers.SchemaHandler;
import ai.starlake.schema.model.SinkType;
import ai.starlake.schema.model.Views;
import ai.starlake.utils.Formatter$;
import ai.starlake.utils.JobBase;
import ai.starlake.utils.JobResult;
import ai.starlake.utils.SparkJob;
import ai.starlake.utils.SparkJobResult;
import ai.starlake.utils.Utils$;
import ai.starlake.utils.kafka.KafkaClient;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.DatasetLogging;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.UDFRegistration;
import org.apache.spark.sql.avro.SchemaConverters;
import org.apache.spark.sql.avro.SchemaConverters$;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.Trigger;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Try;
import scala.util.Try$;

/* compiled from: KafkaJob.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005Eg\u0001\u0002\u000f\u001e\u0001!B\u0001\"\u000e\u0001\u0003\u0006\u0004%\tA\u000e\u0005\tw\u0001\u0011\t\u0011)A\u0005o!AA\b\u0001BC\u0002\u0013\rQ\b\u0003\u0005E\u0001\t\u0005\t\u0015!\u0003?\u0011\u0015)\u0005\u0001\"\u0001G\u0011\u001dY\u0005A1A\u0005\u00021Ca!\u0016\u0001!\u0002\u0013i\u0005b\u0002,\u0001\u0005\u0004%Ia\u0016\u0005\u0007?\u0002\u0001\u000b\u0011\u0002-\t\u000f\u0001\u0004!\u0019!C\u0005C\"1Q\u000e\u0001Q\u0001\n\tDqA\u001c\u0001C\u0002\u0013\u0005q\u000e\u0003\u0004t\u0001\u0001\u0006I\u0001\u001d\u0005\bi\u0002\u0011\r\u0011\"\u0001v\u0011\u001d\tI\u0001\u0001Q\u0001\nYDq!a\u0003\u0001\t\u0003\ti\u0001C\u0005\u0002,\u0001\t\n\u0011\"\u0001\u0002.!9\u00111\t\u0001\u0005\u0002\u0005\u0015\u0003\"CA8\u0001\t\u0007I\u0011AA9\u0011!\t)\b\u0001Q\u0001\n\u0005M\u0004bBA<\u0001\u0011\u0005\u0011\u0011\u0010\u0005\b\u0003\u001b\u0003A\u0011AA=\u0011%\ty\t\u0001b\u0001\n\u0013\t\t\n\u0003\u0005\u0002\u001c\u0002\u0001\u000b\u0011BAJ\u0011\u001d\ti\n\u0001C\u0005\u0003?Cq!a1\u0001\t\u0003\n)\r\u0003\u0004\u0002P\u0002!\t%\u0019\u0002\t\u0017\u000647.\u0019&pE*\u0011adH\u0001\u0006W\u000647.\u0019\u0006\u0003A\u0005\nAa]5oW*\u0011!eI\u0001\u0004U>\u0014'B\u0001\u0013&\u0003!\u0019H/\u0019:mC.,'\"\u0001\u0014\u0002\u0005\u0005L7\u0001A\n\u0004\u0001%z\u0003C\u0001\u0016.\u001b\u0005Y#\"\u0001\u0017\u0002\u000bM\u001c\u0017\r\\1\n\u00059Z#AB!osJ+g\r\u0005\u00021g5\t\u0011G\u0003\u00023G\u0005)Q\u000f^5mg&\u0011A'\r\u0002\t'B\f'o\u001b&pE\u0006q1.\u00194lC*{'mQ8oM&<W#A\u001c\u0011\u0005aJT\"A\u000f\n\u0005ij\"AD&bM.\f'j\u001c2D_:4\u0017nZ\u0001\u0010W\u000647.\u0019&pE\u000e{gNZ5hA\u0005A1/\u001a;uS:<7/F\u0001?!\ty$)D\u0001A\u0015\t\t5%\u0001\u0004d_:4\u0017nZ\u0005\u0003\u0007\u0002\u0013\u0001bU3ui&twm]\u0001\ng\u0016$H/\u001b8hg\u0002\na\u0001P5oSRtDCA$K)\tA\u0015\n\u0005\u00029\u0001!)A(\u0002a\u0002}!)Q'\u0002a\u0001o\u0005i1o\u00195f[\u0006D\u0015M\u001c3mKJ,\u0012!\u0014\t\u0003\u001dNk\u0011a\u0014\u0006\u0003!F\u000b\u0001\u0002[1oI2,'o\u001d\u0006\u0003%\u000e\naa]2iK6\f\u0017B\u0001+P\u00055\u00196\r[3nC\"\u000bg\u000e\u001a7fe\u0006q1o\u00195f[\u0006D\u0015M\u001c3mKJ\u0004\u0013a\u0003;pa&\u001c7i\u001c8gS\u001e,\u0012\u0001\u0017\t\u00033rs!a\u0010.\n\u0005m\u0003\u0015\u0001C*fiRLgnZ:\n\u0005us&\u0001E&bM.\fGk\u001c9jG\u000e{gNZ5h\u0015\tY\u0006)\u0001\u0007u_BL7mQ8oM&<\u0007%A\u0005gS:\fG\u000eU1uQV\t!\r\u0005\u0002dU:\u0011A\r\u001b\t\u0003K.j\u0011A\u001a\u0006\u0003O\u001e\na\u0001\u0010:p_Rt\u0014BA5,\u0003\u0019\u0001&/\u001a3fM&\u00111\u000e\u001c\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005%\\\u0013A\u00034j]\u0006d\u0007+\u0019;iA\u0005\t2o\u00195f[\u0006\u0014VmZ5tiJLXK\u001d7\u0016\u0003A\u00042AK9c\u0013\t\u00118F\u0001\u0004PaRLwN\\\u0001\u0013g\u000eDW-\\1SK\u001eL7\u000f\u001e:z+Jd\u0007%\u0001\u000btG\",W.\u0019*fO&\u001cHO]=DY&,g\u000e^\u000b\u0002mB\u0019!&]<\u0011\u0007a\f)!D\u0001z\u0015\tQ80\u0001\u0004dY&,g\u000e\u001e\u0006\u0003yv\fab]2iK6\f'/Z4jgR\u0014\u0018P\u0003\u0002\u001f}*\u0019q0!\u0001\u0002\u0013\r|gN\u001a7vK:$(BAA\u0002\u0003\tIw.C\u0002\u0002\be\u0014!dQ1dQ\u0016$7k\u00195f[\u0006\u0014VmZ5tiJL8\t\\5f]R\fQc]2iK6\f'+Z4jgR\u0014\u0018p\u00117jK:$\b%A\tm_>\\W\u000f\u001d+pa&\u001c7k\u00195f[\u0006$b!a\u0004\u0002\u001e\u0005\u0005\u0002\u0003\u0002\u0016r\u0003#\u0001B!a\u0005\u0002\u00165\t\u0001!\u0003\u0003\u0002\u0018\u0005e!A\u0004&eE\u000e\u001cuN\u001c4jO:\u000bW.Z\u0005\u0004\u00037\t$a\u0002&pE\n\u000b7/\u001a\u0005\u0007\u0003?\u0001\u0002\u0019\u00012\u0002\u000bQ|\u0007/[2\t\u0013\u0005\r\u0002\u0003%AA\u0002\u0005\u0015\u0012!B5t\u0017\u0016L\bc\u0001\u0016\u0002(%\u0019\u0011\u0011F\u0016\u0003\u000f\t{w\u000e\\3b]\u0006YBn\\8lkB$v\u000e]5d'\u000eDW-\\1%I\u00164\u0017-\u001e7uII*\"!a\f+\t\u0005\u0015\u0012\u0011G\u0016\u0003\u0003g\u0001B!!\u000e\u0002@5\u0011\u0011q\u0007\u0006\u0005\u0003s\tY$A\u0005v]\u000eDWmY6fI*\u0019\u0011QH\u0016\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0003\u0002B\u0005]\"!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\u00069\u0012M\u001e:p'\u000eDW-\\1U_N\u0003\u0018M]6TG\",W.\u0019\u000b\u0005\u0003\u000f\nY\u0007\u0005\u0003\u0002J\u0005\u0015d\u0002BA&\u0003Cj!!!\u0014\u000b\t\u0005=\u0013\u0011K\u0001\u0005CZ\u0014xN\u0003\u0003\u0002T\u0005U\u0013aA:rY*!\u0011qKA-\u0003\u0015\u0019\b/\u0019:l\u0015\u0011\tY&!\u0018\u0002\r\u0005\u0004\u0018m\u00195f\u0015\t\ty&A\u0002pe\u001eLA!a\u0019\u0002N\u0005\u00012k\u00195f[\u0006\u001cuN\u001c<feR,'o]\u0005\u0005\u0003O\nIG\u0001\u0006TG\",W.\u0019+za\u0016TA!a\u0019\u0002N!1\u0011Q\u000e\nA\u0002\t\f!\"\u0019<s_N\u001b\u0007.Z7b\u00035!gMV1mk\u0016\u001c6\r[3nCV\u0011\u00111\u000f\t\u0005UE\f9%\u0001\beMZ\u000bG.^3TG\",W.\u0019\u0011\u0002\u000f=4g\r\\8bIR\u0011\u00111\u0010\t\u0007\u0003{\n\u0019)a\"\u000e\u0005\u0005}$bAAAW\u0005!Q\u000f^5m\u0013\u0011\t))a \u0003\u0007Q\u0013\u0018\u0010E\u00021\u0003\u0013K1!a#2\u00059\u0019\u0006/\u0019:l\u0015>\u0014'+Z:vYR\fA\u0001\\8bI\u0006\tBO]1og\u001a|'/\\%ogR\fgnY3\u0016\u0005\u0005M\u0005\u0003\u0002\u0016r\u0003+\u00032\u0001OAL\u0013\r\tI*\b\u0002\u0013\t\u0006$\u0018M\u0012:b[\u0016$&/\u00198tM>\u0014X.\u0001\nue\u0006t7OZ8s[&s7\u000f^1oG\u0016\u0004\u0013\u0001\u0003;sC:\u001chm\\7\u0015\t\u0005\u0005\u0016q\u0018\t\u0005\u0003G\u000bIL\u0004\u0003\u0002&\u0006Uf\u0002BAT\u0003gsA!!+\u00022:!\u00111VAX\u001d\r)\u0017QV\u0005\u0003\u0003?JA!a\u0017\u0002^%!\u0011qKA-\u0013\u0011\t\u0019&!\u0016\n\t\u0005]\u0016\u0011K\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\tY,!0\u0003\u0013\u0011\u000bG/\u0019$sC6,'\u0002BA\\\u0003#Bq!!1\u001a\u0001\u0004\t\t+\u0001\u0002eM\u0006\u0019!/\u001e8\u0015\u0005\u0005\u001d\u0007CBA?\u0003\u0007\u000bI\rE\u00021\u0003\u0017L1!!42\u0005%QuN\u0019*fgVdG/\u0001\u0003oC6,\u0007")
/* loaded from: input_file:ai/starlake/job/sink/kafka/KafkaJob.class */
public class KafkaJob implements SparkJob {
    private final KafkaJobConfig kafkaJobConfig;
    private final Settings settings;
    private final SchemaHandler schemaHandler;
    private final Settings.KafkaTopicConfig topicConfig;
    private final String finalPath;
    private final Option<String> schemaRegistryUrl;
    private final Option<CachedSchemaRegistryClient> schemaRegistryClient;
    private final Option<SchemaConverters.SchemaType> dfValueSchema;
    private final Option<DataFrameTransform> transformInstance;
    private SparkEnv ai$starlake$utils$SparkJob$$sparkEnv;
    private SparkSession session;
    private final Logger logger;
    private volatile byte bitmap$0;

    @Override // ai.starlake.utils.SparkJob
    public SparkConf withExtraSparkConf(SparkConf sparkConf) {
        SparkConf withExtraSparkConf;
        withExtraSparkConf = withExtraSparkConf(sparkConf);
        return withExtraSparkConf;
    }

    @Override // ai.starlake.utils.SparkJob
    public void registerUdf(String str) {
        registerUdf(str);
    }

    @Override // ai.starlake.utils.SparkJob
    public DataFrameWriter<Row> partitionedDatasetWriter(Dataset<Row> dataset, List<String> list) {
        DataFrameWriter<Row> partitionedDatasetWriter;
        partitionedDatasetWriter = partitionedDatasetWriter(dataset, list);
        return partitionedDatasetWriter;
    }

    @Override // ai.starlake.utils.SparkJob
    public Dataset<Row> partitionDataset(Dataset<Row> dataset, List<String> list) {
        Dataset<Row> partitionDataset;
        partitionDataset = partitionDataset(dataset, list);
        return partitionDataset;
    }

    @Override // ai.starlake.utils.SparkJob
    public Object analyze(String str) {
        Object analyze;
        analyze = analyze(str);
        return analyze;
    }

    @Override // ai.starlake.utils.SparkJob
    public void createSparkViews(Views views, Map<String, String> map, Map<String, String> map2) {
        createSparkViews(views, map, map2);
    }

    @Override // ai.starlake.utils.SparkJob
    public Dataset<Row> createSparkView(SinkType sinkType, Option<String> option, String str) {
        Dataset<Row> createSparkView;
        createSparkView = createSparkView(sinkType, option, str);
        return createSparkView;
    }

    @Override // ai.starlake.utils.JobBase
    public Tuple3<SinkType, Option<String>, String> parseViewDefinition(String str) {
        Tuple3<SinkType, Option<String>, String> parseViewDefinition;
        parseViewDefinition = parseViewDefinition(str);
        return parseViewDefinition;
    }

    @Override // org.apache.spark.sql.DatasetLogging
    public <T> DatasetLogging.DatasetHelper<T> DatasetHelper(Dataset<T> dataset) {
        DatasetLogging.DatasetHelper<T> DatasetHelper;
        DatasetHelper = DatasetHelper(dataset);
        return DatasetHelper;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [ai.starlake.job.sink.kafka.KafkaJob] */
    private SparkEnv ai$starlake$utils$SparkJob$$sparkEnv$lzycompute() {
        SparkEnv ai$starlake$utils$SparkJob$$sparkEnv;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                ai$starlake$utils$SparkJob$$sparkEnv = ai$starlake$utils$SparkJob$$sparkEnv();
                this.ai$starlake$utils$SparkJob$$sparkEnv = ai$starlake$utils$SparkJob$$sparkEnv;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.ai$starlake$utils$SparkJob$$sparkEnv;
    }

    @Override // ai.starlake.utils.SparkJob
    public SparkEnv ai$starlake$utils$SparkJob$$sparkEnv() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? ai$starlake$utils$SparkJob$$sparkEnv$lzycompute() : this.ai$starlake$utils$SparkJob$$sparkEnv;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [ai.starlake.job.sink.kafka.KafkaJob] */
    private SparkSession session$lzycompute() {
        SparkSession session;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                session = session();
                this.session = session;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.session;
    }

    @Override // ai.starlake.utils.SparkJob
    public SparkSession session() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? session$lzycompute() : this.session;
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger logger) {
        this.logger = logger;
    }

    public KafkaJobConfig kafkaJobConfig() {
        return this.kafkaJobConfig;
    }

    @Override // ai.starlake.utils.JobBase
    public Settings settings() {
        return this.settings;
    }

    public SchemaHandler schemaHandler() {
        return this.schemaHandler;
    }

    private Settings.KafkaTopicConfig topicConfig() {
        return this.topicConfig;
    }

    private String finalPath() {
        return this.finalPath;
    }

    public Option<String> schemaRegistryUrl() {
        return this.schemaRegistryUrl;
    }

    public Option<CachedSchemaRegistryClient> schemaRegistryClient() {
        return this.schemaRegistryClient;
    }

    public Option<String> lookupTopicSchema(String str, boolean z) {
        return schemaRegistryClient().map(cachedSchemaRegistryClient -> {
            return cachedSchemaRegistryClient.getLatestSchemaMetadata(new StringBuilder(0).append(str).append((Object) (z ? "-key" : "-value")).toString()).getSchema();
        });
    }

    public boolean lookupTopicSchema$default$2() {
        return false;
    }

    public SchemaConverters.SchemaType avroSchemaToSparkSchema(String str) {
        return SchemaConverters$.MODULE$.toSqlType(new Schema.Parser().parse(str));
    }

    public Option<SchemaConverters.SchemaType> dfValueSchema() {
        return this.dfValueSchema;
    }

    public Try<SparkJobResult> offload() {
        return Try$.MODULE$.apply(() -> {
            return !this.kafkaJobConfig().streaming() ? (SparkJobResult) Utils$.MODULE$.withResources(() -> {
                return new KafkaClient(this.settings().comet().kafka(), this.settings());
            }, kafkaClient -> {
                Dataset<Row> repartition;
                Boolean bool;
                Boolean bool2;
                Tuple2<Dataset<Row>, List<Tuple2<Object, Object>>> consumeTopicBatch = kafkaClient.consumeTopicBatch(this.kafkaJobConfig().topicConfigName(), this.session(), this.topicConfig());
                if (consumeTopicBatch == null) {
                    throw new MatchError(consumeTopicBatch);
                }
                Tuple2 tuple2 = new Tuple2((Dataset) consumeTopicBatch._1(), (List) consumeTopicBatch._2());
                Dataset<Row> dataset = (Dataset) tuple2._1();
                List<Tuple2<Object, Object>> list = (List) tuple2._2();
                Dataset<Row> transfom = this.transfom(dataset);
                Some coalesce = this.kafkaJobConfig().coalesce();
                if (None$.MODULE$.equals(coalesce)) {
                    repartition = transfom;
                } else {
                    if (!(coalesce instanceof Some)) {
                        throw new MatchError(coalesce);
                    }
                    repartition = transfom.repartition(BoxesRunTime.unboxToInt(coalesce.value()));
                }
                Dataset<Row> dataset2 = repartition;
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("Saving to {}", new Object[]{this.kafkaJobConfig()});
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                }
                dataset2.write().mode(this.kafkaJobConfig().mode()).format(this.kafkaJobConfig().format()).options(this.kafkaJobConfig().writeOptions()).save(this.finalPath());
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("Kafka saved messages to offload -> {}", new Object[]{this.finalPath()});
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                }
                Some coalesce2 = this.kafkaJobConfig().coalesce();
                if ((coalesce2 instanceof Some) && 1 == BoxesRunTime.unboxToInt(coalesce2.value())) {
                    this.kafkaJobConfig().format();
                    Path path = new Path(this.finalPath());
                    Path path2 = (Path) ((IterableLike) this.settings().storageHandler().list(path, this.settings().storageHandler().list$default$2(), this.settings().storageHandler().list$default$3(), false, this.settings().storageHandler().list$default$5()).filter(path3 -> {
                        return BoxesRunTime.boxToBoolean($anonfun$offload$4(path3));
                    })).head();
                    Path path4 = new Path(new StringBuilder(4).append(path.toString()).append(".tmp").toString());
                    if (this.settings().storageHandler().move(path2, path4)) {
                        this.settings().storageHandler().delete(path);
                        bool2 = BoxesRunTime.boxToBoolean(this.settings().storageHandler().move(path4, path));
                    } else {
                        bool2 = BoxedUnit.UNIT;
                    }
                    bool = bool2;
                } else {
                    bool = BoxedUnit.UNIT;
                }
                kafkaClient.topicSaveOffsets(this.kafkaJobConfig().topicConfigName(), this.topicConfig().allAccessOptions(this.settings().comet().kafka().sparkServerOptions()), list);
                return new SparkJobResult(new Some(transfom));
            }) : (SparkJobResult) Utils$.MODULE$.withResources(() -> {
                return new KafkaClient(this.settings().comet().kafka(), this.settings());
            }, kafkaClient2 -> {
                DataStreamWriter dataStreamWriter;
                DataStreamWriter options = this.transfom(kafkaClient2.consumeTopicStreaming(this.session(), this.topicConfig())).writeStream().outputMode(this.kafkaJobConfig().streamingWriteMode()).format(this.kafkaJobConfig().streamingWriteFormat()).options(this.kafkaJobConfig().writeOptions());
                Some map = this.kafkaJobConfig().streamingTrigger().map(str -> {
                    return str.toLowerCase();
                }).map(str2 -> {
                    Trigger Continuous;
                    if ("once".equals(str2)) {
                        Continuous = Trigger.Once();
                    } else if ("processingtime".equals(str2)) {
                        Continuous = Trigger.ProcessingTime(this.kafkaJobConfig().streamingTriggerOption());
                    } else {
                        if (!"continuous".equals(str2)) {
                            throw new MatchError(str2);
                        }
                        Continuous = Trigger.Continuous(this.kafkaJobConfig().streamingTriggerOption());
                    }
                    return Continuous;
                });
                if (map instanceof Some) {
                    dataStreamWriter = options.trigger((Trigger) map.value());
                } else {
                    if (!None$.MODULE$.equals(map)) {
                        throw new MatchError(map);
                    }
                    dataStreamWriter = options;
                }
                DataStreamWriter dataStreamWriter2 = dataStreamWriter;
                Seq<String> streamingWritePartitionBy = this.kafkaJobConfig().streamingWritePartitionBy();
                DataStreamWriter partitionBy = Nil$.MODULE$.equals(streamingWritePartitionBy) ? dataStreamWriter2 : dataStreamWriter2.partitionBy(streamingWritePartitionBy);
                if (this.kafkaJobConfig().streamingWriteToTable()) {
                    throw new Exception("streamingWriteToTable Not Supported");
                }
                partitionBy.start(this.finalPath()).awaitTermination();
                return new SparkJobResult(None$.MODULE$);
            });
        });
    }

    public Try<SparkJobResult> load() {
        return Try$.MODULE$.apply(() -> {
            return (SparkJobResult) Utils$.MODULE$.withResources(() -> {
                return new KafkaClient(this.settings().comet().kafka(), this.settings());
            }, kafkaClient -> {
                Dataset<Row> transfom = this.transfom(this.session().read().format(this.kafkaJobConfig().format()).load(Predef$.MODULE$.wrapRefArray(new StringOps(Predef$.MODULE$.augmentString(this.finalPath())).split(','))));
                kafkaClient.sinkToTopic(this.topicConfig(), transfom);
                return new SparkJobResult(new Some(transfom));
            });
        });
    }

    private Option<DataFrameTransform> transformInstance() {
        return this.transformInstance;
    }

    private Dataset<Row> transfom(Dataset<Row> dataset) {
        Dataset<Row> dataset2;
        Some transformInstance = transformInstance();
        if (transformInstance instanceof Some) {
            dataset2 = ((DataFrameTransform) transformInstance.value()).transform(dataset);
        } else {
            if (!None$.MODULE$.equals(transformInstance)) {
                throw new MatchError(transformInstance);
            }
            dataset2 = dataset;
        }
        return dataset2;
    }

    @Override // ai.starlake.utils.JobBase
    public Try<JobResult> run() {
        settings().comet().kafka().customDeserializer().foreach(str -> {
            CustomDeserializer$.MODULE$.configure(str, this.settings().comet().kafka().serverOptions());
            String str = this.topicConfig().topicName();
            UDFRegistration udf = this.session().udf();
            Function1 function1 = bArr -> {
                return CustomDeserializer$.MODULE$.deserialize(str, bArr);
            };
            TypeTags universe = package$.MODULE$.universe();
            final KafkaJob kafkaJob = null;
            TypeTags.TypeTag apply = universe.TypeTag().apply(package$.MODULE$.universe().runtimeMirror(KafkaJob.class.getClassLoader()), new TypeCreator(kafkaJob) { // from class: ai.starlake.job.sink.kafka.KafkaJob$$typecreator1$1
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    Universe universe2 = mirror.universe();
                    return universe2.internal().reificationSupport().TypeRef(universe2.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe2.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
                }
            });
            TypeTags universe2 = package$.MODULE$.universe();
            final KafkaJob kafkaJob2 = null;
            return udf.register("deserialize", function1, apply, universe2.TypeTag().apply(package$.MODULE$.universe().runtimeMirror(KafkaJob.class.getClassLoader()), new TypeCreator(kafkaJob2) { // from class: ai.starlake.job.sink.kafka.KafkaJob$$typecreator2$1
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    Universe universe3 = mirror.universe();
                    return universe3.internal().reificationSupport().TypeRef(universe3.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Array"), new $colon.colon(mirror.staticClass("scala.Byte").asType().toTypeConstructor(), Nil$.MODULE$));
                }
            }));
        });
        return kafkaJobConfig().offload() ? offload() : load();
    }

    @Override // ai.starlake.utils.JobBase
    public String name() {
        return String.valueOf(kafkaJobConfig().topicConfigName());
    }

    public static final /* synthetic */ boolean $anonfun$offload$4(Path path) {
        return path.getName().startsWith("part-");
    }

    public KafkaJob(KafkaJobConfig kafkaJobConfig, Settings settings) {
        this.kafkaJobConfig = kafkaJobConfig;
        this.settings = settings;
        StrictLogging.$init$(this);
        DatasetLogging.$init$(this);
        JobBase.$init$((JobBase) this);
        SparkJob.$init$((SparkJob) this);
        DatasetArea$.MODULE$.initMetadata(settings.metadataStorageHandler(), settings);
        this.schemaHandler = new SchemaHandler(settings.metadataStorageHandler(), settings);
        this.topicConfig = (Settings.KafkaTopicConfig) settings.comet().kafka().topics().apply(kafkaJobConfig.topicConfigName());
        this.finalPath = Formatter$.MODULE$.RichFormatter(kafkaJobConfig.path()).richFormat(schemaHandler().activeEnv(), (Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("config"), kafkaJobConfig.topicConfigName()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("topic"), topicConfig().topicName())})), settings);
        this.schemaRegistryUrl = settings.comet().kafka().serverOptions().get("schema.registry.url");
        this.schemaRegistryClient = schemaRegistryUrl().map(str -> {
            return new CachedSchemaRegistryClient(str, 128);
        });
        this.dfValueSchema = lookupTopicSchema(topicConfig().topicName(), lookupTopicSchema$default$2()).map(str2 -> {
            return this.avroSchemaToSparkSchema(str2);
        });
        this.transformInstance = kafkaJobConfig.transform().map(str3 -> {
            return (DataFrameTransform) Utils$.MODULE$.loadInstance(str3);
        }).map(dataFrameTransform -> {
            return dataFrameTransform.configure(this.topicConfig());
        });
    }
}
