/*
 * Decompiled with CFR 0.152.
 */
package ai.databand;

import ai.databand.DbndClient;
import ai.databand.DbndRun;
import ai.databand.config.DbndConfig;
import ai.databand.id.Sha1Long;
import ai.databand.id.Sha1Short;
import ai.databand.id.Uuid5;
import ai.databand.log.HistogramRequest;
import ai.databand.parameters.Histogram;
import ai.databand.parameters.ParametersPreview;
import ai.databand.parameters.TaskParameterPreview;
import ai.databand.schema.AirflowTaskContext;
import ai.databand.schema.AzkabanTaskContext;
import ai.databand.schema.DatasetOperationStatuses;
import ai.databand.schema.DatasetOperationTypes;
import ai.databand.schema.ErrorInfo;
import ai.databand.schema.LogDataset;
import ai.databand.schema.LogTarget;
import ai.databand.schema.Pair;
import ai.databand.schema.RootRun;
import ai.databand.schema.RunAndDefinition;
import ai.databand.schema.TaskDefinition;
import ai.databand.schema.TaskParamDefinition;
import ai.databand.schema.TaskRun;
import ai.databand.schema.TaskRunParam;
import ai.databand.schema.TaskRunsInfo;
import ai.databand.schema.TrackingSource;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.StageInfo;
import org.apache.spark.sql.Dataset;
import org.apache.spark.util.AccumulatorV2;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Iterator;

public class DefaultDbndRun
implements DbndRun {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultDbndRun.class);
    private final DbndClient dbnd;
    private final List<TaskRun> taskRuns;
    private final List<TaskDefinition> taskDefinitions;
    private final List<List<String>> parentChildMap;
    private final List<List<String>> upstreamsMap;
    private final Deque<TaskRun> stack;
    private final Map<Method, List<TaskParamDefinition>> methodsCache;
    private final Map<Method, TaskRun> methodsRunsCache;
    private final Map<Method, Integer> methodExecutionCounts;
    private final ParametersPreview parameters;
    private final Map<Integer, TaskRun> taskRunOutputs;
    private String rootRunUid;
    private String runId;
    private String jobName;
    private String driverTaskUid;
    private TaskRun driverTask;
    private AirflowTaskContext airflowContext;
    private AzkabanTaskContext azkabanTaskContext;
    private final DbndConfig config;

    public DefaultDbndRun(DbndClient dbndClient, DbndConfig config) {
        this.dbnd = dbndClient;
        this.taskRuns = new ArrayList<TaskRun>(1);
        this.taskDefinitions = new ArrayList<TaskDefinition>(1);
        this.parentChildMap = new ArrayList<List<String>>(1);
        this.upstreamsMap = new ArrayList<List<String>>(1);
        this.stack = new ArrayDeque<TaskRun>(1);
        this.methodsCache = new HashMap<Method, List<TaskParamDefinition>>(1);
        this.methodsRunsCache = new HashMap<Method, TaskRun>(1);
        this.methodExecutionCounts = new HashMap<Method, Integer>(1);
        this.parameters = new ParametersPreview(config.isPreviewEnabled());
        this.taskRunOutputs = new HashMap<Integer, TaskRun>(1);
        this.airflowContext = config.airflowContext().orElse(null);
        this.azkabanTaskContext = config.azkabanContext().orElse(null);
        this.config = config;
    }

    @Override
    public void init(Method method, Object[] args) {
        String annotationValue = this.getTaskName(method);
        this.runId = UUID.randomUUID().toString();
        String user = System.getProperty("user.name");
        String source = null;
        TrackingSource trackingSource = null;
        if (this.airflowContext != null) {
            this.jobName = this.airflowContext.jobName();
            source = "airflow_tracking";
            trackingSource = new TrackingSource(this.airflowContext);
        } else if (this.azkabanTaskContext != null) {
            this.jobName = this.azkabanTaskContext.databandJobName();
            trackingSource = this.azkabanTaskContext.trackingSource();
            if (trackingSource != null) {
                source = "azkaban_tracking";
            }
        } else {
            this.jobName = annotationValue == null || annotationValue.isEmpty() ? method.getName() : annotationValue;
        }
        this.config.jobName().ifPresent(name -> {
            this.jobName = name;
        });
        TaskRunsInfo rootRun = this.buildRootRun(method, args);
        RootRun root = this.config.azkabanContext().isPresent() ? this.config.azkabanContext().get().root() : null;
        this.rootRunUid = this.dbnd.initRun(this.jobName, this.runId, user, this.config.runName(), rootRun, this.airflowContext, root, source, trackingSource, null);
        this.dbnd.setRunState(this.rootRunUid, "RUNNING");
    }

    protected TaskRunsInfo buildRootRun(Method method, Object[] args) {
        ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
        String runUid = new Uuid5("RUN_UID", this.runId).toString();
        this.driverTaskUid = new Uuid5("DRIVER_TASK", this.runId).toString();
        String taskRunEnvUid = new Uuid5("TASK_RUN_ENV_UID", this.runId).toString();
        String taskRunAttemptUid = new Uuid5("TASK_RUN_ATTEMPT", this.runId).toString();
        String cmd = this.config.cmd();
        String version = "";
        Sha1Short taskSignature = new Sha1Short("TASK_SIGNATURE", this.runId);
        String taskDefinitionUid = new Uuid5("TASK_DEFINITION", this.runId).toString();
        String taskAfId = this.getTaskName(method);
        List<TaskParamDefinition> taskParamDefinitions = this.buildTaskParamDefinitions(method);
        String methodName = method == null ? "pipeline" : method.getName();
        Pair<List<TaskRunParam>, List<LogTarget>> paramsAndTargets = this.buildTaskRunParamsAndTargets(method, args, runUid, methodName, taskRunAttemptUid, taskDefinitionUid);
        this.driverTask = new TaskRun(runUid, true, false, null, version, this.driverTaskUid, taskSignature.toString(), this.jobName, paramsAndTargets.left(), taskSignature.toString(), false, now.toLocalDate(), now, "", "RUNNING", taskDefinitionUid, cmd, false, false, taskRunAttemptUid, taskAfId, this.airflowContext != null || this.azkabanTaskContext != null, true, cmd, taskAfId, "jvm", Collections.emptyMap());
        this.driverTask.setStartDate(now);
        String sourceCode = this.extractSourceCode(method);
        TrackingSource trackingSource = null;
        if (this.airflowContext != null) {
            this.parentChildMap.add(Arrays.asList(this.airflowContext.getAfOperatorUid(), this.driverTaskUid));
            this.upstreamsMap.add(Arrays.asList(this.airflowContext.getAfOperatorUid(), this.driverTaskUid));
            trackingSource = new TrackingSource(this.airflowContext);
        }
        if (this.azkabanTaskContext != null) {
            this.parentChildMap.add(Arrays.asList(this.azkabanTaskContext.taskRunUid(), this.driverTaskUid));
            this.upstreamsMap.add(Arrays.asList(this.azkabanTaskContext.taskRunUid(), this.driverTaskUid));
            trackingSource = new TrackingSource(this.azkabanTaskContext);
        }
        return new TaskRunsInfo(taskRunEnvUid, this.parentChildMap, runUid, Collections.singletonList(this.driverTask), Collections.emptyList(), runUid, this.upstreamsMap, false, Collections.singletonList(new TaskDefinition(methodName, sourceCode, new Sha1Long("SOURCE", this.runId).toString(), "", taskDefinitionUid, new Sha1Long("MODULE_SOURCE", this.runId).toString(), taskParamDefinitions, "jvm_task", "java", "")), trackingSource);
    }

    protected String extractSourceCode(Method method) {
        return "";
    }

    @Override
    public void startTask(Method method, Object[] args) {
        TaskRun parent;
        RunAndDefinition runAndDefinition = this.buildRunAndDefinition(method, args, !this.stack.isEmpty());
        TaskRun taskRun = runAndDefinition.taskRun();
        this.taskRuns.add(taskRun);
        TaskDefinition taskDefinition = runAndDefinition.taskDefinition();
        this.taskDefinitions.add(taskDefinition);
        TaskRun taskRun2 = parent = this.stack.isEmpty() ? this.driverTask : this.stack.peek();
        if (!this.stack.isEmpty()) {
            this.upstreamsMap.add(Arrays.asList(parent.getTaskRunUid(), taskRun.getTaskRunUid()));
        }
        taskRun.addUpstream(parent);
        for (Object arg : args) {
            TaskRun parentTask;
            if (arg == null || (parentTask = this.taskRunOutputs.get(arg.hashCode())) == null) continue;
            this.upstreamsMap.add(Arrays.asList(taskRun.getTaskRunUid(), parentTask.getTaskRunUid()));
        }
        this.stack.push(taskRun);
        this.parentChildMap.add(Arrays.asList(parent.getTaskRunUid(), taskRun.getTaskRunUid()));
        this.dbnd.addTaskRuns(this.rootRunUid, this.runId, this.taskRuns, this.taskDefinitions, this.parentChildMap, this.upstreamsMap);
        this.dbnd.logTargets(taskRun.getTaskRunUid(), runAndDefinition.targets());
        this.dbnd.updateTaskRunAttempt(taskRun.getTaskRunUid(), taskRun.getTaskRunAttemptUid(), "RUNNING", null, taskRun.getStartDate());
        LOG.info("TASK: task_id={}", (Object)taskRun.getTaskId());
        LOG.info("TIME: start={}", (Object)taskRun.getExecutionDate());
        LOG.info("TRACKER: {}/app/jobs/{}/{}/{}", new Object[]{this.config.databandUrl(), this.driverTask.getTaskAfId(), this.driverTask.getRunUid(), taskRun.getTaskRunUid()});
    }

    protected List<TaskParamDefinition> buildTaskParamDefinitions(Method method) {
        if (method == null) {
            return Collections.emptyList();
        }
        return this.methodsCache.computeIfAbsent(method, method1 -> {
            ArrayList<TaskParamDefinition> result = new ArrayList<TaskParamDefinition>(method.getParameterCount());
            for (int i = 0; i < method.getParameterCount(); ++i) {
                Parameter parameter = method.getParameters()[i];
                result.add(new TaskParamDefinition(parameter.getName(), "task_input", "user", true, false, parameter.getParameterizedType().getTypeName(), "", ""));
            }
            result.add(new TaskParamDefinition("result", "task_output", "user", true, false, method.getReturnType().getTypeName(), "", ""));
            return result;
        });
    }

    protected Pair<List<TaskRunParam>, List<LogTarget>> buildTaskRunParamsAndTargets(Method method, Object[] args, String taskRunUid, String methodName, String taskRunAttemptUid, String taskDefinitionUid) {
        if (method == null || args == null || args.length == 0) {
            return new Pair<List<TaskRunParam>, List<LogTarget>>(Collections.emptyList(), Collections.emptyList());
        }
        ArrayList<LogTarget> targets = new ArrayList<LogTarget>(1);
        ArrayList<TaskRunParam> params = new ArrayList<TaskRunParam>(method.getParameterCount());
        for (int i = 0; i < method.getParameterCount(); ++i) {
            Parameter parameter = method.getParameters()[i];
            Object parameterValue = args[i];
            TaskParameterPreview preview = this.parameters.get(parameter.getType());
            params.add(new TaskRunParam(preview.compact(parameterValue), "", parameter.getName()));
            targets.add(new LogTarget(this.rootRunUid, taskRunUid, methodName, taskRunAttemptUid, new Sha1Long("TARGET_PATH", preview.compact(parameterValue)).toString(), parameter.getName(), taskDefinitionUid, "read", "OK", preview.full(parameterValue), preview.dimensions(parameterValue), preview.schema(parameterValue), new Sha1Long("", preview.compact(parameterValue)).toString()));
        }
        TaskParameterPreview resultPreview = this.parameters.get(method.getReturnType());
        params.add(new TaskRunParam(resultPreview.typeName(method.getReturnType()), "", "result"));
        return new Pair<List<TaskRunParam>, List<LogTarget>>(params, targets);
    }

    @Override
    public String getTaskName(Method method) {
        if (method == null || method.getName().contains("$anon")) {
            return this.config.sparkAppName();
        }
        Optional<Annotation> taskAnnotation = Arrays.stream(method.getAnnotations()).filter(at -> at.toString().contains("ai.databand.annotations.Task(value=")).findAny();
        if (!taskAnnotation.isPresent()) {
            return method.getName();
        }
        String annotationStr = taskAnnotation.get().toString();
        String annotationValue = annotationStr.substring(annotationStr.indexOf(61) + 1, annotationStr.indexOf(41));
        return annotationValue.isEmpty() ? method.getName() : annotationValue;
    }

    protected RunAndDefinition buildRunAndDefinition(Method method, Object[] args, boolean hasUpstreams) {
        int executionCount = this.methodExecutionCounts.computeIfAbsent(method, m -> 0);
        String taskName = this.getTaskName(method);
        String methodName = ++executionCount == 1 ? taskName : String.format("%s_%s", taskName, executionCount);
        this.methodExecutionCounts.put(method, executionCount);
        List<TaskParamDefinition> paramDefinitions = this.buildTaskParamDefinitions(method);
        String taskRunId = UUID.randomUUID().toString();
        String taskRunUid = new Uuid5("TASK_RUN_UID", taskRunId).toString();
        String taskSignature = new Sha1Short("TASK_SIGNATURE" + methodName, this.runId).toString();
        String taskDefinitionUid = new Uuid5("TASK_DEFINITION" + methodName, this.runId).toString();
        String taskRunAttemptUid = new Uuid5("TASK_RUN_ATTEMPT" + methodName, this.runId).toString();
        String taskAfId = methodName;
        ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
        Pair<List<TaskRunParam>, List<LogTarget>> paramsAndTargets = this.buildTaskRunParamsAndTargets(method, args, taskRunUid, methodName, taskRunAttemptUid, taskDefinitionUid);
        List<TaskRunParam> params = paramsAndTargets.left();
        List<LogTarget> targets = paramsAndTargets.right();
        TaskRun taskRun = new TaskRun(this.rootRunUid, false, false, null, "", taskRunUid, taskSignature, taskAfId, params, taskSignature, false, now.toLocalDate(), now, "", "QUEUED", taskDefinitionUid, methodName, false, hasUpstreams, taskRunAttemptUid, taskAfId, this.airflowContext != null, false, methodName, taskAfId, "jvm", Collections.emptyMap());
        TaskDefinition taskDefinition = new TaskDefinition(methodName, "", new Sha1Long("SOURCE", this.runId).toString(), "", taskDefinitionUid, new Sha1Long("MODULE_SOURCE", this.runId).toString(), paramDefinitions, "jvm_task", "java", "");
        this.methodsRunsCache.put(method, taskRun);
        return new RunAndDefinition(taskRun, taskDefinition, targets);
    }

    @Override
    public void errorTask(Method method, Throwable error) {
        TaskRun task = this.stack.pop();
        if (task == null) {
            return;
        }
        String stackTrace = this.extractStackTrace(error);
        task.appendLog(stackTrace);
        this.dbnd.saveTaskLog(task.getTaskRunAttemptUid(), task.getTaskLog());
        this.dbnd.logMetrics(task.getTaskRunAttemptUid(), task.getMetrics(), "spark");
        ErrorInfo errorInfo = new ErrorInfo(error.getLocalizedMessage(), "", false, stackTrace, "", "", false, error.getClass().getCanonicalName());
        this.dbnd.updateTaskRunAttempt(task.getTaskRunUid(), task.getTaskRunAttemptUid(), "FAILED", errorInfo, task.getStartDate());
    }

    /*
     * Exception decompiling
     */
    protected String extractStackTrace(Throwable error) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    @Override
    public void completeTask(Method method, Object result) {
        TaskRun task = this.stack.pop();
        if (task == null) {
            return;
        }
        if (result != null) {
            TaskParameterPreview taskParameter = this.parameters.get(result.getClass());
            String preview = taskParameter.full(result);
            this.taskRunOutputs.put(result.hashCode(), task);
            this.dbnd.logTargets(task.getTaskRunUid(), Collections.singletonList(new LogTarget(this.rootRunUid, task.getTaskRunUid(), task.getTaskAfId(), task.getTaskRunAttemptUid(), new Sha1Long("TARGET_PATH", preview).toString(), "result", task.getTaskDefinitionUid(), "write", "OK", preview, taskParameter.dimensions(result), taskParameter.schema(result), new Sha1Long("", preview).toString())));
        }
        this.dbnd.saveTaskLog(task.getTaskRunAttemptUid(), task.getTaskLog());
        this.dbnd.logMetrics(task.getTaskRunAttemptUid(), task.getMetrics(), "spark");
        this.dbnd.updateTaskRunAttempt(task.getTaskRunUid(), task.getTaskRunAttemptUid(), "SUCCESS", null, task.getStartDate());
    }

    @Override
    public void stop() {
        this.dbnd.saveTaskLog(this.driverTask.getTaskRunAttemptUid(), this.driverTask.getTaskLog());
        this.dbnd.logMetrics(this.driverTask.getTaskRunAttemptUid(), this.driverTask.getMetrics(), "spark");
        this.dbnd.updateTaskRunAttempt(this.driverTask.getTaskRunUid(), this.driverTask.getTaskRunAttemptUid(), "SUCCESS", null, this.driverTask.getStartDate());
        this.dbnd.setRunState(this.rootRunUid, "SUCCESS");
    }

    @Override
    public void error(Throwable error) {
        String stackTrace = this.extractStackTrace(error);
        ErrorInfo errorInfo = new ErrorInfo(error.getLocalizedMessage(), "", false, stackTrace, "", "", false, error.getClass().getCanonicalName());
        this.driverTask.appendLog(stackTrace);
        this.dbnd.saveTaskLog(this.driverTask.getTaskRunAttemptUid(), this.driverTask.getTaskLog());
        this.dbnd.logMetrics(this.driverTask.getTaskRunAttemptUid(), this.driverTask.getMetrics(), "spark");
        this.dbnd.updateTaskRunAttempt(this.driverTask.getTaskRunUid(), this.driverTask.getTaskRunAttemptUid(), "FAILED", errorInfo, this.driverTask.getStartDate());
        this.dbnd.setRunState(this.rootRunUid, "FAILED");
    }

    @Override
    public void logMetric(String key, Object value) {
        TaskRun currentTask = this.stack.peek();
        if (currentTask == null) {
            currentTask = this.driverTask;
        }
        this.logMetric(currentTask, key, value, null);
    }

    @Override
    public void logMetrics(Map<String, Object> metrics) {
        this.logMetrics(metrics, null);
    }

    @Override
    public void logMetrics(Map<String, Object> metrics, String source) {
        TaskRun currentTask = this.stack.peek();
        if (currentTask == null) {
            currentTask = this.driverTask;
        }
        this.logMetrics(currentTask, metrics, source);
    }

    @Override
    public void logDataframe(String key, Dataset<?> value, HistogramRequest histogramRequest) {
        try {
            TaskRun currentTask = this.stack.peek();
            if (currentTask == null) {
                currentTask = this.driverTask;
            }
            this.logMetric(currentTask, key, value, "user", false);
            this.dbnd.logMetrics(currentTask.getTaskRunAttemptUid(), new Histogram(key, value, histogramRequest).metricValues(), "histograms");
        }
        catch (Exception e) {
            LOG.error("Unable to log dataframe", (Throwable)e);
        }
    }

    @Override
    public void logHistogram(Map<String, Object> histogram) {
        try {
            TaskRun currentTask = this.stack.peek();
            if (currentTask == null) {
                currentTask = this.driverTask;
            }
            this.dbnd.logMetrics(currentTask.getTaskRunAttemptUid(), histogram, "histograms");
        }
        catch (Exception e) {
            LOG.error("Unable to log histogram", (Throwable)e);
        }
    }

    @Override
    public void logDatasetOperation(String operationPath, DatasetOperationTypes operationType, DatasetOperationStatuses operationStatus, String valuePreview, List<Long> dataDimensions, String dataSchema) {
        try {
            TaskRun currentTask = this.stack.peek();
            if (currentTask == null) {
                currentTask = this.driverTask;
            }
            this.dbnd.logDatasetOperations(currentTask.getTaskRunUid(), Collections.singletonList(new LogDataset(currentTask, operationPath, operationType, operationStatus, valuePreview, dataDimensions, dataSchema)));
        }
        catch (Exception e) {
            LOG.error("Unable to log dataset operation", (Throwable)e);
        }
    }

    public void logMetric(TaskRun taskRun, String key, Object value, String source) {
        this.logMetric(taskRun, key, value, source, true);
    }

    public void logMetric(TaskRun taskRun, String key, Object value, String source, boolean compact) {
        try {
            if (taskRun == null) {
                return;
            }
            TaskParameterPreview taskParameter = this.parameters.get(value.getClass());
            this.dbnd.logMetric(taskRun.getTaskRunAttemptUid(), key, compact ? taskParameter.compact(value) : taskParameter.full(value), source);
        }
        catch (Exception e) {
            LOG.error("Unable to log metric", (Throwable)e);
        }
    }

    public void logMetrics(TaskRun taskRun, Map<String, Object> metrics, String source) {
        try {
            if (taskRun == null) {
                return;
            }
            HashMap<String, Object> result = new HashMap<String, Object>(metrics.size());
            for (Map.Entry<String, Object> entry : metrics.entrySet()) {
                TaskParameterPreview taskParameter = this.parameters.get(entry.getValue().getClass());
                result.put(entry.getKey(), taskParameter.compact(entry.getValue()));
            }
            this.dbnd.logMetrics(taskRun.getTaskRunAttemptUid(), result, source);
        }
        catch (Exception e) {
            LOG.error("Unable to log metrics");
        }
    }

    @Override
    public void saveLog(LoggingEvent event, String formattedEvent) {
        try {
            if (this.driverTask == null) {
                return;
            }
            TaskRun currentTask = this.stack.peek();
            if (DbndClient.class.getName().equals(event.getLoggerName())) {
                return;
            }
            if (currentTask == null) {
                this.driverTask.appendLog(formattedEvent);
            } else {
                currentTask.appendLog(formattedEvent);
            }
        }
        catch (Exception e) {
            LOG.error("Unable to save task log", (Throwable)e);
        }
    }

    @Override
    public void saveSparkMetrics(SparkListenerStageCompleted event) {
        try {
            StageInfo stageInfo = event.stageInfo();
            TaskRun currentTask = this.stack.peek();
            if (currentTask == null) {
                currentTask = this.driverTask;
            }
            String transformationName = stageInfo.name().substring(0, stageInfo.name().indexOf(32));
            String metricPrefix = String.format("stage-%s.%s.", stageInfo.stageId(), transformationName);
            Iterator it = stageInfo.taskMetrics().accumulators().iterator();
            HashMap<String, Object> values = new HashMap<String, Object>(1);
            HashMap<String, Object> prefixedValues = new HashMap<String, Object>(1);
            while (it.hasNext()) {
                AccumulatorV2 next = (AccumulatorV2)it.next();
                if (!(next instanceof LongAccumulator)) continue;
                String metricName = (String)next.name().get();
                String value = String.valueOf(next.value());
                prefixedValues.put(metricPrefix + metricName, value);
                values.put(metricName, value);
            }
            currentTask.appendMetrics(values);
            currentTask.appendPrefixedMetrics(prefixedValues);
        }
        catch (Exception e) {
            LOG.error("Unable to save spark metrics", (Throwable)e);
        }
    }
}

