package com.ebiznext.comet.job.index.kafkaload;

import com.ebiznext.comet.config.Settings;
import com.ebiznext.comet.config.SparkEnv;
import com.ebiznext.comet.schema.model.SinkType;
import com.ebiznext.comet.schema.model.Views;
import com.ebiznext.comet.utils.JobBase;
import com.ebiznext.comet.utils.JobResult;
import com.ebiznext.comet.utils.SparkJob;
import com.ebiznext.comet.utils.SparkJobResult;
import com.ebiznext.comet.utils.Utils$;
import com.ebiznext.comet.utils.kafka.KafkaClient;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.DatasetLogging;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.Trigger;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Try;
import scala.util.Try$;

/* compiled from: KafkaJob.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015a\u0001B\u0007\u000f\u0001mA\u0001\u0002\u000b\u0001\u0003\u0006\u0004%\t!\u000b\u0005\t]\u0001\u0011\t\u0011)A\u0005U!Aq\u0006\u0001BC\u0002\u0013\r\u0001\u0007\u0003\u00058\u0001\t\u0005\t\u0015!\u00032\u0011\u0015A\u0004\u0001\"\u0001:\u0011\u001dq\u0004A1A\u0005\n}Baa\u0012\u0001!\u0002\u0013\u0001\u0005\"\u0002%\u0001\t\u0003I\u0005\"B*\u0001\t\u0003I\u0005\"\u0002+\u0001\t\u0013)\u0006\"\u0002:\u0001\t\u0003\u001a\b\"\u0002=\u0001\t\u0003J(\u0001C&bM.\f'j\u001c2\u000b\u0005=\u0001\u0012!C6bM.\fGn\\1e\u0015\t\t\"#A\u0003j]\u0012,\u0007P\u0003\u0002\u0014)\u0005\u0019!n\u001c2\u000b\u0005U1\u0012!B2p[\u0016$(BA\f\u0019\u0003!)'-\u001b>oKb$(\"A\r\u0002\u0007\r|Wn\u0001\u0001\u0014\u0007\u0001a\"\u0005\u0005\u0002\u001eA5\taDC\u0001 \u0003\u0015\u00198-\u00197b\u0013\t\tcD\u0001\u0004B]f\u0014VM\u001a\t\u0003G\u0019j\u0011\u0001\n\u0006\u0003KQ\tQ!\u001e;jYNL!a\n\u0013\u0003\u0011M\u0003\u0018M]6K_\n\fab[1gW\u0006TuNY\"p]\u001aLw-F\u0001+!\tYC&D\u0001\u000f\u0013\ticB\u0001\bLC\u001a\\\u0017MS8c\u0007>tg-[4\u0002\u001f-\fgm[1K_\n\u001cuN\u001c4jO\u0002\n\u0001b]3ui&twm]\u000b\u0002cA\u0011!'N\u0007\u0002g)\u0011A\u0007F\u0001\u0007G>tg-[4\n\u0005Y\u001a$\u0001C*fiRLgnZ:\u0002\u0013M,G\u000f^5oON\u0004\u0013A\u0002\u001fj]&$h\b\u0006\u0002;{Q\u00111\b\u0010\t\u0003W\u0001AQaL\u0003A\u0004EBQ\u0001K\u0003A\u0002)\n1\u0002^8qS\u000e\u001cuN\u001c4jOV\t\u0001\t\u0005\u0002B\t:\u0011!GQ\u0005\u0003\u0007N\n\u0001bU3ui&twm]\u0005\u0003\u000b\u001a\u0013\u0001cS1gW\u0006$v\u000e]5d\u0007>tg-[4\u000b\u0005\r\u001b\u0014\u0001\u0004;pa&\u001c7i\u001c8gS\u001e\u0004\u0013aB8gM2|\u0017\r\u001a\u000b\u0002\u0015B\u00191J\u0014)\u000e\u00031S!!\u0014\u0010\u0002\tU$\u0018\u000e\\\u0005\u0003\u001f2\u00131\u0001\u0016:z!\t\u0019\u0013+\u0003\u0002SI\tq1\u000b]1sW*{'MU3tk2$\u0018\u0001\u00027pC\u0012\f\u0001\u0002\u001e:b]N4w.\u001c\u000b\u0003-B\u0004\"aV7\u000f\u0005aSgBA-h\u001d\tQFM\u0004\u0002\\C:\u0011AlX\u0007\u0002;*\u0011aLG\u0001\u0007yI|w\u000e\u001e \n\u0003\u0001\f1a\u001c:h\u0013\t\u00117-\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002A&\u0011QMZ\u0001\u0006gB\f'o\u001b\u0006\u0003E\u000eL!\u0001[5\u0002\u0007M\fHN\u0003\u0002fM&\u00111\u000e\\\u0001\ba\u0006\u001c7.Y4f\u0015\tA\u0017.\u0003\u0002o_\nIA)\u0019;b\rJ\fW.\u001a\u0006\u0003W2DQ!\u001d\u0006A\u0002Y\u000b!\u0001\u001a4\u0002\u0007I,h\u000eF\u0001u!\rYe*\u001e\t\u0003GYL!a\u001e\u0013\u0003\u0013){'MU3tk2$\u0018\u0001\u00028b[\u0016,\u0012A\u001f\t\u0003w~t!\u0001`?\u0011\u0005qs\u0012B\u0001@\u001f\u0003\u0019\u0001&/\u001a3fM&!\u0011\u0011AA\u0002\u0005\u0019\u0019FO]5oO*\u0011aP\b")
/* loaded from: input_file:com/ebiznext/comet/job/index/kafkaload/KafkaJob.class */
public class KafkaJob implements SparkJob {
    private final KafkaJobConfig kafkaJobConfig;
    private final Settings settings;
    private final Settings.KafkaTopicConfig topicConfig;
    private SparkEnv sparkEnv;
    private SparkSession session;
    private final Logger logger;
    private volatile byte bitmap$0;

    @Override // com.ebiznext.comet.utils.SparkJob
    public SparkConf withExtraSparkConf(SparkConf sparkConf) {
        SparkConf withExtraSparkConf;
        withExtraSparkConf = withExtraSparkConf(sparkConf);
        return withExtraSparkConf;
    }

    @Override // com.ebiznext.comet.utils.SparkJob
    public void registerUdf(String str) {
        registerUdf(str);
    }

    @Override // com.ebiznext.comet.utils.SparkJob
    public DataFrameWriter<Row> partitionedDatasetWriter(Dataset<Row> dataset, List<String> list) {
        DataFrameWriter<Row> partitionedDatasetWriter;
        partitionedDatasetWriter = partitionedDatasetWriter(dataset, list);
        return partitionedDatasetWriter;
    }

    @Override // com.ebiznext.comet.utils.SparkJob
    public Dataset<Row> partitionDataset(Dataset<Row> dataset, List<String> list) {
        Dataset<Row> partitionDataset;
        partitionDataset = partitionDataset(dataset, list);
        return partitionDataset;
    }

    @Override // com.ebiznext.comet.utils.SparkJob
    public Object analyze(String str) {
        Object analyze;
        analyze = analyze(str);
        return analyze;
    }

    @Override // com.ebiznext.comet.utils.SparkJob
    public void createSparkViews(Views views, Map<String, String> map) {
        createSparkViews(views, map);
    }

    @Override // com.ebiznext.comet.utils.JobBase
    public Tuple3<SinkType, Option<String>, String> parseViewDefinition(String str) {
        Tuple3<SinkType, Option<String>, String> parseViewDefinition;
        parseViewDefinition = parseViewDefinition(str);
        return parseViewDefinition;
    }

    @Override // org.apache.spark.sql.DatasetLogging
    public <T> DatasetLogging.DatasetHelper<T> DatasetHelper(Dataset<T> dataset) {
        DatasetLogging.DatasetHelper<T> DatasetHelper;
        DatasetHelper = DatasetHelper(dataset);
        return DatasetHelper;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [com.ebiznext.comet.job.index.kafkaload.KafkaJob] */
    private SparkEnv sparkEnv$lzycompute() {
        SparkEnv sparkEnv;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                sparkEnv = sparkEnv();
                this.sparkEnv = sparkEnv;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.sparkEnv;
    }

    @Override // com.ebiznext.comet.utils.SparkJob
    public SparkEnv sparkEnv() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? sparkEnv$lzycompute() : this.sparkEnv;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [com.ebiznext.comet.job.index.kafkaload.KafkaJob] */
    private SparkSession session$lzycompute() {
        SparkSession session;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                session = session();
                this.session = session;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.session;
    }

    @Override // com.ebiznext.comet.utils.SparkJob
    public SparkSession session() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? session$lzycompute() : this.session;
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger logger) {
        this.logger = logger;
    }

    public KafkaJobConfig kafkaJobConfig() {
        return this.kafkaJobConfig;
    }

    @Override // com.ebiznext.comet.utils.JobBase
    public Settings settings() {
        return this.settings;
    }

    private Settings.KafkaTopicConfig topicConfig() {
        return this.topicConfig;
    }

    public Try<SparkJobResult> offload() {
        return Try$.MODULE$.apply(() -> {
            return !this.kafkaJobConfig().streaming() ? (SparkJobResult) Utils$.MODULE$.withResources(() -> {
                return new KafkaClient(this.settings().comet().kafka(), this.settings());
            }, kafkaClient -> {
                Dataset<Row> coalesce;
                Boolean bool;
                Boolean bool2;
                Tuple2<Dataset<Row>, List<Tuple2<Object, Object>>> consumeTopicBatch = kafkaClient.consumeTopicBatch(this.kafkaJobConfig().topicConfigName(), this.session(), this.topicConfig());
                if (consumeTopicBatch == null) {
                    throw new MatchError(consumeTopicBatch);
                }
                Tuple2 tuple2 = new Tuple2((Dataset) consumeTopicBatch._1(), (List) consumeTopicBatch._2());
                Dataset<Row> dataset = (Dataset) tuple2._1();
                List<Tuple2<Object, Object>> list = (List) tuple2._2();
                Dataset<Row> transfom = this.transfom(dataset);
                Some coalesce2 = this.kafkaJobConfig().coalesce();
                if (None$.MODULE$.equals(coalesce2)) {
                    coalesce = transfom;
                } else {
                    if (!(coalesce2 instanceof Some)) {
                        throw new MatchError(coalesce2);
                    }
                    coalesce = transfom.coalesce(BoxesRunTime.unboxToInt(coalesce2.value()));
                }
                Dataset<Row> dataset2 = coalesce;
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("Saving to {}", new Object[]{this.kafkaJobConfig()});
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                }
                dataset2.write().mode(this.kafkaJobConfig().mode()).format(this.kafkaJobConfig().format()).options(this.kafkaJobConfig().writeOptions()).save(this.kafkaJobConfig().path());
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("Kafka saved messages to offload -> {}", new Object[]{this.kafkaJobConfig().path()});
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                }
                Some coalesce3 = this.kafkaJobConfig().coalesce();
                if ((coalesce3 instanceof Some) && 1 == BoxesRunTime.unboxToInt(coalesce3.value()) && this.kafkaJobConfig().coalesceMerge()) {
                    this.kafkaJobConfig().format();
                    Path path = new Path(this.kafkaJobConfig().path());
                    Path path2 = (Path) ((IterableLike) this.settings().storageHandler().list(path, this.settings().storageHandler().list$default$2(), this.settings().storageHandler().list$default$3(), false, this.settings().storageHandler().list$default$5()).filter(path3 -> {
                        return BoxesRunTime.boxToBoolean($anonfun$offload$4(path3));
                    })).head();
                    Path path4 = new Path(new StringBuilder(4).append(path.toString()).append(".tmp").toString());
                    if (this.settings().storageHandler().move(path2, path4)) {
                        this.settings().storageHandler().delete(path);
                        bool2 = BoxesRunTime.boxToBoolean(this.settings().storageHandler().move(path4, path));
                    } else {
                        bool2 = BoxedUnit.UNIT;
                    }
                    bool = bool2;
                } else {
                    bool = BoxedUnit.UNIT;
                }
                kafkaClient.topicSaveOffsets(this.kafkaJobConfig().topicConfigName(), this.topicConfig().accessOptions(), list);
                return new SparkJobResult(new Some(transfom));
            }) : (SparkJobResult) Utils$.MODULE$.withResources(() -> {
                return new KafkaClient(this.settings().comet().kafka(), this.settings());
            }, kafkaClient2 -> {
                Trigger Continuous;
                Dataset<Row> transfom = this.transfom(kafkaClient2.consumeTopicStreaming(this.session(), this.topicConfig()));
                String lowerCase = this.kafkaJobConfig().streamingTrigger().toLowerCase();
                if ("once".equals(lowerCase)) {
                    Continuous = Trigger.Once();
                } else if ("processingtime".equals(lowerCase)) {
                    Continuous = Trigger.ProcessingTime(this.kafkaJobConfig().streamingTriggerOption());
                } else {
                    if (!"continuous".equals(lowerCase)) {
                        throw new MatchError(lowerCase);
                    }
                    Continuous = Trigger.Continuous(this.kafkaJobConfig().streamingTriggerOption());
                }
                DataStreamWriter trigger = transfom.writeStream().outputMode(this.kafkaJobConfig().streamingWriteMode()).format(this.kafkaJobConfig().streamingWriteFormat()).options(this.kafkaJobConfig().writeOptions()).trigger(Continuous);
                Seq<String> streamingWritePartitionBy = this.kafkaJobConfig().streamingWritePartitionBy();
                DataStreamWriter partitionBy = Nil$.MODULE$.equals(streamingWritePartitionBy) ? trigger : trigger.partitionBy(streamingWritePartitionBy);
                if (this.kafkaJobConfig().streamingWriteToTable()) {
                    throw new Exception("streamingWriteToTable Not Supported");
                }
                partitionBy.start(this.kafkaJobConfig().path()).awaitTermination();
                return new SparkJobResult(None$.MODULE$);
            });
        });
    }

    public Try<SparkJobResult> load() {
        return Try$.MODULE$.apply(() -> {
            return (SparkJobResult) Utils$.MODULE$.withResources(() -> {
                return new KafkaClient(this.settings().comet().kafka(), this.settings());
            }, kafkaClient -> {
                Dataset<Row> transfom = this.transfom(this.session().read().format(this.kafkaJobConfig().format()).load(Predef$.MODULE$.wrapRefArray(new StringOps(Predef$.MODULE$.augmentString(this.kafkaJobConfig().path())).split(','))));
                kafkaClient.sinkToTopic(this.topicConfig(), transfom);
                return new SparkJobResult(new Some(transfom));
            });
        });
    }

    private Dataset<Row> transfom(Dataset<Row> dataset) {
        Dataset<Row> dataset2;
        Some transformInstance = kafkaJobConfig().transformInstance();
        if (transformInstance instanceof Some) {
            dataset2 = ((DataFrameTransform) transformInstance.value()).transform(dataset);
        } else {
            if (!None$.MODULE$.equals(transformInstance)) {
                throw new MatchError(transformInstance);
            }
            dataset2 = dataset;
        }
        return dataset2;
    }

    @Override // com.ebiznext.comet.utils.JobBase
    public Try<JobResult> run() {
        return kafkaJobConfig().offload() ? offload() : load();
    }

    @Override // com.ebiznext.comet.utils.JobBase
    public String name() {
        return String.valueOf(kafkaJobConfig().topicConfigName());
    }

    public static final /* synthetic */ boolean $anonfun$offload$4(Path path) {
        return path.getName().startsWith("part-");
    }

    public KafkaJob(KafkaJobConfig kafkaJobConfig, Settings settings) {
        this.kafkaJobConfig = kafkaJobConfig;
        this.settings = settings;
        StrictLogging.$init$(this);
        DatasetLogging.$init$(this);
        JobBase.$init$((JobBase) this);
        SparkJob.$init$((SparkJob) this);
        this.topicConfig = (Settings.KafkaTopicConfig) settings.comet().kafka().topics().apply(kafkaJobConfig.topicConfigName());
    }
}
