package org.apache.spark.sql.connect.service;

import java.util.HashMap;
import org.apache.spark.connect.proto.ExecutePlanResponse;
import org.apache.spark.connect.proto.StreamingQueryEventType;
import org.apache.spark.connect.proto.StreamingQueryListenerEvent;
import org.apache.spark.connect.proto.StreamingQueryListenerEventsResult;
import org.apache.spark.internal.LogEntry;
import org.apache.spark.internal.LogEntry$;
import org.apache.spark.internal.LogKeys$EXCEPTION$;
import org.apache.spark.internal.LogKeys$QUERY_ID$;
import org.apache.spark.internal.LogKeys$QUERY_RUN_ID$;
import org.apache.spark.internal.LogKeys$SESSION_ID$;
import org.apache.spark.internal.LogKeys$USER_ID$;
import org.apache.spark.internal.Logging;
import org.apache.spark.internal.MDC;
import org.apache.spark.sql.connect.execution.ExecuteResponseObserver;
import org.apache.spark.sql.streaming.StreamingQueryListener;
import org.apache.spark.util.ArrayImplicits$;
import org.slf4j.Logger;
import org.sparkproject.connect.grpc.stub.StreamObserver;
import scala.Function0;
import scala.StringContext;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.ScalaRunTime$;
import scala.util.control.NonFatal$;

/* compiled from: SparkConnectListenerBusListener.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005\u0015a!\u0002\u0007\u000e\u0001EI\u0002\u0002\u0003\u0014\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0015\t\u00111\u0002!\u0011!Q\u0001\n5BQA\u0010\u0001\u0005\u0002}Bqa\u0011\u0001C\u0002\u0013\u0005A\t\u0003\u0004I\u0001\u0001\u0006I!\u0012\u0005\u0006\u0013\u0002!IA\u0013\u0005\u0006G\u0002!\t\u0001\u001a\u0005\u0006K\u0002!\tE\u001a\u0005\u0006a\u0002!\t%\u001d\u0005\u0006m\u0002!\te\u001e\u0005\u0006y\u0002!\t% \u0002 'B\f'o[\"p]:,7\r\u001e'jgR,g.\u001a:CkNd\u0015n\u001d;f]\u0016\u0014(B\u0001\b\u0010\u0003\u001d\u0019XM\u001d<jG\u0016T!\u0001E\t\u0002\u000f\r|gN\\3di*\u0011!cE\u0001\u0004gFd'B\u0001\u000b\u0016\u0003\u0015\u0019\b/\u0019:l\u0015\t1r#\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u00021\u0005\u0019qN]4\u0014\u0007\u0001Q\u0002\u0005\u0005\u0002\u001c=5\tAD\u0003\u0002\u001e#\u0005I1\u000f\u001e:fC6LgnZ\u0005\u0003?q\u0011ac\u0015;sK\u0006l\u0017N\\4Rk\u0016\u0014\u0018\u0010T5ti\u0016tWM\u001d\t\u0003C\u0011j\u0011A\t\u0006\u0003GM\t\u0001\"\u001b8uKJt\u0017\r\\\u0005\u0003K\t\u0012q\u0001T8hO&tw-\u0001\rtKJ4XM]*jI\u0016d\u0015n\u001d;f]\u0016\u0014\bj\u001c7eKJ\u001c\u0001\u0001\u0005\u0002*U5\tQ\"\u0003\u0002,\u001b\tA2+\u001a:wKJ\u001c\u0016\u000eZ3MSN$XM\\3s\u0011>dG-\u001a:\u0002!I,7\u000f]8og\u0016|%m]3sm\u0016\u0014\bc\u0001\u00186o5\tqF\u0003\u00021c\u0005!1\u000f^;c\u0015\t\u00114'\u0001\u0003heB\u001c'\"\u0001\u001b\u0002\u0005%|\u0017B\u0001\u001c0\u00059\u0019FO]3b[>\u00137/\u001a:wKJ\u0004\"\u0001\u000f\u001f\u000e\u0003eR!AO\u001e\u0002\u000bA\u0014x\u000e^8\u000b\u0005A\u0019\u0012BA\u001f:\u0005M)\u00050Z2vi\u0016\u0004F.\u00198SKN\u0004xN\\:f\u0003\u0019a\u0014N\\5u}Q\u0019\u0001)\u0011\"\u0011\u0005%\u0002\u0001\"\u0002\u0014\u0004\u0001\u0004A\u0003\"\u0002\u0017\u0004\u0001\u0004i\u0013!D:fgNLwN\u001c%pY\u0012,'/F\u0001F!\tIc)\u0003\u0002H\u001b\ti1+Z:tS>t\u0007j\u001c7eKJ\fab]3tg&|g\u000eS8mI\u0016\u0014\b%\u0001\u0003tK:$GcA&R=B\u0011AjT\u0007\u0002\u001b*\ta*A\u0003tG\u0006d\u0017-\u0003\u0002Q\u001b\n!QK\\5u\u0011\u0015\u0011f\u00011\u0001T\u0003%)g/\u001a8u\u0015N|g\u000e\u0005\u0002U7:\u0011Q+\u0017\t\u0003-6k\u0011a\u0016\u0006\u00031\u001e\na\u0001\u0010:p_Rt\u0014B\u0001.N\u0003\u0019\u0001&/\u001a3fM&\u0011A,\u0018\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005ik\u0005\"B0\u0007\u0001\u0004\u0001\u0017!C3wK:$H+\u001f9f!\tA\u0014-\u0003\u0002cs\t92\u000b\u001e:fC6LgnZ)vKJLXI^3oiRK\b/Z\u0001\u0013g\u0016tGMU3tk2$8i\\7qY\u0016$X\rF\u0001L\u00039yg.U;fef\u001cF/\u0019:uK\u0012$\"aS4\t\u000b!D\u0001\u0019A5\u0002\u000b\u00154XM\u001c;\u0011\u0005)lgBA\u000el\u0013\taG$\u0001\fTiJ,\u0017-\\5oOF+XM]=MSN$XM\\3s\u0013\tqwNA\tRk\u0016\u0014\u0018p\u0015;beR,G-\u0012<f]RT!\u0001\u001c\u000f\u0002\u001f=t\u0017+^3ssB\u0013xn\u001a:fgN$\"a\u0013:\t\u000b!L\u0001\u0019A:\u0011\u0005)$\u0018BA;p\u0005I\tV/\u001a:z!J|wM]3tg\u00163XM\u001c;\u0002#=t\u0017+^3ssR+'/\\5oCR,G\r\u0006\u0002Lq\")\u0001N\u0003a\u0001sB\u0011!N_\u0005\u0003w>\u0014A#U;fef$VM]7j]\u0006$X\rZ#wK:$\u0018aC8o#V,'/_%eY\u0016$\"a\u0013@\t\u000b!\\\u0001\u0019A@\u0011\u0007)\f\t!C\u0002\u0002\u0004=\u0014a\"U;fefLE\r\\3Fm\u0016tG\u000f")
/* loaded from: input_file:org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.class */
public class SparkConnectListenerBusListener extends StreamingQueryListener implements Logging {
    private final ServerSideListenerHolder serverSideListenerHolder;
    private final StreamObserver<ExecutePlanResponse> responseObserver;
    private final SessionHolder sessionHolder;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public Logging.LogStringContext LogStringContext(StringContext stringContext) {
        return Logging.LogStringContext$(this, stringContext);
    }

    public void withLogContext(HashMap<String, String> hashMap, Function0<BoxedUnit> function0) {
        Logging.withLogContext$(this, hashMap, function0);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logInfo(LogEntry logEntry) {
        Logging.logInfo$(this, logEntry);
    }

    public void logInfo(LogEntry logEntry, Throwable th) {
        Logging.logInfo$(this, logEntry, th);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logDebug(LogEntry logEntry) {
        Logging.logDebug$(this, logEntry);
    }

    public void logDebug(LogEntry logEntry, Throwable th) {
        Logging.logDebug$(this, logEntry, th);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logTrace(LogEntry logEntry) {
        Logging.logTrace$(this, logEntry);
    }

    public void logTrace(LogEntry logEntry, Throwable th) {
        Logging.logTrace$(this, logEntry, th);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logWarning(LogEntry logEntry) {
        Logging.logWarning$(this, logEntry);
    }

    public void logWarning(LogEntry logEntry, Throwable th) {
        Logging.logWarning$(this, logEntry, th);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logError(LogEntry logEntry) {
        Logging.logError$(this, logEntry);
    }

    public void logError(LogEntry logEntry, Throwable th) {
        Logging.logError$(this, logEntry, th);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public SessionHolder sessionHolder() {
        return this.sessionHolder;
    }

    private void send(String str, StreamingQueryEventType streamingQueryEventType) {
        try {
            this.responseObserver.onNext(ExecutePlanResponse.newBuilder().setSessionId(sessionHolder().sessionId()).setServerSideSessionId(sessionHolder().serverSessionId()).setStreamingQueryListenerEventsResult(StreamingQueryListenerEventsResult.newBuilder().addAllEvents(CollectionConverters$.MODULE$.SeqHasAsJava(ArrayImplicits$.MODULE$.SparkArrayOps(new StreamingQueryListenerEvent[]{StreamingQueryListenerEvent.newBuilder().setEventJson(str).setEventType(streamingQueryEventType).build()}).toImmutableArraySeq()).asJava()).build()).build());
        } catch (Throwable th) {
            if (!NonFatal$.MODULE$.apply(th)) {
                throw th;
            }
            logError(LogEntry$.MODULE$.from(() -> {
                return this.LogStringContext(new StringContext(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"[SessionId: ", "]"}))).log(ScalaRunTime$.MODULE$.wrapRefArray(new MDC[]{new MDC(LogKeys$SESSION_ID$.MODULE$, this.sessionHolder().sessionId())})).$plus(this.LogStringContext(new StringContext(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"[UserId: ", "] "}))).log(ScalaRunTime$.MODULE$.wrapRefArray(new MDC[]{new MDC(LogKeys$USER_ID$.MODULE$, this.sessionHolder().userId())}))).$plus(this.LogStringContext(new StringContext(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"Removing SparkConnectListenerBusListener and terminating the long-running thread "}))).log(Nil$.MODULE$)).$plus(this.LogStringContext(new StringContext(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"because of exception: ", ""}))).log(ScalaRunTime$.MODULE$.wrapRefArray(new MDC[]{new MDC(LogKeys$EXCEPTION$.MODULE$, th)})));
            }));
            this.serverSideListenerHolder.cleanUp();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
    }

    public void sendResultComplete() {
        ((ExecuteResponseObserver) this.responseObserver).onNextComplete(ExecutePlanResponse.newBuilder().setResultComplete(ExecutePlanResponse.ResultComplete.newBuilder().build()).build());
    }

    public void onQueryStarted(StreamingQueryListener.QueryStartedEvent queryStartedEvent) {
        this.serverSideListenerHolder.streamingQueryStartedEventCache().put(queryStartedEvent.runId().toString(), queryStartedEvent);
    }

    public void onQueryProgress(StreamingQueryListener.QueryProgressEvent queryProgressEvent) {
        logDebug(() -> {
            return "[SessionId: " + this.sessionHolder().sessionId() + "][UserId: " + this.sessionHolder().userId() + "] Sending QueryProgressEvent to client, id: " + queryProgressEvent.progress().id() + " runId: " + queryProgressEvent.progress().runId() + ", batch: " + queryProgressEvent.progress().batchId() + ".";
        });
        send(queryProgressEvent.json(), StreamingQueryEventType.QUERY_PROGRESS_EVENT);
    }

    public void onQueryTerminated(StreamingQueryListener.QueryTerminatedEvent queryTerminatedEvent) {
        logInfo(LogEntry$.MODULE$.from(() -> {
            return this.LogStringContext(new StringContext(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"[SessionId: ", "]"}))).log(ScalaRunTime$.MODULE$.wrapRefArray(new MDC[]{new MDC(LogKeys$SESSION_ID$.MODULE$, this.sessionHolder().sessionId())})).$plus(this.LogStringContext(new StringContext(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"[UserId: ", "] "}))).log(ScalaRunTime$.MODULE$.wrapRefArray(new MDC[]{new MDC(LogKeys$USER_ID$.MODULE$, this.sessionHolder().userId())}))).$plus(this.LogStringContext(new StringContext(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"Sending QueryTerminatedEvent to client, id: ", " "}))).log(ScalaRunTime$.MODULE$.wrapRefArray(new MDC[]{new MDC(LogKeys$QUERY_ID$.MODULE$, queryTerminatedEvent.id())}))).$plus(this.LogStringContext(new StringContext(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"runId: ", "."}))).log(ScalaRunTime$.MODULE$.wrapRefArray(new MDC[]{new MDC(LogKeys$QUERY_RUN_ID$.MODULE$, queryTerminatedEvent.runId())})));
        }));
        send(queryTerminatedEvent.json(), StreamingQueryEventType.QUERY_TERMINATED_EVENT);
    }

    public void onQueryIdle(StreamingQueryListener.QueryIdleEvent queryIdleEvent) {
        logDebug(() -> {
            return "[SessionId: " + this.sessionHolder().sessionId() + "][UserId: " + this.sessionHolder().userId() + "] Sending QueryIdleEvent to client, id: " + queryIdleEvent.id() + " runId: " + queryIdleEvent.runId() + ".";
        });
        send(queryIdleEvent.json(), StreamingQueryEventType.QUERY_IDLE_EVENT);
    }

    public SparkConnectListenerBusListener(ServerSideListenerHolder serverSideListenerHolder, StreamObserver<ExecutePlanResponse> streamObserver) {
        this.serverSideListenerHolder = serverSideListenerHolder;
        this.responseObserver = streamObserver;
        Logging.$init$(this);
        this.sessionHolder = serverSideListenerHolder.sessionHolder();
    }
}
