package cascading.flow;

import cascading.CascadingException;
import cascading.cascade.Cascade;
import cascading.flow.planner.BaseFlowNode;
import cascading.flow.planner.BaseFlowStep;
import cascading.flow.planner.FlowStepJob;
import cascading.flow.planner.PlannerInfo;
import cascading.flow.planner.PlatformInfo;
import cascading.flow.planner.graph.ElementGraphs;
import cascading.flow.planner.graph.FlowElementGraph;
import cascading.flow.planner.process.FlowStepGraph;
import cascading.flow.planner.process.ProcessGraphs;
import cascading.flow.planner.process.ProcessModel;
import cascading.management.CascadingServices;
import cascading.management.UnitOfWorkExecutorStrategy;
import cascading.management.UnitOfWorkSpawnStrategy;
import cascading.management.state.ClientState;
import cascading.property.AppProps;
import cascading.property.PropertyUtil;
import cascading.stats.FlowStats;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import cascading.util.ProcessLogger;
import cascading.util.ShutdownUtil;
import cascading.util.Util;
import cascading.util.Version;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import riffle.process.DependencyIncoming;
import riffle.process.DependencyOutgoing;
import riffle.process.Process;
import riffle.process.ProcessCleanup;
import riffle.process.ProcessComplete;
import riffle.process.ProcessPrepare;
import riffle.process.ProcessStart;
import riffle.process.ProcessStop;

@Process
/* loaded from: input_file:cascading/flow/BaseFlow.class */
public abstract class BaseFlow<Config> implements Flow<Config>, ProcessLogger {
    private static final Logger LOG = LoggerFactory.getLogger(Flow.class);
    private static final int LOG_FLOW_NAME_MAX = 25;
    private PlannerInfo plannerInfo;
    protected PlatformInfo platformInfo;
    private String id;
    private String name;
    private String runID;
    private List<String> classPath;
    private String tags;
    private List<BaseFlow<Config>.SafeFlowListener> listeners;
    private FlowSkipStrategy flowSkipStrategy;
    protected FlowStats flowStats;
    protected Map<String, Tap> sources;
    protected Map<String, Tap> sinks;
    private Map<String, Tap> traps;
    private Map<String, Tap> checkpoints;
    protected boolean stopJobsOnExit;
    private int submitPriority;
    protected FlowStepGraph flowStepGraph;
    protected transient Thread thread;
    protected Throwable throwable;
    protected volatile boolean stop;
    protected volatile boolean completed;
    protected String flowCanonicalHash;
    protected FlowElementGraph flowElementGraph;
    private transient CascadingServices cascadingServices;
    private FlowStepStrategy<Config> flowStepStrategy;
    protected transient List<FlowStep<Config>> steps;
    private transient Map<String, FlowStepJob<Config>> jobsMap;
    private transient UnitOfWorkSpawnStrategy spawnStrategy;
    private transient ReentrantLock stopLock;
    protected ShutdownUtil.Hook shutdownHook;
    protected HashMap<String, String> flowDescriptor;

    /* loaded from: input_file:cascading/flow/BaseFlow$FlowHolder.class */
    public static class FlowHolder {
        public Flow flow;

        public FlowHolder() {
        }

        public FlowHolder(Flow flow) {
            this.flow = flow;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cascading/flow/BaseFlow$SafeFlowListener.class */
    public class SafeFlowListener implements FlowListener {
        final FlowListener flowListener;
        Throwable throwable;

        private SafeFlowListener(FlowListener flowListener) {
            this.flowListener = flowListener;
        }

        @Override // cascading.flow.FlowListener
        public void onStarting(Flow flow) {
            try {
                this.flowListener.onStarting(flow);
            } catch (Throwable th) {
                handleThrowable(th);
            }
        }

        @Override // cascading.flow.FlowListener
        public void onStopping(Flow flow) {
            try {
                this.flowListener.onStopping(flow);
            } catch (Throwable th) {
                handleThrowable(th);
            }
        }

        @Override // cascading.flow.FlowListener
        public void onCompleted(Flow flow) {
            try {
                this.flowListener.onCompleted(flow);
            } catch (Throwable th) {
                handleThrowable(th);
            }
        }

        @Override // cascading.flow.FlowListener
        public boolean onThrowable(Flow flow, Throwable th) {
            try {
                return this.flowListener.onThrowable(flow, th);
            } catch (Throwable th2) {
                handleThrowable(th2);
                return false;
            }
        }

        private void handleThrowable(Throwable th) {
            this.throwable = th;
            BaseFlow.this.logWarn(String.format("flow listener %s threw throwable", this.flowListener), th);
            BaseFlow.this.stop();
        }

        public boolean equals(Object obj) {
            return obj instanceof SafeFlowListener ? this.flowListener.equals(((SafeFlowListener) obj).flowListener) : this.flowListener.equals(obj);
        }

        public int hashCode() {
            return this.flowListener.hashCode();
        }
    }

    static boolean getStopJobsOnExit(Map<Object, Object> map) {
        return Boolean.parseBoolean((String) PropertyUtil.getProperty(map, FlowProps.STOP_JOBS_ON_EXIT, "true"));
    }

    protected BaseFlow() {
        this.plannerInfo = PlannerInfo.NULL;
        this.platformInfo = PlatformInfo.NULL;
        this.flowSkipStrategy = new FlowSkipIfSinkNotStale();
        this.sources = Collections.emptyMap();
        this.sinks = Collections.emptyMap();
        this.traps = Collections.emptyMap();
        this.checkpoints = Collections.emptyMap();
        this.stopJobsOnExit = true;
        this.submitPriority = 5;
        this.completed = false;
        this.flowStepStrategy = null;
        this.spawnStrategy = new UnitOfWorkExecutorStrategy();
        this.stopLock = new ReentrantLock(true);
        this.name = "NA";
        this.flowStats = createPrepareFlowStats();
    }

    protected BaseFlow(PlatformInfo platformInfo, String str) {
        this.plannerInfo = PlannerInfo.NULL;
        this.platformInfo = PlatformInfo.NULL;
        this.flowSkipStrategy = new FlowSkipIfSinkNotStale();
        this.sources = Collections.emptyMap();
        this.sinks = Collections.emptyMap();
        this.traps = Collections.emptyMap();
        this.checkpoints = Collections.emptyMap();
        this.stopJobsOnExit = true;
        this.submitPriority = 5;
        this.completed = false;
        this.flowStepStrategy = null;
        this.spawnStrategy = new UnitOfWorkExecutorStrategy();
        this.stopLock = new ReentrantLock(true);
        if (platformInfo != null) {
            this.platformInfo = platformInfo;
        }
        this.name = str;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BaseFlow(PlatformInfo platformInfo, Map<Object, Object> map, Config config, String str, Map<String, String> map2) {
        this.plannerInfo = PlannerInfo.NULL;
        this.platformInfo = PlatformInfo.NULL;
        this.flowSkipStrategy = new FlowSkipIfSinkNotStale();
        this.sources = Collections.emptyMap();
        this.sinks = Collections.emptyMap();
        this.traps = Collections.emptyMap();
        this.checkpoints = Collections.emptyMap();
        this.stopJobsOnExit = true;
        this.submitPriority = 5;
        this.completed = false;
        this.flowStepStrategy = null;
        this.spawnStrategy = new UnitOfWorkExecutorStrategy();
        this.stopLock = new ReentrantLock(true);
        if (platformInfo != null) {
            this.platformInfo = platformInfo;
        }
        this.name = str;
        if (map2 != null) {
            this.flowDescriptor = new LinkedHashMap(map2);
        }
        addSessionProperties(map);
        initConfig(map, config);
    }

    protected BaseFlow(PlatformInfo platformInfo, Map<Object, Object> map, Config config, FlowDef flowDef) {
        this.plannerInfo = PlannerInfo.NULL;
        this.platformInfo = PlatformInfo.NULL;
        this.flowSkipStrategy = new FlowSkipIfSinkNotStale();
        this.sources = Collections.emptyMap();
        this.sinks = Collections.emptyMap();
        this.traps = Collections.emptyMap();
        this.checkpoints = Collections.emptyMap();
        this.stopJobsOnExit = true;
        this.submitPriority = 5;
        this.completed = false;
        this.flowStepStrategy = null;
        this.spawnStrategy = new UnitOfWorkExecutorStrategy();
        this.stopLock = new ReentrantLock(true);
        Map<Object, Object> asFlatMap = PropertyUtil.asFlatMap(map);
        if (platformInfo != null) {
            this.platformInfo = platformInfo;
        }
        this.name = flowDef.getName();
        this.tags = flowDef.getTags();
        this.runID = flowDef.getRunID();
        this.classPath = flowDef.getClassPath();
        if (!flowDef.getFlowDescriptor().isEmpty()) {
            this.flowDescriptor = new LinkedHashMap(flowDef.getFlowDescriptor());
        }
        addSessionProperties(asFlatMap);
        initConfig(asFlatMap, config);
        setSources(flowDef.getSourcesCopy());
        setSinks(flowDef.getSinksCopy());
        setTraps(flowDef.getTrapsCopy());
        setCheckpoints(flowDef.getCheckpointsCopy());
        initFromTaps();
        retrieveSourceFields();
        retrieveSinkFields();
    }

    public void setPlannerInfo(PlannerInfo plannerInfo) {
        this.plannerInfo = plannerInfo;
    }

    @Override // cascading.flow.Flow
    public PlannerInfo getPlannerInfo() {
        return this.plannerInfo;
    }

    @Override // cascading.flow.Flow
    public PlatformInfo getPlatformInfo() {
        return this.platformInfo;
    }

    public void initialize(FlowElementGraph flowElementGraph, FlowStepGraph flowStepGraph) {
        addPlannerProperties();
        this.flowElementGraph = flowElementGraph;
        this.flowStepGraph = flowStepGraph;
        initSteps();
        this.flowStats = createPrepareFlowStats();
        initializeNewJobsMap();
        initializeChildStats();
    }

    public FlowElementGraph updateSchemes(FlowElementGraph flowElementGraph) {
        presentSourceFields(flowElementGraph);
        presentSinkFields(flowElementGraph);
        return new FlowElementGraph(flowElementGraph);
    }

    protected void retrieveSourceFields() {
        Iterator<Tap> it = this.sources.values().iterator();
        while (it.hasNext()) {
            it.next().retrieveSourceFields(getFlowProcess());
        }
    }

    protected void presentSourceFields(FlowElementGraph flowElementGraph) {
        for (Tap tap : this.sources.values()) {
            if (flowElementGraph.containsVertex(tap)) {
                tap.presentSourceFields(getFlowProcess(), getFieldsFor(flowElementGraph, tap));
            }
        }
        for (Tap tap2 : this.checkpoints.values()) {
            if (flowElementGraph.containsVertex(tap2)) {
                tap2.presentSourceFields(getFlowProcess(), getFieldsFor(flowElementGraph, tap2));
            }
        }
    }

    protected void retrieveSinkFields() {
        Iterator<Tap> it = this.sinks.values().iterator();
        while (it.hasNext()) {
            it.next().retrieveSinkFields(getFlowProcess());
        }
    }

    protected void presentSinkFields(FlowElementGraph flowElementGraph) {
        for (Tap tap : this.sinks.values()) {
            if (flowElementGraph.containsVertex(tap)) {
                tap.presentSinkFields(getFlowProcess(), getFieldsFor(flowElementGraph, tap));
            }
        }
        for (Tap tap2 : this.checkpoints.values()) {
            if (flowElementGraph.containsVertex(tap2)) {
                tap2.presentSinkFields(getFlowProcess(), getFieldsFor(flowElementGraph, tap2));
            }
        }
    }

    protected Fields getFieldsFor(FlowElementGraph flowElementGraph, Tap tap) {
        return flowElementGraph.outgoingEdgesOf(tap).iterator().next().getOutValuesFields();
    }

    protected void addSessionProperties(Map<Object, Object> map) {
        if (map == null) {
            return;
        }
        PropertyUtil.setProperty(map, Flow.CASCADING_FLOW_ID, getID());
        PropertyUtil.setProperty(map, "cascading.flow.tags", getTags());
        AppProps.setApplicationID(map);
        PropertyUtil.setProperty(map, AppProps.APP_NAME, makeAppName(map));
        PropertyUtil.setProperty(map, AppProps.APP_VERSION, makeAppVersion(map));
    }

    protected void addPlannerProperties() {
        setConfigProperty(getConfig(), "cascading.flow.planner", getPlannerInfo().name);
        setConfigProperty(getConfig(), "cascading.flow.platform", getPlannerInfo().platform);
        setConfigProperty(getConfig(), "cascading.flow.registry", getPlannerInfo().registry);
    }

    private String makeAppName(Map<Object, Object> map) {
        if (map == null) {
            return null;
        }
        String applicationName = AppProps.getApplicationName(map);
        return applicationName != null ? applicationName : Util.findName(AppProps.getApplicationJarPath(map));
    }

    private String makeAppVersion(Map<Object, Object> map) {
        if (map == null) {
            return null;
        }
        String applicationVersion = AppProps.getApplicationVersion(map);
        return applicationVersion != null ? applicationVersion : Util.findVersion(AppProps.getApplicationJarPath(map));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FlowStats createPrepareFlowStats() {
        FlowStats createFlowStats = createFlowStats();
        createFlowStats.prepare();
        createFlowStats.markPending();
        return createFlowStats;
    }

    protected FlowStats createFlowStats() {
        return new FlowStats(this, getClientState());
    }

    public CascadingServices getCascadingServices() {
        if (this.cascadingServices == null) {
            this.cascadingServices = new CascadingServices(getConfigAsProperties());
        }
        return this.cascadingServices;
    }

    protected ClientState getClientState() {
        return getFlowSession().getCascadingServices().createClientState(getID());
    }

    protected void initSteps() {
        if (this.flowStepGraph == null) {
            return;
        }
        for (FlowStep flowStep : this.flowStepGraph.vertexSet()) {
            ((BaseFlowStep) flowStep).setFlow(this);
            Iterator<FlowNode> it = flowStep.getFlowNodeGraph().vertexSet().iterator();
            while (it.hasNext()) {
                ((BaseFlowNode) it.next()).setFlowStep(flowStep);
            }
        }
    }

    private void initFromTaps() {
        initFromTaps(this.sources);
        initFromTaps(this.sinks);
        initFromTaps(this.traps);
    }

    private void initFromTaps(Map<String, Tap> map) {
        Iterator<Tap> it = map.values().iterator();
        while (it.hasNext()) {
            it.next().flowConfInit(this);
        }
    }

    @Override // cascading.flow.Flow, cascading.management.UnitOfWork
    public String getName() {
        return this.name;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setName(String str) {
        this.name = str;
    }

    @Override // cascading.flow.Flow, cascading.management.UnitOfWork
    public String getID() {
        if (this.id == null) {
            this.id = Util.createUniqueID();
        }
        return this.id;
    }

    @Override // cascading.flow.Flow, cascading.management.UnitOfWork
    public String getTags() {
        return this.tags;
    }

    @Override // cascading.flow.Flow
    public int getSubmitPriority() {
        return this.submitPriority;
    }

    @Override // cascading.flow.Flow
    public void setSubmitPriority(int i) {
        if (i < 1 || i > 10) {
            throw new IllegalArgumentException("submitPriority must be between 1 and 10 inclusive, was: " + i);
        }
        this.submitPriority = i;
    }

    public String getFlowCanonicalHash() {
        if (this.flowCanonicalHash != null || this.flowElementGraph == null) {
            return this.flowCanonicalHash;
        }
        synchronized (this.flowElementGraph) {
            this.flowCanonicalHash = createFlowCanonicalHash(this.flowElementGraph);
        }
        return this.flowCanonicalHash;
    }

    protected String createFlowCanonicalHash(FlowElementGraph flowElementGraph) {
        if (flowElementGraph == null) {
            return null;
        }
        return ElementGraphs.canonicalHash(flowElementGraph);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FlowElementGraph getFlowElementGraph() {
        return this.flowElementGraph;
    }

    protected void setFlowElementGraph(FlowElementGraph flowElementGraph) {
        this.flowElementGraph = flowElementGraph;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FlowStepGraph getFlowStepGraph() {
        return this.flowStepGraph;
    }

    protected void setFlowStepGraph(FlowStepGraph flowStepGraph) {
        this.flowStepGraph = flowStepGraph;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setSources(Map<String, Tap> map) {
        if (map == null) {
            return;
        }
        addListeners(map.values());
        this.sources = map;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setSinks(Map<String, Tap> map) {
        if (map == null) {
            return;
        }
        addListeners(map.values());
        this.sinks = map;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setTraps(Map<String, Tap> map) {
        addListeners(map.values());
        this.traps = map;
    }

    protected void setCheckpoints(Map<String, Tap> map) {
        addListeners(map.values());
        this.checkpoints = map;
    }

    protected abstract void initConfig(Map<Object, Object> map, Config config);

    public Config createConfig(Map<Object, Object> map, Config config) {
        Config newConfig = newConfig(config);
        if (map == null) {
            return newConfig;
        }
        HashSet hashSet = new HashSet(map.keySet());
        if (map instanceof Properties) {
            hashSet.addAll(((Properties) map).stringPropertyNames());
        }
        for (Object obj : hashSet) {
            Object obj2 = map.get(obj);
            if (obj2 == null && (map instanceof Properties) && (obj instanceof String)) {
                obj2 = ((Properties) map).getProperty((String) obj);
            }
            if (obj2 != null) {
                setConfigProperty(newConfig, obj, obj2);
            }
        }
        return newConfig;
    }

    protected abstract void setConfigProperty(Config config, Object obj, Object obj2);

    protected abstract Config newConfig(Config config);

    protected void initFromProperties(Map<Object, Object> map) {
        this.stopJobsOnExit = getStopJobsOnExit(map);
    }

    public FlowSession getFlowSession() {
        return new FlowSession(getCascadingServices());
    }

    @Override // cascading.flow.Flow
    public FlowStats getFlowStats() {
        return this.flowStats;
    }

    @Override // cascading.flow.Flow
    public Map<String, String> getFlowDescriptor() {
        return this.flowDescriptor == null ? Collections.emptyMap() : Collections.unmodifiableMap(this.flowDescriptor);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // cascading.management.UnitOfWork
    public FlowStats getStats() {
        return getFlowStats();
    }

    void addListeners(Collection collection) {
        for (Object obj : collection) {
            if (obj instanceof FlowListener) {
                addListener((FlowListener) obj);
            }
        }
    }

    protected void removeListeners(Collection collection) {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        for (Object obj : collection) {
            if (obj instanceof FlowListener) {
                removeListener((FlowListener) obj);
            }
        }
    }

    List<BaseFlow<Config>.SafeFlowListener> getListeners() {
        if (this.listeners == null) {
            this.listeners = new LinkedList();
        }
        return this.listeners;
    }

    @Override // cascading.flow.Flow
    public boolean hasListeners() {
        return (this.listeners == null || this.listeners.isEmpty()) ? false : true;
    }

    @Override // cascading.flow.Flow
    public void addListener(FlowListener flowListener) {
        getListeners().add(new SafeFlowListener(flowListener));
    }

    @Override // cascading.flow.Flow
    public boolean removeListener(FlowListener flowListener) {
        return getListeners().remove(new SafeFlowListener(flowListener));
    }

    @Override // cascading.flow.Flow
    public boolean hasStepListeners() {
        boolean z = false;
        Iterator<FlowStep<Config>> it = getFlowSteps().iterator();
        while (it.hasNext()) {
            z |= it.next().hasListeners();
        }
        return z;
    }

    @Override // cascading.flow.Flow
    public void addStepListener(FlowStepListener flowStepListener) {
        Iterator<FlowStep<Config>> it = getFlowSteps().iterator();
        while (it.hasNext()) {
            it.next().addListener(flowStepListener);
        }
    }

    @Override // cascading.flow.Flow
    public boolean removeStepListener(FlowStepListener flowStepListener) {
        boolean z = true;
        Iterator<FlowStep<Config>> it = getFlowSteps().iterator();
        while (it.hasNext()) {
            z &= it.next().removeListener(flowStepListener);
        }
        return z;
    }

    @Override // cascading.flow.Flow
    public Map<String, Tap> getSources() {
        return Collections.unmodifiableMap(this.sources);
    }

    @Override // cascading.flow.Flow
    public List<String> getSourceNames() {
        return new ArrayList(this.sources.keySet());
    }

    @Override // cascading.flow.Flow
    public Tap getSource(String str) {
        return this.sources.get(str);
    }

    @Override // cascading.flow.Flow
    @DependencyIncoming
    public Collection<Tap> getSourcesCollection() {
        return getSources().values();
    }

    @Override // cascading.flow.Flow
    public Map<String, Tap> getSinks() {
        return Collections.unmodifiableMap(this.sinks);
    }

    @Override // cascading.flow.Flow
    public List<String> getSinkNames() {
        return new ArrayList(this.sinks.keySet());
    }

    @Override // cascading.flow.Flow
    public Tap getSink(String str) {
        return this.sinks.get(str);
    }

    @Override // cascading.flow.Flow
    @DependencyOutgoing
    public Collection<Tap> getSinksCollection() {
        return getSinks().values();
    }

    @Override // cascading.flow.Flow
    public Tap getSink() {
        return this.sinks.values().iterator().next();
    }

    @Override // cascading.flow.Flow
    public Map<String, Tap> getTraps() {
        return Collections.unmodifiableMap(this.traps);
    }

    @Override // cascading.flow.Flow
    public List<String> getTrapNames() {
        return new ArrayList(this.traps.keySet());
    }

    @Override // cascading.flow.Flow
    public Collection<Tap> getTrapsCollection() {
        return getTraps().values();
    }

    @Override // cascading.flow.Flow
    public Map<String, Tap> getCheckpoints() {
        return Collections.unmodifiableMap(this.checkpoints);
    }

    @Override // cascading.flow.Flow
    public List<String> getCheckpointNames() {
        return new ArrayList(this.checkpoints.keySet());
    }

    @Override // cascading.flow.Flow
    public Collection<Tap> getCheckpointsCollection() {
        return getCheckpoints().values();
    }

    @Override // cascading.flow.Flow
    public boolean isStopJobsOnExit() {
        return this.stopJobsOnExit;
    }

    @Override // cascading.flow.Flow
    public FlowSkipStrategy getFlowSkipStrategy() {
        return this.flowSkipStrategy;
    }

    @Override // cascading.flow.Flow
    public FlowSkipStrategy setFlowSkipStrategy(FlowSkipStrategy flowSkipStrategy) {
        if (flowSkipStrategy == null) {
            throw new IllegalArgumentException("flowSkipStrategy may not be null");
        }
        try {
            return this.flowSkipStrategy;
        } finally {
            this.flowSkipStrategy = flowSkipStrategy;
        }
    }

    @Override // cascading.flow.Flow
    public boolean isSkipFlow() throws IOException {
        return this.flowSkipStrategy.skipFlow(this);
    }

    @Override // cascading.flow.Flow
    public boolean areSinksStale() throws IOException {
        return areSourcesNewer(getSinkModified());
    }

    @Override // cascading.flow.Flow
    public boolean areSourcesNewer(long j) throws IOException {
        try {
            long sourceModified = Util.getSourceModified(getConfig(), this.sources.values().iterator(), j);
            if (j < sourceModified) {
                if (LOG.isInfoEnabled()) {
                    logInfo("source modification date at: " + new Date(sourceModified), new Object[0]);
                }
                return true;
            }
            if (LOG.isInfoEnabled()) {
                logInfo("source modification date at: " + new Date(sourceModified), new Object[0]);
            }
            return false;
        } catch (Throwable th) {
            if (LOG.isInfoEnabled()) {
                logInfo("source modification date at: " + new Date(0L), new Object[0]);
            }
            throw th;
        }
    }

    @Override // cascading.flow.Flow
    public long getSinkModified() throws IOException {
        long sinkModified = Util.getSinkModified(getConfig(), this.sinks.values());
        if (LOG.isInfoEnabled()) {
            if (sinkModified == -1) {
                logInfo("at least one sink is marked for delete", new Object[0]);
            }
            if (sinkModified == 0) {
                logInfo("at least one sink does not exist", new Object[0]);
            } else {
                logInfo("sink oldest modified date: " + new Date(sinkModified), new Object[0]);
            }
        }
        return sinkModified;
    }

    @Override // cascading.flow.Flow
    public FlowStepStrategy getFlowStepStrategy() {
        return this.flowStepStrategy;
    }

    @Override // cascading.flow.Flow
    public void setFlowStepStrategy(FlowStepStrategy flowStepStrategy) {
        this.flowStepStrategy = flowStepStrategy;
    }

    @Override // cascading.flow.Flow
    public List<FlowStep<Config>> getFlowSteps() {
        if (this.steps != null) {
            return this.steps;
        }
        if (this.flowStepGraph == null) {
            return Collections.emptyList();
        }
        Iterator<FlowStep> topologicalIterator = this.flowStepGraph.getTopologicalIterator();
        this.steps = new ArrayList();
        while (topologicalIterator.hasNext()) {
            this.steps.add(topologicalIterator.next());
        }
        return this.steps;
    }

    @Override // cascading.flow.Flow, cascading.management.UnitOfWork
    @ProcessPrepare
    public void prepare() {
        try {
            deleteSinksIfNotUpdate();
            deleteTrapsIfNotUpdate();
            deleteCheckpointsIfNotUpdate();
        } catch (IOException e) {
            throw new FlowTapException("unable to prepare flow", e);
        }
    }

    @Override // cascading.flow.Flow, cascading.management.UnitOfWork
    @ProcessStart
    public synchronized void start() {
        if (this.thread == null && !this.stop) {
            registerShutdownHook();
            internalStart();
            this.thread = createFlowThread(("flow " + Util.toNull(getName())).trim());
            this.thread.start();
        }
    }

    protected Thread createFlowThread(String str) {
        return new Thread(new Runnable() { // from class: cascading.flow.BaseFlow.1
            @Override // java.lang.Runnable
            public void run() {
                BaseFlow.this.run();
            }
        }, str);
    }

    protected abstract void internalStart();

    @Override // cascading.flow.Flow, cascading.management.UnitOfWork
    @ProcessStop
    public synchronized void stop() {
        this.stopLock.lock();
        try {
            if (this.stop) {
                try {
                    this.flowStats.cleanup();
                    this.stopLock.unlock();
                    if (this.thread != null) {
                        try {
                            this.thread.interrupt();
                            this.thread.join();
                            return;
                        } catch (InterruptedException e) {
                            return;
                        }
                    }
                    return;
                } finally {
                }
            }
            this.stop = true;
            fireOnStopping();
            if (!this.flowStats.isFinished()) {
                this.flowStats.markStopped();
            }
            internalStopAllJobs();
            handleExecutorShutdown();
            internalClean(true);
            try {
                this.flowStats.cleanup();
                this.stopLock.unlock();
                if (this.thread != null) {
                    try {
                        this.thread.interrupt();
                        this.thread.join();
                    } catch (InterruptedException e2) {
                    }
                }
            } finally {
            }
        } catch (Throwable th) {
            try {
                this.flowStats.cleanup();
                this.stopLock.unlock();
                if (this.thread != null) {
                    try {
                        this.thread.interrupt();
                        this.thread.join();
                    } catch (InterruptedException e3) {
                    }
                }
                throw th;
            } finally {
                this.stopLock.unlock();
                if (this.thread != null) {
                    try {
                        this.thread.interrupt();
                        this.thread.join();
                    } catch (InterruptedException e4) {
                    }
                }
            }
        }
    }

    protected abstract void internalClean(boolean z);

    @Override // cascading.flow.Flow, cascading.management.UnitOfWork
    @ProcessComplete
    public void complete() {
        if (!this.completed) {
            start();
        }
        try {
            try {
                synchronized (this) {
                    while (!this.completed && this.thread == null && !this.stop) {
                        Util.safeSleep(10L);
                    }
                }
                if (this.thread != null) {
                    this.thread.join();
                }
                ifStoppingBlockUntilComplete();
                if (this.throwable instanceof FlowException) {
                    ((FlowException) this.throwable).setFlowName(getName());
                }
                if (this.throwable instanceof CascadingException) {
                    throw ((CascadingException) this.throwable);
                }
                if (this.throwable instanceof OutOfMemoryError) {
                    throw ((OutOfMemoryError) this.throwable);
                }
                if (this.throwable != null) {
                    throw new FlowException(getName(), "unhandled exception", this.throwable);
                }
                if (hasListeners()) {
                    Iterator<BaseFlow<Config>.SafeFlowListener> it = getListeners().iterator();
                    while (it.hasNext()) {
                        if (it.next().throwable != null) {
                            throw new FlowException(getName(), "unhandled listener exception", this.throwable);
                        }
                    }
                }
            } catch (InterruptedException e) {
                throw new FlowException(getName(), "thread interrupted", e);
            }
        } finally {
            this.completed = true;
            this.thread = null;
            this.throwable = null;
        }
    }

    protected void ifStoppingBlockUntilComplete() {
        if (this.stopLock.isLocked()) {
            try {
                this.stopLock.lock();
            } finally {
                this.stopLock.unlock();
            }
        }
    }

    private void commitTraps() {
        for (Tap tap : this.traps.values()) {
            try {
                if (!tap.commitResource(getConfig())) {
                    logError("unable to commit trap: " + tap.getFullIdentifier((Tap) getConfig()), new Object[0]);
                }
            } catch (IOException e) {
                logError("unable to commit trap: " + tap.getFullIdentifier((Tap) getConfig()), e);
            }
        }
    }

    @Override // cascading.flow.Flow, cascading.management.UnitOfWork
    @ProcessCleanup
    public void cleanup() {
    }

    @Override // cascading.flow.Flow
    public TupleEntryIterator openSource() throws IOException {
        return this.sources.values().iterator().next().openForRead(getFlowProcess());
    }

    @Override // cascading.flow.Flow
    public TupleEntryIterator openSource(String str) throws IOException {
        if (this.sources.containsKey(str)) {
            return this.sources.get(str).openForRead(getFlowProcess());
        }
        throw new IllegalArgumentException("source does not exist: " + str);
    }

    @Override // cascading.flow.Flow
    public TupleEntryIterator openSink() throws IOException {
        return this.sinks.values().iterator().next().openForRead(getFlowProcess());
    }

    @Override // cascading.flow.Flow
    public TupleEntryIterator openSink(String str) throws IOException {
        if (this.sinks.containsKey(str)) {
            return this.sinks.get(str).openForRead(getFlowProcess());
        }
        throw new IllegalArgumentException("sink does not exist: " + str);
    }

    @Override // cascading.flow.Flow
    public TupleEntryIterator openTrap() throws IOException {
        return this.traps.values().iterator().next().openForRead(getFlowProcess());
    }

    @Override // cascading.flow.Flow
    public TupleEntryIterator openTrap(String str) throws IOException {
        if (this.traps.containsKey(str)) {
            return this.traps.get(str).openForRead(getFlowProcess());
        }
        throw new IllegalArgumentException("trap does not exist: " + str);
    }

    public void deleteSinks() throws IOException {
        Iterator<Tap> it = this.sinks.values().iterator();
        while (it.hasNext()) {
            deleteOrFail(it.next());
        }
    }

    private void deleteOrFail(Tap tap) throws IOException {
        if (tap.resourceExists((Tap) getConfig()) && !tap.deleteResource((Tap) getConfig())) {
            throw new FlowTapException("unable to delete resource: " + tap.getFullIdentifier((FlowProcess) getFlowProcess()));
        }
    }

    public void deleteSinksIfNotUpdate() throws IOException {
        for (Tap tap : this.sinks.values()) {
            if (!tap.isUpdate()) {
                deleteOrFail(tap);
            }
        }
    }

    public void deleteSinksIfReplace() throws IOException {
        for (Tap tap : this.sinks.values()) {
            if (tap.isKeep() && tap.resourceExists((Tap) getConfig())) {
                throw new FlowTapException("resource exists and sink mode is KEEP, cannot overwrite: " + tap.getFullIdentifier((FlowProcess) getFlowProcess()));
            }
        }
        for (Tap tap2 : this.sinks.values()) {
            if (tap2.isReplace()) {
                deleteOrFail(tap2);
            }
        }
    }

    public void deleteTrapsIfNotUpdate() throws IOException {
        for (Tap tap : this.traps.values()) {
            if (!tap.isUpdate()) {
                deleteOrFail(tap);
            }
        }
    }

    public void deleteCheckpointsIfNotUpdate() throws IOException {
        for (Tap tap : this.checkpoints.values()) {
            if (!tap.isUpdate()) {
                deleteOrFail(tap);
            }
        }
    }

    public void deleteTrapsIfReplace() throws IOException {
        for (Tap tap : this.traps.values()) {
            if (tap.isReplace()) {
                deleteOrFail(tap);
            }
        }
    }

    public void deleteCheckpointsIfReplace() throws IOException {
        for (Tap tap : this.checkpoints.values()) {
            if (tap.isReplace()) {
                deleteOrFail(tap);
            }
        }
    }

    @Override // cascading.flow.Flow
    public boolean resourceExists(Tap tap) throws IOException {
        return tap.resourceExists((Tap) getConfig());
    }

    @Override // cascading.flow.Flow
    public TupleEntryIterator openTapForRead(Tap tap) throws IOException {
        return tap.openForRead(getFlowProcess());
    }

    @Override // cascading.flow.Flow
    public TupleEntryCollector openTapForWrite(Tap tap) throws IOException {
        return tap.openForWrite(getFlowProcess());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void run() {
        if (this.thread == null) {
            throw new IllegalStateException("to start a Flow call start() or complete(), not Runnable#run()");
        }
        Version.printBanner();
        try {
            try {
                if (this.stop) {
                    ifStoppingBlockUntilComplete();
                    handleThrowableAndMarkFailed();
                    if (!this.stop && !this.flowStats.isFinished()) {
                        this.flowStats.markSuccessful();
                    }
                    internalClean(this.stop);
                    commitTraps();
                    try {
                        fireOnCompleted();
                        if (LOG.isInfoEnabled()) {
                            long totalSliceCPUMilliSeconds = getTotalSliceCPUMilliSeconds();
                            if (totalSliceCPUMilliSeconds == -1) {
                                logInfo(" completed in: " + Util.formatDurationFromMillis(this.flowStats.getDuration()), new Object[0]);
                            } else {
                                logInfo(" completed in: " + Util.formatDurationFromMillis(this.flowStats.getDuration()) + ", using cpu time: " + Util.formatDurationFromMillis(totalSliceCPUMilliSeconds), new Object[0]);
                            }
                        }
                        this.flowStats.cleanup();
                        internalShutdown();
                        deregisterShutdownHook();
                        return;
                    } catch (Throwable th) {
                        if (LOG.isInfoEnabled()) {
                            long totalSliceCPUMilliSeconds2 = getTotalSliceCPUMilliSeconds();
                            if (totalSliceCPUMilliSeconds2 == -1) {
                                logInfo(" completed in: " + Util.formatDurationFromMillis(this.flowStats.getDuration()), new Object[0]);
                            } else {
                                logInfo(" completed in: " + Util.formatDurationFromMillis(this.flowStats.getDuration()) + ", using cpu time: " + Util.formatDurationFromMillis(totalSliceCPUMilliSeconds2), new Object[0]);
                            }
                        }
                        this.flowStats.cleanup();
                        internalShutdown();
                        deregisterShutdownHook();
                        throw th;
                    }
                }
                this.flowStats.markStarted();
                fireOnStarting();
                if (isInfoEnabled()) {
                    logInfo("starting", new Object[0]);
                    Iterator<Tap> it = getSourcesCollection().iterator();
                    while (it.hasNext()) {
                        logInfo(" source: " + it.next(), new Object[0]);
                    }
                    Iterator<Tap> it2 = getSinksCollection().iterator();
                    while (it2.hasNext()) {
                        logInfo(" sink: " + it2.next(), new Object[0]);
                    }
                }
                spawnSteps();
                ifStoppingBlockUntilComplete();
                handleThrowableAndMarkFailed();
                if (!this.stop && !this.flowStats.isFinished()) {
                    this.flowStats.markSuccessful();
                }
                internalClean(this.stop);
                commitTraps();
                try {
                    fireOnCompleted();
                    if (LOG.isInfoEnabled()) {
                        long totalSliceCPUMilliSeconds3 = getTotalSliceCPUMilliSeconds();
                        if (totalSliceCPUMilliSeconds3 == -1) {
                            logInfo(" completed in: " + Util.formatDurationFromMillis(this.flowStats.getDuration()), new Object[0]);
                        } else {
                            logInfo(" completed in: " + Util.formatDurationFromMillis(this.flowStats.getDuration()) + ", using cpu time: " + Util.formatDurationFromMillis(totalSliceCPUMilliSeconds3), new Object[0]);
                        }
                    }
                    this.flowStats.cleanup();
                    internalShutdown();
                    deregisterShutdownHook();
                } finally {
                    if (LOG.isInfoEnabled()) {
                        long totalSliceCPUMilliSeconds4 = getTotalSliceCPUMilliSeconds();
                        if (totalSliceCPUMilliSeconds4 == -1) {
                            logInfo(" completed in: " + Util.formatDurationFromMillis(this.flowStats.getDuration()), new Object[0]);
                        } else {
                            logInfo(" completed in: " + Util.formatDurationFromMillis(this.flowStats.getDuration()) + ", using cpu time: " + Util.formatDurationFromMillis(totalSliceCPUMilliSeconds4), new Object[0]);
                        }
                    }
                    this.flowStats.cleanup();
                    internalShutdown();
                    deregisterShutdownHook();
                }
            } catch (Throwable th2) {
                this.throwable = th2;
                ifStoppingBlockUntilComplete();
                handleThrowableAndMarkFailed();
                if (!this.stop && !this.flowStats.isFinished()) {
                    this.flowStats.markSuccessful();
                }
                internalClean(this.stop);
                commitTraps();
                try {
                    fireOnCompleted();
                    if (LOG.isInfoEnabled()) {
                        long totalSliceCPUMilliSeconds5 = getTotalSliceCPUMilliSeconds();
                        if (totalSliceCPUMilliSeconds5 == -1) {
                            logInfo(" completed in: " + Util.formatDurationFromMillis(this.flowStats.getDuration()), new Object[0]);
                        } else {
                            logInfo(" completed in: " + Util.formatDurationFromMillis(this.flowStats.getDuration()) + ", using cpu time: " + Util.formatDurationFromMillis(totalSliceCPUMilliSeconds5), new Object[0]);
                        }
                    }
                    this.flowStats.cleanup();
                    internalShutdown();
                    deregisterShutdownHook();
                } finally {
                    if (LOG.isInfoEnabled()) {
                        long totalSliceCPUMilliSeconds6 = getTotalSliceCPUMilliSeconds();
                        if (totalSliceCPUMilliSeconds6 == -1) {
                            logInfo(" completed in: " + Util.formatDurationFromMillis(this.flowStats.getDuration()), new Object[0]);
                        } else {
                            logInfo(" completed in: " + Util.formatDurationFromMillis(this.flowStats.getDuration()) + ", using cpu time: " + Util.formatDurationFromMillis(totalSliceCPUMilliSeconds6), new Object[0]);
                        }
                    }
                    this.flowStats.cleanup();
                    internalShutdown();
                    deregisterShutdownHook();
                }
            }
        } catch (Throwable th3) {
            ifStoppingBlockUntilComplete();
            handleThrowableAndMarkFailed();
            if (!this.stop && !this.flowStats.isFinished()) {
                this.flowStats.markSuccessful();
            }
            internalClean(this.stop);
            commitTraps();
            try {
                fireOnCompleted();
                if (LOG.isInfoEnabled()) {
                    long totalSliceCPUMilliSeconds7 = getTotalSliceCPUMilliSeconds();
                    if (totalSliceCPUMilliSeconds7 == -1) {
                        logInfo(" completed in: " + Util.formatDurationFromMillis(this.flowStats.getDuration()), new Object[0]);
                    } else {
                        logInfo(" completed in: " + Util.formatDurationFromMillis(this.flowStats.getDuration()) + ", using cpu time: " + Util.formatDurationFromMillis(totalSliceCPUMilliSeconds7), new Object[0]);
                    }
                }
                this.flowStats.cleanup();
                internalShutdown();
                deregisterShutdownHook();
                throw th3;
            } finally {
                if (LOG.isInfoEnabled()) {
                    long totalSliceCPUMilliSeconds8 = getTotalSliceCPUMilliSeconds();
                    if (totalSliceCPUMilliSeconds8 == -1) {
                        logInfo(" completed in: " + Util.formatDurationFromMillis(this.flowStats.getDuration()), new Object[0]);
                    } else {
                        logInfo(" completed in: " + Util.formatDurationFromMillis(this.flowStats.getDuration()) + ", using cpu time: " + Util.formatDurationFromMillis(totalSliceCPUMilliSeconds8), new Object[0]);
                    }
                }
                this.flowStats.cleanup();
                internalShutdown();
                deregisterShutdownHook();
            }
        }
    }

    protected boolean spawnSteps() throws InterruptedException, ExecutionException {
        int maxNumParallelSteps = getMaxNumParallelSteps();
        int eligibleJobsSize = getEligibleJobsSize();
        if (maxNumParallelSteps == 0) {
            maxNumParallelSteps = eligibleJobsSize;
        }
        if (maxNumParallelSteps == 0) {
            throw new IllegalStateException("no jobs rendered for flow: " + getName());
        }
        if (LOG.isInfoEnabled()) {
            logInfo(" parallel execution of steps is enabled: " + (getMaxNumParallelSteps() != 1), new Object[0]);
            logInfo(" executing total steps: " + eligibleJobsSize, new Object[0]);
            logInfo(" allocating management threads: " + maxNumParallelSteps, new Object[0]);
        }
        Iterator<Future<Throwable>> it = spawnJobs(maxNumParallelSteps).iterator();
        while (it.hasNext()) {
            try {
                this.throwable = it.next().get();
            } catch (InterruptedException e) {
            }
            if (this.throwable != null) {
                if (!this.stop) {
                    internalStopAllJobs();
                }
                handleExecutorShutdown();
                return false;
            }
        }
        return true;
    }

    protected long getTotalSliceCPUMilliSeconds() {
        return -1L;
    }

    protected abstract int getMaxNumParallelSteps();

    protected abstract void internalShutdown();

    private List<Future<Throwable>> spawnJobs(int i) throws InterruptedException {
        if (this.spawnStrategy == null) {
            logError("no spawnStrategy set", new Object[0]);
            return new ArrayList();
        }
        if (this.stop) {
            return new ArrayList();
        }
        return this.spawnStrategy.start(this, i, getJobMapCallables());
    }

    private void handleThrowableAndMarkFailed() {
        if (this.throwable == null || this.stop) {
            return;
        }
        this.flowStats.markFailed(this.throwable);
        fireOnThrowable();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, FlowStepJob<Config>> getJobsMap() {
        return this.jobsMap;
    }

    protected boolean isJobsMapInitialized() {
        return this.jobsMap != null;
    }

    protected int getEligibleJobsSize() {
        return getJobMapCallables().size();
    }

    protected List<Callable<Throwable>> getJobMapCallables() {
        ArrayList arrayList = new ArrayList();
        for (FlowStepJob<Config> flowStepJob : this.jobsMap.values()) {
            if (!flowStepJob.isCallableStarted()) {
                arrayList.add(flowStepJob);
            }
        }
        return arrayList;
    }

    protected void initializeNewJobsMap() {
        this.jobsMap = updateJobsMap(this.flowStepGraph, new LinkedHashMap());
    }

    protected void updateJobsMap() {
        this.jobsMap = updateJobsMap(this.flowStepGraph, new LinkedHashMap(this.jobsMap));
    }

    Map<String, FlowStepJob<Config>> updateJobsMap(FlowStepGraph flowStepGraph, Map<String, FlowStepJob<Config>> map) {
        Iterator<FlowStep> topologicalIterator = flowStepGraph.getTopologicalIterator();
        while (topologicalIterator.hasNext()) {
            BaseFlowStep baseFlowStep = (BaseFlowStep) topologicalIterator.next();
            FlowStepJob<Config> flowStepJob = map.get(baseFlowStep.getID());
            if (flowStepJob == null) {
                flowStepJob = baseFlowStep.getCreateFlowStepJob(getFlowProcess(), getConfig());
                map.put(baseFlowStep.getID(), flowStepJob);
            }
            ArrayList arrayList = new ArrayList();
            Iterator<ProcessModel> it = ProcessGraphs.predecessorListOf(flowStepGraph, baseFlowStep).iterator();
            while (it.hasNext()) {
                arrayList.add(map.get(((FlowStep) it.next()).getID()));
            }
            flowStepJob.setPredecessors(arrayList);
        }
        return map;
    }

    protected void initializeChildStats() {
        Iterator<FlowStepJob<Config>> it = this.jobsMap.values().iterator();
        while (it.hasNext()) {
            this.flowStats.addStepStats(it.next().getStepStats());
        }
    }

    protected void internalStopAllJobs() {
        logInfo("stopping all jobs", new Object[0]);
        try {
            if (this.jobsMap == null) {
                logInfo("stopped all jobs", new Object[0]);
                return;
            }
            ArrayList arrayList = new ArrayList(this.jobsMap.values());
            Collections.reverse(arrayList);
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((FlowStepJob) it.next()).stop();
            }
            logInfo("stopped all jobs", new Object[0]);
        } catch (Throwable th) {
            logInfo("stopped all jobs", new Object[0]);
            throw th;
        }
    }

    protected void handleExecutorShutdown() {
        if (this.spawnStrategy == null || this.spawnStrategy.isCompleted(this)) {
            return;
        }
        logDebug("shutting down job executor", new Object[0]);
        try {
            this.spawnStrategy.complete(this, 300, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
        }
        logDebug("shutdown of job executor complete", new Object[0]);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void fireOnCompleted() {
        if (hasListeners()) {
            if (isDebugEnabled()) {
                logDebug("firing onCompleted event: " + getListeners().size(), new Object[0]);
            }
            Iterator<BaseFlow<Config>.SafeFlowListener> it = getListeners().iterator();
            while (it.hasNext()) {
                it.next().onCompleted(this);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void fireOnThrowable(Throwable th) {
        this.throwable = th;
        fireOnThrowable();
    }

    protected void fireOnThrowable() {
        if (hasListeners()) {
            if (isDebugEnabled()) {
                logDebug("firing onThrowable event: " + getListeners().size(), new Object[0]);
            }
            boolean z = false;
            Iterator<BaseFlow<Config>.SafeFlowListener> it = getListeners().iterator();
            while (it.hasNext()) {
                z = it.next().onThrowable(this, this.throwable) || z;
            }
            if (z) {
                this.throwable = null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void fireOnStopping() {
        if (hasListeners()) {
            if (isDebugEnabled()) {
                logDebug("firing onStopping event: " + getListeners().size(), new Object[0]);
            }
            Iterator<BaseFlow<Config>.SafeFlowListener> it = getListeners().iterator();
            while (it.hasNext()) {
                it.next().onStopping(this);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void fireOnStarting() {
        if (hasListeners()) {
            if (isDebugEnabled()) {
                logDebug("firing onStarting event: " + getListeners().size(), new Object[0]);
            }
            Iterator<BaseFlow<Config>.SafeFlowListener> it = getListeners().iterator();
            while (it.hasNext()) {
                it.next().onStarting(this);
            }
        }
    }

    public String toString() {
        StringBuffer stringBuffer = new StringBuffer();
        if (getName() != null) {
            stringBuffer.append(getName()).append(": ");
        }
        Iterator<FlowStep<Config>> it = getFlowSteps().iterator();
        while (it.hasNext()) {
            stringBuffer.append(it.next());
        }
        return stringBuffer.toString();
    }

    @Override // cascading.util.ProcessLogger
    public final boolean isInfoEnabled() {
        return LOG.isInfoEnabled();
    }

    @Override // cascading.util.ProcessLogger
    public final boolean isDebugEnabled() {
        return LOG.isDebugEnabled();
    }

    @Override // cascading.util.ProcessLogger
    public void logInfo(String str, Object... objArr) {
        LOG.info("[" + Util.truncate(getName(), LOG_FLOW_NAME_MAX) + "] " + str, objArr);
    }

    @Override // cascading.util.ProcessLogger
    public void logDebug(String str, Object... objArr) {
        LOG.debug("[" + Util.truncate(getName(), LOG_FLOW_NAME_MAX) + "] " + str, objArr);
    }

    @Override // cascading.util.ProcessLogger
    public void logWarn(String str) {
        LOG.warn("[" + Util.truncate(getName(), LOG_FLOW_NAME_MAX) + "] " + str);
    }

    @Override // cascading.util.ProcessLogger
    public void logWarn(String str, Throwable th) {
        LOG.warn("[" + Util.truncate(getName(), LOG_FLOW_NAME_MAX) + "] " + str, th);
    }

    @Override // cascading.util.ProcessLogger
    public void logWarn(String str, Object... objArr) {
        LOG.warn("[" + Util.truncate(getName(), LOG_FLOW_NAME_MAX) + "] " + str, objArr);
    }

    @Override // cascading.util.ProcessLogger
    public void logError(String str, Object... objArr) {
        LOG.error("[" + Util.truncate(getName(), LOG_FLOW_NAME_MAX) + "] " + str, objArr);
    }

    @Override // cascading.util.ProcessLogger
    public void logError(String str, Throwable th) {
        LOG.error("[" + Util.truncate(getName(), LOG_FLOW_NAME_MAX) + "] " + str, th);
    }

    @Override // cascading.flow.Flow
    public void writeDOT(String str) {
        if (this.flowElementGraph == null) {
            throw new UnsupportedOperationException("this flow instance cannot write a DOT file");
        }
        this.flowElementGraph.writeDOT(str);
    }

    @Override // cascading.flow.Flow
    public void writeStepsDOT(String str) {
        if (this.flowStepGraph == null) {
            throw new UnsupportedOperationException("this flow instance cannot write a DOT file");
        }
        this.flowStepGraph.writeDOT(str);
    }

    public FlowHolder getHolder() {
        return new FlowHolder(this);
    }

    public void setCascade(Cascade cascade) {
        setConfigProperty(getConfig(), "cascading.cascade.id", cascade.getID());
        this.flowStats.recordInfo();
    }

    @Override // cascading.flow.Flow
    public String getCascadeID() {
        return getProperty("cascading.cascade.id");
    }

    @Override // cascading.flow.Flow
    public String getRunID() {
        return this.runID;
    }

    public List<String> getClassPath() {
        return this.classPath;
    }

    @Override // cascading.management.UnitOfWork
    public void setSpawnStrategy(UnitOfWorkSpawnStrategy unitOfWorkSpawnStrategy) {
        this.spawnStrategy = unitOfWorkSpawnStrategy;
    }

    @Override // cascading.management.UnitOfWork
    public UnitOfWorkSpawnStrategy getSpawnStrategy() {
        return this.spawnStrategy;
    }

    protected void registerShutdownHook() {
        if (isStopJobsOnExit()) {
            this.shutdownHook = new ShutdownUtil.Hook() { // from class: cascading.flow.BaseFlow.2
                @Override // cascading.util.ShutdownUtil.Hook
                public ShutdownUtil.Hook.Priority priority() {
                    return ShutdownUtil.Hook.Priority.WORK_CHILD;
                }

                @Override // cascading.util.ShutdownUtil.Hook
                public void execute() {
                    BaseFlow.this.logInfo("shutdown hook calling stop on flow", new Object[0]);
                    BaseFlow.this.stop();
                }
            };
            ShutdownUtil.addHook(this.shutdownHook);
        }
    }

    private void deregisterShutdownHook() {
        if (!isStopJobsOnExit() || this.stop) {
            return;
        }
        ShutdownUtil.removeHook(this.shutdownHook);
    }
}
