package io.openlineage.spark.agent.lifecycle;

import io.openlineage.client.OpenLineage;
import io.openlineage.client.utils.DatasetIdentifier;
import io.openlineage.client.utils.UUIDUtils;
import io.openlineage.spark.agent.EventEmitter;
import io.openlineage.spark.agent.OpenLineageSparkListener;
import io.openlineage.spark.agent.Versions;
import io.openlineage.spark.agent.facets.ErrorFacet;
import io.openlineage.spark.agent.facets.builder.GcpJobFacetBuilder;
import io.openlineage.spark.agent.facets.builder.GcpRunFacetBuilder;
import io.openlineage.spark.agent.facets.builder.SparkJobDetailsFacetBuilder;
import io.openlineage.spark.agent.facets.builder.SparkProcessingEngineRunFacetBuilderDelegate;
import io.openlineage.spark.agent.facets.builder.SparkPropertyFacetBuilder;
import io.openlineage.spark.agent.util.GCPUtils;
import io.openlineage.spark.agent.util.PathUtils;
import io.openlineage.spark.agent.util.PlanUtils;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.agent.util.StreamingContextUtils;
import io.openlineage.spark.agent.util.TimeUtils;
import io.openlineage.spark.api.JobNameSuffixProvider;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.parquet.Strings;
import org.apache.spark.Dependency;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkContext$;
import org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil;
import org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil;
import org.apache.spark.rdd.HadoopRDD;
import org.apache.spark.rdd.MapPartitionsRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.JobFailed;
import org.apache.spark.scheduler.JobResult;
import org.apache.spark.scheduler.ResultStage;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
import org.apache.spark.util.SerializableJobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/openlineage/spark/agent/lifecycle/RddExecutionContext.class */
public class RddExecutionContext implements ExecutionContext {
    private static final Logger log = LoggerFactory.getLogger(RddExecutionContext.class);
    private static final String SPARK_PROCESSING_TYPE_BATCH = "BATCH";
    private static final String SPARK_PROCESSING_TYPE_STREAMING = "STREAMING";
    private static final String SPARK_JOB_TYPE = "RDD_JOB";
    private final EventEmitter eventEmitter;
    private final Optional<SparkContext> sparkContextOption;
    private String jobSuffix;
    private final UUID runId = UUIDUtils.generateNewUUID();
    private List<URI> inputs = Collections.emptyList();
    private List<URI> outputs = Collections.emptyList();
    private OpenLineage openLineage = new OpenLineage(Versions.OPEN_LINEAGE_PRODUCER_URI);

    public RddExecutionContext(EventEmitter eventEmitter) {
        this.eventEmitter = eventEmitter;
        Option active = SparkContext$.MODULE$.getActive();
        this.sparkContextOption = active.isDefined() ? Optional.of((SparkContext) active.get()) : Optional.empty();
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void start(SparkListenerStageSubmitted sparkListenerStageSubmitted) {
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void end(SparkListenerStageCompleted sparkListenerStageCompleted) {
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void start(SparkListenerApplicationStart sparkListenerApplicationStart) {
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void end(SparkListenerApplicationEnd sparkListenerApplicationEnd) {
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void setActiveJob(ActiveJob activeJob) {
        log.debug("setActiveJob within RddExecutionContext {}", activeJob);
        RDD<?> rdd = activeJob.finalStage().rdd();
        this.jobSuffix = nameRDD(rdd);
        Set<RDD<?>> flattenRDDs = Rdds.flattenRDDs(rdd);
        log.debug("flattenRDDs {}", flattenRDDs);
        this.inputs = findInputs(flattenRDDs);
        JobConf jobConf = new JobConf();
        if (activeJob.finalStage() instanceof ResultStage) {
            ResultStage resultStage = (ResultStage) activeJob.finalStage();
            try {
                Field configField = getConfigField(resultStage);
                configField.setAccessible(true);
                Object obj = configField.get(resultStage.func());
                if (obj instanceof HadoopMapRedWriteConfigUtil) {
                    Field declaredField = HadoopMapRedWriteConfigUtil.class.getDeclaredField("conf");
                    declaredField.setAccessible(true);
                    jobConf = ((SerializableJobConf) declaredField.get(obj)).value();
                } else if (obj instanceof HadoopMapReduceWriteConfigUtil) {
                    Field declaredField2 = HadoopMapReduceWriteConfigUtil.class.getDeclaredField("conf");
                    declaredField2.setAccessible(true);
                    jobConf = ((SerializableJobConf) declaredField2.get(obj)).value();
                } else {
                    log.info("Config field is not HadoopMapRedWriteConfigUtil or HadoopMapReduceWriteConfigUtil, it's {}", obj.getClass().getCanonicalName());
                }
            } catch (IllegalAccessException | NoSuchFieldException e) {
                log.warn("Unable to access job conf from RDD", e);
            }
            log.info("Found job conf from RDD {}", jobConf);
        } else {
            jobConf = OpenLineageSparkListener.getConfigForRDD(rdd);
        }
        this.outputs = findOutputs(rdd, jobConf);
    }

    private Field getConfigField(ResultStage resultStage) throws NoSuchFieldException {
        try {
            return resultStage.func().getClass().getDeclaredField("config$1");
        } catch (NoSuchFieldException e) {
            return resultStage.func().getClass().getDeclaredField("arg$1");
        }
    }

    static String nameRDD(RDD<?> rdd) {
        String name = rdd.name();
        if (name == null || (((rdd instanceof HadoopRDD) && Arrays.stream(FileInputFormat.getInputPaths(((HadoopRDD) rdd).getJobConf())).anyMatch(path -> {
            return path.toString().contains(rdd.name());
        })) || ((rdd instanceof MapPartitionsRDD) && rdd.name().equals(((MapPartitionsRDD) rdd).prev().name())))) {
            name = rdd.getClass().getSimpleName().replaceAll("RDD\\d*$", "").replaceAll(ExecutionContext.CAMEL_TO_SNAKE_CASE, "_$1").toLowerCase(Locale.ROOT);
        }
        List fromSeq = ScalaConversionUtils.fromSeq(rdd.dependencies());
        if (fromSeq.isEmpty()) {
            return name;
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = fromSeq.iterator();
        while (it.hasNext()) {
            arrayList.add(nameRDD(((Dependency) it.next()).rdd()));
        }
        String join = Strings.join(arrayList, JobNameSuffixProvider.SUFFIX_DELIMITER);
        return !join.startsWith(name) ? name + JobNameSuffixProvider.SUFFIX_DELIMITER + join : join;
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void start(SparkListenerSQLExecutionStart sparkListenerSQLExecutionStart) {
        log.debug("start SparkListenerSQLExecutionStart {}", sparkListenerSQLExecutionStart);
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void end(SparkListenerSQLExecutionEnd sparkListenerSQLExecutionEnd) {
        log.debug("start SparkListenerSQLExecutionEnd {}", sparkListenerSQLExecutionEnd);
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void start(SparkListenerJobStart sparkListenerJobStart) {
        log.debug("start SparkListenerJobStart {}", sparkListenerJobStart);
        if (this.outputs.isEmpty()) {
            log.info("Output RDDs are empty: skipping sending OpenLineage event");
            return;
        }
        OpenLineage.RunEvent build = this.openLineage.newRunEventBuilder().eventTime(TimeUtils.toZonedTime(sparkListenerJobStart.time())).eventType(OpenLineage.RunEvent.EventType.START).inputs(buildInputs(this.inputs)).outputs(buildOutputs(this.outputs)).run(this.openLineage.newRunBuilder().runId(this.runId).facets(buildRunFacets(null, sparkListenerJobStart)).build()).job(buildJob(sparkListenerJobStart.jobId(), sparkListenerJobStart)).run(this.openLineage.newRunBuilder().runId(this.runId).facets(buildRunFacets(null, sparkListenerJobStart)).build()).job(buildJob(sparkListenerJobStart.jobId(), sparkListenerJobStart)).build();
        log.debug("Posting event for start {}: {}", sparkListenerJobStart, build);
        this.eventEmitter.emit(build);
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void end(SparkListenerJobEnd sparkListenerJobEnd) {
        log.debug("end SparkListenerJobEnd {}", sparkListenerJobEnd);
        if (this.outputs.isEmpty() && !(sparkListenerJobEnd.jobResult() instanceof JobFailed)) {
            log.info("Output RDDs are empty: skipping sending OpenLineage event");
            return;
        }
        OpenLineage.RunEvent build = this.openLineage.newRunEventBuilder().eventTime(TimeUtils.toZonedTime(sparkListenerJobEnd.time())).eventType(getEventType(sparkListenerJobEnd.jobResult())).inputs(buildInputs(this.inputs)).outputs(buildOutputs(this.outputs)).run(this.openLineage.newRunBuilder().runId(this.runId).facets(buildRunFacets(buildJobErrorFacet(sparkListenerJobEnd.jobResult()), sparkListenerJobEnd)).build()).job(buildJob(sparkListenerJobEnd.jobId(), sparkListenerJobEnd)).build();
        log.debug("Posting event for end {}: {}", sparkListenerJobEnd, build);
        this.eventEmitter.emit(build);
    }

    protected OpenLineage.RunFacets buildRunFacets(ErrorFacet errorFacet, SparkListenerEvent sparkListenerEvent) {
        OpenLineage.RunFacetsBuilder newRunFacetsBuilder = this.openLineage.newRunFacetsBuilder();
        newRunFacetsBuilder.parent(buildApplicationParentFacet());
        if (errorFacet != null) {
            newRunFacetsBuilder.put("spark.exception", errorFacet);
        }
        addProcessingEventFacet(newRunFacetsBuilder);
        addSparkPropertyFacet(newRunFacetsBuilder, sparkListenerEvent);
        addGcpRunFacet(newRunFacetsBuilder, sparkListenerEvent);
        addSparkJobDetailsFacet(newRunFacetsBuilder, sparkListenerEvent);
        return newRunFacetsBuilder.build();
    }

    private void addProcessingEventFacet(OpenLineage.RunFacetsBuilder runFacetsBuilder) {
        this.sparkContextOption.ifPresent(sparkContext -> {
            runFacetsBuilder.processing_engine(new SparkProcessingEngineRunFacetBuilderDelegate(this.openLineage, sparkContext).buildFacet());
        });
    }

    private void addSparkPropertyFacet(OpenLineage.RunFacetsBuilder runFacetsBuilder, SparkListenerEvent sparkListenerEvent) {
        runFacetsBuilder.put("spark_properties", new SparkPropertyFacetBuilder().buildFacet(sparkListenerEvent));
    }

    private void addGcpRunFacet(OpenLineage.RunFacetsBuilder runFacetsBuilder, SparkListenerEvent sparkListenerEvent) {
        if (GCPUtils.isDataprocRuntime()) {
            this.sparkContextOption.ifPresent(sparkContext -> {
                GcpRunFacetBuilder gcpRunFacetBuilder = new GcpRunFacetBuilder(sparkContext);
                Objects.requireNonNull(runFacetsBuilder);
                gcpRunFacetBuilder.accept((Object) sparkListenerEvent, runFacetsBuilder::put);
            });
        }
    }

    private void addSparkJobDetailsFacet(OpenLineage.RunFacetsBuilder runFacetsBuilder, SparkListenerEvent sparkListenerEvent) {
        runFacetsBuilder.put("spark_jobDetails", new SparkJobDetailsFacetBuilder().buildFacet(sparkListenerEvent));
    }

    private OpenLineage.ParentRunFacet buildApplicationParentFacet() {
        return PlanUtils.parentRunFacet(this.eventEmitter.getApplicationRunId(), this.eventEmitter.getApplicationJobName(), this.eventEmitter.getJobNamespace());
    }

    protected OpenLineage.JobFacets buildJobFacets(SparkListenerEvent sparkListenerEvent) {
        OpenLineage.JobFacetsBuilder newJobFacetsBuilder = this.openLineage.newJobFacetsBuilder();
        addGcpJobFacets(newJobFacetsBuilder, sparkListenerEvent);
        return newJobFacetsBuilder.build();
    }

    private void addGcpJobFacets(OpenLineage.JobFacetsBuilder jobFacetsBuilder, SparkListenerEvent sparkListenerEvent) {
        if (GCPUtils.isDataprocRuntime()) {
            this.sparkContextOption.ifPresent(sparkContext -> {
                GcpJobFacetBuilder gcpJobFacetBuilder = new GcpJobFacetBuilder(sparkContext);
                Objects.requireNonNull(jobFacetsBuilder);
                gcpJobFacetBuilder.accept((Object) sparkListenerEvent, jobFacetsBuilder::put);
            });
        }
    }

    protected ErrorFacet buildJobErrorFacet(JobResult jobResult) {
        if (!(jobResult instanceof JobFailed) || ((JobFailed) jobResult).exception() == null) {
            return null;
        }
        return ErrorFacet.builder().exception(((JobFailed) jobResult).exception()).build();
    }

    protected OpenLineage.Job buildJob(int i, SparkListenerEvent sparkListenerEvent) {
        String str = this.jobSuffix;
        if (this.jobSuffix == null) {
            str = String.valueOf(i);
        }
        return this.openLineage.newJobBuilder().namespace(this.eventEmitter.getJobNamespace()).name((this.eventEmitter.getOverriddenAppName().orElse((String) this.sparkContextOption.map((v0) -> {
            return v0.appName();
        }).orElse("unknown")) + "." + str).replaceAll(ExecutionContext.CAMEL_TO_SNAKE_CASE, "_$1").toLowerCase(Locale.ROOT)).facets(this.openLineage.newJobFacetsBuilder().jobType(this.openLineage.newJobTypeJobFacetBuilder().jobType(SPARK_JOB_TYPE).processingType(StreamingContextUtils.hasActiveStreamingContext() ? SPARK_PROCESSING_TYPE_STREAMING : SPARK_PROCESSING_TYPE_BATCH).integration("SPARK").build()).build()).build();
    }

    protected List<OpenLineage.OutputDataset> buildOutputs(List<URI> list) {
        return (List) list.stream().map(this::buildOutputDataset).collect(Collectors.toList());
    }

    protected OpenLineage.InputDataset buildInputDataset(URI uri) {
        DatasetIdentifier fromURI = PathUtils.fromURI(uri);
        return this.openLineage.newInputDatasetBuilder().name(fromURI.getName()).namespace(fromURI.getNamespace()).build();
    }

    protected OpenLineage.OutputDataset buildOutputDataset(URI uri) {
        DatasetIdentifier fromURI = PathUtils.fromURI(uri);
        return this.openLineage.newOutputDatasetBuilder().name(fromURI.getName()).namespace(fromURI.getNamespace()).build();
    }

    protected List<OpenLineage.InputDataset> buildInputs(List<URI> list) {
        return (List) list.stream().map(this::buildInputDataset).collect(Collectors.toList());
    }

    protected List<URI> findOutputs(RDD<?> rdd, Configuration configuration) {
        Path outputPath = getOutputPath(rdd, configuration);
        log.info("Found output path {} from RDD {}", outputPath, rdd);
        if (outputPath != null) {
            return Collections.singletonList(outputPath.toUri());
        }
        log.debug("Output path is null");
        return Collections.emptyList();
    }

    protected List<URI> findInputs(Set<RDD<?>> set) {
        log.debug("find Inputs within RddExecutionContext {}", set);
        return (List) PlanUtils.findRDDPaths((List) set.stream().collect(Collectors.toList())).stream().map(path -> {
            return path.toUri();
        }).collect(Collectors.toList());
    }

    protected void printRDDs(String str, RDD<?> rdd) {
        Iterator it = ScalaConversionUtils.fromSeq(rdd.dependencies()).iterator();
        while (it.hasNext()) {
            printRDDs(str + "  ", ((Dependency) it.next()).rdd());
        }
    }

    protected static Path getOutputPath(RDD<?> rdd, Configuration configuration) {
        Path path = null;
        if (configuration != null) {
            JobConf jobConf = configuration instanceof JobConf ? (JobConf) configuration : new JobConf(configuration);
            log.debug("JobConf {}", jobConf);
            path = FileOutputFormat.getOutputPath(jobConf);
            if (path == null) {
                try {
                    log.debug("Path is null, trying to use old fashioned mapreduce api");
                    path = org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath(new Job(jobConf));
                } catch (IOException e) {
                    e.printStackTrace(System.out);
                }
            }
        }
        if (path == null) {
            path = PlanUtils.findRDDPaths(Collections.singletonList(rdd)).stream().findFirst().orElse(null);
        }
        return path;
    }

    protected OpenLineage.RunEvent.EventType getEventType(JobResult jobResult) {
        return jobResult.getClass().getSimpleName().startsWith("JobSucceeded") ? OpenLineage.RunEvent.EventType.COMPLETE : OpenLineage.RunEvent.EventType.FAIL;
    }
}
