package com.twitter.ambrose.cascading;

import cascading.flow.Flow;
import cascading.flow.FlowListener;
import cascading.flow.FlowStep;
import cascading.flow.FlowStepListener;
import cascading.flow.Flows;
import cascading.stats.hadoop.HadoopStepStats;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.twitter.ambrose.model.DAGNode;
import com.twitter.ambrose.model.Event;
import com.twitter.ambrose.model.Job;
import com.twitter.ambrose.model.hadoop.MapReduceHelper;
import com.twitter.ambrose.service.StatsWriteService;
import com.twitter.ambrose.util.AmbroseUtils;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobClient;

/* loaded from: input_file:com/twitter/ambrose/cascading/AmbroseCascadingNotifier.class */
public class AmbroseCascadingNotifier implements FlowListener, FlowStepListener {
    private static final Log LOG = LogFactory.getLog(AmbroseCascadingNotifier.class);
    private final StatsWriteService statsWriteService;
    private int totalNumberOfJobs;
    private int runningJobs;
    private String currentFlowId;
    private final MapReduceHelper mapReduceHelper = new MapReduceHelper();
    private final List<Job> jobs = Lists.newArrayList();
    private final Map<String, DAGNode<CascadingJob>> dagNodeNameMap = Maps.newTreeMap();
    private final Map<String, DAGNode<CascadingJob>> dagNodeJobIdMap = Maps.newTreeMap();
    private final Set<String> completedJobIds = Sets.newHashSet();

    public AmbroseCascadingNotifier(StatsWriteService statsWriteService) {
        this.statsWriteService = statsWriteService;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StatsWriteService getStatsWriteService() {
        return this.statsWriteService;
    }

    public void onStarting(Flow flow) {
        this.totalNumberOfJobs = flow.getFlowSteps().size();
        this.currentFlowId = flow.getID();
        Properties properties = new Properties();
        properties.putAll(flow.getConfigAsProperties());
        try {
            this.statsWriteService.initWriteService(properties);
            new AmbroseCascadingGraphConverter(Flows.getStepGraphFrom(flow), this.dagNodeNameMap).convert();
            AmbroseUtils.sendDagNodeNameMap(this.statsWriteService, this.currentFlowId, this.dagNodeNameMap);
        } catch (IOException e) {
            throw new RuntimeException("Exception while initializing statsWriteService", e);
        }
    }

    public void onStopping(Flow flow) {
    }

    public void onCompleted(Flow flow) {
    }

    public boolean onThrowable(Flow flow, Throwable th) {
        return false;
    }

    public void onStepStarting(FlowStep flowStep) {
        HadoopStepStats flowStepStats = flowStep.getFlowStepStats();
        String jobID = flowStepStats.getJobID();
        String name = flowStep.getName();
        JobClient jobClient = flowStepStats.getJobClient();
        this.runningJobs++;
        DAGNode<CascadingJob> dAGNode = this.dagNodeNameMap.get(name);
        if (dAGNode == null) {
            LOG.warn("jobStartedNotification - unrecognized operator name found (" + name + ") for jobId " + jobID);
            return;
        }
        CascadingJob job = dAGNode.getJob();
        job.setId(jobID);
        job.setJobStats(flowStepStats);
        this.mapReduceHelper.addMapReduceJobState(job, jobClient);
        this.dagNodeJobIdMap.put(job.getId(), dAGNode);
        AmbroseUtils.pushEvent(this.statsWriteService, this.currentFlowId, new Event.JobStartedEvent(dAGNode));
    }

    public void onStepCompleted(FlowStep flowStep) {
        HadoopStepStats hadoopStepStats = (HadoopStepStats) flowStep.getFlowStepStats();
        DAGNode<CascadingJob> dAGNode = this.dagNodeJobIdMap.get(hadoopStepStats.getJobID());
        if (dAGNode == null) {
            LOG.warn("Unrecognized jobId reported for succeeded job: " + hadoopStepStats.getJobID());
            return;
        }
        this.mapReduceHelper.addMapReduceJobState(dAGNode.getJob(), hadoopStepStats.getJobClient());
        addCompletedJobStats((CascadingJob) dAGNode.getJob(), hadoopStepStats);
        AmbroseUtils.pushEvent(this.statsWriteService, this.currentFlowId, new Event.JobFinishedEvent(dAGNode));
    }

    public boolean onStepThrowable(FlowStep flowStep, Throwable th) {
        HadoopStepStats hadoopStepStats = (HadoopStepStats) flowStep.getFlowStepStats();
        DAGNode<CascadingJob> dAGNode = this.dagNodeNameMap.get(flowStep.getName());
        if (dAGNode == null) {
            LOG.warn("Unrecognized jobId reported for succeeded job: " + hadoopStepStats.getJobID());
            return false;
        }
        this.mapReduceHelper.addMapReduceJobState(dAGNode.getJob(), hadoopStepStats.getJobClient());
        addCompletedJobStats((CascadingJob) dAGNode.getJob(), hadoopStepStats);
        AmbroseUtils.pushEvent(this.statsWriteService, this.currentFlowId, new Event.JobFailedEvent(dAGNode));
        return false;
    }

    public void onStepRunning(FlowStep flowStep) {
        HadoopStepStats flowStepStats = flowStep.getFlowStepStats();
        JobClient jobClient = flowStepStats.getJobClient();
        AmbroseUtils.pushWorkflowProgressEvent(this.statsWriteService, this.currentFlowId, (int) (((this.runningJobs * 1.0d) / this.totalNumberOfJobs) * 100.0d));
        DAGNode<CascadingJob> dAGNode = this.dagNodeJobIdMap.get(flowStepStats.getJobID());
        if (dAGNode == null) {
            LOG.warn("Unrecognized jobId reported for succeeded job: " + flowStepStats.getJobID());
            return;
        }
        if (this.completedJobIds.contains(dAGNode.getJob().getId())) {
            return;
        }
        this.mapReduceHelper.addMapReduceJobState(dAGNode.getJob(), jobClient);
        if (dAGNode.getJob().getMapReduceJobState() != null) {
            AmbroseUtils.pushEvent(this.statsWriteService, this.currentFlowId, new Event.JobProgressEvent(dAGNode));
            if (dAGNode.getJob().getMapReduceJobState().isComplete()) {
                this.completedJobIds.add(dAGNode.getJob().getId());
            }
        }
    }

    public void onStepStopping(FlowStep flowStep) {
    }

    private void addCompletedJobStats(CascadingJob cascadingJob, HadoopStepStats hadoopStepStats) {
        cascadingJob.setJobStats(hadoopStepStats);
        this.jobs.add(cascadingJob);
    }
}
