package com.twitter.ambrose.cascading;

import cascading.flow.Flow;
import cascading.flow.FlowListener;
import cascading.flow.FlowStep;
import cascading.flow.FlowStepListener;
import cascading.flow.Flows;
import cascading.stats.hadoop.HadoopStepStats;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.twitter.ambrose.model.DAGNode;
import com.twitter.ambrose.model.Event;
import com.twitter.ambrose.model.hadoop.MapReduceHelper;
import com.twitter.ambrose.service.StatsWriteService;
import com.twitter.ambrose.util.AmbroseUtils;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/twitter/ambrose/cascading/AmbroseCascadingNotifier.class */
public class AmbroseCascadingNotifier implements FlowListener, FlowStepListener {
    private static final Log LOG = LogFactory.getLog(AmbroseCascadingNotifier.class);
    private final StatsWriteService statsWriteService;
    private int totalNumberOfJobs;
    private int runningJobs;
    private String currentFlowId;
    private final MapReduceHelper mapReduceHelper = new MapReduceHelper();
    private final Map<String, DAGNode<CascadingJob>> nodesByName = Maps.newTreeMap();
    private final Set<String> completedStepNames = Sets.newHashSet();

    public AmbroseCascadingNotifier(StatsWriteService statsWriteService) {
        this.statsWriteService = statsWriteService;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StatsWriteService getStatsWriteService() {
        return this.statsWriteService;
    }

    private DAGNode<CascadingJob> getNode(FlowStep flowStep) {
        String name = flowStep.getName();
        DAGNode<CascadingJob> dAGNode = this.nodesByName.get(name);
        if (dAGNode == null) {
            throw new IllegalStateException(String.format("Node with name '%s' not found", name));
        }
        return dAGNode;
    }

    private DAGNode<CascadingJob> updateNode(FlowStep flowStep) {
        DAGNode<CascadingJob> node = getNode(flowStep);
        CascadingJob job = node.getJob();
        HadoopStepStats hadoopStepStats = (HadoopStepStats) flowStep.getFlowStepStats();
        job.setId(hadoopStepStats.getJobID());
        job.setJobStats(hadoopStepStats);
        this.mapReduceHelper.addMapReduceJobState(job, hadoopStepStats.getJobClient());
        return node;
    }

    public void onStarting(Flow flow) {
        this.totalNumberOfJobs = flow.getFlowSteps().size();
        this.currentFlowId = flow.getID();
        Properties properties = new Properties();
        properties.putAll(flow.getConfigAsProperties());
        try {
            this.statsWriteService.initWriteService(properties);
        } catch (IOException e) {
            LOG.error("Failed to initialize statsWriteService", e);
        }
        new AmbroseCascadingGraphConverter(Flows.getStepGraphFrom(flow), this.nodesByName).convert();
        AmbroseUtils.sendDagNodeNameMap(this.statsWriteService, this.currentFlowId, this.nodesByName);
    }

    public void onStopping(Flow flow) {
    }

    public boolean onThrowable(Flow flow, Throwable th) {
        return false;
    }

    public void onCompleted(Flow flow) {
        AmbroseUtils.pushWorkflowProgressEvent(this.statsWriteService, this.currentFlowId, 100);
    }

    public void onStepStarting(FlowStep flowStep) {
        this.runningJobs++;
        try {
            AmbroseUtils.pushEvent(this.statsWriteService, this.currentFlowId, new Event.JobStartedEvent(updateNode(flowStep)));
        } catch (Exception e) {
            LOG.error("Failed to handle onStepStarting event", e);
        }
    }

    public void onStepCompleted(FlowStep flowStep) {
        try {
            AmbroseUtils.pushEvent(this.statsWriteService, this.currentFlowId, new Event.JobFinishedEvent(updateNode(flowStep)));
        } catch (Exception e) {
            LOG.error("Failed to handle onStepCompleted event", e);
        }
    }

    public boolean onStepThrowable(FlowStep flowStep, Throwable th) {
        try {
            AmbroseUtils.pushEvent(this.statsWriteService, this.currentFlowId, new Event.JobFailedEvent(updateNode(flowStep)));
            return false;
        } catch (Exception e) {
            LOG.error("Failed to handle onStepThrowable event", e);
            return false;
        }
    }

    public void onStepRunning(FlowStep flowStep) {
        AmbroseUtils.pushWorkflowProgressEvent(this.statsWriteService, this.currentFlowId, (int) ((this.runningJobs / this.totalNumberOfJobs) * 100.0d));
        if (this.completedStepNames.contains(flowStep.getName())) {
            return;
        }
        try {
            DAGNode<CascadingJob> updateNode = updateNode(flowStep);
            if (updateNode.getJob().getMapReduceJobState() != null) {
                AmbroseUtils.pushEvent(this.statsWriteService, this.currentFlowId, new Event.JobProgressEvent(updateNode));
                if (updateNode.getJob().getMapReduceJobState().isComplete()) {
                    this.completedStepNames.add(flowStep.getName());
                }
            }
        } catch (Exception e) {
            LOG.error("Failed to handle onStepRunning event", e);
        }
    }

    public void onStepStopping(FlowStep flowStep) {
    }
}
