package org.cg.spark.databroker;

import com.typesafe.config.Config;
import java.net.URL;
import kafka.serializer.StringDecoder;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.spark.Logging;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.streaming.Seconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.dstream.InputDStream;
import org.apache.spark.streaming.kafka.KafkaUtils$;
import org.cg.monadic.transformer.TransformationPipelineContext;
import org.cg.spark.databroker.StreamingCoder;
import org.slf4j.Logger;
import scala.Function0;
import scala.None$;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Try$;
import spark.jobserver.SparkJobValidation;
import spark.jobserver.SparkStreamingJob;

/* compiled from: ChannelSparkBroker.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005%f\u0001B\u0001\u0003\u0001-\u0011!c\u00115b]:,Gn\u00159be.\u0014%o\\6fe*\u00111\u0001B\u0001\u000bI\u0006$\u0018M\u0019:pW\u0016\u0014(BA\u0003\u0007\u0003\u0015\u0019\b/\u0019:l\u0015\t9\u0001\"\u0001\u0002dO*\t\u0011\"A\u0002pe\u001e\u001c\u0001!F\u0002\rWa\u001aB\u0001A\u0007\u00145A\u0011a\"E\u0007\u0002\u001f)\t\u0001#A\u0003tG\u0006d\u0017-\u0003\u0002\u0013\u001f\t1\u0011I\\=SK\u001a\u0004\"\u0001\u0006\r\u000e\u0003UQ!!\u0002\f\u000b\u0005]A\u0011AB1qC\u000eDW-\u0003\u0002\u001a+\t9Aj\\4hS:<\u0007CA\u000e \u001b\u0005a\"BA\u000f\u001f\u0003%QwNY:feZ,'OC\u0001\u0006\u0013\t\u0001CDA\tTa\u0006\u00148n\u0015;sK\u0006l\u0017N\\4K_\nD\u0001B\t\u0001\u0003\u0004\u0003\u0006YaI\u0001\u000bKZLG-\u001a8dK\u0012\n\u0004c\u0001\u0013(S5\tQE\u0003\u0002'\u001f\u00059!/\u001a4mK\u000e$\u0018B\u0001\u0015&\u0005!\u0019E.Y:t)\u0006<\u0007C\u0001\u0016,\u0019\u0001!Q\u0001\f\u0001C\u00025\u0012Q!\u0012,F\u001dR\u000b\"AL\u0019\u0011\u00059y\u0013B\u0001\u0019\u0010\u0005\u001dqu\u000e\u001e5j]\u001e\u0004\"A\u0004\u001a\n\u0005Mz!aA!os\"AQ\u0007\u0001B\u0002B\u0003-a'\u0001\u0006fm&$WM\\2fII\u00022\u0001J\u00148!\tQ\u0003\bB\u0003:\u0001\t\u0007!HA\u0004E\u000b\u000e{E)\u0012*\u0012\u00059Z\u0004c\u0001\u001f>S5\t!!\u0003\u0002?\u0005\tq1\u000b\u001e:fC6LgnZ\"pI\u0016\u0014\b\"\u0002!\u0001\t\u0003\t\u0015A\u0002\u001fj]&$h\bF\u0001C)\r\u0019E)\u0012\t\u0005y\u0001Is\u0007C\u0003#\u007f\u0001\u000f1\u0005C\u00036\u007f\u0001\u000fa\u0007C\u0004H\u0001\t\u0007IQ\u0001%\u0002\u001b\t\u0013vjS#S?\u000e{eJR%H+\u0005Iu\"\u0001&\"\u0003-\u000baA\u0019:pW\u0016\u0014\bBB'\u0001A\u00035\u0011*\u0001\bC%>[UIU0D\u001f:3\u0015j\u0012\u0011\t\u000f=\u0003!\u0019!C\u0003!\u0006Q1IR$`\u0005J{5*\u0012*\u0016\u0003E{\u0011AU\u0011\u0002'\u0006\t\"m\\8ugR\u0014\u0018\r\u001d\u0018tKJ4XM]:\t\rU\u0003\u0001\u0015!\u0004R\u0003-\u0019eiR0C%>[UI\u0015\u0011\t\u000f]\u0003!\u0019!C\u00031\u0006Q1IR$`)>\u0003\u0016jQ*\u0016\u0003e{\u0011AW\u0011\u00027\u00061Ao\u001c9jGNDa!\u0018\u0001!\u0002\u001bI\u0016aC\"G\u000f~#v\nU%D'\u0002Bqa\u0018\u0001C\u0002\u0013\u0015\u0001-\u0001\nD\r\u001e{&)\u0011+D\u0011~Ke\nV#S-\u0006cU#A1\u0010\u0003\t\f\u0013aY\u0001\u000fE\u0006$8\r\u001b\u0018j]R,'O^1m\u0011\u0019)\u0007\u0001)A\u0007C\u0006\u00192IR$`\u0005\u0006#6\tS0J\u001dR+%KV!MA!9q\r\u0001b\u0001\n\u000bA\u0017\u0001E\"G\u000f~\u0003\u0016\nU#M\u0013:+ul\u0011'[+\u0005Iw\"\u00016\"\u0003-\fa\u0002]5qK2Lg.\u001a\u0018dY\u0006\u001c8\u000f\u0003\u0004n\u0001\u0001\u0006i![\u0001\u0012\u0007\u001a;u\fU%Q\u000b2Ke*R0D\u0019j\u0003\u0003bB8\u0001\u0005\u0004%)\u0001]\u0001\r\u0007\u001a;ul\u0011%L!~#\u0015JU\u000b\u0002c>\t!/I\u0001t\u00039\u0019\u0007.Z2la>Lg\u000e\u001e\u0018eSJDa!\u001e\u0001!\u0002\u001b\t\u0018!D\"G\u000f~\u001b\u0005j\u0013)`\t&\u0013\u0006\u0005C\u0004x\u0001\t\u0007IQ\u0001=\u0002#\r3uiX\"I\u0017B{\u0016J\u0014+F%Z\u000bE*F\u0001z\u001f\u0005Q\u0018%A>\u0002'\rDWmY6q_&tGOL5oi\u0016\u0014h/\u00197\t\ru\u0004\u0001\u0015!\u0004z\u0003I\u0019eiR0D\u0011.\u0003v,\u0013(U\u000bJ3\u0016\t\u0014\u0011\t\u0011}\u0004!\u0019!C\u0001\u0003\u0003\tAbY8oM&<\u0007*\u001a7qKJ,\"!a\u0001\u0011\t\u0005\u0015\u0011qB\u0007\u0003\u0003\u000fQA!!\u0003\u0002\f\u0005YAO]1og\u001a|'/\\3s\u0015\r\tiAB\u0001\b[>t\u0017\rZ5d\u0013\u0011\t\t\"a\u0002\u0003;Q\u0013\u0018M\\:g_Jl\u0017\r^5p]BK\u0007/\u001a7j]\u0016\u001cuN\u001c;fqRD\u0001\"!\u0006\u0001A\u0003%\u00111A\u0001\u000eG>tg-[4IK2\u0004XM\u001d\u0011\t\u000f\u0005e\u0001\u0001\"\u0001\u0002\u001c\u0005QAn\\1e\u0007>tg-[4\u0015\t\u0005u\u0011\u0011\u0007\t\u0005\u0003?\ti#\u0004\u0002\u0002\")!\u00111EA\u0013\u0003\u0019\u0019wN\u001c4jO*!\u0011qEA\u0015\u0003!!\u0018\u0010]3tC\u001a,'BAA\u0016\u0003\r\u0019w.\\\u0005\u0005\u0003_\t\tC\u0001\u0004D_:4\u0017n\u001a\u0005\t\u0003g\t9\u00021\u0001\u00026\u0005!an\u001c3f!\u0011\t9$!\u0010\u000f\u00079\tI$C\u0002\u0002<=\ta\u0001\u0015:fI\u00164\u0017\u0002BA \u0003\u0003\u0012aa\u0015;sS:<'bAA\u001e\u001f!9\u0011Q\t\u0001\u0005B\u0005\u001d\u0013\u0001\u0003<bY&$\u0017\r^3\u0015\r\u0005%\u0013qJA0!\rY\u00121J\u0005\u0004\u0003\u001bb\"AE*qCJ\\'j\u001c2WC2LG-\u0019;j_:D\u0001\"!\u0015\u0002D\u0001\u0007\u00111K\u0001\u0003g\u000e\u0004B!!\u0016\u0002\\5\u0011\u0011q\u000b\u0006\u0004\u00033*\u0012!C:ue\u0016\fW.\u001b8h\u0013\u0011\ti&a\u0016\u0003!M#(/Z1nS:<7i\u001c8uKb$\b\u0002CA\u0012\u0003\u0007\u0002\r!!\b\t\u000f\u0005\r\u0004\u0001\"\u0011\u0002f\u00051!/\u001e8K_\n$R!MA4\u0003WB\u0001\"!\u001b\u0002b\u0001\u0007\u00111K\u0001\u0004gN\u001c\u0007\u0002CA\u0012\u0003C\u0002\r!!\b\t\u000f\u0005=\u0004\u0001\"\u0001\u0002r\u0005Q\u0011N\\5u\u0007>tg-[4\u0015\t\u0005u\u00111\u000f\u0005\t\u0003G\ti\u00071\u0001\u0002\u001e!9\u0011q\u000e\u0001\u0005\u0002\u0005]D\u0003BA\u000f\u0003sB\u0001\"a\u001f\u0002v\u0001\u0007\u0011QP\u0001\u0005CJ<7\u000fE\u0003\u000f\u0003\u007f\n)$C\u0002\u0002\u0002>\u0011Q!\u0011:sCfDq!!\"\u0001\t\u0003\t9)A\u0006j]R,'O\\1m%VtG#C\u0019\u0002\n\u0006M\u0015QSAM\u0011\u001dY\u00161\u0011a\u0001\u0003\u0017\u0003RADA@\u0003\u001b\u00032\u0001PAH\u0013\r\t\tJ\u0001\u0002\u0006)>\u0004\u0018n\u0019\u0005\t\u0003S\n\u0019\t1\u0001\u0002T!A\u0011qSAB\u0001\u0004\ti\"A\u0005tiJ,\u0017-\\\"gO\"A\u00111EAB\u0001\u0004\ti\u0002C\u0004\u0002\u001e\u0002!\t!a(\u0002\t5\f\u0017N\u001c\u000b\u0005\u0003C\u000b9\u000bE\u0002\u000f\u0003GK1!!*\u0010\u0005\u0011)f.\u001b;\t\u0011\u0005m\u00141\u0014a\u0001\u0003{\u0002")
/* loaded from: input_file:org/cg/spark/databroker/ChannelSparkBroker.class */
public class ChannelSparkBroker<EVENT, DECODER extends StreamingCoder<EVENT>> implements Logging, SparkStreamingJob {
    private final ClassTag<EVENT> evidence$1;
    private final ClassTag<DECODER> evidence$2;
    private final String BROKER_CONFIG;
    private final String CFG_BROKER;
    private final String CFG_TOPICS;
    private final String CFG_BATCH_INTERVAL;
    private final String CFG_PIPELINE_CLZ;
    private final String CFG_CHKP_DIR;
    private final String CFG_CHKP_INTERVAL;
    private final TransformationPipelineContext configHelper;
    private transient Logger org$apache$spark$Logging$$log_;

    public Logger org$apache$spark$Logging$$log_() {
        return this.org$apache$spark$Logging$$log_;
    }

    public void org$apache$spark$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$Logging$$log_ = logger;
    }

    public String logName() {
        return Logging.class.logName(this);
    }

    public Logger log() {
        return Logging.class.log(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    public final String BROKER_CONFIG() {
        return "broker";
    }

    public final String CFG_BROKER() {
        return "bootstrap.servers";
    }

    public final String CFG_TOPICS() {
        return "topics";
    }

    public final String CFG_BATCH_INTERVAL() {
        return "batch.interval";
    }

    public final String CFG_PIPELINE_CLZ() {
        return "pipeline.class";
    }

    public final String CFG_CHKP_DIR() {
        return "checkpoint.dir";
    }

    public final String CFG_CHKP_INTERVAL() {
        return "checkpoint.interval";
    }

    public TransformationPipelineContext configHelper() {
        return this.configHelper;
    }

    public Config loadConfig(String str) {
        return configHelper().loadConfig(str);
    }

    public SparkJobValidation validate(StreamingContext streamingContext, Config config) {
        return (SparkJobValidation) Try$.MODULE$.apply(new ChannelSparkBroker$$anonfun$validate$1(this, config)).map(new ChannelSparkBroker$$anonfun$validate$2(this)).getOrElse(new ChannelSparkBroker$$anonfun$validate$3(this));
    }

    public Object runJob(StreamingContext streamingContext, Config config) {
        log().info("============JOB CONFIG BEGIN==============");
        log().info(config.toString());
        log().info("============JOB CONFIG END==============");
        log().info("============JOB PARAMS BEGIN==============");
        Topic[] stringToTopicSet = TopicUtil$.MODULE$.stringToTopicSet(config.getString("input.string"));
        Predef$.MODULE$.refArrayOps(stringToTopicSet).foreach(new ChannelSparkBroker$$anonfun$runJob$1(this));
        log().info("============JOB PARAMS END==============");
        Config initConfig = initConfig(config);
        log().info("============STREAM CONFIG BEGIN==============");
        log().info(initConfig.toString());
        log().info("============STREAM CONFIG END==============");
        internalRun(stringToTopicSet, streamingContext, initConfig, config);
        log().info("Engine starting");
        streamingContext.start();
        streamingContext.awaitTermination();
        return BoxedUnit.UNIT;
    }

    public Config initConfig(Config config) {
        configHelper().initIfUndefined(config);
        return loadConfig("broker");
    }

    public Config initConfig(String[] strArr) {
        String str = strArr[0];
        try {
            URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
        } catch (Throwable th) {
        }
        configHelper().initIfUndefined(new URL(str));
        return loadConfig("broker");
    }

    public Object internalRun(Topic[] topicArr, StreamingContext streamingContext, Config config, Config config2) {
        String string = config.getString("bootstrap.servers");
        Set set = ((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(config.getStringList("topics")).asScala()).toSet();
        None$ some = Try$.MODULE$.apply(new ChannelSparkBroker$$anonfun$4(this, config)).isFailure() ? None$.MODULE$ : new Some(new StringBuilder().append(config.getString("checkpoint.dir")).append("/").append(getClass().getName()).toString());
        long unboxToLong = BoxesRunTime.unboxToLong(Try$.MODULE$.apply(new ChannelSparkBroker$$anonfun$1(this, config)).getOrElse(new ChannelSparkBroker$$anonfun$2(this)));
        Class<?> cls = Class.forName(config.getString("pipeline.class"));
        Object newInstance = cls.newInstance();
        if (some.isDefined()) {
            streamingContext.checkpoint((String) some.get());
        }
        ((IterableLike) JavaConverters$.MODULE$.asScalaSetConverter(config.entrySet()).asScala()).foreach(new ChannelSparkBroker$$anonfun$internalRun$1(this, streamingContext.sparkContext().hadoopConfiguration()));
        log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"load pipeline ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{cls})));
        InputDStream createDirectStream = KafkaUtils$.MODULE$.createDirectStream(streamingContext, Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc("metadata.broker.list"), string)})), set, ClassTag$.MODULE$.apply(String.class), this.evidence$1, ClassTag$.MODULE$.apply(StringDecoder.class), this.evidence$2);
        if (some.isDefined()) {
            createDirectStream.checkpoint(Seconds$.MODULE$.apply(unboxToLong));
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        if (!(newInstance instanceof ChannelJobPipeline)) {
            log().error(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"wrong event type ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{newInstance})));
            throw new ClassCastException();
        }
        log().info("begin handling");
        ((ChannelJobPipeline) newInstance).handle(streamingContext, createDirectStream, topicArr, config2);
        return BoxedUnit.UNIT;
    }

    public void main(String[] strArr) {
        String name = getClass().getName();
        if (strArr.length < 1) {
            System.err.println(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n        |Usage: ", " <configurationUrl> <topics> \n        |  <configurationUrl> the URL path of the config file \n        |  <topics> topic1|select * from topic1|5|5||topic2|select * from topic2|10|10\n        |  \n        "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{name})))).stripMargin());
            System.exit(1);
        }
        Config initConfig = initConfig(strArr);
        StreamingContext streamingContext = new StreamingContext(new SparkContext(new SparkConf().setAppName("ChannelEngine")), Seconds$.MODULE$.apply(BoxesRunTime.unboxToLong(Option$.MODULE$.apply(BoxesRunTime.boxToLong(initConfig.getLong("batch.interval"))).getOrElse(new ChannelSparkBroker$$anonfun$3(this)))));
        internalRun(TopicUtil$.MODULE$.stringToTopicSet(strArr[1]), streamingContext, initConfig, (Config) configHelper().config().get());
        streamingContext.start();
        streamingContext.awaitTermination();
    }

    public ChannelSparkBroker(ClassTag<EVENT> classTag, ClassTag<DECODER> classTag2) {
        this.evidence$1 = classTag;
        this.evidence$2 = classTag2;
        Logging.class.$init$(this);
        this.configHelper = new TransformationPipelineContext();
    }
}
