package org.cg.spark.databroker;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.ActorSystem$;
import akka.actor.ScalaActorRef;
import akka.actor.package$;
import akka.cluster.Cluster;
import akka.cluster.Cluster$;
import akka.contrib.pattern.DistributedPubSubExtension$;
import akka.contrib.pattern.DistributedPubSubMediator;
import akka.contrib.pattern.DistributedPubSubMediator$Publish$;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.spark.Logging;
import org.khaleesi.carfield.tools.sparkjobserver.api.ISparkJobServerClient;
import org.khaleesi.carfield.tools.sparkjobserver.api.SparkJobResult;
import org.slf4j.Logger;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.concurrent.Map;
import scala.collection.immutable.Map$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;

/* compiled from: ChannelJobManager.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005=f\u0001B\u0001\u0003\u0001-\u0011\u0011c\u00115b]:,GNS8c\u001b\u0006t\u0017mZ3s\u0015\t\u0019A!\u0001\u0006eCR\f'M]8lKJT!!\u0002\u0004\u0002\u000bM\u0004\u0018M]6\u000b\u0005\u001dA\u0011AA2h\u0015\u0005I\u0011aA8sO\u000e\u00011c\u0001\u0001\r%A\u0011Q\u0002E\u0007\u0002\u001d)\tq\"A\u0003tG\u0006d\u0017-\u0003\u0002\u0012\u001d\t1\u0011I\\=SK\u001a\u0004\"aE\f\u000e\u0003QQ!!B\u000b\u000b\u0005YA\u0011AB1qC\u000eDW-\u0003\u0002\u0019)\t9Aj\\4hS:<\u0007\u0002\u0003\u000e\u0001\u0005\u0003\u0005\u000b\u0011B\u000e\u0002\rE,xN]8n!\taR$D\u0001\u0003\u0013\tq\"AA\bK_\n\u001cVM\u001d<feF+xN];n\u0011\u0015\u0001\u0003\u0001\"\u0001\"\u0003\u0019a\u0014N\\5u}Q\u0011!e\t\t\u00039\u0001AQAG\u0010A\u0002mAq!\n\u0001C\u0002\u0013\u0005a%A\u0004S+:s\u0015JT$\u0016\u0003\u001d\u0002\"\u0001K\u0017\u000e\u0003%R!AK\u0016\u0002\t1\fgn\u001a\u0006\u0002Y\u0005!!.\u0019<b\u0013\tq\u0013F\u0001\u0004TiJLgn\u001a\u0005\u0007a\u0001\u0001\u000b\u0011B\u0014\u0002\u0011I+fJT%O\u000f\u0002BqA\r\u0001C\u0002\u0013\u0005a%A\u0004T)\u0006\u0013F+\u0012#\t\rQ\u0002\u0001\u0015!\u0003(\u0003!\u0019F+\u0011*U\u000b\u0012\u0003\u0003b\u0002\u001c\u0001\u0005\u0004%\tAJ\u0001\u0016!\u0006\u0013\u0016)T0D\u001f:#V\t\u0017+`\r\u0006\u001bEk\u0014*Z\u0011\u0019A\u0004\u0001)A\u0005O\u00051\u0002+\u0011*B\u001b~\u001buJ\u0014+F1R{f)Q\"U\u001fJK\u0006\u0005C\u0004;\u0001\t\u0007I\u0011\u0001\u0014\u00021A\u000b%+Q'`'R\u0013V)T!J\u001d\u001e{\u0016J\u0014+F%Z\u000bE\n\u0003\u0004=\u0001\u0001\u0006IaJ\u0001\u001a!\u0006\u0013\u0016)T0T)J+U*Q%O\u000f~Ke\nV#S-\u0006c\u0005\u0005C\u0004?\u0001\t\u0007I\u0011\u0001\u0014\u0002=A\u000b%+Q'`'R\u0013V)T!J\u001d\u001e{6\u000bV(Q\u000fJ\u000b5)\u0012$V\u00192K\u0006B\u0002!\u0001A\u0003%q%A\u0010Q\u0003J\u000bUjX*U%\u0016k\u0015)\u0013(H?N#v\nU$S\u0003\u000e+e)\u0016'M3\u0002BqA\u0011\u0001C\u0002\u0013\u0005a%A\u000eQ\u0003J\u000bUjX*U%\u0016k\u0015)\u0013(H?N#v\nU\"P\u001dR+\u0005\f\u0016\u0005\u0007\t\u0002\u0001\u000b\u0011B\u0014\u00029A\u000b%+Q'`'R\u0013V)T!J\u001d\u001e{6\u000bV(Q\u0007>sE+\u0012-UA!)a\t\u0001C\u0001\u000f\u0006yA.[:u\u0007\"\fgN\\3m\u0015>\u00147/F\u0001I!\u0011Ie\n\u0015,\u000e\u0003)S!a\u0013'\u0002\u0013%lW.\u001e;bE2,'BA'\u000f\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0003\u001f*\u00131!T1q!\t\tFK\u0004\u0002\u000e%&\u00111KD\u0001\u0007!J,G-\u001a4\n\u00059*&BA*\u000f!\tar+\u0003\u0002Y\u0005\tQ1\t[1o]\u0016d'j\u001c2\t\u000bi\u0003A\u0011A.\u0002\u001d\u0019Lg\u000eZ\"iC:tW\r\u001c&pER\u0011Al\u0018\t\u0004\u001bu3\u0016B\u00010\u000f\u0005\u0019y\u0005\u000f^5p]\")\u0001-\u0017a\u0001!\u0006Y1\r[1o]\u0016dg*Y7f\u0011\u0015\u0011\u0007\u0001\"\u0001d\u0003Ya\u0017n\u001d;Sk:t\u0017N\\4DQ\u0006tg.\u001a7K_\n\u001cH#\u0001%\t\u000b\u0015\u0004A\u0011\u00014\u0002%\u0019Lg\u000e\u001a*v]:LgnZ\"iC:tW\r\u001c\u000b\u0003\u0011\u001eDQ\u0001\u00193A\u0002ACQ!\u001b\u0001\u0005\u0002)\fQB];o\u0007\"\fgN\\3m\u0015>\u0014GcA6oaB\u0011Q\u0002\\\u0005\u0003[:\u0011qAQ8pY\u0016\fg\u000eC\u0003pQ\u0002\u0007a+A\u0002k_\nDq!\u001d5\u0011\u0002\u0003\u0007!/A\u0004kCJ\u0004\u0016\r\u001e5\u0011\u00075i\u0006\u000bC\u0003u\u0001\u0011\u0005Q/\u0001\bti>\u00048\t[1o]\u0016d'j\u001c2\u0015\u0005YL\bCA\u0007x\u0013\tAhB\u0001\u0003V]&$\b\"\u00021t\u0001\u0004\u0001\u0006\"B>\u0001\t\u0003a\u0018AE:u_B\u001c\u0005.\u00198oK2\u001cuN\u001c;fqR$\"A^?\t\u000b\u0001T\b\u0019\u0001)\t\u0011}\u0004!\u0019!C\u0003\u0003\u0003\t\u0001c\u0011$H?\u000ecUk\u0015+F%~s\u0015)T#\u0016\u0005\u0005\rqBAA\u0003C\t\t9!A\nce>\\WM\u001d\u0018dYV\u001cH/\u001a:/]\u0006lW\r\u0003\u0005\u0002\f\u0001\u0001\u000bQBA\u0002\u0003E\u0019eiR0D\u0019V\u001bF+\u0012*`\u001d\u0006kU\t\t\u0005\n\u0003\u001f\u0001!\u0019!C\u0001\u0003#\taa]=ti\u0016lWCAA\n!\u0011\t)\"a\b\u000e\u0005\u0005]!\u0002BA\r\u00037\tQ!Y2u_JT!!!\b\u0002\t\u0005\\7.Y\u0005\u0005\u0003C\t9BA\u0006BGR|'oU=ti\u0016l\u0007\u0002CA\u0013\u0001\u0001\u0006I!a\u0005\u0002\u000fML8\u000f^3nA!I\u0011\u0011\u0006\u0001C\u0002\u0013\u0005\u00111F\u0001\bG2,8\u000f^3s+\t\ti\u0003\u0005\u0003\u00020\u0005MRBAA\u0019\u0015\u0011\tI#a\u0007\n\t\u0005U\u0012\u0011\u0007\u0002\b\u00072,8\u000f^3s\u0011!\tI\u0004\u0001Q\u0001\n\u00055\u0012\u0001C2mkN$XM\u001d\u0011\t\u0013\u0005u\u0002A1A\u0005\u0002\u0005}\u0012\u0001C7fI&\fGo\u001c:\u0016\u0005\u0005\u0005\u0003\u0003BA\u000b\u0003\u0007JA!!\u0012\u0002\u0018\tA\u0011i\u0019;peJ+g\r\u0003\u0005\u0002J\u0001\u0001\u000b\u0011BA!\u0003%iW\rZ5bi>\u0014\b\u0005C\u0005\u0002N\u0001\u0011\r\u0011\"\u0001\u0002P\u0005A!/Z4jgR\u0014\u00180\u0006\u0002\u0002RA9\u00111KA-!\u0006\u0005SBAA+\u0015\r\t9\u0006T\u0001\u000bG>t7-\u001e:sK:$\u0018bA(\u0002V!A\u0011Q\f\u0001!\u0002\u0013\t\t&A\u0005sK\u001eL7\u000f\u001e:zA!9\u0011\u0011\r\u0001\u0005\u0002\u0005\r\u0014AD:vEN\u001c'/\u001b2f)>\u0004\u0018n\u0019\u000b\bm\u0006\u0015\u0014\u0011NA7\u0011\u001d\t9'a\u0018A\u0002A\u000bqa\u00195b]:,G\u000eC\u0004\u0002l\u0005}\u0003\u0019\u0001)\u0002\u000bQ|\u0007/[2\t\u0011\u0005=\u0014q\fa\u0001\u0003c\n\u0001\u0002\\5ti\u0016tWM\u001d\t\u00049\u0005M\u0014bAA;\u0005\t\u0001\u0012j\u00115b]:,G\u000eT5ti\u0016tWM\u001d\u0005\b\u0003s\u0002A\u0011AA>\u0003A)hnU;cg\u000e\u0014\u0018NY3U_BL7\rF\u0003w\u0003{\ny\bC\u0004\u0002h\u0005]\u0004\u0019\u0001)\t\u000f\u0005-\u0014q\u000fa\u0001!\"9\u00111\u0011\u0001\u0005\u0002\u0005\u0015\u0015!C:u_B$v\u000e]5d)\r1\u0018q\u0011\u0005\b\u0003\u0013\u000b\t\t1\u0001Q\u000351W\u000f\u001c7U_BL7MT1nK\"9\u0011Q\u0012\u0001\u0005\n\u0005=\u0015!C5t%Vtg.\u001b8h)\rY\u0017\u0011\u0013\u0005\b\u0003'\u000bY\t1\u0001Q\u0003\u0019\u0019H/\u0019;vg\"I\u0011q\u0013\u0001\u0012\u0002\u0013\u0005\u0011\u0011T\u0001\u0018eVt7\t[1o]\u0016d'j\u001c2%I\u00164\u0017-\u001e7uII*\"!a'+\u0007I\fij\u000b\u0002\u0002 B!\u0011\u0011UAV\u001b\t\t\u0019K\u0003\u0003\u0002&\u0006\u001d\u0016!C;oG\",7m[3e\u0015\r\tIKD\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002BAW\u0003G\u0013\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u0001")
/* loaded from: input_file:org/cg/spark/databroker/ChannelJobManager.class */
public class ChannelJobManager implements Logging {
    public final JobServerQuorum org$cg$spark$databroker$ChannelJobManager$$quorom;
    private final String RUNNING;
    private final String STARTED;
    private final String PARAM_CONTEXT_FACTORY;
    private final String PARAM_STREMAING_INTERVAL;
    private final String PARAM_STREMAING_STOPGRACEFULLY;
    private final String PARAM_STREMAING_STOPCONTEXT;
    private final String CFG_CLUSTER_NAME;
    private final ActorSystem system;
    private final Cluster cluster;
    private final ActorRef mediator;
    private final Map<String, ActorRef> registry;
    private transient Logger org$apache$spark$Logging$$log_;

    public Logger org$apache$spark$Logging$$log_() {
        return this.org$apache$spark$Logging$$log_;
    }

    public void org$apache$spark$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$Logging$$log_ = logger;
    }

    public String logName() {
        return Logging.class.logName(this);
    }

    public Logger log() {
        return Logging.class.log(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    public String RUNNING() {
        return this.RUNNING;
    }

    public String STARTED() {
        return this.STARTED;
    }

    public String PARAM_CONTEXT_FACTORY() {
        return this.PARAM_CONTEXT_FACTORY;
    }

    public String PARAM_STREMAING_INTERVAL() {
        return this.PARAM_STREMAING_INTERVAL;
    }

    public String PARAM_STREMAING_STOPGRACEFULLY() {
        return this.PARAM_STREMAING_STOPGRACEFULLY;
    }

    public String PARAM_STREMAING_STOPCONTEXT() {
        return this.PARAM_STREMAING_STOPCONTEXT;
    }

    public scala.collection.immutable.Map<String, ChannelJob> listChannelJobs() {
        return (scala.collection.immutable.Map) this.org$cg$spark$databroker$ChannelJobManager$$quorom.jobClientsMap().flatMap(new ChannelJobManager$$anonfun$listChannelJobs$1(this), Map$.MODULE$.canBuildFrom());
    }

    public Option<ChannelJob> findChannelJob(String str) {
        return Option$.MODULE$.apply(listChannelJobs().apply(str));
    }

    public scala.collection.immutable.Map<String, ChannelJob> listRunningChannelJobs() {
        return (scala.collection.immutable.Map) listChannelJobs().filter(new ChannelJobManager$$anonfun$listRunningChannelJobs$1(this));
    }

    public scala.collection.immutable.Map<String, ChannelJob> findRunningChannel(String str) {
        return (scala.collection.immutable.Map) listRunningChannelJobs().filter(new ChannelJobManager$$anonfun$findRunningChannel$1(this));
    }

    public boolean runChannelJob(ChannelJob channelJob, Option<String> option) {
        if (!findRunningChannel(channelJob.name()).isEmpty()) {
            Predef$.MODULE$.println(new StringBuilder().append("[ERROR] failed to run job, which is running already.").append(channelJob).toString());
            return true;
        }
        if (mediator().isTerminated()) {
            throw new IllegalStateException("Failed to start job mediator is dead!");
        }
        Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"[INFO] clean the registry for ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{channelJob})));
        registry().retain(new ChannelJobManager$$anonfun$runChannelJob$1(this, channelJob));
        registry().foreach(new ChannelJobManager$$anonfun$runChannelJob$2(this));
        scala.collection.mutable.Map apply = scala.collection.mutable.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(PARAM_CONTEXT_FACTORY()), "spark.jobserver.context.StreamingContextFactory"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(PARAM_STREMAING_INTERVAL()), String.valueOf(channelJob.intervalSec() * 1000)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(PARAM_STREMAING_STOPCONTEXT()), String.valueOf(true)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(PARAM_STREMAING_STOPGRACEFULLY()), String.valueOf(true))}));
        Option<ISparkJobServerClient> jobServerClient = this.org$cg$spark$databroker$ChannelJobManager$$quorom.jobServerClient();
        if (!jobServerClient.isDefined()) {
            String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"failed to run job ", ", quorom is not avaliable"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{channelJob}));
            Predef$.MODULE$.println(new StringBuilder().append("[ERROR] ").append(s).toString());
            throw new IllegalStateException(s);
        }
        ISparkJobServerClient iSparkJobServerClient = (ISparkJobServerClient) jobServerClient.get();
        if (iSparkJobServerClient.getContexts().contains(channelJob.name())) {
            Predef$.MODULE$.println(new StringBuilder().append("[INFO] context exists for job ").append(channelJob).toString());
        } else {
            Predef$.MODULE$.println(new StringBuilder().append("[INFO] creating context for job ").append(channelJob).toString());
            if (!iSparkJobServerClient.createContext(channelJob.name(), (java.util.Map) JavaConverters$.MODULE$.mutableMapAsJavaMapConverter(apply).asJava())) {
                String s2 = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"failed to create context for job ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{channelJob}));
                Predef$.MODULE$.println(new StringBuilder().append("[ERROR] ").append(s2).toString());
                throw new IllegalStateException(s2);
            }
        }
        Predef$.MODULE$.println(new StringBuilder().append("[INFO] run channel job ").append(channelJob).toString());
        SparkJobResult startJob = iSparkJobServerClient.startJob(new StringBuilder().append("input.string = ").append(TopicUtil$.MODULE$.topicsToString(channelJob.topics())).toString(), (java.util.Map) JavaConverters$.MODULE$.mutableMapAsJavaMapConverter(scala.collection.mutable.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc("appName"), channelJob.className()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc("context"), channelJob.name()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc("classPath"), channelJob.className())}))).asJava());
        Predef$.MODULE$.println(new StringBuilder().append("[INFO] run channel job with status ").append(channelJob).append(" result:").append(startJob).toString());
        return org$cg$spark$databroker$ChannelJobManager$$isRunning(startJob.getStatus());
    }

    public Option<String> runChannelJob$default$2() {
        return None$.MODULE$;
    }

    public void stopChannelJob(String str) {
        findRunningChannel(str).foreach(new ChannelJobManager$$anonfun$stopChannelJob$1(this, str));
    }

    public void stopChannelContext(String str) {
        this.org$cg$spark$databroker$ChannelJobManager$$quorom.jobClientsMap().foreach(new ChannelJobManager$$anonfun$stopChannelContext$1(this, str));
    }

    public final String CFG_CLUSTER_NAME() {
        return "broker.cluster.name";
    }

    public ActorSystem system() {
        return this.system;
    }

    public Cluster cluster() {
        return this.cluster;
    }

    public ActorRef mediator() {
        return this.mediator;
    }

    public Map<String, ActorRef> registry() {
        return this.registry;
    }

    public void subscribeTopic(String str, String str2, IChannelListener iChannelListener) {
        Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"[INFO] first unsub channel:", " topic: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str, str2})));
        try {
            unSubscribeTopic(str, str2);
        } catch (Exception e) {
            Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"[WARN] failed the attempt to unSubscribe channel:", " topic: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str, str2})));
        }
        registry().getOrElse(ChannelUtil$.MODULE$.clusterTopic(str, str2), new ChannelJobManager$$anonfun$subscribeTopic$1(this, str, str2, iChannelListener));
    }

    public void unSubscribeTopic(String str, String str2) {
        stopTopic(ChannelUtil$.MODULE$.clusterTopic(str, str2));
        registry().remove(ChannelUtil$.MODULE$.clusterTopic(str, str2));
    }

    public void stopTopic(String str) {
        Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"[INFO] stop topic ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str})));
        ScalaActorRef actorRef2Scala = package$.MODULE$.actorRef2Scala(mediator());
        DistributedPubSubMediator.Publish apply = DistributedPubSubMediator$Publish$.MODULE$.apply(str, ChannelSubcriber$Stop$.MODULE$);
        actorRef2Scala.$bang(apply, actorRef2Scala.$bang$default$2(apply));
    }

    public boolean org$cg$spark$databroker$ChannelJobManager$$isRunning(String str) {
        return "RUNNING".equals(str) || "STARTED".equals(str);
    }

    public ChannelJobManager(JobServerQuorum jobServerQuorum) {
        this.org$cg$spark$databroker$ChannelJobManager$$quorom = jobServerQuorum;
        Logging.class.$init$(this);
        this.RUNNING = "RUNNING";
        this.STARTED = "STARTED";
        this.PARAM_CONTEXT_FACTORY = "context-factory";
        this.PARAM_STREMAING_INTERVAL = "streaming.batch_interval";
        this.PARAM_STREMAING_STOPGRACEFULLY = "streaming.stopGracefully";
        this.PARAM_STREMAING_STOPCONTEXT = "streaming.stopSparkContext";
        Config load = ConfigFactory.load("channel_subscriber");
        this.system = ActorSystem$.MODULE$.apply(load.getString("broker.cluster.name"), load);
        this.cluster = Cluster$.MODULE$.apply(system());
        this.mediator = DistributedPubSubExtension$.MODULE$.apply(system()).mediator();
        this.registry = (Map) scala.collection.convert.package$.MODULE$.decorateAsScala().mapAsScalaConcurrentMapConverter(new ConcurrentHashMap()).asScala();
    }
}
