package ai.databand;

import ai.databand.config.DbndConfig;
import ai.databand.id.Sha1Long;
import ai.databand.id.Sha1Short;
import ai.databand.id.Uuid5;
import ai.databand.log.HistogramRequest;
import ai.databand.parameters.DatasetOperationPreview;
import ai.databand.parameters.Histogram;
import ai.databand.parameters.NullPreview;
import ai.databand.parameters.ParametersPreview;
import ai.databand.parameters.TaskParameterPreview;
import ai.databand.schema.AirflowTaskContext;
import ai.databand.schema.AzkabanTaskContext;
import ai.databand.schema.DatasetOperationStatus;
import ai.databand.schema.DatasetOperationType;
import ai.databand.schema.ErrorInfo;
import ai.databand.schema.LogDataset;
import ai.databand.schema.LogTarget;
import ai.databand.schema.Pair;
import ai.databand.schema.RunAndDefinition;
import ai.databand.schema.TaskDefinition;
import ai.databand.schema.TaskParamDefinition;
import ai.databand.schema.TaskRun;
import ai.databand.schema.TaskRunParam;
import ai.databand.schema.TaskRunsInfo;
import ai.databand.schema.TrackingSource;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.StageInfo;
import org.apache.spark.sql.Dataset;
import org.apache.spark.util.AccumulatorV2;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Iterator;

/* loaded from: input_file:ai/databand/DefaultDbndRun.class */
public class DefaultDbndRun implements DbndRun {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultDbndRun.class);
    private final DbndClient dbnd;
    private final ParametersPreview parameters;
    private String rootRunUid;
    private String runId;
    private String jobName;
    private String driverTaskUid;
    private TaskRun driverTask;
    private AirflowTaskContext airflowContext;
    private AzkabanTaskContext azkabanTaskContext;
    private final DbndConfig config;
    private final List<TaskRun> taskRuns = new ArrayList(1);
    private final List<TaskDefinition> taskDefinitions = new ArrayList(1);
    private final List<List<String>> parentChildMap = new ArrayList(1);
    private final List<List<String>> upstreamsMap = new ArrayList(1);
    private final Deque<TaskRun> stack = new ArrayDeque(1);
    private final Map<Method, List<TaskParamDefinition>> methodsCache = new HashMap(1);
    private final Map<Method, TaskRun> methodsRunsCache = new HashMap(1);
    private final Map<Method, Integer> methodExecutionCounts = new HashMap(1);
    private final Map<Integer, TaskRun> taskRunOutputs = new HashMap(1);

    public DefaultDbndRun(DbndClient dbndClient, DbndConfig dbndConfig) {
        this.dbnd = dbndClient;
        this.parameters = new ParametersPreview(dbndConfig.isPreviewEnabled());
        this.airflowContext = dbndConfig.airflowContext().orElse(null);
        this.azkabanTaskContext = dbndConfig.azkabanContext().orElse(null);
        this.config = dbndConfig;
    }

    @Override // ai.databand.DbndRun
    public void init(Method method, Object[] objArr) {
        String taskName = getTaskName(method);
        this.runId = UUID.randomUUID().toString();
        String property = System.getProperty("user.name");
        String str = null;
        TrackingSource trackingSource = null;
        if (this.airflowContext != null) {
            this.jobName = this.airflowContext.jobName();
            str = "airflow_tracking";
            trackingSource = new TrackingSource(this.airflowContext);
        } else if (this.azkabanTaskContext != null) {
            this.jobName = this.azkabanTaskContext.databandJobName();
            trackingSource = this.azkabanTaskContext.trackingSource();
            if (trackingSource != null) {
                str = "azkaban_tracking";
            }
        } else {
            this.jobName = (taskName == null || taskName.isEmpty()) ? method.getName() : taskName;
        }
        this.config.jobName().ifPresent(str2 -> {
            this.jobName = str2;
        });
        this.rootRunUid = this.dbnd.initRun(this.jobName, this.runId, property, this.config.runName(), buildRootRun(method, objArr), this.airflowContext, this.config.azkabanContext().isPresent() ? this.config.azkabanContext().get().root() : null, str, trackingSource, null);
        this.dbnd.setRunState(this.rootRunUid, "RUNNING");
    }

    protected TaskRunsInfo buildRootRun(Method method, Object[] objArr) {
        ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
        String uuid5 = new Uuid5("RUN_UID", this.runId).toString();
        this.driverTaskUid = new Uuid5("DRIVER_TASK", this.runId).toString();
        String uuid52 = new Uuid5("TASK_RUN_ENV_UID", this.runId).toString();
        String uuid53 = new Uuid5("TASK_RUN_ATTEMPT", this.runId).toString();
        String cmd = this.config.cmd();
        Sha1Short sha1Short = new Sha1Short("TASK_SIGNATURE", this.runId);
        String uuid54 = new Uuid5("TASK_DEFINITION", this.runId).toString();
        String taskName = getTaskName(method);
        List<TaskParamDefinition> buildTaskParamDefinitions = buildTaskParamDefinitions(method);
        String name = method == null ? "pipeline" : method.getName();
        this.driverTask = new TaskRun(uuid5, true, false, null, "", this.driverTaskUid, sha1Short.toString(), this.jobName, buildTaskRunParamsAndTargets(method, objArr, uuid5, name, uuid53, uuid54).left(), sha1Short.toString(), false, now.toLocalDate(), now, "", "RUNNING", uuid54, cmd, false, false, uuid53, taskName, (this.airflowContext == null && this.azkabanTaskContext == null) ? false : true, true, cmd, taskName, "jvm", Collections.emptyMap());
        this.driverTask.setStartDate(now);
        String extractSourceCode = extractSourceCode(method);
        TrackingSource trackingSource = null;
        if (this.airflowContext != null) {
            this.parentChildMap.add(Arrays.asList(this.airflowContext.getAfOperatorUid(), this.driverTaskUid));
            this.upstreamsMap.add(Arrays.asList(this.airflowContext.getAfOperatorUid(), this.driverTaskUid));
            trackingSource = new TrackingSource(this.airflowContext);
        }
        if (this.azkabanTaskContext != null) {
            this.parentChildMap.add(Arrays.asList(this.azkabanTaskContext.taskRunUid(), this.driverTaskUid));
            this.upstreamsMap.add(Arrays.asList(this.azkabanTaskContext.taskRunUid(), this.driverTaskUid));
            trackingSource = new TrackingSource(this.azkabanTaskContext);
        }
        return new TaskRunsInfo(uuid52, this.parentChildMap, uuid5, Collections.singletonList(this.driverTask), Collections.emptyList(), uuid5, this.upstreamsMap, false, Collections.singletonList(new TaskDefinition(name, extractSourceCode, new Sha1Long("SOURCE", this.runId).toString(), "", uuid54, new Sha1Long("MODULE_SOURCE", this.runId).toString(), buildTaskParamDefinitions, "jvm_task", "java", "")), trackingSource);
    }

    protected String extractSourceCode(Method method) {
        return "";
    }

    @Override // ai.databand.DbndRun
    public void startTask(Method method, Object[] objArr) {
        TaskRun taskRun;
        RunAndDefinition buildRunAndDefinition = buildRunAndDefinition(method, objArr, !this.stack.isEmpty());
        TaskRun taskRun2 = buildRunAndDefinition.taskRun();
        this.taskRuns.add(taskRun2);
        this.taskDefinitions.add(buildRunAndDefinition.taskDefinition());
        TaskRun peek = this.stack.isEmpty() ? this.driverTask : this.stack.peek();
        if (!this.stack.isEmpty()) {
            this.upstreamsMap.add(Arrays.asList(peek.getTaskRunUid(), taskRun2.getTaskRunUid()));
        }
        taskRun2.addUpstream(peek);
        for (Object obj : objArr) {
            if (obj != null && (taskRun = this.taskRunOutputs.get(Integer.valueOf(obj.hashCode()))) != null) {
                this.upstreamsMap.add(Arrays.asList(taskRun2.getTaskRunUid(), taskRun.getTaskRunUid()));
            }
        }
        this.stack.push(taskRun2);
        this.parentChildMap.add(Arrays.asList(peek.getTaskRunUid(), taskRun2.getTaskRunUid()));
        this.dbnd.addTaskRuns(this.rootRunUid, this.runId, this.taskRuns, this.taskDefinitions, this.parentChildMap, this.upstreamsMap);
        this.dbnd.logTargets(taskRun2.getTaskRunUid(), buildRunAndDefinition.targets());
        this.dbnd.updateTaskRunAttempt(taskRun2.getTaskRunUid(), taskRun2.getTaskRunAttemptUid(), "RUNNING", null, taskRun2.getStartDate());
        LOG.info("TASK: task_id={}", taskRun2.getTaskId());
        LOG.info("TIME: start={}", taskRun2.getExecutionDate());
        LOG.info("TRACKER: {}/app/jobs/{}/{}/{}", new Object[]{this.config.databandUrl(), this.driverTask.getTaskAfId(), this.driverTask.getRunUid(), taskRun2.getTaskRunUid()});
    }

    protected List<TaskParamDefinition> buildTaskParamDefinitions(Method method) {
        return method == null ? Collections.emptyList() : this.methodsCache.computeIfAbsent(method, method2 -> {
            ArrayList arrayList = new ArrayList(method.getParameterCount());
            for (int i = 0; i < method.getParameterCount(); i++) {
                Parameter parameter = method.getParameters()[i];
                arrayList.add(new TaskParamDefinition(parameter.getName(), "task_input", "user", true, false, parameter.getParameterizedType().getTypeName(), "", ""));
            }
            arrayList.add(new TaskParamDefinition("result", "task_output", "user", true, false, method.getReturnType().getTypeName(), "", ""));
            return arrayList;
        });
    }

    protected Pair<List<TaskRunParam>, List<LogTarget>> buildTaskRunParamsAndTargets(Method method, Object[] objArr, String str, String str2, String str3, String str4) {
        if (method == null || objArr == null || objArr.length == 0) {
            return new Pair<>(Collections.emptyList(), Collections.emptyList());
        }
        ArrayList arrayList = new ArrayList(1);
        ArrayList arrayList2 = new ArrayList(method.getParameterCount());
        for (int i = 0; i < method.getParameterCount(); i++) {
            Parameter parameter = method.getParameters()[i];
            Object obj = objArr[i];
            TaskParameterPreview taskParameterPreview = this.parameters.get(parameter.getType());
            String compact = taskParameterPreview.compact(obj);
            arrayList2.add(new TaskRunParam(compact, "", parameter.getName()));
            arrayList.add(new LogTarget(this.rootRunUid, str, str2, str3, String.format("%s.%s", method.getName(), parameter.getName()), parameter.getName(), str4, "read", "OK", taskParameterPreview.full(obj), taskParameterPreview.dimensions(obj), taskParameterPreview.schema(obj), new Sha1Long("", compact).toString()));
        }
        arrayList2.add(new TaskRunParam(this.parameters.get(method.getReturnType()).typeName(method.getReturnType()), "", "result"));
        return new Pair<>(arrayList2, arrayList);
    }

    @Override // ai.databand.DbndRun
    public String getTaskName(Method method) {
        if (method == null || method.getName().contains("$anon")) {
            return this.config.sparkAppName();
        }
        Optional findAny = Arrays.stream(method.getAnnotations()).filter(annotation -> {
            return annotation.toString().contains("ai.databand.annotations.Task(value=");
        }).findAny();
        if (!findAny.isPresent()) {
            return method.getName();
        }
        String annotation2 = ((Annotation) findAny.get()).toString();
        String substring = annotation2.substring(annotation2.indexOf(61) + 1, annotation2.indexOf(41));
        return substring.isEmpty() ? method.getName() : substring;
    }

    protected RunAndDefinition buildRunAndDefinition(Method method, Object[] objArr, boolean z) {
        int intValue = this.methodExecutionCounts.computeIfAbsent(method, method2 -> {
            return 0;
        }).intValue() + 1;
        String taskName = getTaskName(method);
        String format = intValue == 1 ? taskName : String.format("%s_%s", taskName, Integer.valueOf(intValue));
        this.methodExecutionCounts.put(method, Integer.valueOf(intValue));
        List<TaskParamDefinition> buildTaskParamDefinitions = buildTaskParamDefinitions(method);
        String uuid5 = new Uuid5("TASK_RUN_UID", UUID.randomUUID().toString()).toString();
        String sha1Short = new Sha1Short("TASK_SIGNATURE" + format, this.runId).toString();
        String uuid52 = new Uuid5("TASK_DEFINITION" + format, this.runId).toString();
        String uuid53 = new Uuid5("TASK_RUN_ATTEMPT" + format, this.runId).toString();
        ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
        Pair<List<TaskRunParam>, List<LogTarget>> buildTaskRunParamsAndTargets = buildTaskRunParamsAndTargets(method, objArr, uuid5, format, uuid53, uuid52);
        List<TaskRunParam> left = buildTaskRunParamsAndTargets.left();
        List<LogTarget> right = buildTaskRunParamsAndTargets.right();
        TaskRun taskRun = new TaskRun(this.rootRunUid, false, false, null, "", uuid5, sha1Short, format, left, sha1Short, false, now.toLocalDate(), now, "", "QUEUED", uuid52, format, false, z, uuid53, format, this.airflowContext != null, false, format, format, "jvm", Collections.emptyMap());
        TaskDefinition taskDefinition = new TaskDefinition(format, "", new Sha1Long("SOURCE", this.runId).toString(), "", uuid52, new Sha1Long("MODULE_SOURCE", this.runId).toString(), buildTaskParamDefinitions, "jvm_task", "java", "");
        this.methodsRunsCache.put(method, taskRun);
        return new RunAndDefinition(taskRun, taskDefinition, right);
    }

    @Override // ai.databand.DbndRun
    public void errorTask(Method method, Throwable th) {
        TaskRun pop = this.stack.pop();
        if (pop == null) {
            return;
        }
        String extractStackTrace = extractStackTrace(th);
        pop.appendLog(extractStackTrace);
        this.dbnd.saveTaskLog(pop.getTaskRunAttemptUid(), pop.getTaskLog());
        this.dbnd.logMetrics(pop.getTaskRunAttemptUid(), pop.getMetrics(), "spark");
        this.dbnd.updateTaskRunAttempt(pop.getTaskRunUid(), pop.getTaskRunAttemptUid(), "FAILED", new ErrorInfo(th.getLocalizedMessage(), "", false, extractStackTrace, "", "", false, th.getClass().getCanonicalName()), pop.getStartDate());
    }

    protected String extractStackTrace(Throwable th) {
        try {
            StringWriter stringWriter = new StringWriter();
            Throwable th2 = null;
            try {
                PrintWriter printWriter = new PrintWriter(stringWriter);
                Throwable th3 = null;
                try {
                    th.printStackTrace(printWriter);
                    String stringWriter2 = stringWriter.toString();
                    if (printWriter != null) {
                        if (0 != 0) {
                            try {
                                printWriter.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            printWriter.close();
                        }
                    }
                    return stringWriter2;
                } catch (Throwable th5) {
                    if (printWriter != null) {
                        if (0 != 0) {
                            try {
                                printWriter.close();
                            } catch (Throwable th6) {
                                th3.addSuppressed(th6);
                            }
                        } else {
                            printWriter.close();
                        }
                    }
                    throw th5;
                }
            } finally {
                if (stringWriter != null) {
                    if (0 != 0) {
                        try {
                            stringWriter.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        stringWriter.close();
                    }
                }
            }
        } catch (IOException e) {
            LOG.error("Unable to extract stack trace from error", e);
            return "";
        }
    }

    @Override // ai.databand.DbndRun
    public void completeTask(Method method, Object obj) {
        TaskRun pop = this.stack.pop();
        if (pop == null) {
            return;
        }
        if (obj != null) {
            TaskParameterPreview taskParameterPreview = this.parameters.get(obj.getClass());
            String full = taskParameterPreview.full(obj);
            this.taskRunOutputs.put(Integer.valueOf(obj.hashCode()), pop);
            this.dbnd.logTargets(pop.getTaskRunUid(), Collections.singletonList(new LogTarget(this.rootRunUid, pop.getTaskRunUid(), pop.getTaskAfId(), pop.getTaskRunAttemptUid(), new Sha1Long("TARGET_PATH", full).toString(), "result", pop.getTaskDefinitionUid(), "write", "OK", full, taskParameterPreview.dimensions(obj), taskParameterPreview.schema(obj), new Sha1Long("", full).toString())));
        }
        this.dbnd.saveTaskLog(pop.getTaskRunAttemptUid(), pop.getTaskLog());
        this.dbnd.logMetrics(pop.getTaskRunAttemptUid(), pop.getMetrics(), "spark");
        this.dbnd.updateTaskRunAttempt(pop.getTaskRunUid(), pop.getTaskRunAttemptUid(), "SUCCESS", null, pop.getStartDate());
    }

    @Override // ai.databand.DbndRun
    public void stop() {
        this.dbnd.saveTaskLog(this.driverTask.getTaskRunAttemptUid(), this.driverTask.getTaskLog());
        this.dbnd.logMetrics(this.driverTask.getTaskRunAttemptUid(), this.driverTask.getMetrics(), "spark");
        this.dbnd.updateTaskRunAttempt(this.driverTask.getTaskRunUid(), this.driverTask.getTaskRunAttemptUid(), "SUCCESS", null, this.driverTask.getStartDate());
        if (this.rootRunUid == null) {
            return;
        }
        this.dbnd.setRunState(this.rootRunUid, "SUCCESS");
    }

    @Override // ai.databand.DbndRun
    public void error(Throwable th) {
        String extractStackTrace = extractStackTrace(th);
        ErrorInfo errorInfo = new ErrorInfo(th.getLocalizedMessage(), "", false, extractStackTrace, "", "", false, th.getClass().getCanonicalName());
        this.driverTask.appendLog(extractStackTrace);
        this.dbnd.saveTaskLog(this.driverTask.getTaskRunAttemptUid(), this.driverTask.getTaskLog());
        this.dbnd.logMetrics(this.driverTask.getTaskRunAttemptUid(), this.driverTask.getMetrics(), "spark");
        this.dbnd.updateTaskRunAttempt(this.driverTask.getTaskRunUid(), this.driverTask.getTaskRunAttemptUid(), "FAILED", errorInfo, this.driverTask.getStartDate());
        this.dbnd.setRunState(this.rootRunUid, "FAILED");
    }

    @Override // ai.databand.DbndRun
    public void logMetric(String str, Object obj) {
        TaskRun peek = this.stack.peek();
        if (peek == null) {
            peek = this.driverTask;
        }
        logMetric(peek, str, obj, null);
    }

    @Override // ai.databand.DbndRun
    public void logMetrics(Map<String, Object> map) {
        logMetrics(map, null);
    }

    @Override // ai.databand.DbndRun
    public void logMetrics(Map<String, Object> map, String str) {
        TaskRun peek = this.stack.peek();
        if (peek == null) {
            peek = this.driverTask;
        }
        logMetrics(peek, map, str);
    }

    @Override // ai.databand.DbndRun
    public void logDataframe(String str, Dataset<?> dataset, HistogramRequest histogramRequest) {
        try {
            TaskRun peek = this.stack.peek();
            if (peek == null) {
                peek = this.driverTask;
            }
            logMetric(peek, str, dataset, "user", false);
            this.dbnd.logMetrics(peek.getTaskRunAttemptUid(), new Histogram(str, dataset, histogramRequest).metricValues(), "histograms");
        } catch (Exception e) {
            LOG.error("Unable to log dataframe", e);
        }
    }

    @Override // ai.databand.DbndRun
    public void logHistogram(Map<String, Object> map) {
        try {
            TaskRun peek = this.stack.peek();
            if (peek == null) {
                peek = this.driverTask;
            }
            this.dbnd.logMetrics(peek.getTaskRunAttemptUid(), map, "histograms");
        } catch (Exception e) {
            LOG.error("Unable to log histogram", e);
        }
    }

    @Override // ai.databand.DbndRun
    public void logDatasetOperation(String str, DatasetOperationType datasetOperationType, DatasetOperationStatus datasetOperationStatus, String str2, String str3, List<Long> list, Object obj, Boolean bool) {
        try {
            TaskRun peek = this.stack.peek();
            if (peek == null) {
                peek = this.driverTask;
            }
            this.dbnd.logDatasetOperations(peek.getTaskRunUid(), Collections.singletonList(new LogDataset(peek, str, datasetOperationType, datasetOperationStatus, str2, str3, list, obj, bool)));
        } catch (Exception e) {
            LOG.error("Unable to log dataset operation", e);
        }
    }

    @Override // ai.databand.DbndRun
    public void logDatasetOperation(String str, DatasetOperationType datasetOperationType, DatasetOperationStatus datasetOperationStatus, Dataset<?> dataset, Throwable th, boolean z, boolean z2, Boolean bool) {
        TaskParameterPreview datasetOperationPreview = z2 ? new DatasetOperationPreview() : new NullPreview();
        String str2 = null;
        if (th != null) {
            StringWriter stringWriter = new StringWriter();
            PrintWriter printWriter = new PrintWriter(stringWriter);
            Throwable th2 = null;
            try {
                try {
                    th.printStackTrace(printWriter);
                    str2 = stringWriter.toString();
                    if (printWriter != null) {
                        if (0 != 0) {
                            try {
                                printWriter.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            printWriter.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th4) {
                if (printWriter != null) {
                    if (th2 != null) {
                        try {
                            printWriter.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        printWriter.close();
                    }
                }
                throw th4;
            }
        }
        logDatasetOperation(str, datasetOperationType, datasetOperationStatus, str2, datasetOperationPreview.full(dataset), datasetOperationPreview.dimensions(dataset), datasetOperationPreview.schema(dataset), bool);
    }

    public void logMetric(TaskRun taskRun, String str, Object obj, String str2) {
        logMetric(taskRun, str, obj, str2, true);
    }

    public void logMetric(TaskRun taskRun, String str, Object obj, String str2, boolean z) {
        if (taskRun == null) {
            return;
        }
        try {
            TaskParameterPreview taskParameterPreview = this.parameters.get(obj.getClass());
            this.dbnd.logMetric(taskRun.getTaskRunAttemptUid(), str, z ? taskParameterPreview.compact(obj) : taskParameterPreview.full(obj), str2);
        } catch (Exception e) {
            LOG.error("Unable to log metric", e);
        }
    }

    public void logMetrics(TaskRun taskRun, Map<String, Object> map, String str) {
        if (taskRun == null) {
            return;
        }
        try {
            HashMap hashMap = new HashMap(map.size());
            for (Map.Entry<String, Object> entry : map.entrySet()) {
                hashMap.put(entry.getKey(), this.parameters.get(entry.getValue().getClass()).compact(entry.getValue()));
            }
            this.dbnd.logMetrics(taskRun.getTaskRunAttemptUid(), hashMap, str);
        } catch (Exception e) {
            LOG.error("Unable to log metrics");
        }
    }

    @Override // ai.databand.DbndRun
    public void saveLog(LoggingEvent loggingEvent, String str) {
        try {
            if (this.driverTask == null) {
                return;
            }
            TaskRun peek = this.stack.peek();
            if (DbndClient.class.getName().equals(loggingEvent.getLoggerName())) {
                return;
            }
            if (peek == null) {
                this.driverTask.appendLog(str);
            } else {
                peek.appendLog(str);
            }
        } catch (Exception e) {
            LOG.error("Unable to save task log", e);
        }
    }

    @Override // ai.databand.DbndRun
    public void saveSparkMetrics(SparkListenerStageCompleted sparkListenerStageCompleted) {
        try {
            StageInfo stageInfo = sparkListenerStageCompleted.stageInfo();
            TaskRun peek = this.stack.peek();
            if (peek == null) {
                peek = this.driverTask;
            }
            String format = String.format("stage-%s.%s.", Integer.valueOf(stageInfo.stageId()), stageInfo.name().substring(0, stageInfo.name().indexOf(32)));
            Iterator it = stageInfo.taskMetrics().accumulators().iterator();
            HashMap hashMap = new HashMap(1);
            HashMap hashMap2 = new HashMap(1);
            while (it.hasNext()) {
                AccumulatorV2 accumulatorV2 = (AccumulatorV2) it.next();
                if (accumulatorV2 instanceof LongAccumulator) {
                    String str = (String) accumulatorV2.name().get();
                    String valueOf = String.valueOf(accumulatorV2.value());
                    hashMap2.put(format + str, valueOf);
                    hashMap.put(str, valueOf);
                }
            }
            peek.appendMetrics(hashMap);
            peek.appendPrefixedMetrics(hashMap2);
        } catch (Exception e) {
            LOG.error("Unable to save spark metrics", e);
        }
    }

    @Override // ai.databand.DbndRun
    public void setDriverTask(TaskRun taskRun) {
        this.driverTask = taskRun;
    }
}
