package za.co.absa.spline.core.listener;

import org.apache.spark.sql.FileSinkObj$;
import org.apache.spark.sql.execution.datasources.FileFormat;
import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
import org.apache.spark.sql.execution.streaming.ForeachSink;
import org.apache.spark.sql.execution.streaming.MemorySink;
import org.apache.spark.sql.execution.streaming.Sink;
import org.apache.spark.sql.execution.streaming.StreamExecution;
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper;
import org.apache.spark.sql.kafka010.KafkaSinkObj$;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryListener;
import org.apache.spark.sql.streaming.StreamingQueryManager;
import org.slf4s.Logger;
import org.slf4s.Logging;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Seq;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import za.co.absa.spline.core.SparkLineageProcessor;
import za.co.absa.spline.core.harvester.DataLineageBuilderFactory;
import za.co.absa.spline.coresparkadapterapi.StructuredStreamingListenerAdapter;
import za.co.absa.spline.coresparkadapterapi.StructuredStreamingListenerAdapter$;
import za.co.absa.spline.model.DataLineage;
import za.co.absa.spline.model.endpoint.FileEndpoint;
import za.co.absa.spline.model.endpoint.KafkaEndpoint;

/* compiled from: StructuredStreamingListener.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u001db\u0001B\u0001\u0003\u0001=\u00111d\u0015;sk\u000e$XO]3e'R\u0014X-Y7j]\u001ed\u0015n\u001d;f]\u0016\u0014(BA\u0002\u0005\u0003!a\u0017n\u001d;f]\u0016\u0014(BA\u0003\u0007\u0003\u0011\u0019wN]3\u000b\u0005\u001dA\u0011AB:qY&tWM\u0003\u0002\n\u0015\u0005!\u0011MY:b\u0015\tYA\"\u0001\u0002d_*\tQ\"\u0001\u0002{C\u000e\u00011c\u0001\u0001\u0011=A\u0011\u0011\u0003H\u0007\u0002%)\u00111\u0003F\u0001\ngR\u0014X-Y7j]\u001eT!!\u0006\f\u0002\u0007M\fHN\u0003\u0002\u00181\u0005)1\u000f]1sW*\u0011\u0011DG\u0001\u0007CB\f7\r[3\u000b\u0003m\t1a\u001c:h\u0013\ti\"C\u0001\fTiJ,\u0017-\\5oOF+XM]=MSN$XM\\3s!\ty\"%D\u0001!\u0015\t\t#$A\u0003tY\u001a$4/\u0003\u0002$A\t9Aj\\4hS:<\u0007\u0002C\u0013\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0014\u0002\u0019E,XM]=NC:\fw-\u001a:\u0011\u0005E9\u0013B\u0001\u0015\u0013\u0005U\u0019FO]3b[&tw-U;fefl\u0015M\\1hKJD\u0001B\u000b\u0001\u0003\u0002\u0003\u0006IaK\u0001\u0011Y&tW-Y4f\u0011\u0006\u0014h/Z:uKJ\u0004\"\u0001L\u0018\u000e\u00035R!A\f\u0003\u0002\u0013!\f'O^3ti\u0016\u0014\u0018B\u0001\u0019.\u0005e!\u0015\r^1MS:,\u0017mZ3Ck&dG-\u001a:GC\u000e$xN]=\t\u0011I\u0002!\u0011!Q\u0001\nM\n\u0001\u0003\\5oK\u0006<W\r\u0015:pG\u0016\u001c8o\u001c:\u0011\u0005Q*T\"\u0001\u0003\n\u0005Y\"!!F*qCJ\\G*\u001b8fC\u001e,\u0007K]8dKN\u001cxN\u001d\u0005\u0006q\u0001!\t!O\u0001\u0007y%t\u0017\u000e\u001e \u0015\tibTH\u0010\t\u0003w\u0001i\u0011A\u0001\u0005\u0006K]\u0002\rA\n\u0005\u0006U]\u0002\ra\u000b\u0005\u0006e]\u0002\ra\r\u0005\u0006\u0001\u0002!\t%Q\u0001\u000f_:\fV/\u001a:z'R\f'\u000f^3e)\t\u0011\u0005\n\u0005\u0002D\r6\tAIC\u0001F\u0003\u0015\u00198-\u00197b\u0013\t9EI\u0001\u0003V]&$\b\"B%@\u0001\u0004Q\u0015!B3wK:$\bCA&O\u001d\t\tB*\u0003\u0002N%\u000512\u000b\u001e:fC6LgnZ)vKJLH*[:uK:,'/\u0003\u0002P!\n\t\u0012+^3ssN#\u0018M\u001d;fI\u00163XM\u001c;\u000b\u00055\u0013\u0002\"\u0002*\u0001\t\u0003\u001a\u0016aD8o#V,'/\u001f)s_\u001e\u0014Xm]:\u0015\u0005\t#\u0006\"B%R\u0001\u0004)\u0006CA&W\u0013\t9\u0006K\u0001\nRk\u0016\u0014\u0018\u0010\u0015:pOJ,7o]#wK:$\b\"B-\u0001\t\u0003R\u0016!E8o#V,'/\u001f+fe6Lg.\u0019;fIR\u0011!i\u0017\u0005\u0006\u0013b\u0003\r\u0001\u0018\t\u0003\u0017vK!A\u0018)\u0003)E+XM]=UKJl\u0017N\\1uK\u0012,e/\u001a8u\u0011\u0015\u0001\u0007\u0001\"\u0003b\u00031\u0001(o\\2fgN\fV/\u001a:z)\t\u0011%\rC\u0003d?\u0002\u0007A-A\u0003rk\u0016\u0014\u0018\u0010\u0005\u0002\u0012K&\u0011aM\u0005\u0002\u000f'R\u0014X-Y7j]\u001e\fV/\u001a:z\u0011\u0015A\u0007\u0001\"\u0003j\u0003A\u0001(o\\2fgN,\u00050Z2vi&|g\u000e\u0006\u0002CU\")1n\u001aa\u0001Y\u0006\u00111/\u001a\t\u0003[Fl\u0011A\u001c\u0006\u0003'=T!\u0001\u001d\u000b\u0002\u0013\u0015DXmY;uS>t\u0017B\u0001:o\u0005=\u0019FO]3b[\u0016CXmY;uS>t\u0007\"\u0002;\u0001\t\u0013)\u0018AD1tg&<g.\u00192mK\u001a\u0013x.\u001c\u000b\u0005mf\fi\u0002\u0005\u0002Do&\u0011\u0001\u0010\u0012\u0002\b\u0005>|G.Z1o\u0011\u0015Q8\u000f1\u0001|\u00031\u0011XO\u001c;j[\u0016\u001cE.Y:ta\ra\u00181\u0002\t\u0006{\u0006\u0005\u0011q\u0001\b\u0003\u0007zL!a #\u0002\rA\u0013X\rZ3g\u0013\u0011\t\u0019!!\u0002\u0003\u000b\rc\u0017m]:\u000b\u0005}$\u0005\u0003BA\u0005\u0003\u0017a\u0001\u0001B\u0006\u0002\u000ee\f\t\u0011!A\u0003\u0002\u0005=!aA0%eE!\u0011\u0011CA\f!\r\u0019\u00151C\u0005\u0004\u0003+!%a\u0002(pi\"Lgn\u001a\t\u0004\u0007\u0006e\u0011bAA\u000e\t\n\u0019\u0011I\\=\t\u000f\u0005}1\u000f1\u0001\u0002\"\u00051\u0011M\\=SK\u001a\u00042aQA\u0012\u0013\r\t)\u0003\u0012\u0002\u0007\u0003:L(+\u001a4")
/* loaded from: input_file:za/co/absa/spline/core/listener/StructuredStreamingListener.class */
public class StructuredStreamingListener extends StreamingQueryListener implements Logging {
    private final StreamingQueryManager queryManager;
    private final DataLineageBuilderFactory lineageHarvester;
    private final SparkLineageProcessor lineageProcessor;
    private final Logger log;

    public Logger log() {
        return this.log;
    }

    public void org$slf4s$Logging$_setter_$log_$eq(Logger logger) {
        this.log = logger;
    }

    public void onQueryStarted(StreamingQueryListener.QueryStartedEvent queryStartedEvent) {
        if (log().underlying().isDebugEnabled()) {
            log().underlying().debug(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Structured streaming query(id: ", ", runId: ", ") has started."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{queryStartedEvent.id(), queryStartedEvent.runId()})));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        processQuery(this.queryManager.get(queryStartedEvent.id()));
    }

    public void onQueryProgress(StreamingQueryListener.QueryProgressEvent queryProgressEvent) {
    }

    public void onQueryTerminated(StreamingQueryListener.QueryTerminatedEvent queryTerminatedEvent) {
        BoxedUnit boxedUnit;
        BoxedUnit boxedUnit2;
        String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Structured streaming query(id: ", ", runId: ", ")"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{queryTerminatedEvent.id(), queryTerminatedEvent.runId()}));
        Some exception = queryTerminatedEvent.exception();
        if (None$.MODULE$.equals(exception)) {
            if (log().underlying().isDebugEnabled()) {
                log().underlying().debug(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"'", "' successfully terminated."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{s})));
                boxedUnit2 = BoxedUnit.UNIT;
            } else {
                boxedUnit2 = BoxedUnit.UNIT;
            }
            return;
        }
        if (!(exception instanceof Some)) {
            throw new MatchError(exception);
        }
        String str = (String) exception.x();
        if (log().underlying().isDebugEnabled()) {
            log().underlying().debug(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"'", "' terminated with the exception '", "'"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{s, str})));
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    private void processQuery(StreamingQuery streamingQuery) {
        BoxedUnit boxedUnit;
        while (true) {
            StreamingQuery streamingQuery2 = streamingQuery;
            if (streamingQuery2 instanceof StreamExecution) {
                processExecution((StreamExecution) streamingQuery2);
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                break;
            } else if (streamingQuery2 instanceof StreamingQueryWrapper) {
                streamingQuery = ((StreamingQueryWrapper) streamingQuery2).streamingQuery();
            } else if (log().underlying().isErrorEnabled()) {
                log().underlying().error(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Trying to process unknown query '", "'."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{streamingQuery2})));
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
        }
        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
    }

    private void processExecution(StreamExecution streamExecution) {
        Some some;
        Predef$.MODULE$.assume(streamExecution.logicalPlan().resolved(), new StructuredStreamingListener$$anonfun$processExecution$1(this));
        DataLineage buildLineage = this.lineageHarvester.createBuilder(streamExecution.sparkSession().sparkContext()).buildLineage(streamExecution.logicalPlan());
        boolean z = false;
        Sink sink = null;
        BaseStreamingSink sink2 = streamExecution.sink();
        if (sink2 instanceof Sink) {
            z = true;
            sink = (Sink) sink2;
            Option<Tuple2<String, FileFormat>> unapply = FileSinkObj$.MODULE$.unapply(sink);
            if (!unapply.isEmpty()) {
                some = new Some(new FileEndpoint((String) ((Tuple2) unapply.get())._1(), ((FileFormat) ((Tuple2) unapply.get())._2()).toString()));
                this.lineageProcessor.process((DataLineage) Option$.MODULE$.option2Iterable(some).$div$colon(buildLineage, new StructuredStreamingListener$$anonfun$3(this)));
            }
        }
        if (z) {
            Option<Tuple2<Seq<String>, Option<String>>> unapply2 = KafkaSinkObj$.MODULE$.unapply(sink);
            if (!unapply2.isEmpty()) {
                some = new Some(new KafkaEndpoint((Seq) ((Tuple2) unapply2.get())._1(), (String) ((Option) ((Tuple2) unapply2.get())._2()).getOrElse(new StructuredStreamingListener$$anonfun$1(this))));
                this.lineageProcessor.process((DataLineage) Option$.MODULE$.option2Iterable(some).$div$colon(buildLineage, new StructuredStreamingListener$$anonfun$3(this)));
            }
        }
        if (!Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Class[]{((StructuredStreamingListenerAdapter) StructuredStreamingListenerAdapter$.MODULE$.instance()).consoleSinkClass(), ForeachSink.class, MemorySink.class})).exists(new StructuredStreamingListener$$anonfun$2(this, sink2))) {
            throw new IllegalArgumentException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Unsupported sink type: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{sink2.getClass()})));
        }
        some = None$.MODULE$;
        this.lineageProcessor.process((DataLineage) Option$.MODULE$.option2Iterable(some).$div$colon(buildLineage, new StructuredStreamingListener$$anonfun$3(this)));
    }

    public boolean za$co$absa$spline$core$listener$StructuredStreamingListener$$assignableFrom(Class<?> cls, Object obj) {
        return cls.isAssignableFrom(obj.getClass());
    }

    public StructuredStreamingListener(StreamingQueryManager streamingQueryManager, DataLineageBuilderFactory dataLineageBuilderFactory, SparkLineageProcessor sparkLineageProcessor) {
        this.queryManager = streamingQueryManager;
        this.lineageHarvester = dataLineageBuilderFactory;
        this.lineageProcessor = sparkLineageProcessor;
        Logging.class.$init$(this);
    }
}
