package spark.streaming;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import scala.Function0;
import scala.Option;
import scala.ScalaObject;
import scala.collection.IndexedSeqOptimized;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.BufferLike;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassManifest$;
import scala.reflect.ScalaSignature;
import spark.Logging;
import spark.SparkEnv$;

/* compiled from: JobManager.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005=a!B\u0001\u0003\u0001\t1!A\u0003&pE6\u000bg.Y4fe*\u00111\u0001B\u0001\ngR\u0014X-Y7j]\u001eT\u0011!B\u0001\u0006gB\f'o[\n\u0005\u0001\u001dy1\u0003\u0005\u0002\t\u001b5\t\u0011B\u0003\u0002\u000b\u0017\u0005!A.\u00198h\u0015\u0005a\u0011\u0001\u00026bm\u0006L!AD\u0005\u0003\r=\u0013'.Z2u!\t\u0001\u0012#D\u0001\u0005\u0013\t\u0011BAA\u0004M_\u001e<\u0017N\\4\u0011\u0005Q9R\"A\u000b\u000b\u0003Y\tQa]2bY\u0006L!\u0001G\u000b\u0003\u0017M\u001b\u0017\r\\1PE*,7\r\u001e\u0005\t5\u0001\u0011\t\u0011)A\u00059\u0005\u00191o]2\u0004\u0001A\u0011QDH\u0007\u0002\u0005%\u0011qD\u0001\u0002\u0011'R\u0014X-Y7j]\u001e\u001cuN\u001c;fqRD\u0001\"\t\u0001\u0003\u0002\u0003\u0006IAI\u0001\u000b]VlG\u000b\u001b:fC\u0012\u001c\bC\u0001\u000b$\u0013\t!SCA\u0002J]RDQA\n\u0001\u0005\u0002\u001d\na\u0001P5oSRtDc\u0001\u0015*UA\u0011Q\u0004\u0001\u0005\u00065\u0015\u0002\r\u0001\b\u0005\bC\u0015\u0002\n\u00111\u0001#\r\u0011a\u0003\u0001A\u0017\u0003\u0015){'\rS1oI2,'o\u0005\u0003,\u000f9\u001a\u0002C\u0001\u00050\u0013\t\u0001\u0014B\u0001\u0005Sk:t\u0017M\u00197f\u0011!Q2F!A!\u0002\u0013a\u0002\u0002C\u001a,\u0005\u0003\u0005\u000b\u0011\u0002\u001b\u0002\u0007)|'\r\u0005\u0002\u001ek%\u0011aG\u0001\u0002\u0004\u0015>\u0014\u0007\"\u0002\u0014,\t\u0003ADcA\u001d<yA\u0011!hK\u0007\u0002\u0001!)!d\u000ea\u00019!)1g\u000ea\u0001i!)ah\u000bC\u0001\u007f\u0005\u0019!/\u001e8\u0015\u0003\u0001\u0003\"\u0001F!\n\u0005\t+\"\u0001B+oSRDq\u0001\u0012\u0001C\u0002\u0013\u0005Q)A\u0006k_\n,\u00050Z2vi>\u0014X#\u0001$\u0011\u0005\u001dcU\"\u0001%\u000b\u0005%S\u0015AC2p]\u000e,(O]3oi*\u00111jC\u0001\u0005kRLG.\u0003\u0002N\u0011\nyQ\t_3dkR|'oU3sm&\u001cW\r\u0003\u0004P\u0001\u0001\u0006IAR\u0001\rU>\u0014W\t_3dkR|'\u000f\t\u0005\b#\u0002\u0011\r\u0011\"\u0001S\u0003\u0011QwNY:\u0016\u0003M\u0003B\u0001V-\\=6\tQK\u0003\u0002W/\u00069Q.\u001e;bE2,'B\u0001-\u0016\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u00035V\u0013q\u0001S1tQ6\u000b\u0007\u000f\u0005\u0002\u001e9&\u0011QL\u0001\u0002\u0005)&lW\rE\u0002U?RJ!\u0001Y+\u0003\u0017\u0005\u0013(/Y=Ck\u001a4WM\u001d\u0005\u0007E\u0002\u0001\u000b\u0011B*\u0002\u000b)|'m\u001d\u0011\t\u000b\u0011\u0004A\u0011A3\u0002\rI,hNS8c)\t\u0001e\rC\u00034G\u0002\u0007A\u0007C\u0003i\u0001\u0011\u0005q(\u0001\u0003ti>\u0004\b\"\u00026\u0001\t\u0013Y\u0017\u0001C2mK\u0006\u0014(j\u001c2\u0015\u0005\u0001c\u0007\"B\u001aj\u0001\u0004!\u0004\"\u00028\u0001\t\u0003y\u0017aD4fiB+g\u000eZ5oORKW.Z:\u0015\u0003A\u00042\u0001F9\\\u0013\t\u0011XCA\u0003BeJ\f\u0017p\u0002\u0005u\u0005\u0005\u0005\tR\u0001\u0002v\u0003)QuNY'b]\u0006<WM\u001d\t\u0003;Y4\u0001\"\u0001\u0002\u0002\u0002#\u0015!a^\n\u0004m\u001e\u0019\u0002\"\u0002\u0014w\t\u0003IH#A;\t\u000fm4\u0018\u0013!C\u0001y\u0006q\u0011N\\5uI\u0011,g-Y;mi\u0012\u0012T#A?+\u0005\tr8&A@\u0011\t\u0005\u0005\u00111B\u0007\u0003\u0003\u0007QA!!\u0002\u0002\b\u0005IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0003\u0013)\u0012AC1o]>$\u0018\r^5p]&!\u0011QBA\u0002\u0005E)hn\u00195fG.,GMV1sS\u0006t7-\u001a")
/* loaded from: input_file:spark/streaming/JobManager.class */
public class JobManager implements Logging {
    public final StreamingContext ssc;
    private final ExecutorService jobExecutor;
    private final HashMap<Time, ArrayBuffer<Job>> jobs;
    private transient Logger spark$Logging$$log_;

    /* compiled from: JobManager.scala */
    /* loaded from: input_file:spark/streaming/JobManager$JobHandler.class */
    public class JobHandler implements Runnable, ScalaObject {
        private final StreamingContext ssc;
        public final Job spark$streaming$JobManager$JobHandler$$job;
        public final JobManager $outer;

        @Override // java.lang.Runnable
        public void run() {
            SparkEnv$.MODULE$.set(this.ssc.env());
            try {
                spark$streaming$JobManager$JobHandler$$$outer().logInfo(new JobManager$JobHandler$$anonfun$run$1(this, this.spark$streaming$JobManager$JobHandler$$job.run()));
            } catch (Exception e) {
                spark$streaming$JobManager$JobHandler$$$outer().logError(new JobManager$JobHandler$$anonfun$run$2(this), e);
            }
            spark$streaming$JobManager$JobHandler$$$outer().spark$streaming$JobManager$$clearJob(this.spark$streaming$JobManager$JobHandler$$job);
        }

        public JobManager spark$streaming$JobManager$JobHandler$$$outer() {
            return this.$outer;
        }

        public JobHandler(JobManager jobManager, StreamingContext streamingContext, Job job) {
            this.ssc = streamingContext;
            this.spark$streaming$JobManager$JobHandler$$job = job;
            if (jobManager == null) {
                throw new NullPointerException();
            }
            this.$outer = jobManager;
        }
    }

    public final Logger spark$Logging$$log_() {
        return this.spark$Logging$$log_;
    }

    public final void spark$Logging$$log__$eq(Logger logger) {
        this.spark$Logging$$log_ = logger;
    }

    public Logger log() {
        return Logging.class.log(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public void initLogging() {
        Logging.class.initLogging(this);
    }

    public ExecutorService jobExecutor() {
        return this.jobExecutor;
    }

    public HashMap<Time, ArrayBuffer<Job>> jobs() {
        return this.jobs;
    }

    public void runJob(Job job) {
        Throwable jobs = jobs();
        synchronized (jobs) {
            ((ArrayBuffer) jobs().getOrElseUpdate(job.time(), new JobManager$$anonfun$runJob$1(this))).$plus$eq(job);
            jobs = jobs;
            jobExecutor().execute(new JobHandler(this, this.ssc, job));
            logInfo(new JobManager$$anonfun$runJob$2(this, job));
        }
    }

    public void stop() {
        jobExecutor().shutdown();
    }

    public final void spark$streaming$JobManager$$clearJob(Job job) {
        boolean z = false;
        Time time = job.time();
        synchronized (jobs()) {
            Option option = jobs().get(time);
            if (!option.isDefined()) {
                throw new Exception(new StringBuilder().append("Job finished for time ").append(job.time()).append(" but time does not exist in jobs").toString());
            }
            ((BufferLike) option.get()).$minus$eq(job);
            if (((IndexedSeqOptimized) option.get()).isEmpty()) {
                jobs().$minus$eq(time);
                z = true;
            }
        }
        if (z) {
            this.ssc.scheduler().clearOldMetadata(time);
        }
    }

    public Time[] getPendingTimes() {
        Throwable jobs = jobs();
        synchronized (jobs) {
            Object array = jobs().keySet().toArray(ClassManifest$.MODULE$.classType(Time.class));
            jobs = jobs;
            return (Time[]) array;
        }
    }

    public JobManager(StreamingContext streamingContext, int i) {
        this.ssc = streamingContext;
        Logging.class.$init$(this);
        initLogging();
        this.jobExecutor = Executors.newFixedThreadPool(i);
        this.jobs = new HashMap<>();
    }
}
