package org.apache.spark.streaming.scheduler;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Milliseconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.TestServer;
import org.apache.spark.streaming.TestSuiteBase;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.dstream.ReceiverInputDStream;
import org.scalactic.Bool$;
import org.scalactic.Equality$;
import org.scalactic.Prettifier$;
import org.scalactic.TripleEqualsSupport;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import org.scalatest.BeforeAndAfterEach;
import org.scalatest.compatible.Assertion;
import org.scalatest.concurrent.Eventually$;
import org.scalatest.concurrent.PatienceConfiguration;
import org.scalatest.enablers.Retrying$;
import org.scalatest.time.SpanSugar$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.Some;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.package$;
import scala.reflect.ClassTag;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.runtime.Statics;

/* compiled from: RateControllerSuite.scala */
@ScalaSignature(bytes = "\u0006\u0005)2A\u0001B\u0003\u0001!!)\u0011\u0004\u0001C\u00015!)Q\u0004\u0001C!=!)Q\u0005\u0001C!M\t\u0019\"+\u0019;f\u0007>tGO]8mY\u0016\u00148+^5uK*\u0011aaB\u0001\ng\u000eDW\rZ;mKJT!\u0001C\u0005\u0002\u0013M$(/Z1nS:<'B\u0001\u0006\f\u0003\u0015\u0019\b/\u0019:l\u0015\taQ\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u001d\u0005\u0019qN]4\u0004\u0001M\u0019\u0001!E\u000b\u0011\u0005I\u0019R\"A\u0005\n\u0005QI!!D*qCJ\\g)\u001e8Tk&$X\r\u0005\u0002\u0017/5\tq!\u0003\u0002\u0019\u000f\tiA+Z:u'VLG/\u001a\"bg\u0016\fa\u0001P5oSRtD#A\u000e\u0011\u0005q\u0001Q\"A\u0003\u0002\u001dU\u001cX-T1ok\u0006d7\t\\8dWV\tq\u0004\u0005\u0002!G5\t\u0011EC\u0001#\u0003\u0015\u00198-\u00197b\u0013\t!\u0013EA\u0004C_>dW-\u00198\u0002\u001b\t\fGo\u00195EkJ\fG/[8o+\u00059\u0003C\u0001\f)\u0013\tIsA\u0001\u0005EkJ\fG/[8o\u0001")
/* loaded from: input_file:org/apache/spark/streaming/scheduler/RateControllerSuite.class */
public class RateControllerSuite extends SparkFunSuite implements TestSuiteBase {
    private String checkpointDir;
    private SparkConf conf;
    private PatienceConfiguration.Timeout eventuallyTimeout;
    private volatile boolean bitmap$0;

    @Override // org.apache.spark.streaming.TestSuiteBase
    public /* synthetic */ void org$apache$spark$streaming$TestSuiteBase$$super$beforeEach() {
        BeforeAndAfterEach.beforeEach$(this);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public /* synthetic */ void org$apache$spark$streaming$TestSuiteBase$$super$afterEach() {
        BeforeAndAfterEach.afterEach$(this);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public String framework() {
        String framework;
        framework = framework();
        return framework;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public String master() {
        String master;
        master = master();
        return master;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public int numInputPartitions() {
        int numInputPartitions;
        numInputPartitions = numInputPartitions();
        return numInputPartitions;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public int maxWaitTimeMillis() {
        int maxWaitTimeMillis;
        maxWaitTimeMillis = maxWaitTimeMillis();
        return maxWaitTimeMillis;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public boolean actuallyWait() {
        boolean actuallyWait;
        actuallyWait = actuallyWait();
        return actuallyWait;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void beforeFunction() {
        beforeFunction();
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void afterFunction() {
        afterFunction();
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void beforeEach() {
        beforeEach();
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void afterEach() {
        afterEach();
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <R> R withStreamingContext(StreamingContext streamingContext, Function1<StreamingContext, R> function1) {
        Object withStreamingContext;
        withStreamingContext = withStreamingContext(streamingContext, function1);
        return (R) withStreamingContext;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <R> R withTestServer(TestServer testServer, Function1<TestServer, R> function1) {
        Object withTestServer;
        withTestServer = withTestServer(testServer, function1);
        return (R) withTestServer;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V> StreamingContext setupStreams(Seq<Seq<U>> seq, Function1<DStream<U>, DStream<V>> function1, int i, ClassTag<U> classTag, ClassTag<V> classTag2) {
        StreamingContext streamingContext;
        streamingContext = setupStreams(seq, function1, i, classTag, classTag2);
        return streamingContext;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V> int setupStreams$default$3() {
        int i;
        i = setupStreams$default$3();
        return i;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V, W> StreamingContext setupStreams(Seq<Seq<U>> seq, Seq<Seq<V>> seq2, Function2<DStream<U>, DStream<V>, DStream<W>> function2, ClassTag<U> classTag, ClassTag<V> classTag2, ClassTag<W> classTag3) {
        StreamingContext streamingContext;
        streamingContext = setupStreams(seq, seq2, function2, classTag, classTag2, classTag3);
        return streamingContext;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <V> Seq<Seq<V>> runStreams(StreamingContext streamingContext, int i, int i2, Function0<BoxedUnit> function0, ClassTag<V> classTag) {
        Seq<Seq<V>> runStreams;
        runStreams = runStreams(streamingContext, i, i2, function0, classTag);
        return runStreams;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <V> Function0<BoxedUnit> runStreams$default$4() {
        Function0<BoxedUnit> runStreams$default$4;
        runStreams$default$4 = runStreams$default$4();
        return runStreams$default$4;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <V> Seq<Seq<Seq<V>>> runStreamsWithPartitions(StreamingContext streamingContext, int i, int i2, Function0<BoxedUnit> function0, ClassTag<V> classTag) {
        Seq<Seq<Seq<V>>> runStreamsWithPartitions;
        runStreamsWithPartitions = runStreamsWithPartitions(streamingContext, i, i2, function0, classTag);
        return runStreamsWithPartitions;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <V> Function0<BoxedUnit> runStreamsWithPartitions$default$4() {
        Function0<BoxedUnit> runStreamsWithPartitions$default$4;
        runStreamsWithPartitions$default$4 = runStreamsWithPartitions$default$4();
        return runStreamsWithPartitions$default$4;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <V> void verifyOutput(Seq<Seq<V>> seq, Seq<Seq<V>> seq2, boolean z, ClassTag<V> classTag) {
        verifyOutput(seq, seq2, z, classTag);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V> void testOperation(Seq<Seq<U>> seq, Function1<DStream<U>, DStream<V>> function1, Seq<Seq<V>> seq2, boolean z, ClassTag<U> classTag, ClassTag<V> classTag2) {
        testOperation(seq, function1, seq2, z, classTag, classTag2);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V> boolean testOperation$default$4() {
        boolean testOperation$default$4;
        testOperation$default$4 = testOperation$default$4();
        return testOperation$default$4;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V> void testOperation(Seq<Seq<U>> seq, Function1<DStream<U>, DStream<V>> function1, Seq<Seq<V>> seq2, int i, boolean z, ClassTag<U> classTag, ClassTag<V> classTag2) {
        testOperation(seq, function1, seq2, i, z, classTag, classTag2);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V, W> void testOperation(Seq<Seq<U>> seq, Seq<Seq<V>> seq2, Function2<DStream<U>, DStream<V>, DStream<W>> function2, Seq<Seq<W>> seq3, boolean z, ClassTag<U> classTag, ClassTag<V> classTag2, ClassTag<W> classTag3) {
        testOperation(seq, seq2, function2, seq3, z, classTag, classTag2, classTag3);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V, W> void testOperation(Seq<Seq<U>> seq, Seq<Seq<V>> seq2, Function2<DStream<U>, DStream<V>, DStream<W>> function2, Seq<Seq<W>> seq3, int i, boolean z, ClassTag<U> classTag, ClassTag<V> classTag2, ClassTag<W> classTag3) {
        testOperation(seq, seq2, function2, seq3, i, z, classTag, classTag2, classTag3);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [org.apache.spark.streaming.scheduler.RateControllerSuite] */
    private String checkpointDir$lzycompute() {
        String checkpointDir;
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                checkpointDir = checkpointDir();
                this.checkpointDir = checkpointDir;
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        return this.checkpointDir;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public String checkpointDir() {
        return !this.bitmap$0 ? checkpointDir$lzycompute() : this.checkpointDir;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public SparkConf conf() {
        return this.conf;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public PatienceConfiguration.Timeout eventuallyTimeout() {
        return this.eventuallyTimeout;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void org$apache$spark$streaming$TestSuiteBase$_setter_$conf_$eq(SparkConf sparkConf) {
        this.conf = sparkConf;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void org$apache$spark$streaming$TestSuiteBase$_setter_$eventuallyTimeout_$eq(PatienceConfiguration.Timeout timeout) {
        this.eventuallyTimeout = timeout;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public boolean useManualClock() {
        return false;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public Duration batchDuration() {
        return Milliseconds$.MODULE$.apply(50L);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void updateRateAndVerify$1(long j, ConstantEstimator constantEstimator) {
        constantEstimator.updateRate(j);
        Eventually$.MODULE$.eventually(Eventually$.MODULE$.timeout(SpanSugar$.MODULE$.convertIntToGrainOfTime(5).seconds()), () -> {
            TripleEqualsSupport.Equalizer convertToEqualizer = this.convertToEqualizer(BoxesRunTime.boxToLong(((RateTestReceiver) RateTestReceiver$.MODULE$.getActive().get()).getDefaultBlockGeneratorRateLimit()));
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", BoxesRunTime.boxToLong(j), convertToEqualizer.$eq$eq$eq(BoxesRunTime.boxToLong(j), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("RateControllerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 63));
        }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("RateControllerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 62));
    }

    public static final /* synthetic */ void $anonfun$new$5(RateControllerSuite rateControllerSuite, final StreamingContext streamingContext) {
        final ConstantEstimator constantEstimator = new ConstantEstimator(100L);
        final RateControllerSuite rateControllerSuite2 = null;
        new RateTestInputDStream(rateControllerSuite2, streamingContext, constantEstimator) { // from class: org.apache.spark.streaming.scheduler.RateControllerSuite$$anon$1
            private final Some<ReceiverInputDStream<Object>.ReceiverRateController> rateController;

            @Override // org.apache.spark.streaming.scheduler.RateTestInputDStream
            /* renamed from: rateController, reason: merged with bridge method [inline-methods] */
            public Some<ReceiverInputDStream<Object>.ReceiverRateController> mo2rateController() {
                return this.rateController;
            }

            {
                this.rateController = new Some<>(new ReceiverInputDStream.ReceiverRateController(this, id(), constantEstimator));
            }
        }.register();
        streamingContext.start();
        Eventually$.MODULE$.eventually(Eventually$.MODULE$.timeout(SpanSugar$.MODULE$.convertIntToGrainOfTime(5).seconds()), () -> {
            return RateTestReceiver$.MODULE$.getActive().nonEmpty();
        }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("RateControllerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 55));
        package$.MODULE$.Seq().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{100, 200, 300})).foreach(i -> {
            rateControllerSuite.updateRateAndVerify$1(i, constantEstimator);
        });
    }

    public RateControllerSuite() {
        TestSuiteBase.$init$(this);
        test("RateController - rate controller publishes updates after batches complete", Nil$.MODULE$, () -> {
            return (Assertion) this.withStreamingContext(new StreamingContext(this.conf(), this.batchDuration()), streamingContext -> {
                RateTestInputDStream rateTestInputDStream = new RateTestInputDStream(streamingContext);
                rateTestInputDStream.register();
                streamingContext.start();
                return (Assertion) Eventually$.MODULE$.eventually(Eventually$.MODULE$.timeout(SpanSugar$.MODULE$.convertIntToGrainOfTime(10).seconds()), () -> {
                    int publishedRates = rateTestInputDStream.publishedRates();
                    return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToInteger(publishedRates), ">", BoxesRunTime.boxToInteger(0), publishedRates > 0, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("RateControllerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 39));
                }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("RateControllerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 38));
            });
        }, new Position("RateControllerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 32));
        test("ReceiverRateController - published rates reach receivers", Nil$.MODULE$, () -> {
            this.withStreamingContext(new StreamingContext(this.conf(), this.batchDuration()), streamingContext -> {
                $anonfun$new$5(this, streamingContext);
                return BoxedUnit.UNIT;
            });
        }, new Position("RateControllerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 44));
        Statics.releaseFence();
    }
}
