package com.datastax.spark.connector.demo;

import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorLogging;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props$;
import akka.actor.SupervisorStrategy;
import akka.event.LoggingAdapter;
import akka.util.Timeout;
import com.datastax.spark.connector.ColumnSelector;
import com.datastax.spark.connector.SparkContextFunctions;
import com.datastax.spark.connector.demo.CounterActor;
import com.datastax.spark.connector.embedded.Assertions;
import com.datastax.spark.connector.embedded.Event;
import com.datastax.spark.connector.embedded.Event$Completed$;
import com.datastax.spark.connector.mapper.ColumnMapper$;
import com.datastax.spark.connector.rdd.CassandraRDD;
import com.datastax.spark.connector.rdd.ValidRDDType$;
import com.datastax.spark.connector.rdd.reader.RowReaderFactory$;
import com.datastax.spark.connector.streaming.DStreamFunctions;
import com.datastax.spark.connector.streaming.package$;
import com.datastax.spark.connector.util.MagicalTypeTricks$;
import com.datastax.spark.connector.writer.RowWriterFactory$;
import com.datastax.spark.connector.writer.WriteConf;
import org.apache.spark.SparkEnv$;
import org.apache.spark.storage.StorageLevel$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.StreamingContext$;
import scala.Function0;
import scala.Option;
import scala.PartialFunction;
import scala.Predef$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.math.Ordering$String$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: AkkaStreamingDemo.scala */
@ScalaSignature(bytes = "\u0006\u0001]4A!\u0001\u0002\u0001\u001b\taaj\u001c3f\u000fV\f'\u000fZ5b]*\u00111\u0001B\u0001\u0005I\u0016lwN\u0003\u0002\u0006\r\u0005I1m\u001c8oK\u000e$xN\u001d\u0006\u0003\u000f!\tQa\u001d9be.T!!\u0003\u0006\u0002\u0011\u0011\fG/Y:uCbT\u0011aC\u0001\u0004G>l7\u0001A\n\u0006\u00019!\u0002\u0004\t\t\u0003\u001fIi\u0011\u0001\u0005\u0006\u0002#\u0005)1oY1mC&\u00111\u0003\u0005\u0002\u0007\u0003:L(+\u001a4\u0011\u0005U1R\"\u0001\u0002\n\u0005]\u0011!\u0001D\"pk:$XM]!di>\u0014\bCA\r\u001f\u001b\u0005Q\"BA\u000e\u001d\u0003\u0015\t7\r^8s\u0015\u0005i\u0012\u0001B1lW\u0006L!a\b\u000e\u0003\u0019\u0005\u001bGo\u001c:M_\u001e<\u0017N\\4\u0011\u0005\u0005\"S\"\u0001\u0012\u000b\u0005\r\"\u0011\u0001C3nE\u0016$G-\u001a3\n\u0005\u0015\u0012#AC!tg\u0016\u0014H/[8og\"Aq\u0005\u0001B\u0001B\u0003%\u0001&A\u0002tg\u000e\u0004\"!K\u0019\u000e\u0003)R!a\u000b\u0017\u0002\u0013M$(/Z1nS:<'BA\u0004.\u0015\tqs&\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002a\u0005\u0019qN]4\n\u0005IR#\u0001E*ue\u0016\fW.\u001b8h\u0007>tG/\u001a=u\u0011!!\u0004A!A!\u0002\u0013)\u0014\u0001C:fiRLgnZ:\u0011\u0005U1\u0014BA\u001c\u0003\u0005Y\u0019\u0006/\u0019:l\u0007\u0006\u001c8/\u00198ee\u0006\u001cV\r\u001e;j]\u001e\u001c\b\u0002C\u001d\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u001e\u0002\t\u0011\fG/\u0019\t\u0004w\u0001\u0013U\"\u0001\u001f\u000b\u0005ur\u0014!C5n[V$\u0018M\u00197f\u0015\ty\u0004#\u0001\u0006d_2dWm\u0019;j_:L!!\u0011\u001f\u0003\u0007M+G\u000f\u0005\u0002D\r:\u0011q\u0002R\u0005\u0003\u000bB\ta\u0001\u0015:fI\u00164\u0017BA$I\u0005\u0019\u0019FO]5oO*\u0011Q\t\u0005\u0005\u0006\u0015\u0002!\taS\u0001\u0007y%t\u0017\u000e\u001e \u0015\t1kej\u0014\t\u0003+\u0001AQaJ%A\u0002!BQ\u0001N%A\u0002UBQ!O%A\u0002iBq!\u0015\u0001C\u0002\u0013\r!+A\u0004uS6,w.\u001e;\u0016\u0003M\u0003\"\u0001V,\u000e\u0003US!A\u0016\u000f\u0002\tU$\u0018\u000e\\\u0005\u00031V\u0013q\u0001V5nK>,H\u000f\u0003\u0004[\u0001\u0001\u0006IaU\u0001\ti&lWm\\;uA!9A\f\u0001b\u0001\n\u0013i\u0016aA:bgV\ta\f\u0005\u0002\u001a?&\u0011\u0001M\u0007\u0002\f\u0003\u000e$xN]*zgR,W\u000e\u0003\u0004c\u0001\u0001\u0006IAX\u0001\u0005g\u0006\u001c\b\u0005C\u0003e\u0001\u0011\u0005Q-A\u0004sK\u000e,\u0017N^3\u0016\u0003\u0019\u0004\"a\u001a6\u000f\u0005eA\u0017BA5\u001b\u0003\u0015\t5\r^8s\u0013\tYGNA\u0004SK\u000e,\u0017N^3\u000b\u0005%T\u0002\"\u00028\u0001\t\u0003)\u0017\u0001\u00023p]\u0016DQ\u0001\u001d\u0001\u0005\u0002E\faA^3sS\u001aLH#\u0001:\u0011\u0005=\u0019\u0018B\u0001;\u0011\u0005\u0011)f.\u001b;\t\u000bY\u0004A\u0011A9\u0002\u0011MDW\u000f\u001e3po:\u0004")
/* loaded from: input_file:com/datastax/spark/connector/demo/NodeGuardian.class */
public class NodeGuardian implements CounterActor, ActorLogging, Assertions {
    private final StreamingContext ssc;
    private final SparkCassandraSettings settings;
    public final Set<String> com$datastax$spark$connector$demo$NodeGuardian$$data;
    private final Timeout timeout;
    private final ActorSystem sas;
    private Duration com$datastax$spark$connector$embedded$Assertions$$end;
    private final LoggingAdapter log;
    private final int scale;
    private int com$datastax$spark$connector$demo$CounterActor$$count;
    private final ActorContext context;
    private final ActorRef self;

    public Duration com$datastax$spark$connector$embedded$Assertions$$end() {
        return this.com$datastax$spark$connector$embedded$Assertions$$end;
    }

    public void com$datastax$spark$connector$embedded$Assertions$$end_$eq(Duration duration) {
        this.com$datastax$spark$connector$embedded$Assertions$$end = duration;
    }

    public FiniteDuration now() {
        return Assertions.class.now(this);
    }

    public FiniteDuration remainingOrDefault() {
        return Assertions.class.remainingOrDefault(this);
    }

    public FiniteDuration remainingOr(FiniteDuration finiteDuration) {
        return Assertions.class.remainingOr(this, finiteDuration);
    }

    public void awaitCond(Function0<Object> function0, Duration duration, Duration duration2, String str) {
        Assertions.class.awaitCond(this, function0, duration, duration2, str);
    }

    public Duration awaitCond$default$2() {
        return Assertions.class.awaitCond$default$2(this);
    }

    public Duration awaitCond$default$3() {
        return Assertions.class.awaitCond$default$3(this);
    }

    public String awaitCond$default$4() {
        return Assertions.class.awaitCond$default$4(this);
    }

    public LoggingAdapter log() {
        return this.log;
    }

    public void akka$actor$ActorLogging$_setter_$log_$eq(LoggingAdapter loggingAdapter) {
        this.log = loggingAdapter;
    }

    @Override // com.datastax.spark.connector.demo.CounterActor
    public int scale() {
        return this.scale;
    }

    @Override // com.datastax.spark.connector.demo.CounterActor
    public int com$datastax$spark$connector$demo$CounterActor$$count() {
        return this.com$datastax$spark$connector$demo$CounterActor$$count;
    }

    @Override // com.datastax.spark.connector.demo.CounterActor
    public void com$datastax$spark$connector$demo$CounterActor$$count_$eq(int i) {
        this.com$datastax$spark$connector$demo$CounterActor$$count = i;
    }

    @Override // com.datastax.spark.connector.demo.CounterActor
    public void com$datastax$spark$connector$demo$CounterActor$_setter_$scale_$eq(int i) {
        this.scale = i;
    }

    @Override // com.datastax.spark.connector.demo.CounterActor
    public void increment() {
        CounterActor.Cclass.increment(this);
    }

    public ActorContext context() {
        return this.context;
    }

    public final ActorRef self() {
        return this.self;
    }

    public void akka$actor$Actor$_setter_$context_$eq(ActorContext actorContext) {
        this.context = actorContext;
    }

    public final void akka$actor$Actor$_setter_$self_$eq(ActorRef actorRef) {
        this.self = actorRef;
    }

    public final ActorRef sender() {
        return Actor.class.sender(this);
    }

    public SupervisorStrategy supervisorStrategy() {
        return Actor.class.supervisorStrategy(this);
    }

    public void preStart() throws Exception {
        Actor.class.preStart(this);
    }

    public void postStop() throws Exception {
        Actor.class.postStop(this);
    }

    public void preRestart(Throwable th, Option<Object> option) throws Exception {
        Actor.class.preRestart(this, th, option);
    }

    public void postRestart(Throwable th) throws Exception {
        Actor.class.postRestart(this, th);
    }

    public void unhandled(Object obj) {
        Actor.class.unhandled(this, obj);
    }

    public Timeout timeout() {
        return this.timeout;
    }

    private ActorSystem sas() {
        return this.sas;
    }

    public PartialFunction<Object, BoxedUnit> receive() {
        return new NodeGuardian$$anonfun$receive$1(this);
    }

    public PartialFunction<Object, BoxedUnit> done() {
        return new NodeGuardian$$anonfun$done$1(this);
    }

    public void verify() {
        SparkContextFunctions streamingContextFunctions = package$.MODULE$.toStreamingContextFunctions(this.ssc);
        String CassandraKeyspace = this.settings.CassandraKeyspace();
        String CassandraTable = this.settings.CassandraTable();
        ClassTag apply = ClassTag$.MODULE$.apply(Event.WordCount.class);
        RowReaderFactory$ rowReaderFactory$ = RowReaderFactory$.MODULE$;
        TypeTags universe = scala.reflect.runtime.package$.MODULE$.universe();
        CassandraRDD cassandraTable = streamingContextFunctions.cassandraTable(CassandraKeyspace, CassandraTable, streamingContextFunctions.cassandraTable$default$3(CassandraKeyspace, CassandraTable), apply, rowReaderFactory$.classBasedRowReaderFactory(universe.TypeTag().apply(scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(NodeGuardian.class.getClassLoader()), new TypeCreator(this) { // from class: com.datastax.spark.connector.demo.NodeGuardian$$typecreator1$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe2 = mirror.universe();
                return universe2.TypeRef().apply(universe2.SingleType().apply(universe2.SingleType().apply(universe2.SingleType().apply(universe2.SingleType().apply(universe2.SingleType().apply(universe2.SingleType().apply(universe2.build().thisPrefix(mirror.RootClass()), mirror.staticPackage("com")), mirror.staticPackage("com.datastax")), mirror.staticPackage("com.datastax.spark")), mirror.staticPackage("com.datastax.spark.connector")), mirror.staticPackage("com.datastax.spark.connector.embedded")), mirror.staticModule("com.datastax.spark.connector.embedded.Event")), mirror.staticClass("com.datastax.spark.connector.embedded.Event.WordCount"), Nil$.MODULE$);
            }
        }), ColumnMapper$.MODULE$.defaultColumnMapper(ClassTag$.MODULE$.apply(Event.WordCount.class)), MagicalTypeTricks$.MODULE$.nsub(), MagicalTypeTricks$.MODULE$.doesntHaveImplicit()), ValidRDDType$.MODULE$.javaSerializableAsValidRDDType());
        awaitCond(new NodeGuardian$$anonfun$verify$1(this, cassandraTable), new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(3)).minutes(), awaitCond$default$3(), awaitCond$default$4());
        Predef$.MODULE$.refArrayOps((Object[]) cassandraTable.collect()).foreach(new NodeGuardian$$anonfun$verify$2(this));
        context().become(done(), context().become$default$2());
        akka.actor.package$.MODULE$.actorRef2Scala(self()).$bang(Event$Completed$.MODULE$, self());
    }

    public void shutdown() {
        log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Assertions successful, shutting down."})).s(Nil$.MODULE$));
        context().system().eventStream().unsubscribe(self());
        log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Stopping the demo app actor system and '", "'"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.ssc})));
        context().system().shutdown();
        this.ssc.stop(true, false);
    }

    public final boolean com$datastax$spark$connector$demo$NodeGuardian$$successful$1(CassandraRDD cassandraRDD) {
        return Predef$.MODULE$.refArrayOps((Object[]) cassandraRDD.collect()).nonEmpty() && BoxesRunTime.unboxToInt(cassandraRDD.map(new NodeGuardian$$anonfun$com$datastax$spark$connector$demo$NodeGuardian$$successful$1$2(this), ClassTag$.MODULE$.Int()).reduce(new NodeGuardian$$anonfun$com$datastax$spark$connector$demo$NodeGuardian$$successful$1$1(this))) == scale() * 2;
    }

    public NodeGuardian(StreamingContext streamingContext, SparkCassandraSettings sparkCassandraSettings, Set<String> set) {
        this.ssc = streamingContext;
        this.settings = sparkCassandraSettings;
        this.com$datastax$spark$connector$demo$NodeGuardian$$data = set;
        Actor.class.$init$(this);
        CounterActor.Cclass.$init$(this);
        ActorLogging.class.$init$(this);
        Assertions.class.$init$(this);
        this.timeout = new Timeout(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(5)).seconds());
        this.sas = SparkEnv$.MODULE$.get().actorSystem();
        sas().eventStream().subscribe(self(), Event.ReceiverStarted.class);
        DStreamFunctions dStreamFunctions = package$.MODULE$.toDStreamFunctions(StreamingContext$.MODULE$.toPairDStreamFunctions(streamingContext.actorStream(Props$.MODULE$.apply(ClassTag$.MODULE$.apply(Streamer.class)), "actor-stream", StorageLevel$.MODULE$.MEMORY_AND_DISK(), streamingContext.actorStream$default$4(), ClassTag$.MODULE$.apply(String.class)).flatMap(new NodeGuardian$$anonfun$5(this), ClassTag$.MODULE$.apply(String.class)).map(new NodeGuardian$$anonfun$6(this), ClassTag$.MODULE$.apply(Tuple2.class)), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.Int(), Ordering$String$.MODULE$).reduceByKey(new NodeGuardian$$anonfun$1(this)), ClassTag$.MODULE$.apply(Tuple2.class));
        String CassandraKeyspace = sparkCassandraSettings.CassandraKeyspace();
        String CassandraTable = sparkCassandraSettings.CassandraTable();
        ColumnSelector saveToCassandra$default$3 = dStreamFunctions.saveToCassandra$default$3();
        WriteConf saveToCassandra$default$4 = dStreamFunctions.saveToCassandra$default$4();
        dStreamFunctions.saveToCassandra(CassandraKeyspace, CassandraTable, saveToCassandra$default$3, saveToCassandra$default$4, dStreamFunctions.saveToCassandra$default$5(CassandraKeyspace, CassandraTable, saveToCassandra$default$3, saveToCassandra$default$4), RowWriterFactory$.MODULE$.defaultRowWriterFactory(ColumnMapper$.MODULE$.tuple2ColumnMapper()));
        streamingContext.start();
        log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Streaming context started."})).s(Nil$.MODULE$));
    }
}
