package org.apache.spark.sql.streaming;

import java.io.File;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.test.QueryTest;
import org.apache.spark.sql.test.SQLHelper;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Prettifier$;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import org.scalatest.concurrent.Eventually$;
import org.scalatest.concurrent.Futures$;
import org.scalatest.enablers.Retrying$;
import org.scalatest.time.SpanSugar$;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.ArrayOps;
import scala.collection.ArrayOps$;
import scala.collection.JavaConverters$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.Set;
import scala.reflect.ScalaSignature;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

/* compiled from: ClientStreamingQuerySuite.scala */
@ScalaSignature(bytes = "\u0006\u0005\t2AAA\u0002\u0001\u001d!)a\u0004\u0001C\u0001?\tI2\t\\5f]R\u001cFO]3b[&tw-U;fef\u001cV/\u001b;f\u0015\t!Q!A\u0005tiJ,\u0017-\\5oO*\u0011aaB\u0001\u0004gFd'B\u0001\u0005\n\u0003\u0015\u0019\b/\u0019:l\u0015\tQ1\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0019\u0005\u0019qN]4\u0004\u0001M!\u0001aD\u000b\u0019!\t\u00012#D\u0001\u0012\u0015\t\u0011R!\u0001\u0003uKN$\u0018B\u0001\u000b\u0012\u0005%\tV/\u001a:z)\u0016\u001cH\u000f\u0005\u0002\u0011-%\u0011q#\u0005\u0002\n'Fc\u0005*\u001a7qKJ\u0004\"!\u0007\u000f\u000e\u0003iQ!aG\u0004\u0002\u0011%tG/\u001a:oC2L!!\b\u000e\u0003\u000f1{wmZ5oO\u00061A(\u001b8jiz\"\u0012\u0001\t\t\u0003C\u0001i\u0011a\u0001")
/* loaded from: input_file:org/apache/spark/sql/streaming/ClientStreamingQuerySuite.class */
public class ClientStreamingQuerySuite extends QueryTest implements SQLHelper, Logging {
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    @Override // org.apache.spark.sql.test.SQLHelper
    public void withSQLConf(Seq<Tuple2<String, String>> seq, Function0<BoxedUnit> function0) {
        withSQLConf(seq, function0);
    }

    @Override // org.apache.spark.sql.test.SQLHelper
    public void withTempDatabase(Function1<String, BoxedUnit> function1) {
        withTempDatabase(function1);
    }

    @Override // org.apache.spark.sql.test.SQLHelper
    public void withTempPath(Function1<File, BoxedUnit> function1) {
        withTempPath(function1);
    }

    @Override // org.apache.spark.sql.test.SQLHelper
    public void withTable(Seq<String> seq, Function0<BoxedUnit> function0) {
        withTable(seq, function0);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public static final /* synthetic */ void $anonfun$new$7(ClientStreamingQuerySuite clientStreamingQuerySuite, File file) {
        StreamingQuery table = clientStreamingQuerySuite.spark().readStream().format("rate").load().writeStream().option("checkpointLocation", file.getCanonicalPath()).toTable("my_table");
        StreamingQuery start = clientStreamingQuerySuite.spark().readStream().table("my_table").writeStream().format("memory").queryName("my_sink").start();
        try {
            table.processAllAvailable();
            start.processAllAvailable();
            Eventually$.MODULE$.eventually(Futures$.MODULE$.timeout(SpanSugar$.MODULE$.convertIntToGrainOfTime(30).seconds()), () -> {
                long count = clientStreamingQuerySuite.spark().table("my_sink").count();
                return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToLong(count), ">", BoxesRunTime.boxToInteger(0), count > ((long) 0), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 142));
            }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 141));
        } finally {
            table.stop();
            start.stop();
            clientStreamingQuerySuite.spark().sql("DROP TABLE my_table");
        }
    }

    public ClientStreamingQuerySuite() {
        SQLHelper.$init$(this);
        Logging.$init$(this);
        test("Streaming API with windowed aggregate query", Nil$.MODULE$, () -> {
            this.withSQLConf(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("spark.sql.shuffle.partitions"), "1")}), () -> {
                boolean z;
                boolean z2;
                boolean z3;
                boolean z4;
                Dataset load = this.spark().readStream().format("rate").option("rowsPerSecond", "10").option("numPartitions", "1").load();
                String ddl = load.schema().toDDL();
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(ddl, "==", "timestamp TIMESTAMP,value BIGINT", ddl != null ? ddl.equals("timestamp TIMESTAMP,value BIGINT") : "timestamp TIMESTAMP,value BIGINT" == 0, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 53));
                Dataset selectExpr = load.withWatermark("timestamp", "10 seconds").groupBy(ScalaRunTime$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.window(functions$.MODULE$.col("timestamp"), "5 seconds")})).count().selectExpr(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"window.start as timestamp", "count as num_events"}));
                String ddl2 = selectExpr.schema().toDDL();
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(ddl2, "==", "timestamp TIMESTAMP,num_events BIGINT NOT NULL", ddl2 != null ? ddl2.equals("timestamp TIMESTAMP,num_events BIGINT NOT NULL") : "timestamp TIMESTAMP,num_events BIGINT NOT NULL" == 0, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 61));
                StreamingQuery start = selectExpr.writeStream().format("memory").queryName("sparkConnectStreamingQuery").trigger(Trigger.ProcessingTime("1 second")).start();
                try {
                    Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(start.isActive(), "query.isActive", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 74));
                    Eventually$.MODULE$.eventually(Futures$.MODULE$.timeout(SpanSugar$.MODULE$.convertIntToGrainOfTime(30).seconds()), () -> {
                        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(start.status().isDataAvailable(), "query.status.isDataAvailable", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 77));
                        Object refArrayOps = Predef$.MODULE$.refArrayOps(start.recentProgress());
                        return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(new ArrayOps(refArrayOps), "nonEmpty", ArrayOps$.MODULE$.nonEmpty$extension(refArrayOps), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 78));
                    }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 76));
                    StreamingQueryProgress lastProgress = start.lastProgress();
                    Bool$ bool$ = Bool$.MODULE$;
                    if (lastProgress == null) {
                        z = 0 != 0;
                    }
                    Assertions$.MODULE$.assertionsHelper().macroAssert(bool$.binaryMacroBool(lastProgress, "!=", (Object) null, z, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 82));
                    String name = lastProgress.name();
                    Bool$ bool$2 = Bool$.MODULE$;
                    if (name == null) {
                        z2 = "sparkConnectStreamingQuery" == 0;
                    }
                    Assertions$.MODULE$.assertionsHelper().macroAssert(bool$2.binaryMacroBool(name, "==", "sparkConnectStreamingQuery", z2, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 83));
                    Bool$ bool$3 = Bool$.MODULE$;
                    Map durationMs = lastProgress.durationMs();
                    Assertions$.MODULE$.assertionsHelper().macroAssert(bool$3.notBool(Bool$.MODULE$.unaryMacroBool(durationMs, "isEmpty", durationMs.isEmpty(), Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 84));
                    Bool$ bool$4 = Bool$.MODULE$;
                    Map eventTime = lastProgress.eventTime();
                    Assertions$.MODULE$.assertionsHelper().macroAssert(bool$4.notBool(Bool$.MODULE$.unaryMacroBool(eventTime, "isEmpty", eventTime.isEmpty(), Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 85));
                    Object refArrayOps = Predef$.MODULE$.refArrayOps(lastProgress.stateOperators());
                    Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(new ArrayOps(refArrayOps), "nonEmpty", ArrayOps$.MODULE$.nonEmpty$extension(refArrayOps), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 86));
                    Set set = (Set) JavaConverters$.MODULE$.asScalaSetConverter(((StateOperatorProgress) ArrayOps$.MODULE$.head$extension(Predef$.MODULE$.refArrayOps(lastProgress.stateOperators()))).customMetrics().keySet()).asScala();
                    scala.collection.immutable.Set set2 = (scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"loadedMapCacheHitCount", "loadedMapCacheMissCount", "stateOnCurrentVersionSizeBytes"}));
                    Bool$ bool$5 = Bool$.MODULE$;
                    if (set == null) {
                        z3 = set2 == null;
                    }
                    Assertions$.MODULE$.assertionsHelper().macroAssert(bool$5.binaryMacroBool(set, "==", set2, z3, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 87));
                    Object refArrayOps2 = Predef$.MODULE$.refArrayOps(lastProgress.sources());
                    Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(new ArrayOps(refArrayOps2), "nonEmpty", ArrayOps$.MODULE$.nonEmpty$extension(refArrayOps2), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 92));
                    String description = lastProgress.sink().description();
                    Bool$ bool$6 = Bool$.MODULE$;
                    if (description == null) {
                        z4 = "MemorySink" == 0;
                    }
                    Assertions$.MODULE$.assertionsHelper().macroAssert(bool$6.binaryMacroBool(description, "==", "MemorySink", z4, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 93));
                    Map observedMetrics = lastProgress.observedMetrics();
                    Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(observedMetrics, "isEmpty", observedMetrics.isEmpty(), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 94));
                    ArrayOps$.MODULE$.foreach$extension(Predef$.MODULE$.refArrayOps(start.recentProgress()), streamingQueryProgress -> {
                        UUID id = streamingQueryProgress.id();
                        UUID id2 = lastProgress.id();
                        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(id, "==", id2, id != null ? id.equals(id2) : id2 == null, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 97));
                        UUID runId = streamingQueryProgress.runId();
                        UUID runId2 = lastProgress.runId();
                        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(runId, "==", runId2, runId != null ? runId.equals(runId2) : runId2 == null, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 98));
                        String name2 = streamingQueryProgress.name();
                        String name3 = lastProgress.name();
                        return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(name2, "==", name3, name2 != null ? name2.equals(name3) : name3 == null, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 99));
                    });
                    start.explain();
                } finally {
                    start.stop();
                    Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.notBool(Bool$.MODULE$.simpleMacroBool(start.isActive(), "query.isActive", Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 111));
                    Object refArrayOps3 = Predef$.MODULE$.refArrayOps(start.recentProgress());
                    Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(new ArrayOps(refArrayOps3), "nonEmpty", ArrayOps$.MODULE$.nonEmpty$extension(refArrayOps3), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 112));
                }
            });
        }, new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 41));
        test("Streaming table API", Nil$.MODULE$, () -> {
            this.withSQLConf(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("spark.sql.shuffle.partitions"), "1")}), () -> {
                this.spark().sql("DROP TABLE IF EXISTS my_table").collect();
                this.withTempPath(file -> {
                    $anonfun$new$7(this, file);
                    return BoxedUnit.UNIT;
                });
            });
        }, new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 117));
        test("awaitTermination", Nil$.MODULE$, () -> {
            this.withSQLConf(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("spark.sql.shuffle.partitions"), "1")}), () -> {
                StreamingQuery start = this.spark().readStream().format("rate").load().writeStream().format("memory").queryName("test").start();
                long nanoTime = System.nanoTime();
                boolean awaitTermination = start.awaitTermination(500L);
                double nanoTime2 = (System.nanoTime() - nanoTime) / 1000000.0d;
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToDouble(nanoTime2), ">=", BoxesRunTime.boxToInteger(500), nanoTime2 >= ((double) 500), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 168));
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.notBool(Bool$.MODULE$.simpleMacroBool(awaitTermination, "terminated", Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 169));
                start.stop();
                Eventually$.MODULE$.eventually(Futures$.MODULE$.timeout(SpanSugar$.MODULE$.convertIntToGrainOfTime(1).minute()), () -> {
                    start.awaitTermination();
                }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 172));
            });
        }, new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 153));
        test("foreach Row", Nil$.MODULE$, () -> {
            StreamingQuery start = this.spark().readStream().format("rate").option("rowsPerSecond", "10").load().writeStream().foreach(new TestForeachWriter()).outputMode("update").start();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(start.isActive(), "query.isActive", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 191));
            Option exception = start.exception();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(exception, "isEmpty", exception.isEmpty(), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 192));
            start.stop();
        }, new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 178));
        test("foreach Int", Nil$.MODULE$, () -> {
            StreamingQuery start = this.spark().readStream().format("rate").option("rowsPerSecond", "10").load().selectExpr(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"CAST(value AS INT)"})).as(this.spark().implicits().newIntEncoder()).writeStream().foreach(new TestForeachWriter()).outputMode("update").start();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(start.isActive(), "query.isActive", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 216));
            Option exception = start.exception();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(exception, "isEmpty", exception.isEmpty(), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 217));
            start.stop();
        }, new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 197));
        test("foreach Custom class", Nil$.MODULE$, () -> {
            SparkSession spark = this.spark();
            final ClientStreamingQuerySuite clientStreamingQuerySuite = null;
            StreamingQuery start = this.spark().readStream().format("rate").option("rowsPerSecond", "10").load().selectExpr(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"CAST(value AS INT)"})).as(spark.implicits().newProductEncoder(package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(ClientStreamingQuerySuite.class.getClassLoader()), new TypeCreator(clientStreamingQuerySuite) { // from class: org.apache.spark.sql.streaming.ClientStreamingQuerySuite$$typecreator5$1
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    mirror.universe();
                    return mirror.staticClass("org.apache.spark.sql.streaming.TestClass").asType().toTypeConstructor();
                }
            }))).writeStream().foreach(new TestForeachWriter()).outputMode("update").start();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(start.isActive(), "query.isActive", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 240));
            Option exception = start.exception();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(exception, "isEmpty", exception.isEmpty(), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 241));
            start.stop();
        }, new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 222));
        test("streaming query manager", Nil$.MODULE$, () -> {
            Bool simpleMacroBool;
            Bool simpleMacroBool2;
            Bool simpleMacroBool3;
            Object refArrayOps = Predef$.MODULE$.refArrayOps(this.spark().streams().active());
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(new ArrayOps(refArrayOps), "isEmpty", ArrayOps$.MODULE$.isEmpty$extension(refArrayOps), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 247));
            StreamingQuery start = this.spark().readStream().format("rate").load().writeStream().format("console").start();
            String name = start.name();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(name, "==", (Object) null, name != null ? name.equals(null) : 0 == 0, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 255));
            StreamingQuery streamingQuery = this.spark().streams().get(start.id());
            StreamingQuery streamingQuery2 = this.spark().streams().active()[0];
            UUID id = start.id();
            UUID id2 = streamingQuery.id();
            Bool binaryMacroBool = Bool$.MODULE$.binaryMacroBool(id, "==", id2, id != null ? id.equals(id2) : id2 == null, Prettifier$.MODULE$.default());
            if (binaryMacroBool.value()) {
                UUID id3 = start.id();
                UUID id4 = streamingQuery2.id();
                simpleMacroBool = Bool$.MODULE$.binaryMacroBool(id3, "==", id4, id3 != null ? id3.equals(id4) : id4 == null, Prettifier$.MODULE$.default());
            } else {
                simpleMacroBool = Bool$.MODULE$.simpleMacroBool(false, "", Prettifier$.MODULE$.default());
            }
            Bool bool = simpleMacroBool;
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(binaryMacroBool, "&&", bool, binaryMacroBool.$amp$amp(() -> {
                return bool;
            }), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 258));
            UUID runId = start.runId();
            UUID runId2 = streamingQuery.runId();
            Bool binaryMacroBool2 = Bool$.MODULE$.binaryMacroBool(runId, "==", runId2, runId != null ? runId.equals(runId2) : runId2 == null, Prettifier$.MODULE$.default());
            if (binaryMacroBool2.value()) {
                UUID runId3 = start.runId();
                UUID runId4 = streamingQuery2.runId();
                simpleMacroBool2 = Bool$.MODULE$.binaryMacroBool(runId3, "==", runId4, runId3 != null ? runId3.equals(runId4) : runId4 == null, Prettifier$.MODULE$.default());
            } else {
                simpleMacroBool2 = Bool$.MODULE$.simpleMacroBool(false, "", Prettifier$.MODULE$.default());
            }
            Bool bool2 = simpleMacroBool2;
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(binaryMacroBool2, "&&", bool2, binaryMacroBool2.$amp$amp(() -> {
                return bool2;
            }), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 259));
            String name2 = streamingQuery.name();
            Bool binaryMacroBool3 = Bool$.MODULE$.binaryMacroBool(name2, "==", (Object) null, name2 != null ? name2.equals(null) : 0 == 0, Prettifier$.MODULE$.default());
            if (binaryMacroBool3.value()) {
                String name3 = streamingQuery2.name();
                simpleMacroBool3 = Bool$.MODULE$.binaryMacroBool(name3, "==", (Object) null, name3 != null ? name3.equals(null) : 0 == 0, Prettifier$.MODULE$.default());
            } else {
                simpleMacroBool3 = Bool$.MODULE$.simpleMacroBool(false, "", Prettifier$.MODULE$.default());
            }
            Bool bool3 = simpleMacroBool3;
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(binaryMacroBool3, "&&", bool3, binaryMacroBool3.$amp$amp(() -> {
                return bool3;
            }), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 260));
            this.spark().streams().resetTerminated();
            long nanoTime = System.nanoTime();
            boolean awaitAnyTermination = this.spark().streams().awaitAnyTermination(2600L);
            long nanoTime2 = System.nanoTime() - nanoTime;
            long nanos = TimeUnit.MILLISECONDS.toNanos(2000L);
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToLong(nanoTime2), ">=", BoxesRunTime.boxToLong(nanos), nanoTime2 >= nanos, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 267));
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.notBool(Bool$.MODULE$.simpleMacroBool(awaitAnyTermination, "terminated", Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 268));
            start.stop();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.notBool(Bool$.MODULE$.simpleMacroBool(streamingQuery.isActive(), "q1.isActive", Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 271));
            StreamingQuery streamingQuery3 = this.spark().streams().get(start.id());
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(streamingQuery3, "==", (Object) null, streamingQuery3 != null ? streamingQuery3.equals(null) : 0 == 0, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 273));
        }, new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 246));
        test("streaming query listener", Nil$.MODULE$, () -> {
            StreamingQueryListener[] listListeners = this.spark().streams().listListeners();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(listListeners, "length", BoxesRunTime.boxToInteger(listListeners.length), BoxesRunTime.boxToInteger(0), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 277));
            EventCollector eventCollector = new EventCollector();
            this.spark().streams().addListener(eventCollector);
            StreamingQuery start = this.spark().readStream().format("rate").load().writeStream().format("console").start();
            try {
                start.processAllAvailable();
                Eventually$.MODULE$.eventually(Futures$.MODULE$.timeout(SpanSugar$.MODULE$.convertIntToGrainOfTime(30).seconds()), () -> {
                    Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(start.isActive(), "q.isActive", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 292));
                    this.checkAnswer(() -> {
                        return this.spark().table("my_listener_table").toDF();
                    }, (Seq<Row>) scala.package$.MODULE$.Seq().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Row[]{Row$.MODULE$.apply(ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToInteger(2)})), Row$.MODULE$.apply(ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(4), BoxesRunTime.boxToInteger(5)}))})));
                }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 291));
                start.stop();
                this.spark().sql("DROP TABLE IF EXISTS my_listener_table");
                StreamingQueryListener[] listListeners2 = this.spark().streams().listListeners();
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(listListeners2, "length", BoxesRunTime.boxToInteger(listListeners2.length), BoxesRunTime.boxToInteger(1), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 302));
                EventCollector eventCollector2 = new EventCollector();
                this.spark().streams().addListener(eventCollector2);
                StreamingQueryListener[] listListeners3 = this.spark().streams().listListeners();
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(listListeners3, "length", BoxesRunTime.boxToInteger(listListeners3.length), BoxesRunTime.boxToInteger(2), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 307));
                this.spark().streams().removeListener(eventCollector2);
                StreamingQueryListener[] listListeners4 = this.spark().streams().listListeners();
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(listListeners4, "length", BoxesRunTime.boxToInteger(listListeners4.length), BoxesRunTime.boxToInteger(1), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 309));
                this.spark().streams().addListener(eventCollector);
                StreamingQueryListener[] listListeners5 = this.spark().streams().listListeners();
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(listListeners5, "length", BoxesRunTime.boxToInteger(listListeners5.length), BoxesRunTime.boxToInteger(2), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 314));
                this.spark().streams().removeListener(eventCollector);
                StreamingQueryListener[] listListeners6 = this.spark().streams().listListeners();
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(listListeners6, "length", BoxesRunTime.boxToInteger(listListeners6.length), BoxesRunTime.boxToInteger(1), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 316));
                this.spark().streams().removeListener(eventCollector);
                StreamingQueryListener[] listListeners7 = this.spark().streams().listListeners();
                return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(listListeners7, "length", BoxesRunTime.boxToInteger(listListeners7.length), BoxesRunTime.boxToInteger(0), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 320));
            } catch (Throwable th) {
                start.stop();
                this.spark().sql("DROP TABLE IF EXISTS my_listener_table");
                throw th;
            }
        }, new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 276));
        test("foreachBatch", Nil$.MODULE$, () -> {
            String str = "test_view";
            String sb = new StringBuilder(12).append("global_temp.").append("test_view").toString();
            this.withTable(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{sb}), () -> {
                StreamingQuery start = this.spark().readStream().format("rate").option("rowsPerSecond", "10").option("numPartitions", "1").load().writeStream().foreachBatch(new ForeachBatchFn(str)).start();
                Eventually$.MODULE$.eventually(Futures$.MODULE$.timeout(SpanSugar$.MODULE$.convertIntToGrainOfTime(30).seconds()), () -> {
                    StreamingQueryProgress lastProgress = start.lastProgress();
                    Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(lastProgress, "!=", (Object) null, lastProgress != null ? !lastProgress.equals(null) : 0 != 0, Prettifier$.MODULE$.default()), "Failed to make progress", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 341));
                    long numInputRows = start.lastProgress().numInputRows();
                    return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToLong(numInputRows), ">", BoxesRunTime.boxToInteger(0), numInputRows > ((long) 0), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 342));
                }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 340));
                Eventually$.MODULE$.eventually(Futures$.MODULE$.timeout(SpanSugar$.MODULE$.convertIntToGrainOfTime(30).seconds()), () -> {
                    Seq seq$extension = ArrayOps$.MODULE$.toSeq$extension(Predef$.MODULE$.refArrayOps((Object[]) this.spark().sql(new StringBuilder(14).append("select * from ").append(sb).toString()).collect()));
                    int size = seq$extension.size();
                    Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToInteger(size), ">", BoxesRunTime.boxToInteger(0), size > 0, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 351));
                    this.logInfo(() -> {
                        return new StringBuilder(10).append("Rows in ").append(sb).append(": ").append(seq$extension).toString();
                    });
                }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 345));
                start.stop();
            });
        }, new Position("ClientStreamingQuerySuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 323));
    }
}
