package org.apache.spark.sql.connect.service;

import java.util.UUID;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connect.service.SparkConnectStreamingQueryCache;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryManager;
import org.apache.spark.util.ManualClock;
import org.mockito.MockSettings;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.scalactic.Bool$;
import org.scalactic.Prettifier$;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import org.scalatest.concurrent.Eventually$;
import org.scalatest.concurrent.Futures$;
import org.scalatest.enablers.Retrying$;
import org.scalatest.time.Span$;
import org.scalatestplus.mockito.MockitoSugar;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.collection.immutable.Nil$;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: SparkConnectStreamingQueryCacheSuite.scala */
@ScalaSignature(bytes = "\u0006\u0005\u001d2Aa\u0001\u0003\u0001#!)a\u0004\u0001C\u0001?!)!\u0005\u0001C\u0005G\t!3\u000b]1sW\u000e{gN\\3diN#(/Z1nS:<\u0017+^3ss\u000e\u000b7\r[3Tk&$XM\u0003\u0002\u0006\r\u000591/\u001a:wS\u000e,'BA\u0004\t\u0003\u001d\u0019wN\u001c8fGRT!!\u0003\u0006\u0002\u0007M\fHN\u0003\u0002\f\u0019\u0005)1\u000f]1sW*\u0011QBD\u0001\u0007CB\f7\r[3\u000b\u0003=\t1a\u001c:h\u0007\u0001\u00192\u0001\u0001\n\u0017!\t\u0019B#D\u0001\u000b\u0013\t)\"BA\u0007Ta\u0006\u00148NR;o'VLG/\u001a\t\u0003/qi\u0011\u0001\u0007\u0006\u00033i\tq!\\8dW&$xN\u0003\u0002\u001c\u001d\u0005i1oY1mCR,7\u000f\u001e9mkNL!!\b\r\u0003\u00195{7m[5u_N+x-\u0019:\u0002\rqJg.\u001b;?)\u0005\u0001\u0003CA\u0011\u0001\u001b\u0005!\u0011\u0001F2sK\u0006$XmU3tg&|g.T1oC\u001e,'\u000fF\u0001%!\t\tS%\u0003\u0002'\t\ty2\u000b]1sW\u000e{gN\\3diN#(/Z1nS:<\u0017+^3ss\u000e\u000b7\r[3")
/* loaded from: input_file:org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCacheSuite.class */
public class SparkConnectStreamingQueryCacheSuite extends SparkFunSuite implements MockitoSugar {
    public <T> T mock(ClassTag<T> classTag) {
        return (T) MockitoSugar.mock$(this, classTag);
    }

    public <T> T mock(Answer<?> answer, ClassTag<T> classTag) {
        return (T) MockitoSugar.mock$(this, answer, classTag);
    }

    public <T> T mock(MockSettings mockSettings, ClassTag<T> classTag) {
        return (T) MockitoSugar.mock$(this, mockSettings, classTag);
    }

    public <T> T mock(String str, ClassTag<T> classTag) {
        return (T) MockitoSugar.mock$(this, str, classTag);
    }

    private SparkConnectStreamingQueryCache createSessionManager() {
        return new SparkConnectStreamingQueryCache(new ManualClock(), new package.DurationInt(package$.MODULE$.DurationInt(1)).minute(), new package.DurationInt(package$.MODULE$.DurationInt(20)).milliseconds());
    }

    public SparkConnectStreamingQueryCacheSuite() {
        MockitoSugar.$init$(this);
        test("Session cache functionality with a streaming query", Nil$.MODULE$, () -> {
            String uuid = UUID.randomUUID().toString();
            String uuid2 = UUID.randomUUID().toString();
            SparkSession sparkSession = (SparkSession) this.mock(ClassTag$.MODULE$.apply(SparkSession.class));
            StreamingQuery streamingQuery = (StreamingQuery) this.mock(ClassTag$.MODULE$.apply(StreamingQuery.class));
            StreamingQueryManager streamingQueryManager = (StreamingQueryManager) this.mock(ClassTag$.MODULE$.apply(StreamingQueryManager.class));
            SessionHolder sessionHolder = new SessionHolder("test_user_1", "test_session_1", sparkSession);
            SparkConnectStreamingQueryCache createSessionManager = this.createSessionManager();
            ManualClock clock = createSessionManager.clock();
            Mockito.when(streamingQuery.id()).thenReturn(UUID.fromString(uuid));
            Mockito.when(streamingQuery.runId()).thenReturn(UUID.fromString(uuid2));
            Mockito.when(BoxesRunTime.boxToBoolean(streamingQuery.isActive())).thenReturn(BoxesRunTime.boxToBoolean(true));
            Mockito.when(sparkSession.streams()).thenReturn(streamingQueryManager);
            Mockito.when(streamingQueryManager.get(uuid)).thenReturn(streamingQuery);
            createSessionManager.registerNewStreamingQuery(sessionHolder, streamingQuery);
            Some cachedValue = createSessionManager.getCachedValue(uuid, uuid2);
            if (cachedValue instanceof Some) {
                SparkConnectStreamingQueryCache.QueryCacheValue queryCacheValue = (SparkConnectStreamingQueryCache.QueryCacheValue) cachedValue.value();
                String sessionId = queryCacheValue.sessionId();
                String sessionId2 = sessionHolder.sessionId();
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(sessionId, "==", sessionId2, sessionId != null ? sessionId.equals(sessionId2) : sessionId2 == null, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("SparkConnectStreamingQueryCacheSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 74));
                Option expiresAtMs = queryCacheValue.expiresAtMs();
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(expiresAtMs, "isEmpty", expiresAtMs.isEmpty(), Prettifier$.MODULE$.default()), "No expiry time should be set for active query", Prettifier$.MODULE$.default(), new Position("SparkConnectStreamingQueryCacheSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 75));
            } else {
                if (!None$.MODULE$.equals(cachedValue)) {
                    throw new MatchError(cachedValue);
                }
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(false, "", Prettifier$.MODULE$.default()), "Query should be found", Prettifier$.MODULE$.default(), new Position("SparkConnectStreamingQueryCacheSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 77));
            }
            Option cachedQuery = createSessionManager.getCachedQuery(uuid, uuid2, (SparkSession) this.mock(ClassTag$.MODULE$.apply(SparkSession.class)));
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(cachedQuery, "isEmpty", cachedQuery.isEmpty(), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("SparkConnectStreamingQueryCacheSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 81));
            Option cachedQuery2 = createSessionManager.getCachedQuery(uuid, uuid2, sparkSession);
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(cachedQuery2, "contains", streamingQuery, cachedQuery2.contains(streamingQuery), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("SparkConnectStreamingQueryCacheSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 83));
            Mockito.when(BoxesRunTime.boxToBoolean(streamingQuery.isActive())).thenReturn(BoxesRunTime.boxToBoolean(false));
            long timeMillis = createSessionManager.clock().getTimeMillis() + new package.DurationInt(package$.MODULE$.DurationInt(1)).minute().toMillis();
            Eventually$.MODULE$.eventually(Futures$.MODULE$.timeout(Span$.MODULE$.convertDurationToSpan(new package.DurationInt(package$.MODULE$.DurationInt(1)).minute())), () -> {
                Option flatMap = createSessionManager.getCachedValue(uuid, uuid2).flatMap(queryCacheValue2 -> {
                    return queryCacheValue2.expiresAtMs();
                });
                return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(flatMap, "contains", BoxesRunTime.boxToLong(timeMillis), flatMap.contains(BoxesRunTime.boxToLong(timeMillis)), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("SparkConnectStreamingQueryCacheSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 93));
            }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("SparkConnectStreamingQueryCacheSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 91));
            long unboxToLong = BoxesRunTime.unboxToLong(((SparkConnectStreamingQueryCache.QueryCacheValue) createSessionManager.getCachedValue(uuid, uuid2).get()).expiresAtMs().get());
            clock.advance(new package.DurationInt(package$.MODULE$.DurationInt(30)).seconds().toMillis());
            Option cachedQuery3 = createSessionManager.getCachedQuery(uuid, uuid2, sparkSession);
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(cachedQuery3, "contains", streamingQuery, cachedQuery3.contains(streamingQuery), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("SparkConnectStreamingQueryCacheSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 102));
            long unboxToLong2 = BoxesRunTime.unboxToLong(((SparkConnectStreamingQueryCache.QueryCacheValue) createSessionManager.getCachedValue(uuid, uuid2).get()).expiresAtMs().get());
            long millis = unboxToLong + new package.DurationInt(package$.MODULE$.DurationInt(30)).seconds().toMillis();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToLong(unboxToLong2), "==", BoxesRunTime.boxToLong(millis), unboxToLong2 == millis, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("SparkConnectStreamingQueryCacheSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 104));
            String uuid3 = UUID.randomUUID().toString();
            StreamingQuery streamingQuery2 = (StreamingQuery) this.mock(ClassTag$.MODULE$.apply(StreamingQuery.class));
            Mockito.when(streamingQuery2.id()).thenReturn(UUID.fromString(uuid));
            Mockito.when(streamingQuery2.runId()).thenReturn(UUID.fromString(uuid3));
            Mockito.when(BoxesRunTime.boxToBoolean(streamingQuery2.isActive())).thenReturn(BoxesRunTime.boxToBoolean(true));
            Mockito.when(streamingQueryManager.get(uuid)).thenReturn(streamingQuery2);
            createSessionManager.registerNewStreamingQuery(sessionHolder, streamingQuery2);
            Option map = createSessionManager.getCachedValue(uuid, uuid2).map(queryCacheValue2 -> {
                return queryCacheValue2.query();
            });
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(map, "contains", streamingQuery, map.contains(streamingQuery), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("SparkConnectStreamingQueryCacheSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 118));
            Option map2 = createSessionManager.getCachedValue(uuid, uuid3).map(queryCacheValue3 -> {
                return queryCacheValue3.query();
            });
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(map2, "contains", streamingQuery2, map2.contains(streamingQuery2), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("SparkConnectStreamingQueryCacheSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 119));
            clock.advance(new package.DurationInt(package$.MODULE$.DurationInt(1)).minute().toMillis());
            Eventually$.MODULE$.eventually(Futures$.MODULE$.timeout(Span$.MODULE$.convertDurationToSpan(new package.DurationInt(package$.MODULE$.DurationInt(1)).minute())), () -> {
                Option cachedValue2 = createSessionManager.getCachedValue(uuid, uuid2);
                return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(cachedValue2, "isEmpty", cachedValue2.isEmpty(), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("SparkConnectStreamingQueryCacheSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 125));
            }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("SparkConnectStreamingQueryCacheSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 124));
            Mockito.when(BoxesRunTime.boxToBoolean(streamingQuery2.isActive())).thenReturn(BoxesRunTime.boxToBoolean(false));
            Eventually$.MODULE$.eventually(Futures$.MODULE$.timeout(Span$.MODULE$.convertDurationToSpan(new package.DurationInt(package$.MODULE$.DurationInt(1)).minute())), () -> {
                Option flatMap = createSessionManager.getCachedValue(uuid, uuid3).flatMap(queryCacheValue4 -> {
                    return queryCacheValue4.expiresAtMs();
                });
                return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(flatMap, "nonEmpty", flatMap.nonEmpty(), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("SparkConnectStreamingQueryCacheSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 131));
            }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("SparkConnectStreamingQueryCacheSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 130));
            clock.advance(new package.DurationInt(package$.MODULE$.DurationInt(1)).minute().toMillis());
            Eventually$.MODULE$.eventually(Futures$.MODULE$.timeout(Span$.MODULE$.convertDurationToSpan(new package.DurationInt(package$.MODULE$.DurationInt(1)).minute())), () -> {
                Option cachedValue2 = createSessionManager.getCachedValue(uuid, uuid3);
                return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(cachedValue2, "isEmpty", cachedValue2.isEmpty(), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("SparkConnectStreamingQueryCacheSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 137));
            }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("SparkConnectStreamingQueryCacheSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 136));
            createSessionManager.shutdown();
        }, new Position("SparkConnectStreamingQueryCacheSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 46));
    }
}
