package datadog.trace.instrumentation.spark;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import datadog.trace.api.Config;
import datadog.trace.api.DDTags;
import datadog.trace.api.DDTraceId;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import de.thetaphi.forbiddenapis.SuppressForbidden;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.time.OffsetDateTime;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import net.bytebuddy.utility.JavaConstant;
import org.apache.spark.ExceptionFailure;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskFailedReason;
import org.apache.spark.scheduler.JobFailed;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.scheduler.SparkListenerExecutorAdded;
import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.scheduler.StageInfo;
import org.apache.spark.sql.execution.SQLExecution;
import org.apache.spark.sql.execution.streaming.MicroBatchExecution;
import org.apache.spark.sql.execution.streaming.StreamExecution;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
import org.apache.spark.sql.streaming.SourceProgress;
import org.apache.spark.sql.streaming.StateOperatorProgress;
import org.apache.spark.sql.streaming.StreamingQueryListener;
import org.apache.spark.sql.streaming.StreamingQueryProgress;
import scala.Tuple2;

/* loaded from: input_file:inst/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.classdata */
public abstract class AbstractDatadogSparkListener extends SparkListener {
    public static volatile AbstractDatadogSparkListener listener = null;
    public static volatile boolean finishTraceOnApplicationEnd = true;
    private final SparkConf sparkConf;
    private final String sparkVersion;
    private final String appId;
    private AgentSpan applicationSpan;
    private SparkListenerApplicationStart applicationStart;
    private final boolean isRunningOnDatabricks;
    private final String databricksClusterName;
    private final String databricksServiceName;
    private String lastJobFailedMessage;
    private String lastJobFailedStackTrace;
    private final int MAX_COLLECTION_SIZE = 1000;
    private final String RUNTIME_TAGS_PREFIX = "spark.datadog.tags.";
    private final HashMap<String, AgentSpan> streamingBatchSpans = new HashMap<>();
    private final HashMap<Long, AgentSpan> sqlSpans = new HashMap<>();
    private final HashMap<Integer, AgentSpan> jobSpans = new HashMap<>();
    private final HashMap<Long, AgentSpan> stageSpans = new HashMap<>();
    private final HashMap<Integer, Integer> stageToJob = new HashMap<>();
    private final HashMap<Long, Properties> stageProperties = new HashMap<>();
    private final SparkAggregatedTaskMetrics applicationMetrics = new SparkAggregatedTaskMetrics();
    private final HashMap<String, SparkAggregatedTaskMetrics> streamingBatchMetrics = new HashMap<>();
    private final HashMap<Long, SparkAggregatedTaskMetrics> sqlMetrics = new HashMap<>();
    private final HashMap<Integer, SparkAggregatedTaskMetrics> jobMetrics = new HashMap<>();
    private final HashMap<Long, SparkAggregatedTaskMetrics> stageMetrics = new HashMap<>();
    private final HashMap<UUID, StreamingQueryListener.QueryStartedEvent> streamingQueries = new HashMap<>();
    private final HashMap<Long, SparkListenerSQLExecutionStart> sqlQueries = new HashMap<>();
    private final HashMap<String, SparkListenerExecutorAdded> liveExecutors = new HashMap<>();
    private boolean lastJobFailed = false;
    private int jobCount = 0;
    private int currentExecutorCount = 0;
    private int maxExecutorCount = 0;
    private long availableExecutorTime = 0;
    private boolean applicationEnded = false;
    private final AgentTracer.TracerAPI tracer = AgentTracer.get();

    public AbstractDatadogSparkListener(SparkConf sparkConf, String str, String str2) {
        this.sparkConf = sparkConf;
        this.appId = str;
        this.sparkVersion = str2;
        this.isRunningOnDatabricks = sparkConf.contains("spark.databricks.sparkContextId");
        this.databricksClusterName = sparkConf.get("spark.databricks.clusterUsageTags.clusterName", (String) null);
        this.databricksServiceName = getDatabricksServiceName(sparkConf, this.databricksClusterName);
    }

    protected abstract String getSparkJobName(SparkListenerJobStart sparkListenerJobStart);

    protected abstract ArrayList<Integer> getSparkJobStageIds(SparkListenerJobStart sparkListenerJobStart);

    protected abstract int getStageCount(SparkListenerJobStart sparkListenerJobStart);

    public synchronized void onApplicationStart(SparkListenerApplicationStart sparkListenerApplicationStart) {
        this.applicationStart = sparkListenerApplicationStart;
    }

    private void initApplicationSpanIfNotInitialized() {
        if (this.applicationSpan != null) {
            return;
        }
        AgentTracer.SpanBuilder buildSparkSpan = buildSparkSpan("spark.application", null);
        if (this.applicationStart != null) {
            buildSparkSpan.withStartTimestamp(this.applicationStart.time() * 1000).withTag("application_name", this.applicationStart.appName()).withTag("spark_user", this.applicationStart.sparkUser());
            if (this.applicationStart.appAttemptId().isDefined()) {
                buildSparkSpan.withTag("app_attempt_id", (String) this.applicationStart.appAttemptId().get());
            }
        }
        captureApplicationParameters(buildSparkSpan);
        this.applicationSpan = buildSparkSpan.start();
        this.applicationSpan.setMeasured(true);
    }

    public void onApplicationEnd(SparkListenerApplicationEnd sparkListenerApplicationEnd) {
        if (finishTraceOnApplicationEnd) {
            finishApplication(sparkListenerApplicationEnd.time(), null, 0, null);
        }
    }

    public synchronized void finishApplication(long j, Throwable th, int i, String str) {
        if (this.applicationEnded) {
            return;
        }
        this.applicationEnded = true;
        if (this.applicationSpan != null || this.jobCount <= 0) {
            initApplicationSpanIfNotInitialized();
            if (th != null) {
                this.applicationSpan.addThrowable(th);
            } else if (i != 0) {
                this.applicationSpan.setError(true);
                this.applicationSpan.setTag(DDTags.ERROR_TYPE, "Spark Application Failed with exit code " + i);
                this.applicationSpan.setTag(DDTags.ERROR_MSG, getErrorMessageWithoutStackTrace(str));
                this.applicationSpan.setTag(DDTags.ERROR_STACK, str);
            } else if (this.lastJobFailed) {
                this.applicationSpan.setError(true);
                this.applicationSpan.setTag(DDTags.ERROR_TYPE, "Spark Application Failed");
                this.applicationSpan.setTag(DDTags.ERROR_MSG, this.lastJobFailedMessage);
                this.applicationSpan.setTag(DDTags.ERROR_STACK, this.lastJobFailedStackTrace);
            }
            this.applicationMetrics.setSpanMetrics(this.applicationSpan);
            this.applicationSpan.setMetric((CharSequence) "spark.max_executor_count", this.maxExecutorCount);
            this.applicationSpan.setMetric((CharSequence) "spark.available_executor_time", computeCurrentAvailableExecutorTime(j));
            this.applicationSpan.finish(j * 1000);
        }
    }

    private AgentSpan getOrCreateStreamingBatchSpan(String str, Long l, Properties properties) {
        AgentSpan agentSpan = this.streamingBatchSpans.get(str);
        if (agentSpan != null) {
            return agentSpan;
        }
        AgentTracer.SpanBuilder withStartTimestamp = buildSparkSpan("spark.streaming_batch", properties).withStartTimestamp(l.longValue() * 1000);
        captureApplicationParameters(withStartTimestamp);
        captureJobParameters(withStartTimestamp, properties);
        if (this.isRunningOnDatabricks) {
            addDatabricksSpecificTags(withStartTimestamp, properties, false);
        }
        AgentSpan start = withStartTimestamp.start();
        this.streamingBatchSpans.put(str, start);
        return start;
    }

    private void addDatabricksSpecificTags(AgentTracer.SpanBuilder spanBuilder, Properties properties, boolean z) {
        captureApplicationParameters(spanBuilder);
        captureJobParameters(spanBuilder, properties);
        if (properties != null) {
            String databricksJobId = getDatabricksJobId(properties);
            String databricksJobRunId = getDatabricksJobRunId(properties, this.databricksClusterName);
            String databricksTaskRunId = getDatabricksTaskRunId(properties);
            spanBuilder.withTag("databricks_job_id", databricksJobId);
            spanBuilder.withTag("databricks_job_run_id", databricksJobRunId);
            spanBuilder.withTag("databricks_task_run_id", databricksTaskRunId);
            if (z) {
                DatabricksParentContext databricksParentContext = new DatabricksParentContext(databricksJobId, databricksJobRunId, databricksTaskRunId);
                if (databricksParentContext.getTraceId() != DDTraceId.ZERO) {
                    spanBuilder.asChildOf(databricksParentContext);
                }
            }
        }
    }

    private AgentSpan getOrCreateSqlSpan(long j, String str, Properties properties) {
        AgentSpan agentSpan = this.sqlSpans.get(Long.valueOf(j));
        if (agentSpan != null) {
            return agentSpan;
        }
        SparkListenerSQLExecutionStart sparkListenerSQLExecutionStart = this.sqlQueries.get(Long.valueOf(j));
        if (sparkListenerSQLExecutionStart == null) {
            return null;
        }
        AgentTracer.SpanBuilder withTag = buildSparkSpan("spark.sql", properties).withStartTimestamp(sparkListenerSQLExecutionStart.time() * 1000).withTag("query_id", (Number) Long.valueOf(j)).withTag("description", sparkListenerSQLExecutionStart.description()).withTag("details", sparkListenerSQLExecutionStart.details()).withTag(DDTags.RESOURCE_NAME, sparkListenerSQLExecutionStart.description());
        if (str != null) {
            withTag.asChildOf(getOrCreateStreamingBatchSpan(str, Long.valueOf(sparkListenerSQLExecutionStart.time()), properties).context());
        } else if (this.isRunningOnDatabricks) {
            addDatabricksSpecificTags(withTag, properties, true);
        } else {
            initApplicationSpanIfNotInitialized();
            withTag.asChildOf(this.applicationSpan.context());
        }
        AgentSpan start = withTag.start();
        this.sqlSpans.put(Long.valueOf(j), start);
        return start;
    }

    public synchronized void onJobStart(SparkListenerJobStart sparkListenerJobStart) {
        this.jobCount++;
        if (this.jobSpans.size() > 1000) {
            return;
        }
        AgentTracer.SpanBuilder withTag = buildSparkSpan("spark.job", sparkListenerJobStart.properties()).withStartTimestamp(sparkListenerJobStart.time() * 1000).withTag("job_id", (Number) Integer.valueOf(sparkListenerJobStart.jobId())).withTag("stage_count", (Number) Integer.valueOf(getStageCount(sparkListenerJobStart)));
        String streamingBatchKey = getStreamingBatchKey(sparkListenerJobStart.properties());
        Long sqlExecutionId = getSqlExecutionId(sparkListenerJobStart.properties());
        AgentSpan agentSpan = null;
        if (sqlExecutionId != null) {
            agentSpan = getOrCreateSqlSpan(sqlExecutionId.longValue(), streamingBatchKey, sparkListenerJobStart.properties());
        }
        if (agentSpan != null) {
            withTag.asChildOf(agentSpan.context());
        } else if (streamingBatchKey != null) {
            withTag.asChildOf(getOrCreateStreamingBatchSpan(streamingBatchKey, Long.valueOf(sparkListenerJobStart.time()), sparkListenerJobStart.properties()).context());
        } else if (this.isRunningOnDatabricks) {
            addDatabricksSpecificTags(withTag, sparkListenerJobStart.properties(), true);
        } else {
            initApplicationSpanIfNotInitialized();
            withTag.asChildOf(this.applicationSpan.context());
        }
        withTag.withTag(DDTags.RESOURCE_NAME, getSparkJobName(sparkListenerJobStart));
        captureJobParameters(withTag, sparkListenerJobStart.properties());
        AgentSpan start = withTag.start();
        start.setMeasured(true);
        Iterator<Integer> it = getSparkJobStageIds(sparkListenerJobStart).iterator();
        while (it.hasNext()) {
            this.stageToJob.put(Integer.valueOf(it.next().intValue()), Integer.valueOf(sparkListenerJobStart.jobId()));
        }
        this.jobSpans.put(Integer.valueOf(sparkListenerJobStart.jobId()), start);
    }

    public synchronized void onJobEnd(SparkListenerJobEnd sparkListenerJobEnd) {
        AgentSpan remove = this.jobSpans.remove(Integer.valueOf(sparkListenerJobEnd.jobId()));
        if (remove == null) {
            return;
        }
        this.lastJobFailed = false;
        if (sparkListenerJobEnd.jobResult() instanceof JobFailed) {
            Exception exception = sparkListenerJobEnd.jobResult().exception();
            String errorMessageWithoutStackTrace = getErrorMessageWithoutStackTrace(exception.getMessage());
            String stackTraceToString = stackTraceToString(exception);
            remove.setError(true);
            remove.setErrorMessage(errorMessageWithoutStackTrace);
            remove.setTag(DDTags.ERROR_STACK, stackTraceToString);
            remove.setTag(DDTags.ERROR_TYPE, "Spark Job Failed");
            this.lastJobFailed = true;
            this.lastJobFailedMessage = errorMessageWithoutStackTrace;
            this.lastJobFailedStackTrace = stackTraceToString;
        }
        SparkAggregatedTaskMetrics remove2 = this.jobMetrics.remove(Integer.valueOf(sparkListenerJobEnd.jobId()));
        if (remove2 != null) {
            remove2.setSpanMetrics(remove);
        }
        remove.finish(sparkListenerJobEnd.time() * 1000);
    }

    public synchronized void onStageSubmitted(SparkListenerStageSubmitted sparkListenerStageSubmitted) {
        AgentSpan agentSpan;
        if (this.stageSpans.size() > 1000) {
            return;
        }
        int stageId = sparkListenerStageSubmitted.stageInfo().stageId();
        int attemptNumber = sparkListenerStageSubmitted.stageInfo().attemptNumber();
        Integer num = this.stageToJob.get(Integer.valueOf(stageId));
        if (num == null || (agentSpan = this.jobSpans.get(num)) == null) {
            return;
        }
        long longValue = sparkListenerStageSubmitted.stageInfo().submissionTime().isDefined() ? ((Long) sparkListenerStageSubmitted.stageInfo().submissionTime().get()).longValue() : System.currentTimeMillis();
        long stageSpanKey = stageSpanKey(stageId, attemptNumber);
        this.stageMetrics.put(Long.valueOf(stageSpanKey), new SparkAggregatedTaskMetrics(computeCurrentAvailableExecutorTime(longValue)));
        this.stageProperties.put(Long.valueOf(stageSpanKey), sparkListenerStageSubmitted.properties());
        AgentSpan start = buildSparkSpan("spark.stage", sparkListenerStageSubmitted.properties()).asChildOf(agentSpan.context()).withStartTimestamp(longValue * 1000).withTag("stage_id", (Number) Integer.valueOf(stageId)).withTag("task_count", (Number) Integer.valueOf(sparkListenerStageSubmitted.stageInfo().numTasks())).withTag("attempt_id", (Number) Integer.valueOf(attemptNumber)).withTag("details", sparkListenerStageSubmitted.stageInfo().details()).withTag(DDTags.RESOURCE_NAME, sparkListenerStageSubmitted.stageInfo().name()).start();
        start.setMeasured(true);
        this.stageSpans.put(Long.valueOf(stageSpanKey(stageId, attemptNumber)), start);
    }

    public synchronized void onStageCompleted(SparkListenerStageCompleted sparkListenerStageCompleted) {
        StageInfo stageInfo = sparkListenerStageCompleted.stageInfo();
        int stageId = stageInfo.stageId();
        int attemptNumber = stageInfo.attemptNumber();
        Integer num = this.stageToJob.get(Integer.valueOf(stageId));
        if (num == null) {
            return;
        }
        long stageSpanKey = stageSpanKey(stageId, attemptNumber);
        AgentSpan remove = this.stageSpans.remove(Long.valueOf(stageSpanKey));
        if (remove == null) {
            return;
        }
        if (stageInfo.failureReason().isDefined()) {
            remove.setError(true);
            remove.setErrorMessage(getErrorMessageWithoutStackTrace((String) stageInfo.failureReason().get()));
            remove.setTag(DDTags.ERROR_STACK, (String) stageInfo.failureReason().get());
            remove.setTag(DDTags.ERROR_TYPE, "Spark Stage Failed");
        }
        long longValue = sparkListenerStageCompleted.stageInfo().completionTime().isDefined() ? ((Long) sparkListenerStageCompleted.stageInfo().completionTime().get()).longValue() : System.currentTimeMillis();
        long computeCurrentAvailableExecutorTime = computeCurrentAvailableExecutorTime(longValue);
        Iterator<SparkAggregatedTaskMetrics> it = this.stageMetrics.values().iterator();
        while (it.hasNext()) {
            it.next().allocateAvailableExecutorTime(computeCurrentAvailableExecutorTime);
        }
        SparkAggregatedTaskMetrics remove2 = this.stageMetrics.remove(Long.valueOf(stageSpanKey));
        if (remove2 != null) {
            remove2.computeSkew();
            remove2.setSpanMetrics(remove);
            this.applicationMetrics.accumulateStageMetrics(remove2);
            this.jobMetrics.computeIfAbsent(num, num2 -> {
                return new SparkAggregatedTaskMetrics();
            }).accumulateStageMetrics(remove2);
            Properties remove3 = this.stageProperties.remove(Long.valueOf(stageSpanKey));
            String streamingBatchKey = getStreamingBatchKey(remove3);
            if (streamingBatchKey != null) {
                this.streamingBatchMetrics.computeIfAbsent(streamingBatchKey, str -> {
                    return new SparkAggregatedTaskMetrics();
                }).accumulateStageMetrics(remove2);
            }
            Long sqlExecutionId = getSqlExecutionId(remove3);
            if (sqlExecutionId != null) {
                this.sqlMetrics.computeIfAbsent(sqlExecutionId, l -> {
                    return new SparkAggregatedTaskMetrics();
                }).accumulateStageMetrics(remove2);
            }
        }
        remove.finish(longValue * 1000);
    }

    public void onTaskEnd(SparkListenerTaskEnd sparkListenerTaskEnd) {
        AgentSpan agentSpan;
        long stageSpanKey = stageSpanKey(sparkListenerTaskEnd.stageId(), sparkListenerTaskEnd.stageAttemptId());
        SparkAggregatedTaskMetrics sparkAggregatedTaskMetrics = this.stageMetrics.get(Long.valueOf(stageSpanKey));
        if (sparkAggregatedTaskMetrics != null) {
            sparkAggregatedTaskMetrics.addTaskMetrics(sparkListenerTaskEnd);
        }
        if (sparkListenerTaskEnd.taskMetrics() != null) {
            long computeTaskRunTime = SparkAggregatedTaskMetrics.computeTaskRunTime(sparkListenerTaskEnd.taskMetrics());
            Iterator<SparkAggregatedTaskMetrics> it = this.stageMetrics.values().iterator();
            while (it.hasNext()) {
                it.next().recordTotalTaskRunTime(computeTaskRunTime);
            }
        }
        if ((sparkListenerTaskEnd.reason() instanceof TaskFailedReason) && sparkListenerTaskEnd.reason().countTowardsTaskFailures() && (agentSpan = this.stageSpans.get(Long.valueOf(stageSpanKey))) != null) {
            sendTaskSpan(agentSpan, sparkListenerTaskEnd, this.stageProperties.get(Long.valueOf(stageSpanKey)));
        }
    }

    private void sendTaskSpan(AgentSpan agentSpan, SparkListenerTaskEnd sparkListenerTaskEnd, Properties properties) {
        AgentSpan start = buildSparkSpan("spark.task", properties).asChildOf(agentSpan.context()).withStartTimestamp(sparkListenerTaskEnd.taskInfo().launchTime() * 1000).withTag("task_id", (Number) Long.valueOf(sparkListenerTaskEnd.taskInfo().taskId())).withTag("task_attempt_id", (Number) Integer.valueOf(sparkListenerTaskEnd.taskInfo().attemptNumber())).withTag("task_type", sparkListenerTaskEnd.taskType()).withTag("stage_id", (Number) Integer.valueOf(sparkListenerTaskEnd.stageId())).withTag("stage_attempt_id", (Number) Integer.valueOf(sparkListenerTaskEnd.stageAttemptId())).withTag("executor_id", sparkListenerTaskEnd.taskInfo().executorId()).withTag("host", sparkListenerTaskEnd.taskInfo().host()).withTag("task_locality", sparkListenerTaskEnd.taskInfo().taskLocality().toString()).withTag("speculative", sparkListenerTaskEnd.taskInfo().speculative()).withTag("status", sparkListenerTaskEnd.taskInfo().status()).start();
        if (sparkListenerTaskEnd.reason() instanceof TaskFailedReason) {
            ExceptionFailure exceptionFailure = (TaskFailedReason) sparkListenerTaskEnd.reason();
            start.setError(true);
            start.setTag(DDTags.ERROR_TYPE, "Spark Task Failed");
            if (exceptionFailure instanceof ExceptionFailure) {
                ExceptionFailure exceptionFailure2 = exceptionFailure;
                start.setErrorMessage(String.format("%s: %s", exceptionFailure2.className(), exceptionFailure2.description()));
                start.setTag(DDTags.ERROR_STACK, exceptionFailure2.fullStackTrace());
            } else {
                start.setErrorMessage(exceptionFailure.toErrorString());
            }
            start.setTag("count_towards_task_failures", exceptionFailure.countTowardsTaskFailures());
        }
        start.finish(sparkListenerTaskEnd.taskInfo().finishTime() * 1000);
    }

    public synchronized void onExecutorAdded(SparkListenerExecutorAdded sparkListenerExecutorAdded) {
        this.currentExecutorCount++;
        this.maxExecutorCount = Math.max(this.maxExecutorCount, this.currentExecutorCount);
        if (this.liveExecutors.size() <= 1000) {
            this.liveExecutors.put(sparkListenerExecutorAdded.executorId(), sparkListenerExecutorAdded);
        }
    }

    public synchronized void onExecutorRemoved(SparkListenerExecutorRemoved sparkListenerExecutorRemoved) {
        this.currentExecutorCount--;
        SparkListenerExecutorAdded remove = this.liveExecutors.remove(sparkListenerExecutorRemoved.executorId());
        if (remove != null) {
            this.availableExecutorTime += (sparkListenerExecutorRemoved.time() - remove.time()) * remove.executorInfo().totalCores();
        }
    }

    public void onOtherEvent(SparkListenerEvent sparkListenerEvent) {
        if (sparkListenerEvent instanceof StreamingQueryListener.QueryStartedEvent) {
            onStreamingQueryStartedEvent((StreamingQueryListener.QueryStartedEvent) sparkListenerEvent);
            return;
        }
        if (sparkListenerEvent instanceof StreamingQueryListener.QueryProgressEvent) {
            onStreamingQueryProgressEvent((StreamingQueryListener.QueryProgressEvent) sparkListenerEvent);
            return;
        }
        if (sparkListenerEvent instanceof StreamingQueryListener.QueryTerminatedEvent) {
            onStreamingQueryTerminatedEvent((StreamingQueryListener.QueryTerminatedEvent) sparkListenerEvent);
        } else if (sparkListenerEvent instanceof SparkListenerSQLExecutionStart) {
            onSQLExecutionStart((SparkListenerSQLExecutionStart) sparkListenerEvent);
        } else if (sparkListenerEvent instanceof SparkListenerSQLExecutionEnd) {
            onSQLExecutionEnd((SparkListenerSQLExecutionEnd) sparkListenerEvent);
        }
    }

    private synchronized void onSQLExecutionStart(SparkListenerSQLExecutionStart sparkListenerSQLExecutionStart) {
        this.sqlQueries.put(Long.valueOf(sparkListenerSQLExecutionStart.executionId()), sparkListenerSQLExecutionStart);
    }

    private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sparkListenerSQLExecutionEnd) {
        AgentSpan remove = this.sqlSpans.remove(Long.valueOf(sparkListenerSQLExecutionEnd.executionId()));
        SparkAggregatedTaskMetrics remove2 = this.sqlMetrics.remove(Long.valueOf(sparkListenerSQLExecutionEnd.executionId()));
        this.sqlQueries.remove(Long.valueOf(sparkListenerSQLExecutionEnd.executionId()));
        if (remove != null) {
            if (remove2 != null) {
                remove2.setSpanMetrics(remove);
            }
            remove.finish(sparkListenerSQLExecutionEnd.time() * 1000);
        }
    }

    private synchronized void onStreamingQueryStartedEvent(StreamingQueryListener.QueryStartedEvent queryStartedEvent) {
        if (this.streamingQueries.size() > 1000) {
            return;
        }
        this.streamingQueries.put(queryStartedEvent.id(), queryStartedEvent);
    }

    private synchronized void onStreamingQueryTerminatedEvent(StreamingQueryListener.QueryTerminatedEvent queryTerminatedEvent) {
        StreamingQueryListener.QueryStartedEvent remove = this.streamingQueries.remove(queryTerminatedEvent.id());
        ArrayList arrayList = new ArrayList();
        for (String str : this.streamingBatchSpans.keySet()) {
            if (str.startsWith(queryTerminatedEvent.id().toString())) {
                arrayList.add(str);
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            String str2 = (String) it.next();
            AgentSpan remove2 = this.streamingBatchSpans.remove(str2);
            SparkAggregatedTaskMetrics remove3 = this.streamingBatchMetrics.remove(str2);
            if (remove2 != null) {
                if (remove3 != null) {
                    remove3.setSpanMetrics(remove2);
                }
                remove2.setTag("streaming_query.id", queryTerminatedEvent.id());
                remove2.setTag("streaming_query.run_id", queryTerminatedEvent.runId());
                remove2.setTag("streaming_query.batch_id", getBatchIdFromBatchKey(str2));
                if (remove != null) {
                    remove2.setTag("streaming_query.name", remove.name());
                    remove2.setTag(DDTags.RESOURCE_NAME, remove.name());
                }
                if (queryTerminatedEvent.exception().isDefined()) {
                    String str3 = (String) queryTerminatedEvent.exception().get();
                    remove2.setError(true);
                    remove2.setErrorMessage(getErrorMessageWithoutStackTrace(str3));
                    remove2.setTag(DDTags.ERROR_STACK, str3);
                }
                remove2.finish();
            }
        }
    }

    private synchronized void onStreamingQueryProgressEvent(StreamingQueryListener.QueryProgressEvent queryProgressEvent) {
        StreamingQueryProgress progress = queryProgressEvent.progress();
        String streamingBatchKey = getStreamingBatchKey(progress.id().toString(), String.valueOf(progress.batchId()));
        AgentSpan remove = this.streamingBatchSpans.remove(streamingBatchKey);
        SparkAggregatedTaskMetrics remove2 = this.streamingBatchMetrics.remove(streamingBatchKey);
        if (remove != null) {
            if (remove2 != null) {
                remove2.setSpanMetrics(remove);
            }
            remove.setTag("streaming_query.id", progress.id());
            remove.setTag("streaming_query.run_id", progress.runId());
            remove.setTag("streaming_query.batch_id", progress.batchId());
            remove.setTag("streaming_query.name", progress.name());
            remove.setTag(DDTags.RESOURCE_NAME, progress.name());
            remove.setMetric((CharSequence) "spark.num_input_rows", progress.numInputRows());
            remove.setMetric((CharSequence) "spark.input_rows_per_second", progress.inputRowsPerSecond());
            remove.setMetric((CharSequence) "spark.processed_rows_per_second", progress.processedRowsPerSecond());
            Long convertStringDateToMillis = convertStringDateToMillis((String) progress.eventTime().get("watermark"));
            if (convertStringDateToMillis != null) {
                remove.setMetric((CharSequence) "spark.event_time.watermark", convertStringDateToMillis.longValue());
                Long convertStringDateToMillis2 = convertStringDateToMillis(progress.timestamp());
                if (convertStringDateToMillis.longValue() > 0 && convertStringDateToMillis2 != null) {
                    remove.setMetric((CharSequence) "spark.event_time.watermark_gap", convertStringDateToMillis2.longValue() - convertStringDateToMillis.longValue());
                }
            }
            Long convertStringDateToMillis3 = convertStringDateToMillis((String) progress.eventTime().get("max"));
            if (convertStringDateToMillis3 != null) {
                remove.setMetric((CharSequence) "spark.event_time.max", convertStringDateToMillis3.longValue());
            }
            Long convertStringDateToMillis4 = convertStringDateToMillis((String) progress.eventTime().get("min"));
            if (convertStringDateToMillis4 != null) {
                remove.setMetric((CharSequence) "spark.event_time.min", convertStringDateToMillis4.longValue());
            }
            Long l = (Long) progress.durationMs().get("addBatch");
            if (l != null) {
                remove.setMetric((CharSequence) "spark.add_batch_duration", l.longValue());
            }
            Long l2 = (Long) progress.durationMs().get("getBatch");
            if (l2 != null) {
                remove.setMetric((CharSequence) "spark.get_batch_duration", l2.longValue());
            }
            Long l3 = (Long) progress.durationMs().get("latestOffset");
            if (l3 != null) {
                remove.setMetric((CharSequence) "spark.latest_offset_duration", l3.longValue());
            }
            Long l4 = (Long) progress.durationMs().get("queryPlanning");
            if (l4 != null) {
                remove.setMetric((CharSequence) "spark.query_planing_duration", l4.longValue());
            }
            Long l5 = (Long) progress.durationMs().get("triggerExecution");
            if (l5 != null) {
                remove.setMetric((CharSequence) "spark.trigger_execution_duration", l5.longValue());
            }
            Long l6 = (Long) progress.durationMs().get("walCommit");
            if (l6 != null) {
                remove.setMetric((CharSequence) "spark.wal_commit_duration", l6.longValue());
            }
            for (int i = 0; i < progress.sources().length; i++) {
                SourceProgress sourceProgress = progress.sources()[i];
                String str = "spark.source." + i + ".";
                remove.setTag(str + "description", sourceProgress.description());
                remove.setTag(str + "start_offset", sourceProgress.startOffset());
                remove.setTag(str + "end_offset", sourceProgress.endOffset());
                remove.setTag(str + "num_input_rows", sourceProgress.numInputRows());
                remove.setTag(str + "input_rows_per_second", sourceProgress.inputRowsPerSecond());
                remove.setTag(str + "processed_rows_per_second", sourceProgress.processedRowsPerSecond());
            }
            for (int i2 = 0; i2 < progress.stateOperators().length; i2++) {
                StateOperatorProgress stateOperatorProgress = progress.stateOperators()[i2];
                String str2 = "spark.state." + i2 + ".";
                remove.setTag(str2 + "num_rows_total", stateOperatorProgress.numRowsTotal());
                remove.setTag(str2 + "num_rows_updated", stateOperatorProgress.numRowsUpdated());
                remove.setTag(str2 + "memory_used_bytes", stateOperatorProgress.memoryUsedBytes());
            }
            remove.setTag("spark.sink.description", progress.sink().description());
            remove.finish();
        }
    }

    private AgentTracer.SpanBuilder buildSparkSpan(String str, Properties properties) {
        AgentTracer.SpanBuilder withTag = this.tracer.buildSpan(str).withSpanType("spark").withTag("app_id", this.appId);
        if (this.databricksServiceName != null) {
            withTag.withServiceName(this.databricksServiceName);
        }
        addPropertiesTags(withTag, properties);
        return withTag;
    }

    private void addPropertiesTags(AgentTracer.SpanBuilder spanBuilder, Properties properties) {
        if (properties == null) {
            return;
        }
        for (String str : properties.stringPropertyNames()) {
            if (str.startsWith("spark.datadog.tags.")) {
                spanBuilder.withTag(str.substring("spark.datadog.tags.".length()), properties.getProperty(str));
            }
        }
    }

    private long stageSpanKey(int i, int i2) {
        return (i << 32) + i2;
    }

    @SuppressForbidden
    private static String getDatabricksJobId(Properties properties) {
        String property = properties.getProperty("spark.databricks.job.id");
        if (property != null) {
            return property;
        }
        String property2 = properties.getProperty("spark.jobGroup.id");
        if (property2 != null) {
            int indexOf = property2.indexOf("job-");
            int indexOf2 = property2.indexOf("-run", indexOf);
            if (indexOf != -1 && indexOf2 != -1) {
                return property2.substring(indexOf + 4, indexOf2);
            }
        }
        String property3 = properties.getProperty("spark.databricks.workload.id");
        if (property3 == null) {
            return null;
        }
        String[] split = property3.split("-");
        if (split.length > 1) {
            return split[1];
        }
        return null;
    }

    @SuppressForbidden
    private static String getDatabricksJobRunId(Properties properties, String str) {
        String property = properties.getProperty("spark.databricks.job.parentRunId");
        if (property != null) {
            return property;
        }
        String property2 = properties.getProperty("spark.databricks.clusterUsageTags.clusterName");
        String str2 = property2 == null ? str : property2;
        if (str2 == null) {
            return null;
        }
        String[] split = str2.split("-");
        if (split.length > 3) {
            return split[3];
        }
        return null;
    }

    @SuppressForbidden
    private static String getDatabricksTaskRunId(Properties properties) {
        String property = properties.getProperty("spark.databricks.job.runId");
        if (property != null) {
            return property;
        }
        String property2 = properties.getProperty("spark.jobGroup.id");
        if (property2 != null) {
            int indexOf = property2.indexOf("run-");
            int indexOf2 = property2.indexOf("-action", indexOf);
            if (indexOf != -1 && indexOf2 != -1) {
                return property2.substring(indexOf + 4, indexOf2);
            }
        }
        String property3 = properties.getProperty("spark.databricks.workload.id");
        if (property3 == null) {
            return null;
        }
        String[] split = property3.split("-");
        if (split.length > 2) {
            return split[2];
        }
        return null;
    }

    private String stackTraceToString(Throwable th) {
        StringWriter stringWriter = new StringWriter();
        th.printStackTrace(new PrintWriter(stringWriter));
        return stringWriter.toString();
    }

    private String getErrorMessageWithoutStackTrace(String str) {
        if (str == null) {
            return null;
        }
        int indexOf = str.indexOf("\tat ");
        return indexOf != -1 ? str.substring(0, indexOf) : str;
    }

    private long computeCurrentAvailableExecutorTime(long j) {
        long j2 = this.availableExecutorTime;
        Iterator<SparkListenerExecutorAdded> it = this.liveExecutors.values().iterator();
        while (it.hasNext()) {
            j2 += (j - it.next().time()) * r0.executorInfo().totalCores();
        }
        return j2;
    }

    private void captureApplicationParameters(AgentTracer.SpanBuilder spanBuilder) {
        for (Tuple2 tuple2 : this.sparkConf.getAll()) {
            if (SparkConfAllowList.canCaptureApplicationParameter((String) tuple2._1)) {
                spanBuilder.withTag("config." + ((String) tuple2._1).replace(".", JavaConstant.Dynamic.DEFAULT_NAME), (String) tuple2._2);
            }
        }
        spanBuilder.withTag("config.spark_version", this.sparkVersion);
    }

    private void captureJobParameters(AgentTracer.SpanBuilder spanBuilder, Properties properties) {
        if (properties != null) {
            for (Map.Entry entry : properties.entrySet()) {
                if (SparkConfAllowList.canCaptureJobParameter(entry.getKey().toString())) {
                    spanBuilder.withTag("config." + entry.getKey().toString().replace('.', '_'), entry.getValue());
                }
            }
        }
        spanBuilder.withTag("config.spark_version", this.sparkVersion);
    }

    private static Long getSqlExecutionId(Properties properties) {
        String property;
        if (properties == null || (property = properties.getProperty(SQLExecution.EXECUTION_ID_KEY())) == null) {
            return null;
        }
        try {
            return Long.valueOf(Long.parseLong(property));
        } catch (NumberFormatException e) {
            return null;
        }
    }

    private static Long convertStringDateToMillis(String str) {
        if (str == null) {
            return null;
        }
        try {
            return Long.valueOf(OffsetDateTime.parse(str).toInstant().toEpochMilli());
        } catch (DateTimeParseException e) {
            return null;
        }
    }

    private static String getStreamingBatchKey(Properties properties) {
        if (properties == null) {
            return null;
        }
        Object obj = properties.get(StreamExecution.QUERY_ID_KEY());
        Object obj2 = properties.get(MicroBatchExecution.BATCH_ID_KEY());
        if (obj == null || obj2 == null) {
            return null;
        }
        return getStreamingBatchKey(obj.toString(), obj2.toString());
    }

    private static String getStreamingBatchKey(String str, String str2) {
        return str + "." + str2;
    }

    private static String getBatchIdFromBatchKey(String str) {
        return str.substring(str.lastIndexOf(".") + 1);
    }

    private static String getDatabricksServiceName(SparkConf sparkConf, String str) {
        if (Config.get().isServiceNameSetByUser()) {
            return null;
        }
        String str2 = null;
        String databricksRunName = getDatabricksRunName(sparkConf);
        if (databricksRunName != null) {
            str2 = "databricks.job-cluster." + databricksRunName;
        } else if (str != null) {
            str2 = "databricks.all-purpose-cluster." + str;
        }
        return str2;
    }

    private static String getDatabricksRunName(SparkConf sparkConf) {
        String str = sparkConf.get("spark.databricks.clusterUsageTags.clusterAllTags", (String) null);
        if (str == null) {
            return null;
        }
        try {
            Iterator it = new ObjectMapper().readTree(str).iterator();
            while (it.hasNext()) {
                JsonNode jsonNode = (JsonNode) it.next();
                if ("RunName".equals(jsonNode.get("key").asText())) {
                    return removeUuidFromEndOfString(jsonNode.get("value").asText());
                }
            }
            return null;
        } catch (Exception e) {
            return null;
        }
    }

    @SuppressForbidden
    private static String removeUuidFromEndOfString(String str) {
        return str.replaceAll("_[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$", "");
    }
}
