package za.co.absa.spline.agent;

import java.util.UUID;
import org.apache.commons.configuration.Configuration;
import org.apache.spark.SparkContext;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.QueryExecution;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.concurrent.duration.Duration;
import scala.runtime.BoxedUnit;
import scala.util.Either;
import scala.util.control.NonFatal$;
import za.co.absa.spline.harvester.HarvestingContext;
import za.co.absa.spline.harvester.IdGeneratorsBundle;
import za.co.absa.spline.harvester.LineageHarvester;
import za.co.absa.spline.harvester.UUIDGenerator;
import za.co.absa.spline.harvester.builder.OperationNodeBuilderFactory;
import za.co.absa.spline.harvester.builder.dsformat.PluggableDataSourceFormatResolver;
import za.co.absa.spline.harvester.builder.read.PluggableReadCommandExtractor;
import za.co.absa.spline.harvester.builder.write.PluggableWriteCommandExtractor;
import za.co.absa.spline.harvester.converter.DataConverter;
import za.co.absa.spline.harvester.dispatcher.LineageDispatcher;
import za.co.absa.spline.harvester.iwd.IgnoredWriteDetectionStrategy;
import za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry;
import za.co.absa.spline.harvester.postprocessing.AbstractInternalPostProcessingFilter;
import za.co.absa.spline.harvester.postprocessing.AttributeReorderingFilter;
import za.co.absa.spline.harvester.postprocessing.OneRowRelationFilter;
import za.co.absa.spline.harvester.postprocessing.PostProcessingFilter;
import za.co.absa.spline.harvester.postprocessing.PostProcessor;
import za.co.absa.spline.harvester.postprocessing.ViewAttributeAddingFilter;
import za.co.absa.spline.harvester.qualifier.HDFSPathQualifier;
import za.co.absa.spline.producer.model.ExecutionEvent;
import za.co.absa.spline.producer.model.ExecutionPlan;

/* compiled from: SplineAgent.scala */
/* loaded from: input_file:za/co/absa/spline/agent/SplineAgent$.class */
public final class SplineAgent$ implements Logging {
    public static SplineAgent$ MODULE$;
    private final Seq<AbstractInternalPostProcessingFilter> InternalPostProcessingFilters;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        new SplineAgent$();
    }

    @Override // org.apache.spark.internal.Logging
    public String logName() {
        String logName;
        logName = logName();
        return logName;
    }

    @Override // org.apache.spark.internal.Logging
    public Logger log() {
        Logger log;
        log = log();
        return log;
    }

    @Override // org.apache.spark.internal.Logging
    public void logInfo(Function0<String> function0) {
        logInfo(function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logDebug(Function0<String> function0) {
        logDebug(function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logTrace(Function0<String> function0) {
        logTrace(function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logWarning(Function0<String> function0) {
        logWarning(function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logError(Function0<String> function0) {
        logError(function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logInfo(Function0<String> function0, Throwable th) {
        logInfo(function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logDebug(Function0<String> function0, Throwable th) {
        logDebug(function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logTrace(Function0<String> function0, Throwable th) {
        logTrace(function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logWarning(Function0<String> function0, Throwable th) {
        logWarning(function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logError(Function0<String> function0, Throwable th) {
        logError(function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public boolean isTraceEnabled() {
        boolean isTraceEnabled;
        isTraceEnabled = isTraceEnabled();
        return isTraceEnabled;
    }

    @Override // org.apache.spark.internal.Logging
    public void initializeLogIfNecessary(boolean z) {
        initializeLogIfNecessary(z);
    }

    @Override // org.apache.spark.internal.Logging
    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        boolean initializeLogIfNecessary;
        initializeLogIfNecessary = initializeLogIfNecessary(z, z2);
        return initializeLogIfNecessary;
    }

    @Override // org.apache.spark.internal.Logging
    public boolean initializeLogIfNecessary$default$2() {
        boolean initializeLogIfNecessary$default$2;
        initializeLogIfNecessary$default$2 = initializeLogIfNecessary$default$2();
        return initializeLogIfNecessary$default$2;
    }

    @Override // org.apache.spark.internal.Logging
    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    @Override // org.apache.spark.internal.Logging
    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    private Seq<AbstractInternalPostProcessingFilter> InternalPostProcessingFilters() {
        return this.InternalPostProcessingFilters;
    }

    public SplineAgent create(Configuration configuration, final SparkSession sparkSession, final LineageDispatcher lineageDispatcher, Option<PostProcessingFilter> option, final IgnoredWriteDetectionStrategy ignoredWriteDetectionStrategy, final Function1<UUID, UUIDGenerator<ExecutionPlan>> function1) {
        final Seq seq = (Seq) InternalPostProcessingFilters().$plus$plus(Option$.MODULE$.option2Iterable(option), Seq$.MODULE$.canBuildFrom());
        AutoDiscoveryPluginRegistry autoDiscoveryPluginRegistry = new AutoDiscoveryPluginRegistry(configuration, Predef$.MODULE$.wrapRefArray(new Object[]{new HDFSPathQualifier(sparkSession.sparkContext().hadoopConfiguration()), sparkSession}));
        PluggableDataSourceFormatResolver pluggableDataSourceFormatResolver = new PluggableDataSourceFormatResolver(autoDiscoveryPluginRegistry);
        final PluggableWriteCommandExtractor pluggableWriteCommandExtractor = new PluggableWriteCommandExtractor(autoDiscoveryPluginRegistry, pluggableDataSourceFormatResolver);
        final PluggableReadCommandExtractor pluggableReadCommandExtractor = new PluggableReadCommandExtractor(autoDiscoveryPluginRegistry, pluggableDataSourceFormatResolver);
        return new SplineAgent(function1, sparkSession, seq, pluggableWriteCommandExtractor, pluggableReadCommandExtractor, ignoredWriteDetectionStrategy, lineageDispatcher) { // from class: za.co.absa.spline.agent.SplineAgent$$anon$1
            private final Function1 execPlanUUIDGeneratorFactory$1;
            private final SparkSession session$1;
            private final Seq filters$1;
            private final PluggableWriteCommandExtractor writeCommandExtractor$1;
            private final PluggableReadCommandExtractor readCommandExtractor$1;
            private final IgnoredWriteDetectionStrategy iwdStrategy$1;
            private final LineageDispatcher lineageDispatcher$1;

            @Override // za.co.absa.spline.agent.SplineAgent
            public void handle(String str, QueryExecution queryExecution, Either<Throwable, Duration> either) {
                withErrorHandling(() -> {
                    IdGeneratorsBundle idGeneratorsBundle = new IdGeneratorsBundle(this.execPlanUUIDGeneratorFactory$1);
                    HarvestingContext harvestingContext = new HarvestingContext(str, queryExecution.analyzed(), new Some(queryExecution.executedPlan()), this.session$1, idGeneratorsBundle);
                    PostProcessor postProcessor = new PostProcessor(this.filters$1, harvestingContext);
                    SplineAgent$$anon$1$$anon$2 splineAgent$$anon$1$$anon$2 = new SplineAgent$$anon$1$$anon$2(null, idGeneratorsBundle);
                    new LineageHarvester(harvestingContext, this.writeCommandExtractor$1, this.readCommandExtractor$1, this.iwdStrategy$1, postProcessor, splineAgent$$anon$1$$anon$2, new OperationNodeBuilderFactory(postProcessor, splineAgent$$anon$1$$anon$2, new DataConverter(), idGeneratorsBundle)).harvest(either).foreach(tuple2 -> {
                        $anonfun$handle$2(this, tuple2);
                        return BoxedUnit.UNIT;
                    });
                });
            }

            private void withErrorHandling(Function0<BoxedUnit> function0) {
                try {
                    function0.apply$mcV$sp();
                } catch (Throwable th) {
                    Option unapply = NonFatal$.MODULE$.unapply(th);
                    if (unapply.isEmpty()) {
                        throw th;
                    }
                    Throwable th2 = (Throwable) unapply.get();
                    SparkContext sparkContext = this.session$1.sparkContext();
                    SplineAgent$.MODULE$.logError(() -> {
                        return new StringBuilder(71).append("Unexpected error occurred during lineage processing for application: ").append(sparkContext.appName()).append(" #").append(sparkContext.applicationId()).toString();
                    }, th2);
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                }
            }

            public static final /* synthetic */ void $anonfun$handle$2(SplineAgent$$anon$1 splineAgent$$anon$1, Tuple2 tuple2) {
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                ExecutionPlan executionPlan = (ExecutionPlan) tuple2._1();
                ExecutionEvent executionEvent = (ExecutionEvent) tuple2._2();
                splineAgent$$anon$1.lineageDispatcher$1.send(executionPlan);
                splineAgent$$anon$1.lineageDispatcher$1.send(executionEvent);
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            }

            {
                this.execPlanUUIDGeneratorFactory$1 = function1;
                this.session$1 = sparkSession;
                this.filters$1 = seq;
                this.writeCommandExtractor$1 = pluggableWriteCommandExtractor;
                this.readCommandExtractor$1 = pluggableReadCommandExtractor;
                this.iwdStrategy$1 = ignoredWriteDetectionStrategy;
                this.lineageDispatcher$1 = lineageDispatcher;
            }
        };
    }

    private SplineAgent$() {
        MODULE$ = this;
        org$apache$spark$internal$Logging$$log__$eq(null);
        this.InternalPostProcessingFilters = new $colon.colon<>(new AttributeReorderingFilter(), new $colon.colon(new OneRowRelationFilter(), new $colon.colon(new ViewAttributeAddingFilter(), Nil$.MODULE$)));
    }
}
