package io.openlineage.spark.agent.lifecycle;

import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import io.openlineage.spark.agent.lifecycle.plan.column.ColumnLevelLineageUtils;
import io.openlineage.spark.agent.lifecycle.plan.column.ColumnLevelLineageVisitor;
import io.openlineage.spark.agent.util.FacetUtils;
import io.openlineage.spark.agent.util.PlanUtils;
import io.openlineage.spark.agent.util.RemovePathPatternUtils;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.api.CustomFacetBuilder;
import io.openlineage.spark.api.OpenLineageContext;
import io.openlineage.spark.api.OpenLineageEventHandlerFactory;
import io.openlineage.spark.api.QueryPlanVisitor;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.scheduler.Stage;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.PartialFunction;

/* loaded from: input_file:io/openlineage/spark/agent/lifecycle/OpenLineageRunEventBuilder.class */
class OpenLineageRunEventBuilder {
    private static final Logger log = LoggerFactory.getLogger(OpenLineageRunEventBuilder.class);

    @NonNull
    private final OpenLineageContext openLineageContext;

    @NonNull
    private final Collection<PartialFunction<Object, List<OpenLineage.InputDataset>>> inputDatasetBuilders;

    @NonNull
    private final Collection<PartialFunction<LogicalPlan, List<OpenLineage.InputDataset>>> inputDatasetQueryPlanVisitors;

    @NonNull
    private final Collection<PartialFunction<Object, List<OpenLineage.OutputDataset>>> outputDatasetBuilders;

    @NonNull
    private final Collection<PartialFunction<LogicalPlan, List<OpenLineage.OutputDataset>>> outputDatasetQueryPlanVisitors;

    @NonNull
    private final Collection<CustomFacetBuilder<?, ? extends OpenLineage.DatasetFacet>> datasetFacetBuilders;

    @NonNull
    private final Collection<CustomFacetBuilder<?, ? extends OpenLineage.InputDatasetFacet>> inputDatasetFacetBuilders;

    @NonNull
    private final Collection<CustomFacetBuilder<?, ? extends OpenLineage.OutputDatasetFacet>> outputDatasetFacetBuilders;

    @NonNull
    private final Collection<CustomFacetBuilder<?, ? extends OpenLineage.RunFacet>> runFacetBuilders;

    @NonNull
    private final Collection<CustomFacetBuilder<?, ? extends OpenLineage.JobFacet>> jobFacetBuilders;

    @NonNull
    private final Collection<ColumnLevelLineageVisitor> columnLineageVisitors;
    private final UnknownEntryFacetListener unknownEntryFacetListener;
    private final Map<Integer, ActiveJob> jobMap;
    private final Map<Integer, Stage> stageMap;

    /* JADX INFO: Access modifiers changed from: package-private */
    public OpenLineageRunEventBuilder(OpenLineageContext openLineageContext, OpenLineageEventHandlerFactory openLineageEventHandlerFactory) {
        this(openLineageContext, openLineageEventHandlerFactory.createInputDatasetBuilder(openLineageContext), openLineageEventHandlerFactory.createInputDatasetQueryPlanVisitors(openLineageContext), openLineageEventHandlerFactory.createOutputDatasetBuilder(openLineageContext), openLineageEventHandlerFactory.createOutputDatasetQueryPlanVisitors(openLineageContext), openLineageEventHandlerFactory.createDatasetFacetBuilders(openLineageContext), openLineageEventHandlerFactory.createInputDatasetFacetBuilders(openLineageContext), openLineageEventHandlerFactory.createOutputDatasetFacetBuilders(openLineageContext), openLineageEventHandlerFactory.createRunFacetBuilders(openLineageContext), openLineageEventHandlerFactory.createJobFacetBuilders(openLineageContext), openLineageEventHandlerFactory.createColumnLevelLineageVisitors(openLineageContext));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerJob(ActiveJob activeJob) {
        this.jobMap.put(Integer.valueOf(activeJob.jobId()), activeJob);
        this.stageMap.put(Integer.valueOf(activeJob.finalStage().id()), activeJob.finalStage());
        activeJob.finalStage().parents().forall(ScalaConversionUtils.toScalaFn(stage -> {
            this.stageMap.put(Integer.valueOf(stage.id()), stage);
            return true;
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OpenLineage.RunEvent buildRun(OpenLineageRunEventContext openLineageRunEventContext) {
        OpenLineage openLineage = this.openLineageContext.getOpenLineage();
        List<Object> loadNodes = openLineageRunEventContext.loadNodes(this.stageMap, this.jobMap);
        UUID orElse = openLineageRunEventContext.getOverwriteRunId().orElse(this.openLineageContext.getRunUuid());
        OpenLineage.RunFacetsBuilder constructRunFacetsBuilder = constructRunFacetsBuilder(openLineageRunEventContext, openLineage);
        constructRunFacetsBuilder.parent(openLineageRunEventContext.getApplicationParentRunFacet());
        OpenLineage.JobFacets buildJobFacets = buildJobFacets(loadNodes, this.jobFacetBuilders, openLineageRunEventContext.getJobFacetsBuilder());
        List<OpenLineage.InputDataset> buildInputDatasets = buildInputDatasets(loadNodes);
        List<OpenLineage.OutputDataset> buildOutputDatasets = buildOutputDatasets(loadNodes);
        this.openLineageContext.getQueryExecution().filter(queryExecution -> {
            return !FacetUtils.isFacetDisabled(this.openLineageContext, "spark_unknown");
        }).flatMap(queryExecution2 -> {
            return (Optional) this.openLineageContext.getMeterRegistry().timer("openlineage.spark.unknownFacet.time", new String[0]).record(() -> {
                return this.unknownEntryFacetListener.build(queryExecution2.optimizedPlan());
            });
        }).ifPresent(unknownEntryFacet -> {
            constructRunFacetsBuilder.put("spark_unknown", unknownEntryFacet);
        });
        this.unknownEntryFacetListener.clear();
        openLineageRunEventContext.getRunEventBuilder().run(openLineage.newRunBuilder().runId(orElse).facets(buildRunFacets(loadNodes, this.runFacetBuilders, constructRunFacetsBuilder)).build()).job(openLineageRunEventContext.getJobBuilder().facets(buildJobFacets).build()).inputs(RemovePathPatternUtils.removeInputsPathPattern(this.openLineageContext, buildInputDatasets)).outputs(RemovePathPatternUtils.removeOutputsPathPattern(this.openLineageContext, buildOutputDatasets));
        return openLineageRunEventContext.getRunEventBuilder().build();
    }

    OpenLineage.RunFacetsBuilder constructRunFacetsBuilder(OpenLineageRunEventContext openLineageRunEventContext, OpenLineage openLineage) {
        OpenLineage.RunFacetsBuilder runFacetsBuilder = openLineageRunEventContext.getRunFacetsBuilder();
        return runFacetsBuilder == null ? openLineage.newRunFacetsBuilder() : runFacetsBuilder;
    }

    private List<OpenLineage.InputDataset> buildInputDatasets(List<Object> list) {
        this.openLineageContext.getQueryExecution().ifPresent(queryExecution -> {
            if (log.isDebugEnabled()) {
                log.debug("Traversing optimized plan {}", queryExecution.optimizedPlan().toJSON());
                log.debug("Physical plan executed {}", queryExecution.executedPlan().toJSON());
            }
        });
        if (log.isDebugEnabled()) {
            log.debug("Visiting query plan {} with input dataset builders {}", this.openLineageContext.getQueryExecution(), this.inputDatasetBuilders);
        }
        Function1 visitLogicalPlan = visitLogicalPlan(PlanUtils.merge(this.inputDatasetQueryPlanVisitors));
        List<OpenLineage.InputDataset> list2 = (List) Stream.concat(buildDatasets(list, this.inputDatasetBuilders), (Stream) this.openLineageContext.getQueryExecution().map(queryExecution2 -> {
            Stream flatMap = ScalaConversionUtils.fromSeq(queryExecution2.optimizedPlan().map(visitLogicalPlan)).stream().flatMap((v0) -> {
                return v0.stream();
            });
            Class<OpenLineage.InputDataset> cls = OpenLineage.InputDataset.class;
            Objects.requireNonNull(OpenLineage.InputDataset.class);
            return flatMap.map((v1) -> {
                return r1.cast(v1);
            });
        }).orElse(Stream.empty())).collect(Collectors.toList());
        OpenLineage openLineage = this.openLineageContext.getOpenLineage();
        if (list2.isEmpty()) {
            return list2;
        }
        HashMap hashMap = new HashMap();
        list.forEach(obj -> {
            this.inputDatasetFacetBuilders.forEach(customFacetBuilder -> {
                Objects.requireNonNull(hashMap);
                customFacetBuilder.accept(obj, (v1, v2) -> {
                    r2.put(v1, v2);
                });
            });
        });
        HashMap hashMap2 = new HashMap();
        list.forEach(obj2 -> {
            this.inputDatasetFacetBuilders.forEach(customFacetBuilder -> {
                Objects.requireNonNull(hashMap);
                customFacetBuilder.accept(obj2, (v1, v2) -> {
                    r2.put(v1, v2);
                });
            });
        });
        return (List) list2.stream().map(inputDataset -> {
            return openLineage.newInputDatasetBuilder().name(inputDataset.getName()).namespace(inputDataset.getNamespace()).inputFacets((OpenLineage.InputDatasetInputFacets) OpenLineageClientUtils.mergeFacets(hashMap, inputDataset.getInputFacets(), OpenLineage.InputDatasetInputFacets.class)).facets((OpenLineage.DatasetFacets) OpenLineageClientUtils.mergeFacets(hashMap2, inputDataset.getFacets(), OpenLineage.DatasetFacets.class)).build();
        }).collect(Collectors.toList());
    }

    private <D> Function1<LogicalPlan, Collection<D>> visitLogicalPlan(PartialFunction<LogicalPlan, Collection<D>> partialFunction) {
        return (Function1) this.openLineageContext.getMeterRegistry().timer("openlineage.spark.dataset.input.execution.time", new String[0]).record(() -> {
            return ScalaConversionUtils.toScalaFn(logicalPlan -> {
                return (Collection) partialFunction.andThen(ScalaConversionUtils.toScalaFn(collection -> {
                    if (!FacetUtils.isFacetDisabled(this.openLineageContext, "spark_unknown")) {
                        this.unknownEntryFacetListener.accept(logicalPlan);
                    }
                    return collection;
                })).applyOrElse(logicalPlan, ScalaConversionUtils.toScalaFn(logicalPlan -> {
                    return Collections.emptyList();
                }));
            });
        });
    }

    private List<OpenLineage.OutputDataset> buildOutputDatasets(List<Object> list) {
        if (log.isDebugEnabled()) {
            log.debug("Visiting query plan {} with output dataset builders {}", this.openLineageContext.getQueryExecution(), this.outputDatasetBuilders);
        }
        Function1 visitLogicalPlan = visitLogicalPlan(PlanUtils.merge((Collection) this.outputDatasetQueryPlanVisitors.stream().filter(partialFunction -> {
            return partialFunction instanceof QueryPlanVisitor;
        }).filter(partialFunction2 -> {
            return !list.isEmpty() && (list.get(0) instanceof SparkListenerEvent);
        }).filter(partialFunction3 -> {
            return ((QueryPlanVisitor) partialFunction3).isDefinedAt((SparkListenerEvent) list.get(0));
        }).collect(Collectors.toList())));
        List<OpenLineage.OutputDataset> list2 = (List) Stream.concat(buildDatasets(list, this.outputDatasetBuilders), (Stream) this.openLineageContext.getQueryExecution().map(queryExecution -> {
            return (Collection) visitLogicalPlan.apply(queryExecution.optimizedPlan());
        }).map((v0) -> {
            return v0.stream();
        }).orElse(Stream.empty())).collect(Collectors.toList());
        OpenLineage openLineage = this.openLineageContext.getOpenLineage();
        if (list2.isEmpty()) {
            return list2;
        }
        HashMap hashMap = new HashMap();
        list.forEach(obj -> {
            this.outputDatasetFacetBuilders.forEach(customFacetBuilder -> {
                Objects.requireNonNull(hashMap);
                customFacetBuilder.accept(obj, (v1, v2) -> {
                    r2.put(v1, v2);
                });
            });
        });
        HashMap hashMap2 = new HashMap();
        list.forEach(obj2 -> {
            this.datasetFacetBuilders.forEach(customFacetBuilder -> {
                Objects.requireNonNull(hashMap2);
                customFacetBuilder.accept(obj2, (v1, v2) -> {
                    r2.put(v1, v2);
                });
            });
        });
        return (List) list2.stream().map(outputDataset -> {
            HashMap hashMap3 = new HashMap(hashMap2);
            ColumnLevelLineageUtils.buildColumnLineageDatasetFacet((SparkListenerEvent) Optional.of(list.get(0)).filter(obj3 -> {
                return obj3 instanceof SparkListenerEvent;
            }).map(obj4 -> {
                return (SparkListenerEvent) obj4;
            }).orElse(null), this.openLineageContext, outputDataset.getFacets().getSchema()).ifPresent(columnLineageDatasetFacet -> {
                hashMap3.put("columnLineage", columnLineageDatasetFacet);
            });
            return openLineage.newOutputDatasetBuilder().name(outputDataset.getName()).namespace(outputDataset.getNamespace()).outputFacets((OpenLineage.OutputDatasetOutputFacets) OpenLineageClientUtils.mergeFacets(hashMap, outputDataset.getOutputFacets(), OpenLineage.OutputDatasetOutputFacets.class)).facets((OpenLineage.DatasetFacets) OpenLineageClientUtils.mergeFacets(hashMap3, outputDataset.getFacets(), OpenLineage.DatasetFacets.class)).build();
        }).collect(Collectors.toList());
    }

    private <T extends OpenLineage.Dataset> Stream<T> buildDatasets(List<Object> list, Collection<PartialFunction<Object, List<T>>> collection) {
        return (Stream<T>) list.stream().flatMap(obj -> {
            return collection.stream().filter(partialFunction -> {
                return PlanUtils.safeIsDefinedAt(partialFunction, obj);
            }).map(partialFunction2 -> {
                return PlanUtils.safeApply(partialFunction2, obj);
            }).flatMap((v0) -> {
                return v0.stream();
            });
        });
    }

    private OpenLineage.JobFacets buildJobFacets(List<Object> list, Collection<CustomFacetBuilder<?, ? extends OpenLineage.JobFacet>> collection, OpenLineage.JobFacetsBuilder jobFacetsBuilder) {
        list.forEach(obj -> {
            collection.forEach(customFacetBuilder -> {
                this.openLineageContext.getMeterRegistry().timer("openlineage.spark.facets.job.execution.time", "facet.builder", customFacetBuilder.getClass().getCanonicalName()).record(() -> {
                    Objects.requireNonNull(jobFacetsBuilder);
                    customFacetBuilder.accept(obj, jobFacetsBuilder::put);
                });
            });
        });
        return jobFacetsBuilder.build();
    }

    private OpenLineage.RunFacets buildRunFacets(List<Object> list, Collection<CustomFacetBuilder<?, ? extends OpenLineage.RunFacet>> collection, OpenLineage.RunFacetsBuilder runFacetsBuilder) {
        list.forEach(obj -> {
            collection.forEach(customFacetBuilder -> {
                this.openLineageContext.getMeterRegistry().timer("openlineage.spark.facets.run.execution.time", "facet.builder", customFacetBuilder.getClass().getCanonicalName()).record(() -> {
                    Objects.requireNonNull(runFacetsBuilder);
                    customFacetBuilder.accept(obj, runFacetsBuilder::put);
                });
            });
        });
        return runFacetsBuilder.build();
    }

    public OpenLineageRunEventBuilder(@NonNull OpenLineageContext openLineageContext, @NonNull Collection<PartialFunction<Object, List<OpenLineage.InputDataset>>> collection, @NonNull Collection<PartialFunction<LogicalPlan, List<OpenLineage.InputDataset>>> collection2, @NonNull Collection<PartialFunction<Object, List<OpenLineage.OutputDataset>>> collection3, @NonNull Collection<PartialFunction<LogicalPlan, List<OpenLineage.OutputDataset>>> collection4, @NonNull Collection<CustomFacetBuilder<?, ? extends OpenLineage.DatasetFacet>> collection5, @NonNull Collection<CustomFacetBuilder<?, ? extends OpenLineage.InputDatasetFacet>> collection6, @NonNull Collection<CustomFacetBuilder<?, ? extends OpenLineage.OutputDatasetFacet>> collection7, @NonNull Collection<CustomFacetBuilder<?, ? extends OpenLineage.RunFacet>> collection8, @NonNull Collection<CustomFacetBuilder<?, ? extends OpenLineage.JobFacet>> collection9, @NonNull Collection<ColumnLevelLineageVisitor> collection10) {
        this.unknownEntryFacetListener = UnknownEntryFacetListener.getInstance();
        this.jobMap = new HashMap();
        this.stageMap = new HashMap();
        if (openLineageContext == null) {
            throw new NullPointerException("openLineageContext is marked non-null but is null");
        }
        if (collection == null) {
            throw new NullPointerException("inputDatasetBuilders is marked non-null but is null");
        }
        if (collection2 == null) {
            throw new NullPointerException("inputDatasetQueryPlanVisitors is marked non-null but is null");
        }
        if (collection3 == null) {
            throw new NullPointerException("outputDatasetBuilders is marked non-null but is null");
        }
        if (collection4 == null) {
            throw new NullPointerException("outputDatasetQueryPlanVisitors is marked non-null but is null");
        }
        if (collection5 == null) {
            throw new NullPointerException("datasetFacetBuilders is marked non-null but is null");
        }
        if (collection6 == null) {
            throw new NullPointerException("inputDatasetFacetBuilders is marked non-null but is null");
        }
        if (collection7 == null) {
            throw new NullPointerException("outputDatasetFacetBuilders is marked non-null but is null");
        }
        if (collection8 == null) {
            throw new NullPointerException("runFacetBuilders is marked non-null but is null");
        }
        if (collection9 == null) {
            throw new NullPointerException("jobFacetBuilders is marked non-null but is null");
        }
        if (collection10 == null) {
            throw new NullPointerException("columnLineageVisitors is marked non-null but is null");
        }
        this.openLineageContext = openLineageContext;
        this.inputDatasetBuilders = collection;
        this.inputDatasetQueryPlanVisitors = collection2;
        this.outputDatasetBuilders = collection3;
        this.outputDatasetQueryPlanVisitors = collection4;
        this.datasetFacetBuilders = collection5;
        this.inputDatasetFacetBuilders = collection6;
        this.outputDatasetFacetBuilders = collection7;
        this.runFacetBuilders = collection8;
        this.jobFacetBuilders = collection9;
        this.columnLineageVisitors = collection10;
    }
}
