package org.apache.spark.sql.connect.planner;

import java.util.HashMap;
import org.apache.spark.connect.proto.ExecutePlanResponse;
import org.apache.spark.connect.proto.StreamingQueryListenerBusCommand;
import org.apache.spark.connect.proto.StreamingQueryListenerEventsResult;
import org.apache.spark.internal.LogEntry;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.connect.service.ExecuteHolder;
import org.apache.spark.sql.connect.service.ServerSideListenerHolder;
import org.apache.spark.sql.connect.service.SessionHolder;
import org.slf4j.Logger;
import org.sparkproject.connect.grpc.stub.StreamObserver;
import scala.Function0;
import scala.MatchError;
import scala.StringContext;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.control.NonFatal$;

/* compiled from: SparkConnectStreamingQueryListenerHandler.scala */
@ScalaSignature(bytes = "\u0006\u0005\u00014A\u0001C\u0005\u0001-!A1\u0005\u0001B\u0001B\u0003%A\u0005C\u0003+\u0001\u0011\u00051\u0006C\u00040\u0001\t\u0007I\u0011\u0001\u0019\t\rQ\u0002\u0001\u0015!\u00032\u0011\u0019)\u0004\u0001\"\u0001\fm!1!\t\u0001C\u0001\u0017YBQa\u0011\u0001\u0005\u0002\u0011\u0013\u0011f\u00159be.\u001cuN\u001c8fGR\u001cFO]3b[&tw-U;fefd\u0015n\u001d;f]\u0016\u0014\b*\u00198eY\u0016\u0014(B\u0001\u0006\f\u0003\u001d\u0001H.\u00198oKJT!\u0001D\u0007\u0002\u000f\r|gN\\3di*\u0011abD\u0001\u0004gFd'B\u0001\t\u0012\u0003\u0015\u0019\b/\u0019:l\u0015\t\u00112#\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002)\u0005\u0019qN]4\u0004\u0001M\u0019\u0001aF\u000f\u0011\u0005aYR\"A\r\u000b\u0003i\tQa]2bY\u0006L!\u0001H\r\u0003\r\u0005s\u0017PU3g!\tq\u0012%D\u0001 \u0015\t\u0001s\"\u0001\u0005j]R,'O\\1m\u0013\t\u0011sDA\u0004M_\u001e<\u0017N\\4\u0002\u001b\u0015DXmY;uK\"{G\u000eZ3s!\t)\u0003&D\u0001'\u0015\t93\"A\u0004tKJ4\u0018nY3\n\u0005%2#!D#yK\u000e,H/\u001a%pY\u0012,'/\u0001\u0004=S:LGO\u0010\u000b\u0003Y9\u0002\"!\f\u0001\u000e\u0003%AQa\t\u0002A\u0002\u0011\nQb]3tg&|g\u000eS8mI\u0016\u0014X#A\u0019\u0011\u0005\u0015\u0012\u0014BA\u001a'\u00055\u0019Vm]:j_:Du\u000e\u001c3fe\u0006q1/Z:tS>t\u0007j\u001c7eKJ\u0004\u0013AB;tKJLE-F\u00018!\tAtH\u0004\u0002:{A\u0011!(G\u0007\u0002w)\u0011A(F\u0001\u0007yI|w\u000e\u001e \n\u0005yJ\u0012A\u0002)sK\u0012,g-\u0003\u0002A\u0003\n11\u000b\u001e:j]\u001eT!AP\r\u0002\u0013M,7o]5p]&#\u0017!\u00065b]\u0012dW\rT5ti\u0016tWM]\"p[6\fg\u000e\u001a\u000b\u0004\u000b\"\u000b\u0006C\u0001\rG\u0013\t9\u0015D\u0001\u0003V]&$\b\"B%\b\u0001\u0004Q\u0015aB2p[6\fg\u000e\u001a\t\u0003\u0017>k\u0011\u0001\u0014\u0006\u0003\u001b:\u000bQ\u0001\u001d:pi>T!\u0001D\b\n\u0005Ac%\u0001I*ue\u0016\fW.\u001b8h#V,'/\u001f'jgR,g.\u001a:CkN\u001cu.\\7b]\u0012DQAU\u0004A\u0002M\u000b\u0001C]3ta>t7/Z(cg\u0016\u0014h/\u001a:\u0011\u0007Q[V,D\u0001V\u0015\t1v+\u0001\u0003tiV\u0014'B\u0001-Z\u0003\u00119'\u000f]2\u000b\u0003i\u000b!![8\n\u0005q+&AD*ue\u0016\fWn\u00142tKJ4XM\u001d\t\u0003\u0017zK!a\u0018'\u0003'\u0015CXmY;uKBc\u0017M\u001c*fgB|gn]3")
/* loaded from: input_file:org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.class */
public class SparkConnectStreamingQueryListenerHandler implements Logging {
    private final ExecuteHolder executeHolder;
    private final SessionHolder sessionHolder;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public Logging.LogStringContext LogStringContext(StringContext stringContext) {
        return Logging.LogStringContext$(this, stringContext);
    }

    public void withLogContext(HashMap<String, String> hashMap, Function0<BoxedUnit> function0) {
        Logging.withLogContext$(this, hashMap, function0);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logInfo(LogEntry logEntry) {
        Logging.logInfo$(this, logEntry);
    }

    public void logInfo(LogEntry logEntry, Throwable th) {
        Logging.logInfo$(this, logEntry, th);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logDebug(LogEntry logEntry) {
        Logging.logDebug$(this, logEntry);
    }

    public void logDebug(LogEntry logEntry, Throwable th) {
        Logging.logDebug$(this, logEntry, th);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logTrace(LogEntry logEntry) {
        Logging.logTrace$(this, logEntry);
    }

    public void logTrace(LogEntry logEntry, Throwable th) {
        Logging.logTrace$(this, logEntry, th);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logWarning(LogEntry logEntry) {
        Logging.logWarning$(this, logEntry);
    }

    public void logWarning(LogEntry logEntry, Throwable th) {
        Logging.logWarning$(this, logEntry, th);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logError(LogEntry logEntry) {
        Logging.logError$(this, logEntry);
    }

    public void logError(LogEntry logEntry, Throwable th) {
        Logging.logError$(this, logEntry, th);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public SessionHolder sessionHolder() {
        return this.sessionHolder;
    }

    public String userId() {
        return sessionHolder().userId();
    }

    public String sessionId() {
        return sessionHolder().sessionId();
    }

    public void handleListenerCommand(StreamingQueryListenerBusCommand streamingQueryListenerBusCommand, StreamObserver<ExecutePlanResponse> streamObserver) {
        ServerSideListenerHolder streamingServersideListenerHolder = sessionHolder().streamingServersideListenerHolder();
        StreamingQueryListenerBusCommand.CommandCase commandCase = streamingQueryListenerBusCommand.getCommandCase();
        if (StreamingQueryListenerBusCommand.CommandCase.ADD_LISTENER_BUS_LISTENER.equals(commandCase)) {
            boolean isServerSideListenerRegistered = streamingServersideListenerHolder.isServerSideListenerRegistered();
            if (true == isServerSideListenerRegistered) {
                logWarning(() -> {
                    return "[SessionId: " + this.sessionId() + "][UserId: " + this.userId() + "][operationId: " + this.executeHolder.operationId() + "] Redundant server side listener added. Exiting.";
                });
                return;
            }
            if (false != isServerSideListenerRegistered) {
                throw new MatchError(BoxesRunTime.boxToBoolean(isServerSideListenerRegistered));
            }
            streamingServersideListenerHolder.init(streamObserver);
            try {
                streamObserver.onNext(ExecutePlanResponse.newBuilder().setSessionId(sessionHolder().sessionId()).setServerSideSessionId(sessionHolder().serverSessionId()).setStreamingQueryListenerEventsResult(StreamingQueryListenerEventsResult.newBuilder().setListenerBusListenerAdded(true).build()).build());
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                logInfo(() -> {
                    return "[SessionId: " + this.sessionId() + "][UserId: " + this.userId() + "][operationId: " + this.executeHolder.operationId() + "] Server side listener added. Now blocking until all client side listeners are removed or there is error transmitting the event back.";
                });
                streamingServersideListenerHolder.streamingQueryListenerLatch().await();
                logInfo(() -> {
                    return "[SessionId: " + this.sessionId() + "][UserId: " + this.userId() + "][operationId: " + this.executeHolder.operationId() + "] Server side listener long-running handling thread ended.";
                });
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            } catch (Throwable th) {
                if (!NonFatal$.MODULE$.apply(th)) {
                    throw th;
                }
                logError(() -> {
                    return "[SessionId: " + this.sessionId() + "][UserId: " + this.userId() + "][operationId: " + this.executeHolder.operationId() + "] Error sending listener added response.";
                }, th);
                streamingServersideListenerHolder.cleanUp();
                return;
            }
        } else {
            if (!StreamingQueryListenerBusCommand.CommandCase.REMOVE_LISTENER_BUS_LISTENER.equals(commandCase)) {
                if (!StreamingQueryListenerBusCommand.CommandCase.COMMAND_NOT_SET.equals(commandCase)) {
                    throw new MatchError(commandCase);
                }
                throw new IllegalArgumentException("Missing command in StreamingQueryListenerBusCommand");
            }
            boolean isServerSideListenerRegistered2 = streamingServersideListenerHolder.isServerSideListenerRegistered();
            if (true != isServerSideListenerRegistered2) {
                if (false != isServerSideListenerRegistered2) {
                    throw new MatchError(BoxesRunTime.boxToBoolean(isServerSideListenerRegistered2));
                }
                logWarning(() -> {
                    return "[SessionId: " + this.sessionId() + "][UserId: " + this.userId() + "][operationId: " + this.executeHolder.operationId() + "] No active server side listener bus listener but received remove listener call. Exiting.";
                });
                return;
            } else {
                sessionHolder().streamingServersideListenerHolder().cleanUp();
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            }
        }
        this.executeHolder.eventsManager().postFinished(this.executeHolder.eventsManager().postFinished$default$1(), this.executeHolder.eventsManager().postFinished$default$2());
    }

    public SparkConnectStreamingQueryListenerHandler(ExecuteHolder executeHolder) {
        this.executeHolder = executeHolder;
        Logging.$init$(this);
        this.sessionHolder = executeHolder.sessionHolder();
    }
}
