/*
 * Decompiled with CFR 0.152.
 */
package it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka;

import com.typesafe.config.Config;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaSparkStructuredStreamingReader$;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.TopicModelUtils$;
import it.agilelab.bigdata.wasp.consumers.spark.readers.SparkStructuredStreamingReader;
import it.agilelab.bigdata.wasp.consumers.spark.utils.AvroDeserializerExpression;
import it.agilelab.bigdata.wasp.consumers.spark.utils.SparkUtils$;
import it.agilelab.bigdata.wasp.core.WaspSystem$;
import it.agilelab.bigdata.wasp.core.kafka.CheckOrCreateTopic;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.utils.AvroSchemaConverters$;
import it.agilelab.bigdata.wasp.core.utils.ConfigManager$;
import it.agilelab.bigdata.wasp.core.utils.StringToByteArrayUtil$;
import it.agilelab.bigdata.wasp.models.DatastoreModel;
import it.agilelab.bigdata.wasp.models.MultiTopicModel;
import it.agilelab.bigdata.wasp.models.MultiTopicModel$;
import it.agilelab.bigdata.wasp.models.StreamingReaderModel;
import it.agilelab.bigdata.wasp.models.StructuredStreamingETLModel;
import it.agilelab.bigdata.wasp.models.SubjectStrategy$;
import it.agilelab.bigdata.wasp.models.TopicDataTypes$;
import it.agilelab.bigdata.wasp.models.TopicModel;
import it.agilelab.bigdata.wasp.models.configuration.ConnectionConfig;
import it.agilelab.bigdata.wasp.models.configuration.Handle$;
import it.agilelab.bigdata.wasp.models.configuration.Ignore$;
import it.agilelab.bigdata.wasp.models.configuration.KafkaConfigModel;
import it.agilelab.bigdata.wasp.models.configuration.KafkaEntryConfig;
import it.agilelab.bigdata.wasp.models.configuration.ParsingMode;
import it.agilelab.bigdata.wasp.models.configuration.ParsingMode$;
import it.agilelab.bigdata.wasp.models.configuration.Strict$;
import it.agilelab.bigdata.wasp.repository.core.bl.ConfigBL$;
import it.agilelab.bigdata.wasp.repository.core.bl.TopicBL;
import it.agilelab.bigdata.wasp.spark.sql.kafka011.KafkaSparkSQLSchemas$;
import it.agilelab.darwin.manager.AvroSchemaManager;
import it.agilelab.darwin.manager.AvroSchemaManagerFactory$;
import org.apache.avro.Schema;
import org.apache.spark.SparkException;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.CaseWhen;
import org.apache.spark.sql.catalyst.expressions.CaseWhen$;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.Hex$;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.PartialFunction;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.GenSeq;
import scala.collection.GenTraversableOnce;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.reflect.api.JavaUniverse;
import scala.reflect.api.Mirror;
import scala.reflect.api.Symbols;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.runtime.ObjectRef;
import scala.runtime.VolatileByteRef;
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;

public final class KafkaSparkStructuredStreamingReader$
implements SparkStructuredStreamingReader,
Logging {
    public static final KafkaSparkStructuredStreamingReader$ MODULE$;
    private final String KAFKA_METADATA_COL;
    private final String it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME;
    private final String DATA_TYPE_ATTRIBUTE_NAME;
    private final WaspLogger logger;

    static {
        new KafkaSparkStructuredStreamingReader$();
    }

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger x$1) {
        this.logger = x$1;
    }

    private String KAFKA_METADATA_COL() {
        return this.KAFKA_METADATA_COL;
    }

    public String it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME() {
        return this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME;
    }

    private String DATA_TYPE_ATTRIBUTE_NAME() {
        return this.DATA_TYPE_ATTRIBUTE_NAME;
    }

    public Dataset<Row> createStructuredStream(StructuredStreamingETLModel etl, StreamingReaderModel streamingReaderModel, SparkSession ss) {
        this.logger().info((Function0)new Serializable(etl, streamingReaderModel){
            public static final long serialVersionUID = 0L;
            private final StructuredStreamingETLModel etl$1;
            private final StreamingReaderModel streamingReaderModel$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Creating stream from input: ", " of ETL: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.streamingReaderModel$1, this.etl$1}));
            }
            {
                this.etl$1 = etl$1;
                this.streamingReaderModel$1 = streamingReaderModel$1;
            }
        });
        this.logger().info((Function0)new Serializable(streamingReaderModel){
            public static final long serialVersionUID = 0L;
            private final StreamingReaderModel streamingReaderModel$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Retrieving topic datastore model with name \"", "\""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.streamingReaderModel$1.datastoreModelName()}));
            }
            {
                this.streamingReaderModel$1 = streamingReaderModel$1;
            }
        });
        TopicBL topicBL = ConfigBL$.MODULE$.topicBL();
        Seq<TopicModel> topics = this.retrieveTopicModelsRecursively(topicBL, streamingReaderModel.datastoreModelName());
        this.logger().info((Function0)new Serializable(topics){
            public static final long serialVersionUID = 0L;
            private final Seq topics$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Retrieved topic model(s): ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.topics$1}));
            }
            {
                this.topics$1 = topics$1;
            }
        });
        KafkaConfigModel kafkaConfig = ConfigManager$.MODULE$.getKafkaConfig();
        this.logger().info((Function0)new Serializable(kafkaConfig){
            public static final long serialVersionUID = 0L;
            private final KafkaConfigModel kafkaConfig$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Kafka configuration: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.kafkaConfig$1}));
            }
            {
                this.kafkaConfig$1 = kafkaConfig$1;
            }
        });
        boolean allCheckOrCreateResult = BoxesRunTime.unboxToBoolean((Object)((TraversableOnce)topics.map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final boolean apply(TopicModel topic) {
                return BoxesRunTime.unboxToBoolean((Object)WaspSystem$.MODULE$.$qmark$qmark(WaspSystem$.MODULE$.kafkaAdminActor(), (Object)new CheckOrCreateTopic(topic.name(), topic.partitions(), topic.replicas()), WaspSystem$.MODULE$.$qmark$qmark$default$3()));
            }
        }, Seq$.MODULE$.canBuildFrom())).reduce((Function2)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final boolean apply(boolean x$1, boolean x$2) {
                return x$1 && x$2;
            }
        }));
        if (allCheckOrCreateResult) {
            long triggerIntervalMs = SparkUtils$.MODULE$.getTriggerIntervalMs(ConfigManager$.MODULE$.getSparkStreamingConfig(), etl);
            Option maybeRateLimit = streamingReaderModel.rateLimit().map((Function1)new Serializable(triggerIntervalMs){
                public static final long serialVersionUID = 0L;
                private final long triggerIntervalMs$1;

                public final long apply(int x) {
                    return this.apply$mcJI$sp(x);
                }

                public long apply$mcJI$sp(int x) {
                    return this.triggerIntervalMs$1 == 0L ? (long)x : (long)((double)this.triggerIntervalMs$1 / 1000.0 * (double)x);
                }
                {
                    this.triggerIntervalMs$1 = triggerIntervalMs$1;
                }
            });
            Option maybeMaxOffsetsPerTrigger = maybeRateLimit.map((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final Tuple2<String, String> apply(long rateLimit) {
                    return new Tuple2((Object)"maxOffsetsPerTrigger", (Object)((Object)BoxesRunTime.boxToLong((long)rateLimit)).toString());
                }
            });
            scala.collection.mutable.Map options = Map$.MODULE$.empty();
            options.$plus$plus$eq((TraversableOnce)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"subscribe"), (Object)((TraversableOnce)topics.map((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply(TopicModel x$3) {
                    return x$3.name();
                }
            }, Seq$.MODULE$.canBuildFrom())).mkString(",")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"kafka.bootstrap.servers"), (Object)((TraversableOnce)kafkaConfig.connections().map((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply(ConnectionConfig x$4) {
                    return x$4.toString();
                }
            }, Seq$.MODULE$.canBuildFrom())).mkString(",")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"kafkaConsumer.pollTimeoutMs"), (Object)((Object)BoxesRunTime.boxToInteger((int)kafkaConfig.ingestRateToMills())).toString())})));
            options.$plus$plus$eq((TraversableOnce)Option$.MODULE$.option2Iterable(maybeMaxOffsetsPerTrigger));
            options.$plus$plus$eq((TraversableOnce)((TraversableOnce)kafkaConfig.others().map((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final Tuple2<String, String> apply(KafkaEntryConfig x$5) {
                    return x$5.toTupla();
                }
            }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
            options.$plus$plus$eq((TraversableOnce)streamingReaderModel.options());
            this.logger().info((Function0)new Serializable(options){
                public static final long serialVersionUID = 0L;
                private final scala.collection.mutable.Map options$1;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Final options to be pushed to DataStreamReader: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.options$1}));
                }
                {
                    this.options$1 = options$1;
                }
            });
            Dataset df = ss.readStream().format("kafka").options((Map)options).load().withColumn(this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME(), functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()));
            ParsingMode parsingMode = (ParsingMode)ParsingMode$.MODULE$.fromString((String)streamingReaderModel.options().getOrElse((Object)"parsingMode", (Function0)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "strict";
                }
            })).getOrElse((Function0)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final Strict$ apply() {
                    return Strict$.MODULE$;
                }
            });
            return this.parseDF(topics, (Dataset<Row>)df, parsingMode);
        }
        String msg = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Unable to check/create one or more topic; topics: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topics}));
        this.logger().error((Function0)new Serializable(msg){
            public static final long serialVersionUID = 0L;
            private final String msg$1;

            public final String apply() {
                return this.msg$1;
            }
            {
                this.msg$1 = msg$1;
            }
        });
        throw new Exception(msg);
    }

    /*
     * WARNING - void declaration
     */
    private Column selectMetadata(Column keyCol) {
        void var3_4;
        List allColumnsButValue = ((TraversableOnce)((TraversableLike)KafkaSparkSQLSchemas$.MODULE$.INPUT_SCHEMA().map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply(StructField x$6) {
                return x$6.name();
            }
        }, Seq$.MODULE$.canBuildFrom())).filter((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            /*
             * Enabled force condition propagation
             * Lifted jumps to return sites
             */
            public final boolean apply(String cName) {
                String string = cName;
                String string2 = KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME();
                if (string == null) {
                    if (string2 == null) return false;
                } else if (string.equals(string2)) return false;
                String string3 = cName;
                String string4 = KafkaSparkSQLSchemas$.MODULE$.KEY_ATTRIBUTE_NAME();
                if (string3 == null) {
                    if (string4 == null) return false;
                } else if (string3.equals(string4)) return false;
                String string5 = cName;
                String string6 = KafkaSparkStructuredStreamingReader$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME();
                if (string5 != null) {
                    if (!string5.equals(string6)) return true;
                    return false;
                }
                if (string6 == null) return false;
                return true;
            }
        })).toList();
        Column column = keyCol;
        Column metadataSelectExpr = functions$.MODULE$.struct((Seq)((List)allColumnsButValue.map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Column apply(String colName) {
                return functions$.MODULE$.col(colName);
            }
        }, List$.MODULE$.canBuildFrom())).$colon$colon((Object)column)).as(this.KAFKA_METADATA_COL());
        return var3_4;
    }

    private Column selectMetadata$default$1() {
        return functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.KEY_ATTRIBUTE_NAME());
    }

    private Dataset<Row> parseDF(Seq<TopicModel> topics, Dataset<Row> df, ParsingMode parsingMode) {
        Either either;
        block5: {
            Either<String, BoxedUnit> either2;
            block9: {
                Dataset<Row> dataset;
                block8: {
                    block6: {
                        Either<String, BoxedUnit> either3;
                        block7: {
                            either = MultiTopicModel$.MODULE$.areTopicsHealthy(topics);
                            if (either instanceof Left) {
                                Left left = (Left)either;
                                String a = (String)left.a();
                                throw new IllegalArgumentException(a);
                            }
                            if (!(either instanceof Right)) break block5;
                            either2 = TopicModelUtils$.MODULE$.areTopicsEqualForReading(topics);
                            if (!(either2 instanceof Left)) break block6;
                            Left left = (Left)either2;
                            String a = (String)left.a();
                            either3 = TopicModelUtils$.MODULE$.topicsShareKeySchema(topics);
                            if (either3 instanceof Left) {
                                Left left2 = (Left)either3;
                                String error = (String)left2.a();
                                throw new IllegalArgumentException(error);
                            }
                            if (!(either3 instanceof Right)) break block7;
                            this.logger().debug((Function0)new Serializable(a){
                                public static final long serialVersionUID = 0L;
                                private final String a$1;

                                public final String apply() {
                                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Suppressing error: '", "' and trying with multipleSchema strategy"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.a$1}));
                                }
                                {
                                    this.a$1 = a$1;
                                }
                            });
                            Dataset<Row> dataset2 = this.selectForMultipleSchema(topics, df, parsingMode);
                            dataset = dataset2;
                            break block8;
                        }
                        throw new MatchError(either3);
                    }
                    if (!(either2 instanceof Right)) break block9;
                    dataset = this.selectForOneSchema((TopicModel)topics.head(), df, parsingMode);
                }
                Dataset<Row> dataset3 = dataset;
                return dataset3;
            }
            throw new MatchError(either2);
        }
        throw new MatchError((Object)either);
    }

    public Dataset<Row> selectForOneSchema(TopicModel prototypeTopic, Dataset<Row> df, ParsingMode parsingMode) {
        block9: {
            Dataset dataset;
            block6: {
                String string;
                block8: {
                    block7: {
                        block5: {
                            String topicType = prototypeTopic.topicDataType();
                            ParsingMode parsingMode2 = parsingMode;
                            Handle$ handle$ = Handle$.MODULE$;
                            if (!(parsingMode2 != null ? !parsingMode2.equals(handle$) : handle$ != null)) {
                                if (!((SeqLike)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{TopicDataTypes$.MODULE$.JSON(), TopicDataTypes$.MODULE$.AVRO()}))).contains((Object)topicType)) {
                                    this.logger().warn((Function0)new Serializable(topicType){
                                        public static final long serialVersionUID = 0L;
                                        private final String topicType$1;

                                        public final String apply() {
                                            return new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Handle parsing mode is not supported for ", " topic type, it will be managed as in "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.topicType$1}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Strict mode (so no raw column will be produced). To remove this warning please set Strict as parsing mode "})).s((Seq)Nil$.MODULE$)).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"of your topic. Look at KafkaSparkStructuredStreamingReader#createStructuredStream java doc for more details."})).s((Seq)Nil$.MODULE$)).toString();
                                        }
                                        {
                                            this.topicType$1 = topicType$1;
                                        }
                                    });
                                }
                            }
                            string = topicType;
                            String string2 = TopicDataTypes$.MODULE$.AVRO();
                            String string3 = string;
                            if (string2 != null ? !string2.equals(string3) : string3 != null) break block5;
                            Dataset parsedDf = df.withColumn(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME(), this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseAvroValue(prototypeTopic));
                            dataset = this.checkParsingMode((Dataset<Row>)parsedDf, parsingMode, TopicDataTypes$.MODULE$.AVRO(), this.parseKey(prototypeTopic));
                            break block6;
                        }
                        String string4 = TopicDataTypes$.MODULE$.JSON();
                        String string5 = string;
                        if (string4 != null ? !string4.equals(string5) : string5 != null) break block7;
                        Dataset parsedDf = df.withColumn(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME(), functions$.MODULE$.from_json(this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseString(), this.getDataType(prototypeTopic.getJsonSchema())));
                        dataset = this.checkParsingMode((Dataset<Row>)parsedDf, parsingMode, TopicDataTypes$.MODULE$.JSON(), this.checkParsingMode$default$4());
                        break block6;
                    }
                    String string6 = TopicDataTypes$.MODULE$.PLAINTEXT();
                    String string7 = string;
                    if (string6 != null ? !string6.equals(string7) : string7 != null) break block8;
                    dataset = df.withColumn("value_string", this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseString()).select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{this.selectMetadata(this.selectMetadata$default$1()), functions$.MODULE$.expr("value_string AS value")}));
                    break block6;
                }
                String string8 = TopicDataTypes$.MODULE$.BINARY();
                String string9 = string;
                if (string8 != null ? !string8.equals(string9) : string9 != null) break block9;
                dataset = df.select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{this.selectMetadata(this.selectMetadata$default$1()), functions$.MODULE$.expr(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME())}));
            }
            Dataset ret = dataset;
            this.logger().debug((Function0)new Serializable(ret){
                public static final long serialVersionUID = 0L;
                private final Dataset ret$2;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"DataFrame schema: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.ret$2.schema().treeString()}));
                }
                {
                    this.ret$2 = ret$2;
                }
            });
            return ret;
        }
        throw new UnsupportedOperationException(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Unsupported topic data type \"", "\""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{prototypeTopic.topicDataType()})));
    }

    public Dataset<Row> checkParsingMode(Dataset<Row> df, ParsingMode parsingMode, String topicDataType, Column metadataKey) {
        ParsingMode parsingMode2;
        block5: {
            Dataset dataset;
            block3: {
                block4: {
                    block2: {
                        parsingMode2 = parsingMode;
                        if (!Strict$.MODULE$.equals(parsingMode2)) break block2;
                        String computedValue = "computedValue";
                        dataset = df.withColumn(computedValue, functions$.MODULE$.when(functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()).isNull(), (Object)this.strictExceptionLauncherUdf().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col(this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME()), functions$.MODULE$.lit((Object)topicDataType)}))).otherwise((Object)functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()))).select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{this.selectMetadata(metadataKey), functions$.MODULE$.col(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ".*"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{computedValue})))}));
                        break block3;
                    }
                    if (!Ignore$.MODULE$.equals(parsingMode2)) break block4;
                    dataset = df.select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{this.selectMetadata(metadataKey), functions$.MODULE$.col(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ".*"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()})))})).where(functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()).isNotNull());
                    break block3;
                }
                if (!Handle$.MODULE$.equals(parsingMode2)) break block5;
                dataset = df.select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{this.selectMetadata(metadataKey), functions$.MODULE$.when(functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()).isNull(), (Object)functions$.MODULE$.col(this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME())).otherwise(null).as(this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME()), functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME())}));
            }
            return dataset;
        }
        throw new MatchError((Object)parsingMode2);
    }

    public Column checkParsingMode$default$4() {
        return functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.KEY_ATTRIBUTE_NAME());
    }

    public UserDefinedFunction strictExceptionLauncherUdf() {
        JavaUniverse $u = package$.MODULE$.universe();
        JavaUniverse.JavaMirror $m = package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
        JavaUniverse $u2 = package$.MODULE$.universe();
        JavaUniverse.JavaMirror $m2 = package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
        public final class It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaSparkStructuredStreamingReader$$typecreator1$1
        extends TypeCreator {
            public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                Universe $u = $m$untyped.universe();
                Mirror<U> $m = $m$untyped;
                return $u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().ThisType($m.staticPackage("scala").asModule().moduleClass()), (Symbols.SymbolApi)$m.staticClass("scala.Array"), List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Types.TypeApi[]{$m.staticClass("scala.Byte").asType().toTypeConstructor()})));
            }

            public It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaSparkStructuredStreamingReader$$typecreator1$1() {
            }
        }
        public final class It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaSparkStructuredStreamingReader$$typecreator2$1
        extends TypeCreator {
            public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                Universe $u = $m$untyped.universe();
                Mirror<U> $m = $m$untyped;
                return $u.internal().reificationSupport().TypeRef($u.internal().reificationSupport().SingleType($u.internal().reificationSupport().ThisType($m.staticPackage("scala").asModule().moduleClass()), (Symbols.SymbolApi)$m.staticModule("scala.Predef")), (Symbols.SymbolApi)$u.internal().reificationSupport().selectType($m.staticModule("scala.Predef").asModule().moduleClass(), "String"), (List)Nil$.MODULE$);
            }

            public It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaSparkStructuredStreamingReader$$typecreator2$1() {
            }
        }
        return functions$.MODULE$.udf((Function2)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Nothing$ apply(byte[] raw, String topicDataType) {
                String string = topicDataType;
                String string2 = TopicDataTypes$.MODULE$.JSON();
                String string3 = string;
                if (!(string2 != null ? !string2.equals(string3) : string3 != null)) {
                    throw new SparkException(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Unable to parse raw value [", "] to ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{StringToByteArrayUtil$.MODULE$.byteArrayToString(raw), topicDataType})));
                }
                String string4 = TopicDataTypes$.MODULE$.AVRO();
                String string5 = string;
                if (!(string4 != null ? !string4.equals(string5) : string5 != null)) {
                    throw new SparkException(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Unable to parse raw value [", "] to ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Hex$.MODULE$.hex(raw), topicDataType})));
                }
                throw new MatchError((Object)string);
            }
        }, ((TypeTags)package$.MODULE$.universe()).TypeTag().Nothing(), ((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaSparkStructuredStreamingReader$$typecreator1$1()), ((TypeTags)$u2).TypeTag().apply((Mirror)$m2, (TypeCreator)new It_agilelab_bigdata_wasp_consumers_spark_plugins_kafka_KafkaSparkStructuredStreamingReader$$typecreator2$1()));
    }

    public Dataset<Row> selectForMultipleSchema(Seq<TopicModel> topics, Dataset<Row> df, ParsingMode parsingMode) {
        Option option;
        block4: {
            Column metadataExpr;
            Column column;
            List valueColumns;
            block3: {
                block2: {
                    valueColumns = ((List)topics.foldLeft((Object)List$.MODULE$.empty(), (Function2)new Serializable(){
                        public static final long serialVersionUID = 0L;

                        public final List<Column> apply(List<Column> cols, TopicModel t) {
                            block6: {
                                List list;
                                block3: {
                                    String string;
                                    String dataType;
                                    block5: {
                                        block4: {
                                            block2: {
                                                string = dataType = t.topicDataType();
                                                String string2 = TopicDataTypes$.MODULE$.AVRO();
                                                String string3 = string;
                                                if (string2 != null ? !string2.equals(string3) : string3 != null) break block2;
                                                Column column = KafkaSparkStructuredStreamingReader$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseIfMyTopicOrNull(t.name(), KafkaSparkStructuredStreamingReader$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseAvroValue(t), dataType);
                                                list = cols.$colon$colon((Object)column);
                                                break block3;
                                            }
                                            String string4 = TopicDataTypes$.MODULE$.JSON();
                                            String string5 = string;
                                            if (string4 != null ? !string4.equals(string5) : string5 != null) break block4;
                                            Column column = KafkaSparkStructuredStreamingReader$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseIfMyTopicOrNull(t.name(), KafkaSparkStructuredStreamingReader$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseJson(t), dataType);
                                            list = cols.$colon$colon((Object)column);
                                            break block3;
                                        }
                                        String string6 = TopicDataTypes$.MODULE$.PLAINTEXT();
                                        String string7 = string;
                                        if (string6 != null ? !string6.equals(string7) : string7 != null) break block5;
                                        Column column = KafkaSparkStructuredStreamingReader$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseIfMyTopicOrNull(t.name(), KafkaSparkStructuredStreamingReader$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseString(), dataType);
                                        list = cols.$colon$colon((Object)column);
                                        break block3;
                                    }
                                    String string8 = TopicDataTypes$.MODULE$.BINARY();
                                    String string9 = string;
                                    if (string8 != null ? !string8.equals(string9) : string9 != null) break block6;
                                    Column column = KafkaSparkStructuredStreamingReader$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseIfMyTopicOrNull(t.name(), KafkaSparkStructuredStreamingReader$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseBinary(), dataType);
                                    list = cols.$colon$colon((Object)column);
                                }
                                return list;
                            }
                            throw new UnsupportedOperationException(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Unsupported topic data type \"", "\""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{t.topicDataType()})));
                        }
                    })).reverse();
                    this.logger().info((Function0)new Serializable(valueColumns){
                        public static final long serialVersionUID = 0L;
                        private final List valueColumns$1;

                        public final String apply() {
                            return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Selecting the following columns:\\n", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.valueColumns$1.mkString("\t- ", "\n\t- ", "")}));
                        }
                        {
                            this.valueColumns$1 = valueColumns$1;
                        }
                    });
                    option = topics.collectFirst((PartialFunction)new Serializable(){
                        public static final long serialVersionUID = 0L;

                        /*
                         * Enabled aggressive block sorting
                         */
                        public final <A1 extends TopicModel, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                            Object object;
                            A1 A1 = x1;
                            if (A1 != null) {
                                String string = A1.topicDataType();
                                String string2 = TopicDataTypes$.MODULE$.AVRO();
                                String string3 = string;
                                if (!(string2 != null ? !string2.equals(string3) : string3 != null)) {
                                    object = A1;
                                    return (B1)object;
                                }
                            }
                            object = function1.apply(x1);
                            return (B1)object;
                        }

                        /*
                         * Enabled force condition propagation
                         * Lifted jumps to return sites
                         */
                        public final boolean isDefinedAt(TopicModel x1) {
                            TopicModel topicModel = x1;
                            if (topicModel == null) return false;
                            String string = topicModel.topicDataType();
                            String string2 = TopicDataTypes$.MODULE$.AVRO();
                            String string3 = string;
                            if (string2 != null) {
                                if (!string2.equals(string3)) return false;
                                return true;
                            }
                            if (string3 == null) return true;
                            return false;
                        }
                    });
                    if (!(option instanceof Some)) break block2;
                    Some some = (Some)option;
                    TopicModel t = (TopicModel)some.x();
                    column = this.selectMetadata(this.parseKey(t));
                    break block3;
                }
                if (!None$.MODULE$.equals(option)) break block4;
                column = this.selectMetadata(this.selectMetadata$default$1());
            }
            Column column2 = metadataExpr = column;
            Column column3 = functions$.MODULE$.col(this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME());
            Dataset ret = df.select((Seq)valueColumns.$colon$colon((Object)column3).$colon$colon((Object)column2));
            this.logger().debug((Function0)new Serializable(ret){
                public static final long serialVersionUID = 0L;
                private final Dataset ret$1;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"DataFrame schema before parsing mode check: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.ret$1.schema().treeString()}));
                }
                {
                    this.ret$1 = ret$1;
                }
            });
            Dataset<Row> retChecked = this.checkParsingModeMultipleTopics((Dataset<Row>)ret, parsingMode);
            this.logger().debug((Function0)new Serializable(ret){
                public static final long serialVersionUID = 0L;
                private final Dataset ret$1;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"DataFrame schema after parsing mode check: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.ret$1.schema().treeString()}));
                }
                {
                    this.ret$1 = ret$1;
                }
            });
            return retChecked;
        }
        throw new MatchError((Object)option);
    }

    public Dataset<Row> checkParsingModeMultipleTopics(Dataset<Row> df, ParsingMode parsingMode) {
        ParsingMode parsingMode2;
        block5: {
            Dataset dataset;
            block3: {
                Column parsingErrorFilteredCondition;
                Column[] goodCaseSelect;
                block4: {
                    block2: {
                        String[] parsedCols = (String[])Predef$.MODULE$.refArrayOps((Object[])df.columns()).diff((GenSeq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{this.KAFKA_METADATA_COL(), this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME()})));
                        goodCaseSelect = (Column[])Predef$.MODULE$.refArrayOps((Object[])parsedCols).map((Function1)new Serializable(){
                            public static final long serialVersionUID = 0L;

                            public final Column apply(String c) {
                                return functions$.MODULE$.when(functions$.MODULE$.col(c).isNotNull(), (Object)KafkaSparkStructuredStreamingReader$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parsedValueCol$1(c)).otherwise(null).as(c);
                            }
                        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Column.class)));
                        parsingErrorFilteredCondition = (Column)Predef$.MODULE$.refArrayOps((Object[])Predef$.MODULE$.refArrayOps((Object[])parsedCols).map((Function1)new Serializable(){
                            public static final long serialVersionUID = 0L;

                            public final Column apply(String c) {
                                return functions$.MODULE$.col(c).isNull().$bar$bar((Object)KafkaSparkStructuredStreamingReader$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$dataTypeToCheckCondition$1(c).unary_$bang()).$bar$bar((Object)KafkaSparkStructuredStreamingReader$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$dataTypeToCheckCondition$1(c).$amp$amp((Object)KafkaSparkStructuredStreamingReader$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parsedValueCol$1(c).isNotNull()));
                            }
                        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Column.class)))).reduce((Function2)new Serializable(){
                            public static final long serialVersionUID = 0L;

                            public final Column apply(Column x$14, Column x$15) {
                                return x$14.and(x$15);
                            }
                        });
                        parsingMode2 = parsingMode;
                        if (!Strict$.MODULE$.equals(parsingMode2)) break block2;
                        Column column = functions$.MODULE$.col(this.KAFKA_METADATA_COL());
                        dataset = ((Dataset)Predef$.MODULE$.refArrayOps((Object[])parsedCols).foldLeft(df, (Function2)new Serializable(){
                            public static final long serialVersionUID = 0L;

                            public final Dataset<Row> apply(Dataset<Row> df, String c) {
                                return df.withColumn("workingColumn", functions$.MODULE$.when(KafkaSparkStructuredStreamingReader$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$dataTypeToCheckCondition$1(c).$amp$amp((Object)functions$.MODULE$.col(c).isNotNull()).$amp$amp((Object)KafkaSparkStructuredStreamingReader$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parsedValueCol$1(c).isNull()), (Object)KafkaSparkStructuredStreamingReader$.MODULE$.strictExceptionLauncherUdf().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col(KafkaSparkStructuredStreamingReader$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME()), KafkaSparkStructuredStreamingReader$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parsedDataTypeCol$1(c)}))).otherwise(null));
                            }
                        })).select((Seq)Predef$.MODULE$.wrapRefArray((Object[])Predef$.MODULE$.refArrayOps((Object[])goodCaseSelect).$plus$colon((Object)column, ClassTag$.MODULE$.apply(Column.class))));
                        break block3;
                    }
                    if (!Ignore$.MODULE$.equals(parsingMode2)) break block4;
                    Column column = functions$.MODULE$.col(this.KAFKA_METADATA_COL());
                    dataset = df.where(parsingErrorFilteredCondition).select((Seq)Predef$.MODULE$.wrapRefArray((Object[])Predef$.MODULE$.refArrayOps((Object[])goodCaseSelect).$plus$colon((Object)column, ClassTag$.MODULE$.apply(Column.class))));
                    break block3;
                }
                if (!Handle$.MODULE$.equals(parsingMode2)) break block5;
                Column rawCol = functions$.MODULE$.when(parsingErrorFilteredCondition.unary_$bang(), (Object)functions$.MODULE$.col(this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME())).otherwise(null).as(this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME());
                dataset = df.select((Seq)((TraversableLike)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col(this.KAFKA_METADATA_COL()), rawCol}))).$plus$plus((GenTraversableOnce)Predef$.MODULE$.refArrayOps((Object[])goodCaseSelect), Seq$.MODULE$.canBuildFrom()));
            }
            return dataset;
        }
        throw new MatchError((Object)parsingMode2);
    }

    public Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseBinary() {
        return functions$.MODULE$.col(this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME());
    }

    public Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseString() {
        return functions$.MODULE$.col(this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME()).cast((DataType)StringType$.MODULE$);
    }

    public Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseJson(TopicModel t) {
        return functions$.MODULE$.from_json(this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseString(), this.getDataType(t.getJsonSchema()));
    }

    private Column parseKey(TopicModel t) {
        Option option;
        block4: {
            Column column;
            block3: {
                block2: {
                    VolatileByteRef bitmap$0 = VolatileByteRef.create((byte)0);
                    option = t.keySchema();
                    if (!(option instanceof Some)) break block2;
                    Some some = (Some)option;
                    String keySchema = (String)some.x();
                    ObjectRef avroSchemaManager$lzy = ObjectRef.zero();
                    this.logger().debug((Function0)new Serializable(keySchema){
                        public static final long serialVersionUID = 0L;
                        private final String keySchema$1;

                        public final String apply() {
                            return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"AVRO key schema: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{new Schema.Parser().parse(this.keySchema$1).toString(true)}));
                        }
                        {
                            this.keySchema$1 = keySchema$1;
                        }
                    });
                    None$ darwinConf = t.useAvroSchemaManager() ? new Some((Object)ConfigManager$.MODULE$.getAvroSchemaManagerConfig()) : None$.MODULE$;
                    Some schemaToUse = keySchema.isEmpty() ? this.avroSchemaManager$1((Option)darwinConf, avroSchemaManager$lzy, bitmap$0).flatMap((Function1)new Serializable(t){
                        public static final long serialVersionUID = 0L;
                        private final TopicModel t$2;

                        public final Option<String> apply(AvroSchemaManager sm) {
                            return SubjectStrategy$.MODULE$.subjectFor(this.t$2.getJsonSchema(), this.t$2, true).map((Function1)new Serializable(this, sm){
                                public static final long serialVersionUID = 0L;
                                private final AvroSchemaManager sm$1;

                                public final String apply(String subj) {
                                    Tuple2 idAndSchema = (Tuple2)this.sm$1.retrieveLatestSchema(subj).getOrElse((Function0)new Serializable(this, subj){
                                        public static final long serialVersionUID = 0L;
                                        private final String subj$1;

                                        public final Nothing$ apply() {
                                            throw new RuntimeException(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Reader schema not specified and fetching latest schema with subject '", "' failed."})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.subj$1})));
                                        }
                                        {
                                            this.subj$1 = subj$1;
                                        }
                                    });
                                    return ((Schema)idAndSchema._2()).toString();
                                }
                                {
                                    this.sm$1 = sm$1;
                                }
                            });
                        }
                        {
                            this.t$2 = t$2;
                        }
                    }) : new Some((Object)keySchema);
                    AvroDeserializerExpression avroKeyConversion = new AvroDeserializerExpression(functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.KEY_ATTRIBUTE_NAME()).expr(), (String)schemaToUse.get(), (Option)darwinConf, true);
                    column = new Column((Expression)avroKeyConversion);
                    break block3;
                }
                if (!None$.MODULE$.equals(option)) break block4;
                column = functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.KEY_ATTRIBUTE_NAME());
            }
            return column;
        }
        throw new MatchError((Object)option);
    }

    public Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseAvroValue(TopicModel t) {
        ObjectRef avroSchemaManager$lzy = ObjectRef.zero();
        VolatileByteRef bitmap$0 = VolatileByteRef.create((byte)0);
        this.logger().debug((Function0)new Serializable(t){
            public static final long serialVersionUID = 0L;
            private final TopicModel t$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"AVRO value schema: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{new Schema.Parser().parse(this.t$1.getJsonSchema()).toString(true)}));
            }
            {
                this.t$1 = t$1;
            }
        });
        None$ darwinConf = t.useAvroSchemaManager() ? new Some((Object)ConfigManager$.MODULE$.getAvroSchemaManagerConfig()) : None$.MODULE$;
        Some schemaToUse = t.schema().isEmpty() ? this.avroSchemaManager$2((Option)darwinConf, avroSchemaManager$lzy, bitmap$0).flatMap((Function1)new Serializable(t){
            public static final long serialVersionUID = 0L;
            private final TopicModel t$1;

            public final Option<String> apply(AvroSchemaManager sm) {
                return SubjectStrategy$.MODULE$.subjectFor(this.t$1.getJsonSchema(), this.t$1, false).map((Function1)new Serializable(this, sm){
                    public static final long serialVersionUID = 0L;
                    private final AvroSchemaManager sm$2;

                    public final String apply(String subj) {
                        Tuple2 idAndSchema = (Tuple2)this.sm$2.retrieveLatestSchema(subj).getOrElse((Function0)new Serializable(this, subj){
                            public static final long serialVersionUID = 0L;
                            private final String subj$2;

                            public final Nothing$ apply() {
                                throw new RuntimeException(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Reader schema not specified and fetching latest schema with subject '", "' failed."})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.subj$2})));
                            }
                            {
                                this.subj$2 = subj$2;
                            }
                        });
                        return ((Schema)idAndSchema._2()).toString();
                    }
                    {
                        this.sm$2 = sm$2;
                    }
                });
            }
            {
                this.t$1 = t$1;
            }
        }) : new Some((Object)t.getJsonSchema());
        AvroDeserializerExpression avroToRowConversion = new AvroDeserializerExpression(functions$.MODULE$.col(this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME()).expr(), (String)schemaToUse.get(), (Option)darwinConf, true);
        return new Column((Expression)avroToRowConversion);
    }

    private Seq<TopicModel> retrieveTopicModelsRecursively(TopicBL topicBL, String topicDatastoreModelName) {
        return this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$innerRetrieveTopicModelsRecursively$1(topicDatastoreModelName, topicBL);
    }

    private DataType getDataType(String schema) {
        Schema schemaAvro = new Schema.Parser().parse(schema);
        return AvroSchemaConverters$.MODULE$.toSqlType(schemaAvro).dataType();
    }

    public Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseIfMyTopicOrNull(String topicName, Column parseCol, String dataType) {
        return this.when$1(functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.TOPIC_ATTRIBUTE_NAME()).equalTo((Object)topicName), functions$.MODULE$.struct((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{parseCol.as(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()), functions$.MODULE$.lit((Object)dataType).as(this.DATA_TYPE_ATTRIBUTE_NAME())}))).otherwise(null).as(TopicModelUtils$.MODULE$.topicNameToColumnName(topicName));
    }

    public final Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parsedValueCol$1(String colName) {
        return functions$.MODULE$.col(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ".", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{colName, KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()})));
    }

    public final Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parsedDataTypeCol$1(String colName) {
        return functions$.MODULE$.col(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ".", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{colName, this.DATA_TYPE_ATTRIBUTE_NAME()})));
    }

    public final Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$dataTypeToCheckCondition$1(String colName) {
        return functions$.MODULE$.col(colName).isNull().$bar$bar((Object)functions$.MODULE$.lower(this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parsedDataTypeCol$1(colName)).isin((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{TopicDataTypes$.MODULE$.JSON(), TopicDataTypes$.MODULE$.AVRO()})));
    }

    private final Option avroSchemaManager$lzycompute$1(Option darwinConf$1, ObjectRef avroSchemaManager$lzy$1, VolatileByteRef bitmap$0$1) {
        KafkaSparkStructuredStreamingReader$ kafkaSparkStructuredStreamingReader$ = this;
        synchronized (kafkaSparkStructuredStreamingReader$) {
            if ((byte)(bitmap$0$1.elem & 1) == 0) {
                avroSchemaManager$lzy$1.elem = darwinConf$1.map((Function1)new Serializable(){
                    public static final long serialVersionUID = 0L;

                    public final AvroSchemaManager apply(Config config) {
                        return AvroSchemaManagerFactory$.MODULE$.initialize(config);
                    }
                });
                bitmap$0$1.elem = (byte)(bitmap$0$1.elem | 1);
            }
            return (Option)avroSchemaManager$lzy$1.elem;
        }
    }

    private final Option avroSchemaManager$1(Option darwinConf$1, ObjectRef avroSchemaManager$lzy$1, VolatileByteRef bitmap$0$1) {
        return (byte)(bitmap$0$1.elem & 1) == 0 ? this.avroSchemaManager$lzycompute$1(darwinConf$1, avroSchemaManager$lzy$1, bitmap$0$1) : (Option)avroSchemaManager$lzy$1.elem;
    }

    private final Option avroSchemaManager$lzycompute$2(Option darwinConf$2, ObjectRef avroSchemaManager$lzy$2, VolatileByteRef bitmap$0$2) {
        KafkaSparkStructuredStreamingReader$ kafkaSparkStructuredStreamingReader$ = this;
        synchronized (kafkaSparkStructuredStreamingReader$) {
            if ((byte)(bitmap$0$2.elem & 1) == 0) {
                avroSchemaManager$lzy$2.elem = darwinConf$2.map((Function1)new Serializable(){
                    public static final long serialVersionUID = 0L;

                    public final AvroSchemaManager apply(Config config) {
                        return AvroSchemaManagerFactory$.MODULE$.initialize(config);
                    }
                });
                bitmap$0$2.elem = (byte)(bitmap$0$2.elem | 1);
            }
            return (Option)avroSchemaManager$lzy$2.elem;
        }
    }

    private final Option avroSchemaManager$2(Option darwinConf$2, ObjectRef avroSchemaManager$lzy$2, VolatileByteRef bitmap$0$2) {
        return (byte)(bitmap$0$2.elem & 1) == 0 ? this.avroSchemaManager$lzycompute$2(darwinConf$2, avroSchemaManager$lzy$2, bitmap$0$2) : (Option)avroSchemaManager$lzy$2.elem;
    }

    public final Seq it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$innerRetrieveTopicModelsRecursively$1(String topicDatastoreModelName, TopicBL topicBL$1) {
        DatastoreModel datastoreModel;
        block4: {
            Seq seq;
            block3: {
                block2: {
                    DatastoreModel topicDatastoreModel = (DatastoreModel)topicBL$1.getByName(topicDatastoreModelName).getOrElse((Function0)new Serializable(topicDatastoreModelName){
                        public static final long serialVersionUID = 0L;
                        private final String topicDatastoreModelName$1;

                        public final Nothing$ apply() {
                            throw new IllegalArgumentException(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Cannot find topic with name: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.topicDatastoreModelName$1})));
                        }
                        {
                            this.topicDatastoreModelName$1 = topicDatastoreModelName$1;
                        }
                    });
                    datastoreModel = topicDatastoreModel;
                    if (!(datastoreModel instanceof TopicModel)) break block2;
                    TopicModel topicModel = (TopicModel)datastoreModel;
                    seq = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicModel[]{topicModel}));
                    break block3;
                }
                if (!(datastoreModel instanceof MultiTopicModel)) break block4;
                MultiTopicModel multiTopicModel = (MultiTopicModel)datastoreModel;
                seq = (Seq)multiTopicModel.topicModelNames().flatMap((Function1)new Serializable(topicBL$1){
                    public static final long serialVersionUID = 0L;
                    private final TopicBL topicBL$1;

                    public final Seq<TopicModel> apply(String topicDatastoreModelName) {
                        return KafkaSparkStructuredStreamingReader$.MODULE$.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$innerRetrieveTopicModelsRecursively$1(topicDatastoreModelName, this.topicBL$1);
                    }
                    {
                        this.topicBL$1 = topicBL$1;
                    }
                }, Seq$.MODULE$.canBuildFrom());
            }
            return seq;
        }
        throw new MatchError((Object)datastoreModel);
    }

    private final Column when$1(Column condition, Column value) {
        return new Column((Expression)new CaseWhen((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)condition.expr(), (Object)value.expr())})), CaseWhen$.MODULE$.apply$default$2()));
    }

    private KafkaSparkStructuredStreamingReader$() {
        MODULE$ = this;
        Logging.class.$init$((Logging)this);
        this.KAFKA_METADATA_COL = "kafkaMetadata";
        this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME = "raw";
        this.DATA_TYPE_ATTRIBUTE_NAME = "dataType";
    }
}

