package za.co.absa.spline.core.listener;

import org.apache.spark.sql.FileSinkObj$;
import org.apache.spark.sql.execution.datasources.FileFormat;
import org.apache.spark.sql.execution.streaming.ForeachSink;
import org.apache.spark.sql.execution.streaming.MemorySink;
import org.apache.spark.sql.execution.streaming.Sink;
import org.apache.spark.sql.execution.streaming.StreamExecution;
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper;
import org.apache.spark.sql.kafka010.KafkaSinkObj$;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryListener;
import org.apache.spark.sql.streaming.StreamingQueryManager;
import org.slf4s.Logger;
import org.slf4s.Logging;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Seq;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import za.co.absa.spline.core.SparkLineageProcessor;
import za.co.absa.spline.core.harvester.LogicalPlanLineageHarvester;
import za.co.absa.spline.coresparkadapterapi.StructuredStreamingListenerAdapter;
import za.co.absa.spline.coresparkadapterapi.StructuredStreamingListenerAdapter$;
import za.co.absa.spline.model.DataLineage;
import za.co.absa.spline.model.endpoint.FileEndpoint;
import za.co.absa.spline.model.endpoint.KafkaEndpoint;

/* compiled from: StructuredStreamingListener.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u001db\u0001B\u0001\u0003\u0001=\u00111d\u0015;sk\u000e$XO]3e'R\u0014X-Y7j]\u001ed\u0015n\u001d;f]\u0016\u0014(BA\u0002\u0005\u0003!a\u0017n\u001d;f]\u0016\u0014(BA\u0003\u0007\u0003\u0011\u0019wN]3\u000b\u0005\u001dA\u0011AB:qY&tWM\u0003\u0002\n\u0015\u0005!\u0011MY:b\u0015\tYA\"\u0001\u0002d_*\tQ\"\u0001\u0002{C\u000e\u00011c\u0001\u0001\u0011=A\u0011\u0011\u0003H\u0007\u0002%)\u00111\u0003F\u0001\ngR\u0014X-Y7j]\u001eT!!\u0006\f\u0002\u0007M\fHN\u0003\u0002\u00181\u0005)1\u000f]1sW*\u0011\u0011DG\u0001\u0007CB\f7\r[3\u000b\u0003m\t1a\u001c:h\u0013\ti\"C\u0001\fTiJ,\u0017-\\5oOF+XM]=MSN$XM\\3s!\ty\"%D\u0001!\u0015\t\t#$A\u0003tY\u001a$4/\u0003\u0002$A\t9Aj\\4hS:<\u0007\u0002C\u0013\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0014\u0002\u0019E,XM]=NC:\fw-\u001a:\u0011\u0005E9\u0013B\u0001\u0015\u0013\u0005U\u0019FO]3b[&tw-U;fefl\u0015M\\1hKJD\u0001B\u000b\u0001\u0003\u0002\u0003\u0006IaK\u0001\u0011Y&tW-Y4f\u0011\u0006\u0014h/Z:uKJ\u0004\"\u0001L\u0018\u000e\u00035R!A\f\u0003\u0002\u0013!\f'O^3ti\u0016\u0014\u0018B\u0001\u0019.\u0005maunZ5dC2\u0004F.\u00198MS:,\u0017mZ3ICJ4Xm\u001d;fe\"A!\u0007\u0001B\u0001B\u0003%1'\u0001\tmS:,\u0017mZ3Qe>\u001cWm]:peB\u0011A'N\u0007\u0002\t%\u0011a\u0007\u0002\u0002\u0016'B\f'o\u001b'j]\u0016\fw-\u001a)s_\u000e,7o]8s\u0011\u0015A\u0004\u0001\"\u0001:\u0003\u0019a\u0014N\\5u}Q!!\bP\u001f?!\tY\u0004!D\u0001\u0003\u0011\u0015)s\u00071\u0001'\u0011\u0015Qs\u00071\u0001,\u0011\u0015\u0011t\u00071\u00014\u0011\u0015\u0001\u0005\u0001\"\u0011B\u00039yg.U;fef\u001cF/\u0019:uK\u0012$\"A\u0011%\u0011\u0005\r3U\"\u0001#\u000b\u0003\u0015\u000bQa]2bY\u0006L!a\u0012#\u0003\tUs\u0017\u000e\u001e\u0005\u0006\u0013~\u0002\rAS\u0001\u0006KZ,g\u000e\u001e\t\u0003\u0017:s!!\u0005'\n\u00055\u0013\u0012AF*ue\u0016\fW.\u001b8h#V,'/\u001f'jgR,g.\u001a:\n\u0005=\u0003&!E)vKJL8\u000b^1si\u0016$WI^3oi*\u0011QJ\u0005\u0005\u0006%\u0002!\teU\u0001\u0010_:\fV/\u001a:z!J|wM]3tgR\u0011!\t\u0016\u0005\u0006\u0013F\u0003\r!\u0016\t\u0003\u0017ZK!a\u0016)\u0003%E+XM]=Qe><'/Z:t\u000bZ,g\u000e\u001e\u0005\u00063\u0002!\tEW\u0001\u0012_:\fV/\u001a:z)\u0016\u0014X.\u001b8bi\u0016$GC\u0001\"\\\u0011\u0015I\u0005\f1\u0001]!\tYU,\u0003\u0002_!\n!\u0012+^3ssR+'/\\5oCR,G-\u0012<f]RDQ\u0001\u0019\u0001\u0005\n\u0005\fA\u0002\u001d:pG\u0016\u001c8/U;fef$\"A\u00112\t\u000b\r|\u0006\u0019\u00013\u0002\u000bE,XM]=\u0011\u0005E)\u0017B\u00014\u0013\u00059\u0019FO]3b[&tw-U;fefDQ\u0001\u001b\u0001\u0005\n%\f\u0001\u0003\u001d:pG\u0016\u001c8/\u0012=fGV$\u0018n\u001c8\u0015\u0005\tS\u0007\"B6h\u0001\u0004a\u0017AA:f!\ti\u0017/D\u0001o\u0015\t\u0019rN\u0003\u0002q)\u0005IQ\r_3dkRLwN\\\u0005\u0003e:\u0014qb\u0015;sK\u0006lW\t_3dkRLwN\u001c\u0005\u0006i\u0002!I!^\u0001\u000fCN\u001c\u0018n\u001a8bE2,gI]8n)\u00111\u00180!\b\u0011\u0005\r;\u0018B\u0001=E\u0005\u001d\u0011un\u001c7fC:DQA_:A\u0002m\fAB];oi&lWm\u00117bgN\u00044\u0001`A\u0006!\u0015i\u0018\u0011AA\u0004\u001d\t\u0019e0\u0003\u0002��\t\u00061\u0001K]3eK\u001aLA!a\u0001\u0002\u0006\t)1\t\\1tg*\u0011q\u0010\u0012\t\u0005\u0003\u0013\tY\u0001\u0004\u0001\u0005\u0017\u00055\u00110!A\u0001\u0002\u000b\u0005\u0011q\u0002\u0002\u0004?\u0012\u0012\u0014\u0003BA\t\u0003/\u00012aQA\n\u0013\r\t)\u0002\u0012\u0002\b\u001d>$\b.\u001b8h!\r\u0019\u0015\u0011D\u0005\u0004\u00037!%aA!os\"9\u0011qD:A\u0002\u0005\u0005\u0012AB1osJ+g\rE\u0002D\u0003GI1!!\nE\u0005\u0019\te.\u001f*fM\u0002")
/* loaded from: input_file:za/co/absa/spline/core/listener/StructuredStreamingListener.class */
public class StructuredStreamingListener extends StreamingQueryListener implements Logging {
    private final StreamingQueryManager queryManager;
    private final LogicalPlanLineageHarvester lineageHarvester;
    private final SparkLineageProcessor lineageProcessor;
    private final Logger log;

    public Logger log() {
        return this.log;
    }

    public void org$slf4s$Logging$_setter_$log_$eq(Logger logger) {
        this.log = logger;
    }

    public void onQueryStarted(StreamingQueryListener.QueryStartedEvent queryStartedEvent) {
        if (log().underlying().isDebugEnabled()) {
            log().underlying().debug(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Structured streaming query(id: ", ", runId: ", ") has started."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{queryStartedEvent.id(), queryStartedEvent.runId()})));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        processQuery(this.queryManager.get(queryStartedEvent.id()));
    }

    public void onQueryProgress(StreamingQueryListener.QueryProgressEvent queryProgressEvent) {
    }

    public void onQueryTerminated(StreamingQueryListener.QueryTerminatedEvent queryTerminatedEvent) {
        BoxedUnit boxedUnit;
        BoxedUnit boxedUnit2;
        String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Structured streaming query(id: ", ", runId: ", ")"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{queryTerminatedEvent.id(), queryTerminatedEvent.runId()}));
        Some exception = queryTerminatedEvent.exception();
        if (None$.MODULE$.equals(exception)) {
            if (log().underlying().isDebugEnabled()) {
                log().underlying().debug(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"'", "' successfully terminated."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{s})));
                boxedUnit2 = BoxedUnit.UNIT;
            } else {
                boxedUnit2 = BoxedUnit.UNIT;
            }
            return;
        }
        if (!(exception instanceof Some)) {
            throw new MatchError(exception);
        }
        String str = (String) exception.x();
        if (log().underlying().isDebugEnabled()) {
            log().underlying().debug(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"'", "' terminated with the exception '", "'"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{s, str})));
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    private void processQuery(StreamingQuery streamingQuery) {
        BoxedUnit boxedUnit;
        while (true) {
            StreamingQuery streamingQuery2 = streamingQuery;
            if (streamingQuery2 instanceof StreamExecution) {
                processExecution((StreamExecution) streamingQuery2);
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                break;
            } else if (streamingQuery2 instanceof StreamingQueryWrapper) {
                streamingQuery = ((StreamingQueryWrapper) streamingQuery2).streamingQuery();
            } else if (log().underlying().isErrorEnabled()) {
                log().underlying().error(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Trying to process unknown query '", "'."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{streamingQuery2})));
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
        }
        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
    }

    private void processExecution(StreamExecution streamExecution) {
        Some some;
        Predef$.MODULE$.assume(streamExecution.logicalPlan().resolved(), new StructuredStreamingListener$$anonfun$processExecution$1(this));
        DataLineage harvestLineage = this.lineageHarvester.harvestLineage(streamExecution.sparkSession().sparkContext(), streamExecution.logicalPlan());
        Sink sink = streamExecution.sink();
        Option<Tuple2<String, FileFormat>> unapply = FileSinkObj$.MODULE$.unapply(sink);
        if (unapply.isEmpty()) {
            Option<Tuple2<Seq<String>, Option<String>>> unapply2 = KafkaSinkObj$.MODULE$.unapply(sink);
            if (!unapply2.isEmpty()) {
                some = new Some(new KafkaEndpoint((Seq) ((Tuple2) unapply2.get())._1(), (String) ((Option) ((Tuple2) unapply2.get())._2()).getOrElse(new StructuredStreamingListener$$anonfun$1(this))));
            } else {
                if (!Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Class[]{((StructuredStreamingListenerAdapter) StructuredStreamingListenerAdapter$.MODULE$.instance()).consoleSinkClass(), ForeachSink.class, MemorySink.class})).exists(new StructuredStreamingListener$$anonfun$2(this, sink))) {
                    throw new IllegalArgumentException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Unsupported sink type: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{sink.getClass()})));
                }
                some = None$.MODULE$;
            }
        } else {
            some = new Some(new FileEndpoint((String) ((Tuple2) unapply.get())._1(), ((FileFormat) ((Tuple2) unapply.get())._2()).toString()));
        }
        this.lineageProcessor.process((DataLineage) Option$.MODULE$.option2Iterable(some).$div$colon(harvestLineage, new StructuredStreamingListener$$anonfun$3(this)));
    }

    public boolean za$co$absa$spline$core$listener$StructuredStreamingListener$$assignableFrom(Class<?> cls, Object obj) {
        return cls.isAssignableFrom(obj.getClass());
    }

    public StructuredStreamingListener(StreamingQueryManager streamingQueryManager, LogicalPlanLineageHarvester logicalPlanLineageHarvester, SparkLineageProcessor sparkLineageProcessor) {
        this.queryManager = streamingQueryManager;
        this.lineageHarvester = logicalPlanLineageHarvester;
        this.lineageProcessor = sparkLineageProcessor;
        Logging.class.$init$(this);
    }
}
