package org.apache.spark.streaming.scheduler;

import java.io.File;
import java.util.concurrent.CountDownLatch;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.BatchCounter;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.TestReceiver;
import org.apache.spark.streaming.TestServer;
import org.apache.spark.streaming.TestSuiteBase;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.dstream.ReceiverInputDStream;
import org.apache.spark.util.ManualClock;
import org.apache.spark.util.Utils$;
import org.scalactic.Bool$;
import org.scalactic.Equality$;
import org.scalactic.Prettifier$;
import org.scalactic.TripleEqualsSupport;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import org.scalatest.BeforeAndAfterEach;
import org.scalatest.concurrent.Eventually$;
import org.scalatest.concurrent.PatienceConfiguration;
import org.scalatest.enablers.Retrying$;
import org.scalatest.time.Span$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.Predef$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.Statics;

/* compiled from: JobGeneratorSuite.scala */
@ScalaSignature(bytes = "\u0006\u0005\u00193A\u0001C\u0005\u0001)!)Q\u0004\u0001C\u0001=\u001d)\u0011%\u0003E\u0001E\u0019)\u0001\"\u0003E\u0001G!)Qd\u0001C\u0001e!91g\u0001b\u0001\n\u0003!\u0004BB\u001f\u0004A\u0003%Q\u0007C\u0004?\u0007\u0005\u0005I\u0011B \u0003#){'mR3oKJ\fGo\u001c:Tk&$XM\u0003\u0002\u000b\u0017\u0005I1o\u00195fIVdWM\u001d\u0006\u0003\u00195\t\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u00059y\u0011!B:qCJ\\'B\u0001\t\u0012\u0003\u0019\t\u0007/Y2iK*\t!#A\u0002pe\u001e\u001c\u0001aE\u0002\u0001+e\u0001\"AF\f\u000e\u00035I!\u0001G\u0007\u0003\u001bM\u0003\u0018M]6Gk:\u001cV/\u001b;f!\tQ2$D\u0001\f\u0013\ta2BA\u0007UKN$8+^5uK\n\u000b7/Z\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003}\u0001\"\u0001\t\u0001\u000e\u0003%\t\u0011CS8c\u000f\u0016tWM]1u_J\u001cV/\u001b;f!\t\u00013aE\u0002\u0004I)\u0002\"!\n\u0015\u000e\u0003\u0019R\u0011aJ\u0001\u0006g\u000e\fG.Y\u0005\u0003S\u0019\u0012a!\u00118z%\u00164\u0007CA\u00161\u001b\u0005a#BA\u0017/\u0003\tIwNC\u00010\u0003\u0011Q\u0017M^1\n\u0005Eb#\u0001D*fe&\fG.\u001b>bE2,G#\u0001\u0012\u0002\u0013]\f\u0017\u000e\u001e'bi\u000eDW#A\u001b\u0011\u0005YZT\"A\u001c\u000b\u0005aJ\u0014AC2p]\u000e,(O]3oi*\u0011!HL\u0001\u0005kRLG.\u0003\u0002=o\tq1i\\;oi\u0012{wO\u001c'bi\u000eD\u0017AC<bSRd\u0015\r^2iA\u0005aqO]5uKJ+\u0007\u000f\\1dKR\t\u0001\t\u0005\u0002B\t6\t!I\u0003\u0002D]\u0005!A.\u00198h\u0013\t)%I\u0001\u0004PE*,7\r\u001e")
/* loaded from: input_file:org/apache/spark/streaming/scheduler/JobGeneratorSuite.class */
public class JobGeneratorSuite extends SparkFunSuite implements TestSuiteBase {
    private String checkpointDir;
    private SparkConf conf;
    private PatienceConfiguration.Timeout eventuallyTimeout;
    private volatile boolean bitmap$0;

    public static CountDownLatch waitLatch() {
        return JobGeneratorSuite$.MODULE$.waitLatch();
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public /* synthetic */ void org$apache$spark$streaming$TestSuiteBase$$super$beforeEach() {
        BeforeAndAfterEach.beforeEach$(this);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public /* synthetic */ void org$apache$spark$streaming$TestSuiteBase$$super$afterEach() {
        BeforeAndAfterEach.afterEach$(this);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public String framework() {
        String framework;
        framework = framework();
        return framework;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public String master() {
        String master;
        master = master();
        return master;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public Duration batchDuration() {
        Duration batchDuration;
        batchDuration = batchDuration();
        return batchDuration;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public int numInputPartitions() {
        int numInputPartitions;
        numInputPartitions = numInputPartitions();
        return numInputPartitions;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public int maxWaitTimeMillis() {
        int maxWaitTimeMillis;
        maxWaitTimeMillis = maxWaitTimeMillis();
        return maxWaitTimeMillis;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public boolean useManualClock() {
        boolean useManualClock;
        useManualClock = useManualClock();
        return useManualClock;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public boolean actuallyWait() {
        boolean actuallyWait;
        actuallyWait = actuallyWait();
        return actuallyWait;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void beforeFunction() {
        beforeFunction();
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void afterFunction() {
        afterFunction();
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void beforeEach() {
        beforeEach();
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void afterEach() {
        afterEach();
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <R> R withStreamingContext(StreamingContext streamingContext, Function1<StreamingContext, R> function1) {
        Object withStreamingContext;
        withStreamingContext = withStreamingContext(streamingContext, function1);
        return (R) withStreamingContext;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <R> R withTestServer(TestServer testServer, Function1<TestServer, R> function1) {
        Object withTestServer;
        withTestServer = withTestServer(testServer, function1);
        return (R) withTestServer;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V> StreamingContext setupStreams(Seq<Seq<U>> seq, Function1<DStream<U>, DStream<V>> function1, int i, ClassTag<U> classTag, ClassTag<V> classTag2) {
        StreamingContext streamingContext;
        streamingContext = setupStreams(seq, function1, i, classTag, classTag2);
        return streamingContext;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V> int setupStreams$default$3() {
        int i;
        i = setupStreams$default$3();
        return i;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V, W> StreamingContext setupStreams(Seq<Seq<U>> seq, Seq<Seq<V>> seq2, Function2<DStream<U>, DStream<V>, DStream<W>> function2, ClassTag<U> classTag, ClassTag<V> classTag2, ClassTag<W> classTag3) {
        StreamingContext streamingContext;
        streamingContext = setupStreams(seq, seq2, function2, classTag, classTag2, classTag3);
        return streamingContext;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <V> Seq<Seq<V>> runStreams(StreamingContext streamingContext, int i, int i2, Function0<BoxedUnit> function0, ClassTag<V> classTag) {
        Seq<Seq<V>> runStreams;
        runStreams = runStreams(streamingContext, i, i2, function0, classTag);
        return runStreams;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <V> Function0<BoxedUnit> runStreams$default$4() {
        Function0<BoxedUnit> runStreams$default$4;
        runStreams$default$4 = runStreams$default$4();
        return runStreams$default$4;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <V> Seq<Seq<Seq<V>>> runStreamsWithPartitions(StreamingContext streamingContext, int i, int i2, Function0<BoxedUnit> function0, ClassTag<V> classTag) {
        Seq<Seq<Seq<V>>> runStreamsWithPartitions;
        runStreamsWithPartitions = runStreamsWithPartitions(streamingContext, i, i2, function0, classTag);
        return runStreamsWithPartitions;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <V> Function0<BoxedUnit> runStreamsWithPartitions$default$4() {
        Function0<BoxedUnit> runStreamsWithPartitions$default$4;
        runStreamsWithPartitions$default$4 = runStreamsWithPartitions$default$4();
        return runStreamsWithPartitions$default$4;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <V> void verifyOutput(Seq<Seq<V>> seq, Seq<Seq<V>> seq2, boolean z, ClassTag<V> classTag) {
        verifyOutput(seq, seq2, z, classTag);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V> void testOperation(Seq<Seq<U>> seq, Function1<DStream<U>, DStream<V>> function1, Seq<Seq<V>> seq2, boolean z, ClassTag<U> classTag, ClassTag<V> classTag2) {
        testOperation(seq, function1, seq2, z, classTag, classTag2);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V> boolean testOperation$default$4() {
        boolean testOperation$default$4;
        testOperation$default$4 = testOperation$default$4();
        return testOperation$default$4;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V> void testOperation(Seq<Seq<U>> seq, Function1<DStream<U>, DStream<V>> function1, Seq<Seq<V>> seq2, int i, boolean z, ClassTag<U> classTag, ClassTag<V> classTag2) {
        testOperation(seq, function1, seq2, i, z, classTag, classTag2);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V, W> void testOperation(Seq<Seq<U>> seq, Seq<Seq<V>> seq2, Function2<DStream<U>, DStream<V>, DStream<W>> function2, Seq<Seq<W>> seq3, boolean z, ClassTag<U> classTag, ClassTag<V> classTag2, ClassTag<W> classTag3) {
        testOperation(seq, seq2, function2, seq3, z, classTag, classTag2, classTag3);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V, W> void testOperation(Seq<Seq<U>> seq, Seq<Seq<V>> seq2, Function2<DStream<U>, DStream<V>, DStream<W>> function2, Seq<Seq<W>> seq3, int i, boolean z, ClassTag<U> classTag, ClassTag<V> classTag2, ClassTag<W> classTag3) {
        testOperation(seq, seq2, function2, seq3, i, z, classTag, classTag2, classTag3);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [org.apache.spark.streaming.scheduler.JobGeneratorSuite] */
    private String checkpointDir$lzycompute() {
        String checkpointDir;
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                checkpointDir = checkpointDir();
                this.checkpointDir = checkpointDir;
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        return this.checkpointDir;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public String checkpointDir() {
        return !this.bitmap$0 ? checkpointDir$lzycompute() : this.checkpointDir;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public SparkConf conf() {
        return this.conf;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public PatienceConfiguration.Timeout eventuallyTimeout() {
        return this.eventuallyTimeout;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void org$apache$spark$streaming$TestSuiteBase$_setter_$conf_$eq(SparkConf sparkConf) {
        this.conf = sparkConf;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void org$apache$spark$streaming$TestSuiteBase$_setter_$eventuallyTimeout_$eq(PatienceConfiguration.Timeout timeout) {
        this.eventuallyTimeout = timeout;
    }

    public static final /* synthetic */ void $anonfun$new$3(long j, RDD rdd, Time time) {
        if (time.milliseconds() == j) {
            while (JobGeneratorSuite$.MODULE$.waitLatch().getCount() > 0) {
                JobGeneratorSuite$.MODULE$.waitLatch().await();
            }
        }
    }

    private static final Seq getBlocksOfBatch$1(long j, ReceiverTracker receiverTracker, ReceiverInputDStream receiverInputDStream) {
        return receiverTracker.getBlocksOfBatchAndStream(new Time(j), receiverInputDStream.id());
    }

    private static final void waitForNewReceivedBlocks$1(PatienceConfiguration.Timeout timeout, ReceiverTracker receiverTracker) {
        Eventually$.MODULE$.eventually(timeout, () -> {
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(receiverTracker.hasUnallocatedBlocks(), "receiverTracker.hasUnallocatedBlocks", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("JobGeneratorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 98));
        }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("JobGeneratorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 97));
    }

    private static final void waitForBlocksToBeAllocatedToBatch$1(long j, PatienceConfiguration.Timeout timeout, ReceiverTracker receiverTracker, ReceiverInputDStream receiverInputDStream) {
        Eventually$.MODULE$.eventually(timeout, () -> {
            Seq blocksOfBatch$1 = getBlocksOfBatch$1(j, receiverTracker, receiverInputDStream);
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(blocksOfBatch$1, "nonEmpty", blocksOfBatch$1.nonEmpty(), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("JobGeneratorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 105));
        }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("JobGeneratorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 104));
    }

    public static final /* synthetic */ void $anonfun$new$2(JobGeneratorSuite jobGeneratorSuite, File file, StreamingContext streamingContext) {
        ManualClock clock = streamingContext.scheduler().clock();
        int i = 10;
        long milliseconds = 3 * jobGeneratorSuite.batchDuration().milliseconds();
        PatienceConfiguration.Timeout timeout = Eventually$.MODULE$.timeout(Span$.MODULE$.convertDurationToSpan(new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds()));
        ReceiverInputDStream receiverStream = streamingContext.receiverStream(new TestReceiver(), ClassTag$.MODULE$.Int());
        receiverStream.foreachRDD((rdd, time) -> {
            $anonfun$new$3(milliseconds, rdd, time);
            return BoxedUnit.UNIT;
        });
        BatchCounter batchCounter = new BatchCounter(streamingContext);
        streamingContext.checkpoint(file.getAbsolutePath());
        streamingContext.start();
        TripleEqualsSupport.Equalizer convertToEqualizer = jobGeneratorSuite.convertToEqualizer(receiverStream.rememberDuration());
        Duration batchDuration = jobGeneratorSuite.batchDuration();
        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", batchDuration, convertToEqualizer.$eq$eq$eq(batchDuration, Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("JobGeneratorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 87));
        ReceiverTracker receiverTracker = streamingContext.scheduler().receiverTracker();
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 10).foreach$mVc$sp(i2 -> {
            waitForNewReceivedBlocks$1(timeout, receiverTracker);
            clock.advance(jobGeneratorSuite.batchDuration().milliseconds());
            waitForBlocksToBeAllocatedToBatch$1(clock.getTimeMillis(), timeout, receiverTracker, receiverStream);
        });
        Eventually$.MODULE$.eventually(timeout, () -> {
            return streamingContext.scheduler().getPendingTimes().contains(new Time(i * jobGeneratorSuite.batchDuration().milliseconds()));
        }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("JobGeneratorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 117));
        Seq blocksOfBatch$1 = getBlocksOfBatch$1(milliseconds, receiverTracker, receiverStream);
        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(blocksOfBatch$1, "nonEmpty", blocksOfBatch$1.nonEmpty(), Prettifier$.MODULE$.default()), "blocks of incomplete batch already deleted", Prettifier$.MODULE$.default(), new Position("JobGeneratorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 122));
        int numCompletedBatches = batchCounter.getNumCompletedBatches();
        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToInteger(numCompletedBatches), "<", BoxesRunTime.boxToInteger(3), numCompletedBatches < 3, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("JobGeneratorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 123));
        JobGeneratorSuite$.MODULE$.waitLatch().countDown();
        streamingContext.stop(streamingContext.stop$default$1());
    }

    public JobGeneratorSuite() {
        TestSuiteBase.$init$(this);
        test("SPARK-6222: Do not clear received block data too soon", Nil$.MODULE$, () -> {
            File createTempDir = Utils$.MODULE$.createTempDir();
            SparkConf conf = this.conf();
            conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
            conf.set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1");
            this.withStreamingContext(new StreamingContext(conf, this.batchDuration()), streamingContext -> {
                $anonfun$new$2(this, createTempDir, streamingContext);
                return BoxedUnit.UNIT;
            });
        }, new Position("JobGeneratorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 58));
        Statics.releaseFence();
    }
}
