package streaming.dsl;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.streaming.DataStreamReader;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import streaming.dsl.DslTool;
import streaming.dsl.parser.DSLSQLParser;
import streaming.dsl.template.TemplateMerge$;
import streaming.source.parser.SourceParser$;
import streaming.source.parser.SourceSchema;

/* compiled from: LoadAdaptor.scala */
@ScalaSignature(bytes = "\u0006\u0001\u001d4A!\u0001\u0002\u0001\u000f\t\t2\u000b\u001e:fC6du.\u00193BI\u0006\u0004Ho\u001c:\u000b\u0005\r!\u0011a\u00013tY*\tQ!A\u0005tiJ,\u0017-\\5oO\u000e\u00011c\u0001\u0001\t\u001dA\u0011\u0011\u0002D\u0007\u0002\u0015)\t1\"A\u0003tG\u0006d\u0017-\u0003\u0002\u000e\u0015\t1\u0011I\\=SK\u001a\u0004\"a\u0004\t\u000e\u0003\tI!!\u0005\u0002\u0003\u000f\u0011\u001bH\u000eV8pY\"A1\u0003\u0001B\u0001B\u0003%A#A\u000btGJL\u0007\u000f^*R\u0019\u0016CXm\u0019'jgR,g.\u001a:\u0011\u0005=)\u0012B\u0001\f\u0003\u0005U\u00196M]5qiN\u000bF*\u0012=fG2K7\u000f^3oKJD\u0001\u0002\u0007\u0001\u0003\u0002\u0003\u0006I!G\u0001\u0007_B$\u0018n\u001c8\u0011\tii\u0002\u0005\t\b\u0003\u0013mI!\u0001\b\u0006\u0002\rA\u0013X\rZ3g\u0013\tqrDA\u0002NCBT!\u0001\b\u0006\u0011\u0005i\t\u0013B\u0001\u0012 \u0005\u0019\u0019FO]5oO\"AA\u0005\u0001BA\u0002\u0013\u0005Q%\u0001\u0003qCRDW#\u0001\u0011\t\u0011\u001d\u0002!\u00111A\u0005\u0002!\n\u0001\u0002]1uQ~#S-\u001d\u000b\u0003S1\u0002\"!\u0003\u0016\n\u0005-R!\u0001B+oSRDq!\f\u0014\u0002\u0002\u0003\u0007\u0001%A\u0002yIEB\u0001b\f\u0001\u0003\u0002\u0003\u0006K\u0001I\u0001\u0006a\u0006$\b\u000e\t\u0005\tc\u0001\u0011\t\u0011)A\u0005A\u0005IA/\u00192mK:\u000bW.\u001a\u0005\tg\u0001\u0011\t\u0011)A\u0005A\u00051am\u001c:nCRDQ!\u000e\u0001\u0005\u0002Y\na\u0001P5oSRtDCB\u001c9siZD\b\u0005\u0002\u0010\u0001!)1\u0003\u000ea\u0001)!)\u0001\u0004\u000ea\u00013!)A\u0005\u000ea\u0001A!)\u0011\u0007\u000ea\u0001A!)1\u0007\u000ea\u0001A!)a\b\u0001C\u0001\u007f\u0005iq/\u001b;i/\u0006$XM]'be.$2\u0001Q(d!\r\t%\nT\u0007\u0002\u0005*\u00111\tR\u0001\u0004gFd'BA#G\u0003\u0015\u0019\b/\u0019:l\u0015\t9\u0005*\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0013\u0006\u0019qN]4\n\u0005-\u0013%a\u0002#bi\u0006\u001cX\r\u001e\t\u0003\u00036K!A\u0014\"\u0003\u0007I{w\u000fC\u0003Q{\u0001\u0007\u0011+A\u0003uC\ndW\r\u0005\u0002SA:\u00111K\u0018\b\u0003)vs!!\u0016/\u000f\u0005Y[fBA,[\u001b\u0005A&BA-\u0007\u0003\u0019a$o\\8u}%\t\u0011*\u0003\u0002H\u0011&\u0011QIR\u0005\u0003\u0007\u0012K!a\u0018\"\u0002\u000fA\f7m[1hK&\u0011\u0011M\u0019\u0002\n\t\u0006$\u0018M\u0012:b[\u0016T!a\u0018\"\t\u000bai\u0004\u0019A\r\t\u000b\u0015\u0004A\u0011\u00014\u0002\u000bA\f'o]3\u0016\u0003%\u0002")
/* loaded from: input_file:streaming/dsl/StreamLoadAdaptor.class */
public class StreamLoadAdaptor implements DslTool {
    private final ScriptSQLExecListener scriptSQLExecListener;
    private final Map<String, String> option;
    private String path;
    private final String tableName;
    public final String streaming$dsl$StreamLoadAdaptor$$format;

    @Override // streaming.dsl.DslTool
    public String currentText(DSLSQLParser.SqlContext sqlContext) {
        return DslTool.Cclass.currentText(this, sqlContext);
    }

    @Override // streaming.dsl.DslTool
    public String cleanStr(String str) {
        return DslTool.Cclass.cleanStr(this, str);
    }

    @Override // streaming.dsl.DslTool
    public String cleanBlockStr(String str) {
        return DslTool.Cclass.cleanBlockStr(this, str);
    }

    @Override // streaming.dsl.DslTool
    public String getStrOrBlockStr(DSLSQLParser.ExpressionContext expressionContext) {
        return DslTool.Cclass.getStrOrBlockStr(this, expressionContext);
    }

    @Override // streaming.dsl.DslTool
    public String withPathPrefix(String str, String str2) {
        return DslTool.Cclass.withPathPrefix(this, str, str2);
    }

    @Override // streaming.dsl.DslTool
    public String withPathPrefix(MLSQLExecuteContext mLSQLExecuteContext, String str) {
        return DslTool.Cclass.withPathPrefix(this, mLSQLExecuteContext, str);
    }

    @Override // streaming.dsl.DslTool
    public Tuple2<String, String> parseDBAndTableFromStr(String str) {
        return DslTool.Cclass.parseDBAndTableFromStr(this, str);
    }

    @Override // streaming.dsl.DslTool
    public String resourceRealPath(ScriptSQLExecListener scriptSQLExecListener, Option<String> option, String str) {
        return DslTool.Cclass.resourceRealPath(this, scriptSQLExecListener, option, str);
    }

    public String path() {
        return this.path;
    }

    public void path_$eq(String str) {
        this.path = str;
    }

    public Dataset<Row> withWaterMark(Dataset<Row> dataset, Map<String, String> map) {
        return map.contains("eventTimeCol") ? dataset.withWatermark((String) map.apply("eventTimeCol"), (String) map.apply("delayThreshold")) : dataset;
    }

    public void parse() {
        Dataset<Row> load;
        DataStreamReader readStream = this.scriptSQLExecListener.sparkSession().readStream();
        String cleanStr = cleanStr(path());
        String str = this.streaming$dsl$StreamLoadAdaptor$$format;
        if ("kafka".equals(str) ? true : "socket".equals(str)) {
            if (cleanStr.isEmpty()) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                readStream.option("subscribe", cleanStr);
            }
            load = readStream.options(this.option).format(this.streaming$dsl$StreamLoadAdaptor$$format).load();
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else {
            if ("kafka8".equals(str) ? true : "kafka9".equals(str)) {
                if (cleanStr.isEmpty()) {
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                } else {
                    readStream.option("topics", cleanStr);
                }
                load = readStream.format("com.hortonworks.spark.sql.kafka08").options(this.option).load();
                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            } else if ("mockStream".equals(str)) {
                load = readStream.format("org.apache.spark.sql.execution.streaming.mock.MockStreamSourceProvider").options(this.option.$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("path"), cleanStr(path())))).load();
                BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
            } else {
                load = readStream.format((String) this.option.getOrElse("provider", new StreamLoadAdaptor$$anonfun$2(this))).options(this.option).load();
                BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
            }
        }
        Dataset<Row> withWaterMark = withWaterMark(load, this.option);
        if (this.option.contains("valueSchema") && this.option.contains("valueFormat")) {
            withWaterMark = withWaterMark.withColumn("kafkaValue", functions$.MODULE$.struct((Seq) ((List) List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"key", "partition", "offset", "timestamp", "timestampType", "topic"})).$plus$plus(new StringOps(Predef$.MODULE$.augmentString((String) this.option.getOrElse("keepValue", new StreamLoadAdaptor$$anonfun$3(this)))).toBoolean() ? List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"value"})) : Nil$.MODULE$, List$.MODULE$.canBuildFrom())).map(new StreamLoadAdaptor$$anonfun$parse$4(this), List$.MODULE$.canBuildFrom()))).selectExpr(Predef$.MODULE$.wrapRefArray(new String[]{"CAST(value AS STRING) as tmpValue", "kafkaValue"})).select(Predef$.MODULE$.wrapRefArray(new Column[]{SourceParser$.MODULE$.getSourceParser((String) this.option.apply("valueFormat")).parse(functions$.MODULE$.col("tmpValue"), new SourceSchema((String) this.option.apply("valueSchema")), (Map) Predef$.MODULE$.Map().apply(Nil$.MODULE$)).as("data"), functions$.MODULE$.col("kafkaValue")})).select("data.*", Predef$.MODULE$.wrapRefArray(new String[]{"kafkaValue"}));
        }
        path_$eq(TemplateMerge$.MODULE$.merge(path(), this.scriptSQLExecListener.env().toMap(Predef$.MODULE$.$conforms())));
        withWaterMark.createOrReplaceTempView(this.tableName);
    }

    public StreamLoadAdaptor(ScriptSQLExecListener scriptSQLExecListener, Map<String, String> map, String str, String str2, String str3) {
        this.scriptSQLExecListener = scriptSQLExecListener;
        this.option = map;
        this.path = str;
        this.tableName = str2;
        this.streaming$dsl$StreamLoadAdaptor$$format = str3;
        DslTool.Cclass.$init$(this);
    }
}
