package it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka;

import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkLegacyStreamingWriter;
import it.agilelab.bigdata.wasp.core.WaspSystem$;
import it.agilelab.bigdata.wasp.core.kafka.CheckOrCreateTopic;
import it.agilelab.bigdata.wasp.core.kafka.WaspKafkaWriter;
import it.agilelab.bigdata.wasp.core.utils.AvroToJsonUtil$;
import it.agilelab.bigdata.wasp.core.utils.ConfigManager$;
import it.agilelab.bigdata.wasp.core.utils.StringToByteArrayUtil$;
import it.agilelab.bigdata.wasp.models.TopicModel;
import it.agilelab.bigdata.wasp.models.configuration.TinyKafkaConfig;
import it.agilelab.bigdata.wasp.repository.core.bl.TopicBL;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.dstream.DStream;
import scala.collection.Iterator;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaSparkLegacyStreamingWriter.scala */
@ScalaSignature(bytes = "\u0006\u0001q3AAB\u0004\u00011!AQ\u0005\u0001B\u0001B\u0003%a\u0005\u0003\u00051\u0001\t\u0005\t\u0015!\u00032\u0011!a\u0004A!A!\u0002\u0013i\u0004\"\u0002%\u0001\t\u0003I\u0005\"B(\u0001\t\u0003\u0002&aH&bM.\f7\u000b]1sW2+w-Y2z'R\u0014X-Y7j]\u001e<&/\u001b;fe*\u0011\u0001\"C\u0001\u0006W\u000647.\u0019\u0006\u0003\u0015-\tq\u0001\u001d7vO&t7O\u0003\u0002\r\u001b\u0005)1\u000f]1sW*\u0011abD\u0001\nG>t7/^7feNT!\u0001E\t\u0002\t]\f7\u000f\u001d\u0006\u0003%M\tqAY5hI\u0006$\u0018M\u0003\u0002\u0015+\u0005A\u0011mZ5mK2\f'MC\u0001\u0017\u0003\tIGo\u0001\u0001\u0014\u0007\u0001Ir\u0004\u0005\u0002\u001b;5\t1DC\u0001\u001d\u0003\u0015\u00198-\u00197b\u0013\tq2D\u0001\u0004B]f\u0014VM\u001a\t\u0003A\rj\u0011!\t\u0006\u0003E-\tqa\u001e:ji\u0016\u00148/\u0003\u0002%C\tQ2\u000b]1sW2+w-Y2z'R\u0014X-Y7j]\u001e<&/\u001b;fe\u00069Ao\u001c9jG\nc\u0005CA\u0014/\u001b\u0005A#BA\u0015+\u0003\t\u0011GN\u0003\u0002,Y\u0005!1m\u001c:f\u0015\tis\"\u0001\u0006sKB|7/\u001b;pefL!a\f\u0015\u0003\u000fQ{\u0007/[2C\u0019\u0006\u00191o]2\u0011\u0005IRT\"A\u001a\u000b\u0005Q*\u0014!C:ue\u0016\fW.\u001b8h\u0015\taaG\u0003\u00028q\u00051\u0011\r]1dQ\u0016T\u0011!O\u0001\u0004_J<\u0017BA\u001e4\u0005A\u0019FO]3b[&twmQ8oi\u0016DH/\u0001\u0003oC6,\u0007C\u0001 F\u001d\ty4\t\u0005\u0002A75\t\u0011I\u0003\u0002C/\u00051AH]8pizJ!\u0001R\u000e\u0002\rA\u0013X\rZ3g\u0013\t1uI\u0001\u0004TiJLgn\u001a\u0006\u0003\tn\ta\u0001P5oSRtD\u0003\u0002&M\u001b:\u0003\"a\u0013\u0001\u000e\u0003\u001dAQ!\n\u0003A\u0002\u0019BQ\u0001\r\u0003A\u0002EBQ\u0001\u0010\u0003A\u0002u\nQa\u001e:ji\u0016$\"!\u0015+\u0011\u0005i\u0011\u0016BA*\u001c\u0005\u0011)f.\u001b;\t\u000bU+\u0001\u0019\u0001,\u0002\rM$(/Z1n!\r9&,P\u0007\u00021*\u0011\u0011lM\u0001\bIN$(/Z1n\u0013\tY\u0006LA\u0004E'R\u0014X-Y7")
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/kafka/KafkaSparkLegacyStreamingWriter.class */
public class KafkaSparkLegacyStreamingWriter implements SparkLegacyStreamingWriter {
    private final TopicBL topicBL;
    private final StreamingContext ssc;
    private final String name;

    public void write(DStream<String> dStream) {
        TinyKafkaConfig tinyConfig = ConfigManager$.MODULE$.getKafkaConfig().toTinyConfig();
        this.topicBL.getTopicModelByName(this.name).foreach(topicModel -> {
            $anonfun$write$1(this, tinyConfig, dStream, topicModel);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$write$4(Broadcast broadcast, Broadcast broadcast2, TopicModel topicModel, WaspKafkaWriter waspKafkaWriter, Broadcast broadcast3, String str) {
        String str2 = (String) broadcast.value();
        waspKafkaWriter.send((String) broadcast3.value(), (Object) null, "json".equals(str2) ? true : "plaintext".equals(str2) ? StringToByteArrayUtil$.MODULE$.stringToByteArray(str) : "avro".equals(str2) ? AvroToJsonUtil$.MODULE$.jsonToAvro(str, (String) broadcast2.value(), topicModel.useAvroSchemaManager()) : AvroToJsonUtil$.MODULE$.jsonToAvro(str, (String) broadcast2.value(), topicModel.useAvroSchemaManager()));
    }

    public static final /* synthetic */ void $anonfun$write$3(Broadcast broadcast, Broadcast broadcast2, Broadcast broadcast3, TopicModel topicModel, Broadcast broadcast4, Iterator iterator) {
        WaspKafkaWriter waspKafkaWriter = new WaspKafkaWriter((TinyKafkaConfig) broadcast.value());
        iterator.foreach(str -> {
            $anonfun$write$4(broadcast2, broadcast3, topicModel, waspKafkaWriter, broadcast4, str);
            return BoxedUnit.UNIT;
        });
        waspKafkaWriter.close();
    }

    public static final /* synthetic */ void $anonfun$write$2(Broadcast broadcast, Broadcast broadcast2, Broadcast broadcast3, TopicModel topicModel, Broadcast broadcast4, RDD rdd) {
        rdd.foreachPartition(iterator -> {
            $anonfun$write$3(broadcast, broadcast2, broadcast3, topicModel, broadcast4, iterator);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$write$1(KafkaSparkLegacyStreamingWriter kafkaSparkLegacyStreamingWriter, TinyKafkaConfig tinyKafkaConfig, DStream dStream, TopicModel topicModel) {
        if (!BoxesRunTime.unboxToBoolean(WaspSystem$.MODULE$.$qmark$qmark(WaspSystem$.MODULE$.kafkaAdminActor(), new CheckOrCreateTopic(topicModel.name(), topicModel.partitions(), topicModel.replicas()), WaspSystem$.MODULE$.$qmark$qmark$default$3()))) {
            throw new Exception(new StringBuilder(21).append("Error creating topic ").append(topicModel.name()).toString());
        }
        Broadcast broadcast = kafkaSparkLegacyStreamingWriter.ssc.sparkContext().broadcast(topicModel.getJsonSchema(), ClassTag$.MODULE$.apply(String.class));
        Broadcast broadcast2 = kafkaSparkLegacyStreamingWriter.ssc.sparkContext().broadcast(tinyKafkaConfig, ClassTag$.MODULE$.apply(TinyKafkaConfig.class));
        Broadcast broadcast3 = kafkaSparkLegacyStreamingWriter.ssc.sparkContext().broadcast(topicModel.name(), ClassTag$.MODULE$.apply(String.class));
        Broadcast broadcast4 = kafkaSparkLegacyStreamingWriter.ssc.sparkContext().broadcast(topicModel.topicDataType(), ClassTag$.MODULE$.apply(String.class));
        dStream.foreachRDD(rdd -> {
            $anonfun$write$2(broadcast2, broadcast4, broadcast, topicModel, broadcast3, rdd);
            return BoxedUnit.UNIT;
        });
    }

    public KafkaSparkLegacyStreamingWriter(TopicBL topicBL, StreamingContext streamingContext, String str) {
        this.topicBL = topicBL;
        this.ssc = streamingContext;
        this.name = str;
    }
}
