package it.agilelab.bigdata.wasp.consumers.spark.plugins.elastic;

import akka.actor.ActorRef;
import akka.actor.Props$;
import akka.pattern.AskableActorRef$;
import akka.pattern.package$;
import akka.util.Timeout;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.WaspConsumersSparkPlugin;
import it.agilelab.bigdata.wasp.consumers.spark.readers.SparkBatchReader;
import it.agilelab.bigdata.wasp.consumers.spark.readers.SparkStructuredStreamingReader;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkBatchWriter;
import it.agilelab.bigdata.wasp.core.WaspSystem$;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.models.configuration.ValidationRule;
import it.agilelab.bigdata.wasp.core.utils.ConfigManager$;
import it.agilelab.bigdata.wasp.datastores.DatastoreProduct;
import it.agilelab.bigdata.wasp.datastores.DatastoreProduct$;
import it.agilelab.bigdata.wasp.models.IndexModel;
import it.agilelab.bigdata.wasp.models.ReaderModel;
import it.agilelab.bigdata.wasp.models.StreamingReaderModel;
import it.agilelab.bigdata.wasp.models.StructuredStreamingETLModel;
import it.agilelab.bigdata.wasp.models.WriterModel;
import it.agilelab.bigdata.wasp.repository.core.bl.ConfigBL$;
import it.agilelab.bigdata.wasp.repository.core.bl.IndexBL;
import it.agilelab.bigdata.wasp.repository.core.db.WaspDB;
import java.util.concurrent.TimeUnit;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SparkSession;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.Nil$;
import scala.concurrent.Await$;
import scala.concurrent.duration.Duration$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: ElasticConsumersSpark.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005Mf\u0001B\u0001\u0003\u0001M\u0011Q#\u00127bgRL7mQ8ogVlWM]:Ta\u0006\u00148N\u0003\u0002\u0004\t\u00059Q\r\\1ti&\u001c'BA\u0003\u0007\u0003\u001d\u0001H.^4j]NT!a\u0002\u0005\u0002\u000bM\u0004\u0018M]6\u000b\u0005%Q\u0011!C2p]N,X.\u001a:t\u0015\tYA\"\u0001\u0003xCN\u0004(BA\u0007\u000f\u0003\u001d\u0011\u0017n\u001a3bi\u0006T!a\u0004\t\u0002\u0011\u0005<\u0017\u000e\\3mC\nT\u0011!E\u0001\u0003SR\u001c\u0001a\u0005\u0003\u0001)iq\u0002CA\u000b\u0019\u001b\u00051\"\"A\f\u0002\u000bM\u001c\u0017\r\\1\n\u0005e1\"AB!osJ+g\r\u0005\u0002\u001c95\tA!\u0003\u0002\u001e\t\tAr+Y:q\u0007>t7/^7feN\u001c\u0006/\u0019:l!2,x-\u001b8\u0011\u0005}!S\"\u0001\u0011\u000b\u0005\u0005\u0012\u0013a\u00027pO\u001eLgn\u001a\u0006\u0003G)\tAaY8sK&\u0011Q\u0005\t\u0002\b\u0019><w-\u001b8h\u0011\u00159\u0003\u0001\"\u0001)\u0003\u0019a\u0014N\\5u}Q\t\u0011\u0006\u0005\u0002+\u00015\t!\u0001C\u0005-\u0001\u0001\u0007\t\u0019!C\u0001[\u00059\u0011N\u001c3fq\ncU#\u0001\u0018\u0011\u0005=*T\"\u0001\u0019\u000b\u0005E\u0012\u0014A\u00012m\u0015\t\u00193G\u0003\u00025\u0015\u0005Q!/\u001a9pg&$xN]=\n\u0005Y\u0002$aB%oI\u0016D(\t\u0014\u0005\nq\u0001\u0001\r\u00111A\u0005\u0002e\n1\"\u001b8eKb\u0014Ej\u0018\u0013fcR\u0011!(\u0010\t\u0003+mJ!\u0001\u0010\f\u0003\tUs\u0017\u000e\u001e\u0005\b}]\n\t\u00111\u0001/\u0003\rAH%\r\u0005\u0007\u0001\u0002\u0001\u000b\u0015\u0002\u0018\u0002\u0011%tG-\u001a=C\u0019\u0002B\u0011B\u0011\u0001A\u0002\u0003\u0007I\u0011A\"\u0002%\u0015d\u0017m\u001d;jG\u0006#W.\u001b8BGR|'oX\u000b\u0002\tB\u0011QIS\u0007\u0002\r*\u0011q\tS\u0001\u0006C\u000e$xN\u001d\u0006\u0002\u0013\u0006!\u0011m[6b\u0013\tYeI\u0001\u0005BGR|'OU3g\u0011%i\u0005\u00011AA\u0002\u0013\u0005a*\u0001\ffY\u0006\u001cH/[2BI6Lg.Q2u_J|v\fJ3r)\tQt\nC\u0004?\u0019\u0006\u0005\t\u0019\u0001#\t\rE\u0003\u0001\u0015)\u0003E\u0003M)G.Y:uS\u000e\fE-\\5o\u0003\u000e$xN]0!\u0011\u0015\u0019\u0006\u0001\"\u0011U\u0003A!\u0017\r^1ti>\u0014X\r\u0015:pIV\u001cG/F\u0001V!\t1\u0016,D\u0001X\u0015\tA&\"\u0001\u0006eCR\f7\u000f^8sKNL!AW,\u0003!\u0011\u000bG/Y:u_J,\u0007K]8ek\u000e$\b\"\u0002/\u0001\t\u0003j\u0016AC5oSRL\u0017\r\\5{KR\u0011!H\u0018\u0005\u0006?n\u0003\r\u0001Y\u0001\u0007o\u0006\u001c\b\u000f\u0012\"\u0011\u0005\u0005$W\"\u00012\u000b\u0005\r\u0014\u0014A\u00013c\u0013\t)'M\u0001\u0004XCN\u0004HI\u0011\u0005\u0006O\u0002!\t\u0005[\u0001\u0013O\u0016$h+\u00197jI\u0006$\u0018n\u001c8Sk2,7/F\u0001j!\rQ'/\u001e\b\u0003WBt!\u0001\\8\u000e\u00035T!A\u001c\n\u0002\rq\u0012xn\u001c;?\u0013\u00059\u0012BA9\u0017\u0003\u001d\u0001\u0018mY6bO\u0016L!a\u001d;\u0003\u0007M+\u0017O\u0003\u0002r-A\u0011ao_\u0007\u0002o*\u0011\u00010_\u0001\u000eG>tg-[4ve\u0006$\u0018n\u001c8\u000b\u0005i\u0014\u0013AB7pI\u0016d7/\u0003\u0002}o\nqa+\u00197jI\u0006$\u0018n\u001c8Sk2,\u0007\"\u0002@\u0001\t\u0003z\u0018!I4fiN\u0003\u0018M]6TiJ,8\r^;sK\u0012\u001cFO]3b[&twm\u0016:ji\u0016\u0014H\u0003CA\u0001\u0003\u000f\t\t#a\f\u0011\u0007)\n\u0019!C\u0002\u0002\u0006\t\u00111&\u00127bgRL7m]3be\u000eD7\u000b]1sWN#(/^2ukJ,Gm\u0015;sK\u0006l\u0017N\\4Xe&$XM\u001d\u0005\b\u0003\u0013i\b\u0019AA\u0006\u0003\t\u00198\u000f\u0005\u0003\u0002\u000e\u0005uQBAA\b\u0015\u0011\t\t\"a\u0005\u0002\u0007M\fHNC\u0002\b\u0003+QA!a\u0006\u0002\u001a\u00051\u0011\r]1dQ\u0016T!!a\u0007\u0002\u0007=\u0014x-\u0003\u0003\u0002 \u0005=!\u0001D*qCJ\\7+Z:tS>t\u0007bBA\u0012{\u0002\u0007\u0011QE\u0001\u001cgR\u0014Xo\u0019;ve\u0016$7\u000b\u001e:fC6LgnZ#U\u00196{G-\u001a7\u0011\t\u0005\u001d\u00121F\u0007\u0003\u0003SQ!A\u001f\u0006\n\t\u00055\u0012\u0011\u0006\u0002\u001c'R\u0014Xo\u0019;ve\u0016$7\u000b\u001e:fC6LgnZ#U\u00196{G-\u001a7\t\u000f\u0005ER\u00101\u0001\u00024\u0005YqO]5uKJlu\u000eZ3m!\u0011\t9#!\u000e\n\t\u0005]\u0012\u0011\u0006\u0002\f/JLG/\u001a:N_\u0012,G\u000eC\u0004\u0002<\u0001!\t%!\u0010\u0002C\u001d,Go\u00159be.\u001cFO];diV\u0014X\rZ*ue\u0016\fW.\u001b8h%\u0016\fG-\u001a:\u0015\u0011\u0005}\u00121JA'\u0003\u001f\u0002B!!\u0011\u0002H5\u0011\u00111\t\u0006\u0004\u0003\u000b2\u0011a\u0002:fC\u0012,'o]\u0005\u0005\u0003\u0013\n\u0019E\u0001\u0010Ta\u0006\u00148n\u0015;sk\u000e$XO]3e'R\u0014X-Y7j]\u001e\u0014V-\u00193fe\"A\u0011\u0011BA\u001d\u0001\u0004\tY\u0001\u0003\u0005\u0002$\u0005e\u0002\u0019AA\u0013\u0011!\t\t&!\u000fA\u0002\u0005M\u0013\u0001F:ue\u0016\fW.\u001b8h%\u0016\fG-\u001a:N_\u0012,G\u000e\u0005\u0003\u0002(\u0005U\u0013\u0002BA,\u0003S\u0011Ac\u0015;sK\u0006l\u0017N\\4SK\u0006$WM]'pI\u0016d\u0007bBA.\u0001\u0011\u0005\u0013QL\u0001\u0014O\u0016$8\u000b]1sW\n\u000bGo\u00195Xe&$XM\u001d\u000b\u0007\u0003?\nY'a\u001e\u0011\t\u0005\u0005\u0014qM\u0007\u0003\u0003GR1!!\u001a\u0007\u0003\u001d9(/\u001b;feNLA!!\u001b\u0002d\t\u00012\u000b]1sW\n\u000bGo\u00195Xe&$XM\u001d\u0005\t\u0003[\nI\u00061\u0001\u0002p\u0005\u00111o\u0019\t\u0005\u0003c\n\u0019(\u0004\u0002\u0002\u0014%!\u0011QOA\n\u00051\u0019\u0006/\u0019:l\u0007>tG/\u001a=u\u0011!\t\t$!\u0017A\u0002\u0005M\u0002bBA>\u0001\u0011\u0005\u0013QP\u0001\u0014O\u0016$8\u000b]1sW\n\u000bGo\u00195SK\u0006$WM\u001d\u000b\u0007\u0003\u007f\n))a\"\u0011\t\u0005\u0005\u0013\u0011Q\u0005\u0005\u0003\u0007\u000b\u0019E\u0001\tTa\u0006\u00148NQ1uG\"\u0014V-\u00193fe\"A\u0011QNA=\u0001\u0004\ty\u0007\u0003\u0005\u0002\n\u0006e\u0004\u0019AAF\u0003-\u0011X-\u00193fe6{G-\u001a7\u0011\t\u0005\u001d\u0012QR\u0005\u0005\u0003\u001f\u000bICA\u0006SK\u0006$WM]'pI\u0016d\u0007bBAJ\u0001\u0011%\u0011QS\u0001\u000fgR\f'\u000f^;q\u000b2\f7\u000f^5d)\u0011\t9*!+\u0015\u0007i\nI\n\u0003\u0005\u0002\u001c\u0006E\u00059AAO\u0003\u001d!\u0018.\\3pkR\u0004B!a(\u0002&6\u0011\u0011\u0011\u0015\u0006\u0004\u0003GC\u0015\u0001B;uS2LA!a*\u0002\"\n9A+[7f_V$\b\u0002CAV\u0003#\u0003\r!!,\u0002+M,'O^5dKN$\u0016.\\3pkRl\u0015\u000e\u001c7jgB\u0019Q#a,\n\u0007\u0005EfC\u0001\u0003M_:<\u0007")
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/elastic/ElasticConsumersSpark.class */
public class ElasticConsumersSpark implements WaspConsumersSparkPlugin, Logging {
    private IndexBL indexBL;
    private ActorRef elasticAdminActor_;
    private final WaspLogger logger;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    public IndexBL indexBL() {
        return this.indexBL;
    }

    public void indexBL_$eq(IndexBL indexBL) {
        this.indexBL = indexBL;
    }

    public ActorRef elasticAdminActor_() {
        return this.elasticAdminActor_;
    }

    public void elasticAdminActor__$eq(ActorRef actorRef) {
        this.elasticAdminActor_ = actorRef;
    }

    public DatastoreProduct datastoreProduct() {
        return DatastoreProduct$.MODULE$.ElasticProduct();
    }

    public void initialize(WaspDB waspDB) {
        logger().info(new ElasticConsumersSpark$$anonfun$initialize$1(this));
        indexBL_$eq(ConfigBL$.MODULE$.indexBL());
        logger().info(new ElasticConsumersSpark$$anonfun$initialize$2(this));
        elasticAdminActor__$eq(WaspSystem$.MODULE$.actorSystem().actorOf(Props$.MODULE$.apply(new ElasticConsumersSpark$$anonfun$initialize$3(this), ClassTag$.MODULE$.apply(ElasticAdminActor.class)), ElasticAdminActor$.MODULE$.name()));
        startupElastic(WaspSystem$.MODULE$.waspConfig().servicesTimeoutMillis(), new Timeout(r0 - 1000, TimeUnit.MILLISECONDS));
    }

    public Seq<ValidationRule> getValidationRules() {
        return Seq$.MODULE$.apply(Nil$.MODULE$);
    }

    /* renamed from: getSparkStructuredStreamingWriter, reason: merged with bridge method [inline-methods] */
    public ElasticsearchSparkStructuredStreamingWriter m21getSparkStructuredStreamingWriter(SparkSession sparkSession, StructuredStreamingETLModel structuredStreamingETLModel, WriterModel writerModel) {
        logger().info(new ElasticConsumersSpark$$anonfun$getSparkStructuredStreamingWriter$1(this, writerModel));
        return new ElasticsearchSparkStructuredStreamingWriter(indexBL(), sparkSession, writerModel.datastoreModelName(), elasticAdminActor_());
    }

    public SparkStructuredStreamingReader getSparkStructuredStreamingReader(SparkSession sparkSession, StructuredStreamingETLModel structuredStreamingETLModel, StreamingReaderModel streamingReaderModel) {
        String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"The datastore product ", " is not a valid streaming source! Reader model ", " is not valid."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{datastoreProduct(), streamingReaderModel}));
        logger().error(new ElasticConsumersSpark$$anonfun$getSparkStructuredStreamingReader$1(this, s));
        throw new UnsupportedOperationException(s);
    }

    public SparkBatchWriter getSparkBatchWriter(SparkContext sparkContext, WriterModel writerModel) {
        logger().info(new ElasticConsumersSpark$$anonfun$getSparkBatchWriter$1(this, writerModel));
        return new ElasticsearchSparkBatchWriter(indexBL(), sparkContext, writerModel.name(), elasticAdminActor_());
    }

    public SparkBatchReader getSparkBatchReader(SparkContext sparkContext, ReaderModel readerModel) {
        Option byName = indexBL().getByName(readerModel.name());
        if (!byName.isDefined()) {
            String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Index model not found: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{readerModel}));
            logger().error(new ElasticConsumersSpark$$anonfun$getSparkBatchReader$3(this, s));
            throw new Exception(s);
        }
        IndexModel indexModel = (IndexModel) byName.get();
        String eventuallyTimedName = indexModel.eventuallyTimedName();
        logger().info(new ElasticConsumersSpark$$anonfun$getSparkBatchReader$1(this, indexModel, eventuallyTimedName));
        if (indexModel.schema().isEmpty()) {
            throw new Exception(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"There no define schema in the index configuration: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{indexModel})));
        }
        String lowerCase = indexModel.name().toLowerCase();
        String name = indexModel.name();
        if (lowerCase != null ? !lowerCase.equals(name) : name != null) {
            throw new Exception(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"The index name must be all lowercase: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{indexModel})));
        }
        if (BoxesRunTime.unboxToBoolean(WaspSystem$.MODULE$.$qmark$qmark(elasticAdminActor_(), new CheckOrCreateIndex(eventuallyTimedName, indexModel.name(), indexModel.dataType(), indexModel.getJsonSchema()), WaspSystem$.MODULE$.$qmark$qmark$default$3()))) {
            return new ElasticsearchSparkBatchReader(indexModel);
        }
        String s2 = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Error creating elastic index: ", " with this index name ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{indexModel, eventuallyTimedName}));
        logger().error(new ElasticConsumersSpark$$anonfun$getSparkBatchReader$2(this, s2));
        throw new Exception(s2);
    }

    private void startupElastic(long j, Timeout timeout) {
        logger().info(new ElasticConsumersSpark$$anonfun$startupElastic$1(this));
        ActorRef ask = package$.MODULE$.ask(elasticAdminActor_());
        Initialization initialization = new Initialization(ConfigManager$.MODULE$.getElasticConfig());
        boolean z = false;
        Some some = null;
        Option value = Await$.MODULE$.ready(AskableActorRef$.MODULE$.$qmark$extension1(ask, initialization, timeout, AskableActorRef$.MODULE$.$qmark$default$3$extension(ask, initialization)), Duration$.MODULE$.apply(j, TimeUnit.MILLISECONDS)).value();
        if (value instanceof Some) {
            z = true;
            some = (Some) value;
            Failure failure = (Try) some.x();
            if (failure instanceof Failure) {
                Throwable exception = failure.exception();
                logger().error(new ElasticConsumersSpark$$anonfun$startupElastic$2(this, exception));
                throw new Exception(exception);
            }
        }
        if (z && (((Try) some.x()) instanceof Success)) {
            logger().info(new ElasticConsumersSpark$$anonfun$startupElastic$3(this));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!None$.MODULE$.equals(value)) {
                throw new MatchError(value);
            }
            throw new UnknownError("Unknown error during Elastic connection initialization");
        }
    }

    public ElasticConsumersSpark() {
        Logging.class.$init$(this);
    }
}
