package cascading.flow.planner;

import cascading.flow.Flow;
import cascading.flow.FlowException;
import cascading.flow.FlowStepStrategy;
import cascading.management.state.ClientState;
import cascading.stats.CascadingStats;
import cascading.stats.FlowNodeStats;
import cascading.stats.FlowStats;
import cascading.stats.FlowStepStats;
import cascading.util.Util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cascading/flow/planner/FlowStepJob.class */
public abstract class FlowStepJob<Config> implements Callable<Throwable> {
    private static final Logger LOG = LoggerFactory.getLogger(FlowStepJob.class);
    protected final String stepName;
    protected final Config jobConfiguration;
    protected long pollingInterval;
    protected long statsStoreInterval;
    protected long blockForCompletedChildDetailDuration;
    protected List<FlowStepJob<Config>> predecessors;
    private final CountDownLatch latch = new CountDownLatch(1);
    private AtomicBoolean callableStarted = new AtomicBoolean(false);
    private volatile boolean stop = false;
    protected final BaseFlowStep<Config> flowStep;
    protected FlowStepStats flowStepStats;
    protected Throwable throwable;

    public FlowStepJob(ClientState clientState, Config config, BaseFlowStep<Config> baseFlowStep, long j, long j2, long j3) {
        this.pollingInterval = 1000L;
        this.statsStoreInterval = 60000L;
        this.blockForCompletedChildDetailDuration = 60000L;
        this.jobConfiguration = config;
        this.stepName = baseFlowStep.getName();
        this.pollingInterval = j;
        this.statsStoreInterval = j2;
        this.blockForCompletedChildDetailDuration = j3;
        this.flowStep = baseFlowStep;
        this.flowStepStats = createStepStats(clientState);
        this.flowStepStats.prepare();
        this.flowStepStats.markPending();
        for (FlowNodeStats flowNodeStats : this.flowStepStats.getFlowNodeStats()) {
            flowNodeStats.prepare();
            flowNodeStats.markPending();
        }
    }

    public Config getConfig() {
        return this.jobConfiguration;
    }

    protected abstract FlowStepStats createStepStats(ClientState clientState);

    public synchronized void stop() {
        if (this.flowStep.isInfoEnabled()) {
            this.flowStep.logInfo("stopping: " + this.stepName, new Object[0]);
        }
        this.stop = true;
        if (!this.flowStepStats.isFinished()) {
            this.flowStepStats.markStopped();
        }
        try {
            try {
                internalBlockOnStop();
                if (this.flowStepStats.isStopped()) {
                    this.flowStep.rollbackSinks();
                    this.flowStep.fireOnStopping();
                }
                this.flowStepStats.cleanup();
            } catch (IOException e) {
                this.flowStep.logWarn("unable to kill job: " + this.stepName, e);
                if (this.flowStepStats.isStopped()) {
                    this.flowStep.rollbackSinks();
                    this.flowStep.fireOnStopping();
                }
                this.flowStepStats.cleanup();
            }
        } catch (Throwable th) {
            if (this.flowStepStats.isStopped()) {
                this.flowStep.rollbackSinks();
                this.flowStep.fireOnStopping();
            }
            this.flowStepStats.cleanup();
            throw th;
        }
    }

    protected abstract void internalBlockOnStop() throws IOException;

    public void setPredecessors(List<FlowStepJob<Config>> list) {
        this.predecessors = list;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public Throwable call() {
        start();
        return this.throwable;
    }

    public boolean isCallableStarted() {
        return this.callableStarted.get();
    }

    protected void start() {
        try {
        } catch (Throwable th) {
            this.throwable = th;
            dumpDebugInfo();
            if (this.flowStepStats.isPending()) {
                markStarted();
            }
            if (!this.flowStepStats.isFinished()) {
                this.flowStepStats.markFailed(this.throwable);
                this.flowStep.fireOnThrowable(this.throwable);
            }
        } finally {
            this.latch.countDown();
            finalizeNodeSliceCapture();
            this.flowStepStats.cleanup();
        }
        if (this.callableStarted.getAndSet(true)) {
            return;
        }
        if (isSkipFlowStep()) {
            markSkipped();
            if (this.flowStep.isInfoEnabled() && this.flowStepStats.isSkipped()) {
                this.flowStep.logInfo("skipping step: " + this.stepName, new Object[0]);
            }
            return;
        }
        synchronized (this) {
            if (this.stop) {
                if (this.flowStep.isInfoEnabled()) {
                    this.flowStep.logInfo("stop called before start: " + this.stepName, new Object[0]);
                }
                return;
            }
            markStarted();
            blockOnPredecessors();
            prepareResources();
            applyFlowStepConfStrategy();
            blockOnJob();
            internalCleanup();
        }
    }

    private void prepareResources() throws Throwable {
        Throwable prepareResources;
        if (!this.stop && (prepareResources = this.flowStep.prepareResources()) != null) {
            throw prepareResources;
        }
    }

    private synchronized boolean markStarted() {
        if (this.flowStepStats.isFinished()) {
            return false;
        }
        this.flowStepStats.markStarted();
        return true;
    }

    private void applyFlowStepConfStrategy() {
        FlowStepStrategy flowStepStrategy = this.flowStep.getFlow().getFlowStepStrategy();
        if (flowStepStrategy == null) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        Iterator<FlowStepJob<Config>> it = this.predecessors.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().flowStep);
        }
        flowStepStrategy.apply(this.flowStep.getFlow(), arrayList, this.flowStep);
    }

    protected boolean isSkipFlowStep() throws IOException {
        return (this.flowStep.getFlow().getRunID() == null || !this.flowStep.allSourcesExist() || this.flowStep.areSourcesNewer(this.flowStep.getSinkModified())) ? false : true;
    }

    protected void blockOnJob() throws IOException {
        if (this.stop) {
            return;
        }
        if (this.flowStep.isInfoEnabled()) {
            this.flowStep.logInfo("starting step: " + this.stepName, new Object[0]);
        }
        internalNonBlockingStart();
        markSubmitted();
        this.flowStep.fireOnStarting();
        blockTillCompleteOrStopped();
        if (this.stop || internalNonBlockingIsSuccessful()) {
            if (!internalNonBlockingIsSuccessful() || this.flowStepStats.isFinished()) {
                return;
            }
            this.throwable = this.flowStep.commitSinks();
            if (this.throwable != null) {
                this.flowStepStats.markFailed(this.throwable);
                updateNodesStatus();
                this.flowStep.fireOnThrowable(this.throwable);
                return;
            } else {
                this.flowStepStats.markSuccessful();
                updateNodesStatus();
                this.flowStep.fireOnCompleted();
                return;
            }
        }
        if (!this.flowStepStats.isFinished()) {
            this.flowStep.rollbackSinks();
            this.flowStepStats.markFailed(getThrowable());
            updateNodesStatus();
            this.flowStep.fireOnThrowable(getThrowable());
        }
        if (getThrowable() instanceof OutOfMemoryError) {
            throw ((OutOfMemoryError) getThrowable());
        }
        dumpDebugInfo();
        if (isRemoteExecution()) {
            this.throwable = new FlowException("step failed: " + this.stepName + ", step id: " + getStepStats().getID() + ", job id: " + internalJobId() + ", please see cluster logs for failure messages");
        } else {
            this.throwable = new FlowException("local step failed: " + this.stepName, getThrowable());
        }
    }

    protected void finalizeNodeSliceCapture() {
        boolean updateNodesStatus;
        long currentTimeMillis = System.currentTimeMillis();
        long j = 0;
        long j2 = 0;
        while (true) {
            updateNodesStatus = updateNodesStatus();
            this.flowStepStats.recordChildStats();
            if ((!updateNodesStatus || !this.flowStepStats.hasCapturedFinalDetail()) && System.currentTimeMillis() - currentTimeMillis < this.blockForCompletedChildDetailDuration) {
                if (System.currentTimeMillis() - j > 1000) {
                    if (updateNodesStatus) {
                        this.flowStep.logInfo("did not capture all completed slice details, will retry in {}, prior retries: {}", Util.formatDurationFromMillis(this.pollingInterval), Long.valueOf(j2));
                    } else {
                        this.flowStep.logInfo("did not capture all completed node details, will retry in {}, prior retries: {}", Util.formatDurationFromMillis(this.pollingInterval), Long.valueOf(j2));
                    }
                    j = System.currentTimeMillis();
                }
                j2++;
                sleepForPollingInterval();
            }
        }
        if (!updateNodesStatus) {
            this.flowStep.logWarn("unable to capture all completed node details or determine final state within configured duration: {}, configure property to increase wait duration: '{}'", Util.formatDurationFromMillis(this.blockForCompletedChildDetailDuration), CascadingStats.STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION);
        }
        if (this.flowStepStats.hasCapturedFinalDetail()) {
            return;
        }
        this.flowStep.logWarn("unable to capture all completed slice details within configured duration: {}, configure property to increase wait duration: '{}'", Util.formatDurationFromMillis(this.blockForCompletedChildDetailDuration), CascadingStats.STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION);
    }

    protected abstract boolean isRemoteExecution();

    protected abstract String internalJobId();

    protected abstract boolean internalNonBlockingIsSuccessful() throws IOException;

    protected abstract Throwable getThrowable();

    protected abstract void internalNonBlockingStart() throws IOException;

    protected void blockTillCompleteOrStopped() throws IOException {
        int floor = (int) Math.floor(this.statsStoreInterval / this.pollingInterval);
        int i = 0;
        while (true) {
            if (this.flowStepStats.isSubmitted() && internalIsStartedRunning() && !this.stop) {
                markRunning();
                this.flowStep.fireOnRunning();
            }
            if (this.flowStepStats.isRunning()) {
                updateNodesStatus();
            }
            if (this.stop || internalNonBlockingIsComplete()) {
                return;
            }
            int i2 = i;
            i++;
            if (floor == i2) {
                i = 0;
                this.flowStepStats.recordStats();
                this.flowStepStats.recordChildStats();
            }
            sleepForPollingInterval();
        }
    }

    private synchronized void markSubmitted() {
        if (this.flowStepStats.isStarted()) {
            this.flowStepStats.markSubmitted();
            Iterator<FlowNodeStats> it = this.flowStepStats.getChildren().iterator();
            while (it.hasNext()) {
                it.next().markStarted();
            }
        }
        Flow<Config> flow = this.flowStep.getFlow();
        if (flow == null) {
            LOG.warn("no parent flow set");
            return;
        }
        FlowStats flowStats = flow.getFlowStats();
        synchronized (flowStats) {
            if (flowStats.isStarted()) {
                flowStats.markSubmitted();
            }
        }
    }

    private synchronized void markSkipped() {
        if (this.flowStepStats.isFinished()) {
            return;
        }
        try {
            this.flowStepStats.markSkipped();
            this.flowStep.fireOnCompleted();
        } finally {
            markFlowRunning();
        }
    }

    private synchronized void markRunning() {
        this.flowStepStats.markRunning();
        markFlowRunning();
    }

    private synchronized void markFlowRunning() {
        Flow<Config> flow = this.flowStep.getFlow();
        if (flow == null) {
            LOG.warn("no parent flow set");
            return;
        }
        FlowStats flowStats = flow.getFlowStats();
        synchronized (flowStats) {
            if (flowStats.isStarted() || flowStats.isSubmitted()) {
                flowStats.markRunning();
            }
        }
    }

    private boolean updateNodesStatus() {
        boolean z = true;
        for (FlowNodeStats flowNodeStats : this.flowStepStats.getFlowNodeStats()) {
            if (!flowNodeStats.isFinished() && !flowNodeStats.isPending()) {
                updateNodeStatus(flowNodeStats);
                z &= flowNodeStats.isFinished();
            }
        }
        return z;
    }

    protected abstract void updateNodeStatus(FlowNodeStats flowNodeStats);

    protected abstract boolean internalNonBlockingIsComplete() throws IOException;

    protected void sleepForPollingInterval() {
        Util.safeSleep(this.pollingInterval);
    }

    protected void blockOnPredecessors() {
        for (FlowStepJob<Config> flowStepJob : this.predecessors) {
            if (!flowStepJob.isSuccessful()) {
                this.flowStep.logWarn("abandoning step: " + this.stepName + ", predecessor failed: " + flowStepJob.stepName);
                stop();
            }
        }
    }

    protected abstract void dumpDebugInfo();

    public boolean isSuccessful() {
        try {
            this.latch.await();
            if (!this.flowStepStats.isSuccessful()) {
                if (!this.flowStepStats.isSkipped()) {
                    return false;
                }
            }
            return true;
        } catch (InterruptedException e) {
            this.flowStep.logWarn("latch interrupted", e);
            return false;
        }
    }

    public boolean isStarted() {
        return internalIsStartedRunning();
    }

    protected abstract boolean internalIsStartedRunning();

    protected void internalCleanup() {
    }

    public FlowStepStats getStepStats() {
        return this.flowStepStats;
    }
}
