package io.openlineage.spark.agent.lifecycle;

import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import io.openlineage.spark.agent.EventEmitter;
import io.openlineage.spark.agent.filters.EventFilterUtils;
import io.openlineage.spark.agent.util.PlanUtils;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.agent.util.TimeUtils;
import io.openlineage.spark.api.OpenLineageContext;
import io.openlineage.spark.api.naming.JobNameBuilder;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Objects;
import java.util.Optional;
import java.util.Stack;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.JobFailed;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.class */
class SparkSQLExecutionContext implements ExecutionContext {
    private static final Logger log = LoggerFactory.getLogger(SparkSQLExecutionContext.class);
    private static final String NO_EXECUTION_INFO = "No execution info {}";
    private static final String SPARK_JOB_TYPE = "SQL_JOB";
    private static final String SPARK_INTEGRATION = "SPARK";
    private static final String SPARK_PROCESSING_TYPE_BATCH = "BATCH";
    private static final String SPARK_PROCESSING_TYPE_STREAMING = "STREAMING";
    private final long executionId;
    private final OpenLineageContext olContext;
    private final EventEmitter eventEmitter;
    private final OpenLineageRunEventBuilder runEventBuilder;
    private Integer activeJobId;
    private boolean emittedOnSqlExecutionStart = false;
    private boolean emittedOnSqlExecutionEnd = false;
    private boolean emittedOnJobStart = false;
    private boolean emittedOnJobEnd = false;
    private AtomicBoolean finished = new AtomicBoolean(false);
    private SparkSQLQueryParser sqlRecorder = new SparkSQLQueryParser();

    public SparkSQLExecutionContext(long j, EventEmitter eventEmitter, OpenLineageContext openLineageContext, OpenLineageRunEventBuilder openLineageRunEventBuilder) {
        this.executionId = j;
        this.eventEmitter = eventEmitter;
        this.olContext = openLineageContext;
        this.runEventBuilder = openLineageRunEventBuilder;
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void start(SparkListenerSQLExecutionStart sparkListenerSQLExecutionStart) {
        if (log.isDebugEnabled()) {
            log.debug("SparkListenerSQLExecutionStart - executionId: {}", Long.valueOf(sparkListenerSQLExecutionStart.executionId()));
        }
        if (!this.olContext.getQueryExecution().isPresent()) {
            log.info(NO_EXECUTION_INFO, this.olContext);
            return;
        }
        if (EventFilterUtils.isDisabled(this.olContext, sparkListenerSQLExecutionStart)) {
            log.info("OpenLineage received Spark event that is configured to be skipped: SparkListenerSQLExecutionStart");
            return;
        }
        this.olContext.setActiveJobId(this.activeJobId);
        OpenLineage.RunEvent.EventType eventType = this.emittedOnJobStart ? OpenLineage.RunEvent.EventType.RUNNING : OpenLineage.RunEvent.EventType.START;
        this.emittedOnSqlExecutionStart = true;
        OpenLineage.RunEvent buildRun = this.runEventBuilder.buildRun(OpenLineageRunEventContext.builder().applicationParentRunFacet(buildApplicationParentFacet()).event(sparkListenerSQLExecutionStart).runEventBuilder(this.olContext.getOpenLineage().newRunEventBuilder().eventTime(TimeUtils.toZonedTime(sparkListenerSQLExecutionStart.time())).eventType(eventType)).jobBuilder(buildJob()).jobFacetsBuilder(getJobFacetsBuilder(this.olContext.getQueryExecution().get())).build());
        log.debug("Posting event for start {}: {}", Long.valueOf(this.executionId), buildRun);
        this.eventEmitter.emit(buildRun);
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void end(SparkListenerSQLExecutionEnd sparkListenerSQLExecutionEnd) {
        if (log.isDebugEnabled()) {
            log.debug("SparkListenerSQLExecutionEnd - executionId: {}", Long.valueOf(sparkListenerSQLExecutionEnd.executionId()));
        }
        this.olContext.setActiveJobId(this.activeJobId);
        if (!this.olContext.getQueryExecution().isPresent()) {
            log.info(NO_EXECUTION_INFO, this.olContext);
            return;
        }
        if (EventFilterUtils.isDisabled(this.olContext, sparkListenerSQLExecutionEnd)) {
            log.info("OpenLineage received Spark event that is configured to be skipped: SparkListenerSQLExecutionEnd");
            return;
        }
        OpenLineage.RunEvent.EventType eventType = (!this.emittedOnJobStart || this.emittedOnJobEnd) ? OpenLineage.RunEvent.EventType.COMPLETE : OpenLineage.RunEvent.EventType.RUNNING;
        this.emittedOnSqlExecutionEnd = true;
        OpenLineage.RunEvent buildRun = this.runEventBuilder.buildRun(OpenLineageRunEventContext.builder().applicationParentRunFacet(buildApplicationParentFacet()).event(sparkListenerSQLExecutionEnd).runEventBuilder(this.olContext.getOpenLineage().newRunEventBuilder().eventTime(TimeUtils.toZonedTime(sparkListenerSQLExecutionEnd.time())).eventType(eventType)).jobBuilder(buildJob()).jobFacetsBuilder(getJobFacetsBuilder(this.olContext.getQueryExecution().get())).build());
        if (log.isDebugEnabled()) {
            log.debug("Posting event for end {}: {}", Long.valueOf(this.executionId), OpenLineageClientUtils.toJson(buildRun));
        }
        this.eventEmitter.emit(buildRun);
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void start(SparkListenerStageSubmitted sparkListenerStageSubmitted) {
        if (!this.olContext.getQueryExecution().isPresent()) {
            log.info(NO_EXECUTION_INFO, this.olContext);
        } else {
            if (EventFilterUtils.isDisabled(this.olContext, sparkListenerStageSubmitted)) {
                log.info("OpenLineage received Spark event that is configured to be skipped: SparkListenerStageSubmitted");
                return;
            }
            OpenLineage.RunEvent buildRun = this.runEventBuilder.buildRun(OpenLineageRunEventContext.builder().applicationParentRunFacet(buildApplicationParentFacet()).event(sparkListenerStageSubmitted).runEventBuilder(this.olContext.getOpenLineage().newRunEventBuilder().eventTime(ZonedDateTime.now(ZoneOffset.UTC)).eventType(OpenLineage.RunEvent.EventType.RUNNING)).jobBuilder(buildJob()).jobFacetsBuilder(getJobFacetsBuilder(this.olContext.getQueryExecution().get())).build());
            log.debug("Posting event for stage submitted {}: {}", Long.valueOf(this.executionId), buildRun);
            this.eventEmitter.emit(buildRun);
        }
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void end(SparkListenerStageCompleted sparkListenerStageCompleted) {
        if (!this.olContext.getQueryExecution().isPresent()) {
            log.info(NO_EXECUTION_INFO, this.olContext);
        } else {
            if (EventFilterUtils.isDisabled(this.olContext, sparkListenerStageCompleted)) {
                log.info("OpenLineage received Spark event that is configured to be skipped: SparkListenerStageCompleted");
                return;
            }
            OpenLineage.RunEvent buildRun = this.runEventBuilder.buildRun(OpenLineageRunEventContext.builder().applicationParentRunFacet(buildApplicationParentFacet()).event(sparkListenerStageCompleted).runEventBuilder(this.olContext.getOpenLineage().newRunEventBuilder().eventTime(ZonedDateTime.now(ZoneOffset.UTC)).eventType(OpenLineage.RunEvent.EventType.RUNNING)).jobBuilder(buildJob()).jobFacetsBuilder(getJobFacetsBuilder(this.olContext.getQueryExecution().get())).build());
            log.debug("Posting event for stage completed {}: {}", Long.valueOf(this.executionId), buildRun);
            this.eventEmitter.emit(buildRun);
        }
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public Optional<Integer> getActiveJobId() {
        return Optional.ofNullable(this.activeJobId);
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void setActiveJobId(Integer num) {
        this.activeJobId = num;
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void setActiveJob(ActiveJob activeJob) {
        this.olContext.setActiveJobId(Integer.valueOf(activeJob.jobId()));
        this.runEventBuilder.registerJob(activeJob);
        log.debug("Registering jobId: {} into runUid: {}", activeJob, this.olContext.getRunUuid());
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void start(SparkListenerJobStart sparkListenerJobStart) {
        log.debug("SparkListenerJobStart - executionId: {}", Long.valueOf(this.executionId));
        if (!this.olContext.getQueryExecution().isPresent()) {
            log.info(NO_EXECUTION_INFO, this.olContext);
            return;
        }
        if (EventFilterUtils.isDisabled(this.olContext, sparkListenerJobStart)) {
            log.info("OpenLineage received Spark event that is configured to be skipped: SparkListenerJobStart");
            return;
        }
        OpenLineage.RunEvent.EventType eventType = this.emittedOnSqlExecutionStart ? OpenLineage.RunEvent.EventType.RUNNING : OpenLineage.RunEvent.EventType.START;
        this.emittedOnJobStart = true;
        OpenLineage.RunEvent buildRun = this.runEventBuilder.buildRun(OpenLineageRunEventContext.builder().applicationParentRunFacet(buildApplicationParentFacet()).event(sparkListenerJobStart).runEventBuilder(this.olContext.getOpenLineage().newRunEventBuilder().eventTime(TimeUtils.toZonedTime(sparkListenerJobStart.time())).eventType(eventType)).jobBuilder(buildJob()).jobFacetsBuilder(getJobFacetsBuilder(this.olContext.getQueryExecution().get())).build());
        log.debug("Posting event for start {}: {}", Long.valueOf(this.executionId), buildRun);
        this.eventEmitter.emit(buildRun);
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void end(SparkListenerJobEnd sparkListenerJobEnd) {
        log.debug("SparkListenerJobEnd - executionId: {}", Long.valueOf(this.executionId));
        this.olContext.setActiveJobId(Integer.valueOf(sparkListenerJobEnd.jobId()));
        if (!this.finished.compareAndSet(false, true)) {
            log.debug("Event already finished, returning");
            return;
        }
        if (!this.olContext.getQueryExecution().isPresent()) {
            log.info(NO_EXECUTION_INFO, this.olContext);
            return;
        }
        if (EventFilterUtils.isDisabled(this.olContext, sparkListenerJobEnd)) {
            log.info("OpenLineage received Spark event that is configured to be skipped: SparkListenerJobEnd");
            return;
        }
        OpenLineage.RunEvent.EventType eventType = sparkListenerJobEnd.jobResult() instanceof JobFailed ? OpenLineage.RunEvent.EventType.FAIL : (!this.emittedOnSqlExecutionStart || this.emittedOnSqlExecutionEnd) ? OpenLineage.RunEvent.EventType.COMPLETE : OpenLineage.RunEvent.EventType.RUNNING;
        this.emittedOnJobEnd = true;
        OpenLineage.RunEvent buildRun = this.runEventBuilder.buildRun(OpenLineageRunEventContext.builder().applicationParentRunFacet(buildApplicationParentFacet()).event(sparkListenerJobEnd).runEventBuilder(this.olContext.getOpenLineage().newRunEventBuilder().eventTime(TimeUtils.toZonedTime(sparkListenerJobEnd.time())).eventType(eventType)).jobBuilder(buildJob()).jobFacetsBuilder(getJobFacetsBuilder(this.olContext.getQueryExecution().get())).build());
        log.debug("Posting event for end {}: {}", Long.valueOf(this.executionId), buildRun);
        this.eventEmitter.emit(buildRun);
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void start(SparkListenerApplicationStart sparkListenerApplicationStart) {
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void end(SparkListenerApplicationEnd sparkListenerApplicationEnd) {
    }

    private OpenLineage.ParentRunFacet buildApplicationParentFacet() {
        return PlanUtils.parentRunFacet(this.eventEmitter.getApplicationRunId(), this.eventEmitter.getApplicationJobName(), this.eventEmitter.getJobNamespace());
    }

    protected OpenLineage.JobBuilder buildJob() {
        return this.olContext.getOpenLineage().newJobBuilder().name(JobNameBuilder.build(this.olContext)).namespace(this.eventEmitter.getJobNamespace());
    }

    private OpenLineage.JobTypeJobFacet getJobTypeJobFacet(QueryExecution queryExecution) {
        return this.olContext.getOpenLineage().newJobTypeJobFacetBuilder().jobType(SPARK_JOB_TYPE).processingType(queryExecution.optimizedPlan().isStreaming() ? SPARK_PROCESSING_TYPE_STREAMING : SPARK_PROCESSING_TYPE_BATCH).integration(SPARK_INTEGRATION).build();
    }

    private OpenLineage.JobFacetsBuilder getJobFacetsBuilder(QueryExecution queryExecution) {
        OpenLineage.JobFacetsBuilder jobType = this.olContext.getOpenLineage().newJobFacetsBuilder().jobType(getJobTypeJobFacet(queryExecution));
        Optional<OpenLineage.SQLJobFacet> resolveSQLFacets = resolveSQLFacets(queryExecution);
        Objects.requireNonNull(jobType);
        resolveSQLFacets.ifPresent(jobType::sql);
        return jobType;
    }

    Optional<OpenLineage.SQLJobFacet> resolveSQLFacets(QueryExecution queryExecution) {
        LogicalPlan logical = queryExecution.logical();
        String str = null;
        Stack stack = new Stack();
        if (logical != null) {
            stack.add(logical);
        }
        boolean z = false;
        while (!stack.isEmpty() && !z) {
            int size = stack.size();
            while (true) {
                if (size > 0) {
                    LogicalPlan logicalPlan = (LogicalPlan) stack.pop();
                    if (logicalPlan != null) {
                        Optional<String> parse = this.sqlRecorder.parse(logicalPlan);
                        if (logicalPlan.origin() != null && parse.isPresent()) {
                            str = parse.get();
                            z = true;
                            break;
                        }
                        stack.addAll(ScalaConversionUtils.fromSeq(logicalPlan.children()));
                        size--;
                    }
                } else {
                    break;
                }
            }
        }
        return str == null ? Optional.empty() : Optional.of(this.olContext.getOpenLineage().newSQLJobFacetBuilder().query(str).build());
    }
}
