package datadog.trace.instrumentation.spark;

import datadog.trace.api.DDTags;
import datadog.trace.api.DDTraceId;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import de.thetaphi.forbiddenapis.SuppressForbidden;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import net.bytebuddy.utility.JavaConstant;
import org.apache.spark.ExceptionFailure;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskFailedReason;
import org.apache.spark.scheduler.JobFailed;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerExecutorAdded;
import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.scheduler.StageInfo;
import scala.Tuple2;

/* loaded from: input_file:inst/datadog/trace/instrumentation/spark/DatadogSparkListener.classdata */
public class DatadogSparkListener extends SparkListener {
    public static volatile DatadogSparkListener listener = null;
    public static volatile boolean finishTraceOnApplicationEnd = true;
    private final SparkConf sparkConf;
    private final String sparkVersion;
    private final String appId;
    private AgentSpan applicationSpan;
    private final boolean isRunningOnDatabricks;
    private String lastJobFailedMessage;
    private String lastJobFailedStackTrace;
    private final int MAX_COLLECTION_SIZE = 1000;
    private final HashMap<Integer, AgentSpan> jobSpans = new HashMap<>();
    private final HashMap<Long, AgentSpan> stageSpans = new HashMap<>();
    private final HashMap<Integer, Integer> stageToJob = new HashMap<>();
    private final SparkAggregatedTaskMetrics applicationMetrics = new SparkAggregatedTaskMetrics();
    private final HashMap<Integer, SparkAggregatedTaskMetrics> jobMetrics = new HashMap<>();
    private final HashMap<Long, SparkAggregatedTaskMetrics> stageMetrics = new HashMap<>();
    private final HashMap<String, SparkListenerExecutorAdded> liveExecutors = new HashMap<>();
    private boolean lastJobFailed = false;
    private int currentExecutorCount = 0;
    private int maxExecutorCount = 0;
    private long availableExecutorTime = 0;
    private boolean applicationEnded = false;
    private final AgentTracer.TracerAPI tracer = AgentTracer.get();

    public DatadogSparkListener(SparkConf sparkConf, String str, String str2) {
        this.sparkConf = sparkConf;
        this.appId = str;
        this.sparkVersion = str2;
        this.isRunningOnDatabricks = sparkConf.contains("spark.databricks.sparkContextId");
    }

    public synchronized void onApplicationStart(SparkListenerApplicationStart sparkListenerApplicationStart) {
        this.applicationSpan = buildSparkSpan("spark.application").withStartTimestamp(sparkListenerApplicationStart.time() * 1000).withTag("application_name", sparkListenerApplicationStart.appName()).withTag("spark_user", sparkListenerApplicationStart.sparkUser()).start();
        this.applicationSpan.setMeasured(true);
        if (sparkListenerApplicationStart.appAttemptId().isDefined()) {
            this.applicationSpan.m1483setTag("app_attempt_id", (String) sparkListenerApplicationStart.appAttemptId().get());
        }
        for (Tuple2 tuple2 : this.sparkConf.getAll()) {
            if (SparkConfAllowList.canCaptureApplicationParameter((String) tuple2._1)) {
                this.applicationSpan.m1483setTag("config." + ((String) tuple2._1).replace(".", JavaConstant.Dynamic.DEFAULT_NAME), (String) tuple2._2);
            }
        }
        this.applicationSpan.m1483setTag("config.spark_version", this.sparkVersion);
    }

    public void onApplicationEnd(SparkListenerApplicationEnd sparkListenerApplicationEnd) {
        if (finishTraceOnApplicationEnd) {
            finishApplication(sparkListenerApplicationEnd.time(), null, 0, null);
        }
    }

    public synchronized void finishApplication(long j, Throwable th, int i, String str) {
        if (this.applicationEnded) {
            return;
        }
        this.applicationEnded = true;
        if (th != null) {
            this.applicationSpan.addThrowable(th);
        } else if (i != 0) {
            this.applicationSpan.setError(true);
            this.applicationSpan.m1483setTag(DDTags.ERROR_TYPE, "Spark Application Failed with exit code " + i);
            this.applicationSpan.m1483setTag(DDTags.ERROR_MSG, getErrorMessageWithoutStackTrace(str));
            this.applicationSpan.m1483setTag(DDTags.ERROR_STACK, str);
        } else if (this.lastJobFailed) {
            this.applicationSpan.setError(true);
            this.applicationSpan.m1483setTag(DDTags.ERROR_TYPE, "Spark Application Failed");
            this.applicationSpan.m1483setTag(DDTags.ERROR_MSG, this.lastJobFailedMessage);
            this.applicationSpan.m1483setTag(DDTags.ERROR_STACK, this.lastJobFailedStackTrace);
        }
        this.applicationMetrics.setSpanMetrics(this.applicationSpan, "spark_application_metrics");
        this.applicationSpan.setMetric((CharSequence) "spark_application_metrics.max_executor_count", this.maxExecutorCount);
        this.applicationSpan.setMetric((CharSequence) "spark_application_metrics.available_executor_time", computeCurrentAvailableExecutorTime(j));
        this.applicationSpan.finish(j * 1000);
    }

    public synchronized void onJobStart(SparkListenerJobStart sparkListenerJobStart) {
        if (this.jobSpans.size() > 1000) {
            return;
        }
        AgentTracer.SpanBuilder withTag = buildSparkSpan("spark.job").withStartTimestamp(sparkListenerJobStart.time() * 1000).withTag("job_id", (Number) Integer.valueOf(sparkListenerJobStart.jobId())).withTag("stage_count", (Number) Integer.valueOf(sparkListenerJobStart.stageInfos().size()));
        if (this.isRunningOnDatabricks) {
            for (Tuple2 tuple2 : this.sparkConf.getAll()) {
                if (SparkConfAllowList.canCaptureApplicationParameter((String) tuple2._1)) {
                    withTag.withTag("config." + ((String) tuple2._1).replace(".", JavaConstant.Dynamic.DEFAULT_NAME), (String) tuple2._2);
                }
            }
            if (sparkListenerJobStart.properties() != null) {
                String str = (String) sparkListenerJobStart.properties().get("spark.databricks.job.id");
                String databricksJobRunId = getDatabricksJobRunId(sparkListenerJobStart.properties());
                String str2 = (String) sparkListenerJobStart.properties().get("spark.databricks.job.runId");
                withTag.withTag("databricks_job_id", str);
                withTag.withTag("databricks_job_run_id", databricksJobRunId);
                withTag.withTag("databricks_task_run_id", str2);
                DatabricksParentContext databricksParentContext = new DatabricksParentContext(str, databricksJobRunId, str2);
                if (databricksParentContext.getTraceId() != DDTraceId.ZERO) {
                    withTag.asChildOf(databricksParentContext);
                }
            }
        } else {
            withTag.asChildOf(this.applicationSpan.context());
        }
        if (sparkListenerJobStart.stageInfos().nonEmpty()) {
            withTag.withTag(DDTags.RESOURCE_NAME, ((StageInfo) sparkListenerJobStart.stageInfos().last()).name());
        }
        AgentSpan start = withTag.start();
        start.setMeasured(true);
        if (sparkListenerJobStart.properties() != null) {
            for (Map.Entry entry : sparkListenerJobStart.properties().entrySet()) {
                if (SparkConfAllowList.canCaptureJobParameter(entry.getKey().toString())) {
                    start.setTag("config." + entry.getKey().toString().replace('.', '_'), entry.getValue());
                }
            }
        }
        start.m1483setTag("config.spark_version", this.sparkVersion);
        sparkListenerJobStart.stageInfos().foreach(stageInfo -> {
            return this.stageToJob.put(Integer.valueOf(stageInfo.stageId()), Integer.valueOf(sparkListenerJobStart.jobId()));
        });
        this.jobSpans.put(Integer.valueOf(sparkListenerJobStart.jobId()), start);
    }

    public synchronized void onJobEnd(SparkListenerJobEnd sparkListenerJobEnd) {
        AgentSpan remove = this.jobSpans.remove(Integer.valueOf(sparkListenerJobEnd.jobId()));
        if (remove == null) {
            return;
        }
        this.lastJobFailed = false;
        if (sparkListenerJobEnd.jobResult() instanceof JobFailed) {
            Exception exception = sparkListenerJobEnd.jobResult().exception();
            String errorMessageWithoutStackTrace = getErrorMessageWithoutStackTrace(exception.getMessage());
            String stackTraceToString = stackTraceToString(exception);
            remove.setError(true);
            remove.setErrorMessage(errorMessageWithoutStackTrace);
            remove.m1483setTag(DDTags.ERROR_STACK, stackTraceToString);
            remove.m1483setTag(DDTags.ERROR_TYPE, "Spark Job Failed");
            this.lastJobFailed = true;
            this.lastJobFailedMessage = errorMessageWithoutStackTrace;
            this.lastJobFailedStackTrace = stackTraceToString;
        }
        SparkAggregatedTaskMetrics remove2 = this.jobMetrics.remove(Integer.valueOf(sparkListenerJobEnd.jobId()));
        if (remove2 != null) {
            remove2.setSpanMetrics(remove, "spark_job_metrics");
        }
        remove.finish(sparkListenerJobEnd.time() * 1000);
    }

    public synchronized void onStageSubmitted(SparkListenerStageSubmitted sparkListenerStageSubmitted) {
        AgentSpan agentSpan;
        if (this.stageSpans.size() > 1000) {
            return;
        }
        int stageId = sparkListenerStageSubmitted.stageInfo().stageId();
        int attemptNumber = sparkListenerStageSubmitted.stageInfo().attemptNumber();
        Integer num = this.stageToJob.get(Integer.valueOf(stageId));
        if (num == null || (agentSpan = this.jobSpans.get(num)) == null) {
            return;
        }
        long longValue = sparkListenerStageSubmitted.stageInfo().submissionTime().isDefined() ? ((Long) sparkListenerStageSubmitted.stageInfo().submissionTime().get()).longValue() : System.currentTimeMillis();
        this.stageMetrics.put(Long.valueOf(stageSpanKey(stageId, attemptNumber)), new SparkAggregatedTaskMetrics(computeCurrentAvailableExecutorTime(longValue)));
        AgentSpan start = buildSparkSpan("spark.stage").asChildOf(agentSpan.context()).withStartTimestamp(longValue * 1000).withTag("stage_id", (Number) Integer.valueOf(stageId)).withTag("task_count", (Number) Integer.valueOf(sparkListenerStageSubmitted.stageInfo().numTasks())).withTag("attempt_id", (Number) Integer.valueOf(attemptNumber)).withTag("parent_stages_ids", sparkListenerStageSubmitted.stageInfo().parentIds()).withTag("details", sparkListenerStageSubmitted.stageInfo().details()).withTag(DDTags.RESOURCE_NAME, sparkListenerStageSubmitted.stageInfo().name()).start();
        start.setMeasured(true);
        this.stageSpans.put(Long.valueOf(stageSpanKey(stageId, attemptNumber)), start);
    }

    public synchronized void onStageCompleted(SparkListenerStageCompleted sparkListenerStageCompleted) {
        StageInfo stageInfo = sparkListenerStageCompleted.stageInfo();
        int stageId = stageInfo.stageId();
        int attemptNumber = stageInfo.attemptNumber();
        Integer num = this.stageToJob.get(Integer.valueOf(stageId));
        if (num == null) {
            return;
        }
        long stageSpanKey = stageSpanKey(stageId, attemptNumber);
        AgentSpan remove = this.stageSpans.remove(Long.valueOf(stageSpanKey));
        if (remove == null) {
            return;
        }
        if (stageInfo.failureReason().isDefined()) {
            remove.setError(true);
            remove.setErrorMessage(getErrorMessageWithoutStackTrace((String) stageInfo.failureReason().get()));
            remove.m1483setTag(DDTags.ERROR_STACK, (String) stageInfo.failureReason().get());
            remove.m1483setTag(DDTags.ERROR_TYPE, "Spark Stage Failed");
        }
        stageInfo.rddInfos().foreach(rDDInfo -> {
            return remove.m1483setTag("rdd." + rDDInfo.name(), rDDInfo.toString());
        });
        long longValue = sparkListenerStageCompleted.stageInfo().completionTime().isDefined() ? ((Long) sparkListenerStageCompleted.stageInfo().completionTime().get()).longValue() : System.currentTimeMillis();
        long computeCurrentAvailableExecutorTime = computeCurrentAvailableExecutorTime(longValue);
        Iterator<SparkAggregatedTaskMetrics> it = this.stageMetrics.values().iterator();
        while (it.hasNext()) {
            it.next().allocateAvailableExecutorTime(computeCurrentAvailableExecutorTime);
        }
        SparkAggregatedTaskMetrics remove2 = this.stageMetrics.remove(Long.valueOf(stageSpanKey));
        if (remove2 != null) {
            remove2.setSpanMetrics(remove, "spark_stage_metrics");
            this.applicationMetrics.accumulateStageMetrics(remove2);
            this.jobMetrics.computeIfAbsent(num, num2 -> {
                return new SparkAggregatedTaskMetrics();
            }).accumulateStageMetrics(remove2);
        }
        remove.finish(longValue * 1000);
    }

    public void onTaskEnd(SparkListenerTaskEnd sparkListenerTaskEnd) {
        AgentSpan agentSpan;
        long stageSpanKey = stageSpanKey(sparkListenerTaskEnd.stageId(), sparkListenerTaskEnd.stageAttemptId());
        SparkAggregatedTaskMetrics sparkAggregatedTaskMetrics = this.stageMetrics.get(Long.valueOf(stageSpanKey));
        if (sparkAggregatedTaskMetrics != null) {
            sparkAggregatedTaskMetrics.addTaskMetrics(sparkListenerTaskEnd);
        }
        if (sparkListenerTaskEnd.taskMetrics() != null) {
            long computeTaskRunTime = SparkAggregatedTaskMetrics.computeTaskRunTime(sparkListenerTaskEnd.taskMetrics());
            Iterator<SparkAggregatedTaskMetrics> it = this.stageMetrics.values().iterator();
            while (it.hasNext()) {
                it.next().recordTotalTaskRunTime(computeTaskRunTime);
            }
        }
        if ((sparkListenerTaskEnd.reason() instanceof TaskFailedReason) && sparkListenerTaskEnd.reason().countTowardsTaskFailures() && (agentSpan = this.stageSpans.get(Long.valueOf(stageSpanKey))) != null) {
            sendTaskSpan(agentSpan, sparkListenerTaskEnd);
        }
    }

    private void sendTaskSpan(AgentSpan agentSpan, SparkListenerTaskEnd sparkListenerTaskEnd) {
        AgentSpan start = buildSparkSpan("spark.task").asChildOf(agentSpan.context()).withStartTimestamp(sparkListenerTaskEnd.taskInfo().launchTime() * 1000).withTag("task_id", (Number) Long.valueOf(sparkListenerTaskEnd.taskInfo().taskId())).withTag("task_attempt_id", (Number) Integer.valueOf(sparkListenerTaskEnd.taskInfo().attemptNumber())).withTag("task_type", sparkListenerTaskEnd.taskType()).withTag("stage_id", (Number) Integer.valueOf(sparkListenerTaskEnd.stageId())).withTag("stage_attempt_id", (Number) Integer.valueOf(sparkListenerTaskEnd.stageAttemptId())).withTag("executor_id", sparkListenerTaskEnd.taskInfo().executorId()).withTag("host", sparkListenerTaskEnd.taskInfo().host()).withTag("task_locality", sparkListenerTaskEnd.taskInfo().taskLocality().toString()).withTag("speculative", sparkListenerTaskEnd.taskInfo().speculative()).withTag("status", sparkListenerTaskEnd.taskInfo().status()).start();
        if (sparkListenerTaskEnd.reason() instanceof TaskFailedReason) {
            ExceptionFailure exceptionFailure = (TaskFailedReason) sparkListenerTaskEnd.reason();
            start.setError(true);
            start.m1483setTag(DDTags.ERROR_TYPE, "Spark Task Failed");
            if (exceptionFailure instanceof ExceptionFailure) {
                ExceptionFailure exceptionFailure2 = exceptionFailure;
                start.setErrorMessage(String.format("%s: %s", exceptionFailure2.className(), exceptionFailure2.description()));
                start.m1483setTag(DDTags.ERROR_STACK, exceptionFailure2.fullStackTrace());
            } else {
                start.setErrorMessage(exceptionFailure.toErrorString());
            }
            start.m1482setTag("count_towards_task_failures", exceptionFailure.countTowardsTaskFailures());
        }
        start.finish(sparkListenerTaskEnd.taskInfo().finishTime() * 1000);
    }

    public synchronized void onExecutorAdded(SparkListenerExecutorAdded sparkListenerExecutorAdded) {
        this.currentExecutorCount++;
        this.maxExecutorCount = Math.max(this.maxExecutorCount, this.currentExecutorCount);
        if (this.liveExecutors.size() <= 1000) {
            this.liveExecutors.put(sparkListenerExecutorAdded.executorId(), sparkListenerExecutorAdded);
        }
    }

    public synchronized void onExecutorRemoved(SparkListenerExecutorRemoved sparkListenerExecutorRemoved) {
        this.currentExecutorCount--;
        SparkListenerExecutorAdded remove = this.liveExecutors.remove(sparkListenerExecutorRemoved.executorId());
        if (remove != null) {
            this.availableExecutorTime += (sparkListenerExecutorRemoved.time() - remove.time()) * remove.executorInfo().totalCores();
        }
    }

    private AgentTracer.SpanBuilder buildSparkSpan(String str) {
        return this.tracer.buildSpan(str).withSpanType("spark").withTag("app_id", this.appId);
    }

    private long stageSpanKey(int i, int i2) {
        return (i << 32) + i2;
    }

    @SuppressForbidden
    private static String getDatabricksJobRunId(Properties properties) {
        String str = (String) properties.get("spark.databricks.clusterUsageTags.clusterName");
        if (str == null) {
            return null;
        }
        String[] split = str.split("-");
        if (split.length > 3) {
            return split[3];
        }
        return null;
    }

    private String stackTraceToString(Throwable th) {
        StringWriter stringWriter = new StringWriter();
        th.printStackTrace(new PrintWriter(stringWriter));
        return stringWriter.toString();
    }

    private String getErrorMessageWithoutStackTrace(String str) {
        if (str == null) {
            return null;
        }
        int indexOf = str.indexOf("\tat ");
        return indexOf != -1 ? str.substring(0, indexOf) : str;
    }

    private long computeCurrentAvailableExecutorTime(long j) {
        long j2 = this.availableExecutorTime;
        Iterator<SparkListenerExecutorAdded> it = this.liveExecutors.values().iterator();
        while (it.hasNext()) {
            j2 += (j - it.next().time()) * r0.executorInfo().totalCores();
        }
        return j2;
    }
}
