package xyz.mytang0.brook.core;

import com.google.common.base.Joiner;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.validation.ValidationException;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import xyz.mytang0.brook.common.configuration.Configuration;
import xyz.mytang0.brook.common.context.FlowContext;
import xyz.mytang0.brook.common.context.TaskMapperContext;
import xyz.mytang0.brook.common.exception.BizException;
import xyz.mytang0.brook.common.extension.ExtensionDirector;
import xyz.mytang0.brook.common.extension.ExtensionLoader;
import xyz.mytang0.brook.common.metadata.definition.FlowDef;
import xyz.mytang0.brook.common.metadata.definition.TaskDef;
import xyz.mytang0.brook.common.metadata.enums.FlowStatus;
import xyz.mytang0.brook.common.metadata.enums.TaskStatus;
import xyz.mytang0.brook.common.metadata.instance.FlowInstance;
import xyz.mytang0.brook.common.metadata.instance.TaskInstance;
import xyz.mytang0.brook.common.metadata.model.QueueMessage;
import xyz.mytang0.brook.common.metadata.model.SkipTaskReq;
import xyz.mytang0.brook.common.metadata.model.StartFlowReq;
import xyz.mytang0.brook.common.metadata.model.TaskResult;
import xyz.mytang0.brook.common.utils.ExceptionUtils;
import xyz.mytang0.brook.common.utils.JsonUtils;
import xyz.mytang0.brook.common.utils.TimeUtils;
import xyz.mytang0.brook.core.aspect.FlowAspect;
import xyz.mytang0.brook.core.aspect.TaskAspect;
import xyz.mytang0.brook.core.constants.FlowConstants;
import xyz.mytang0.brook.core.exception.FlowErrorCode;
import xyz.mytang0.brook.core.exception.FlowException;
import xyz.mytang0.brook.core.exception.TerminateException;
import xyz.mytang0.brook.core.execution.ExecutionProperties;
import xyz.mytang0.brook.core.executor.ExecutorEnum;
import xyz.mytang0.brook.core.lock.FlowLockFacade;
import xyz.mytang0.brook.core.metadata.MetadataFacade;
import xyz.mytang0.brook.core.metadata.MetadataProperties;
import xyz.mytang0.brook.core.monitor.DelayedTaskMonitor;
import xyz.mytang0.brook.core.monitor.DelayedTaskMonitorProperties;
import xyz.mytang0.brook.core.queue.QueueProperties;
import xyz.mytang0.brook.core.utils.ParameterUtils;
import xyz.mytang0.brook.core.utils.QueueUtils;
import xyz.mytang0.brook.spi.cache.FlowCache;
import xyz.mytang0.brook.spi.cache.FlowCacheFactory;
import xyz.mytang0.brook.spi.computing.EngineActuator;
import xyz.mytang0.brook.spi.execution.ExecutionDAO;
import xyz.mytang0.brook.spi.executor.ExecutorFactory;
import xyz.mytang0.brook.spi.metadata.MetadataService;
import xyz.mytang0.brook.spi.queue.QueueService;
import xyz.mytang0.brook.spi.task.FlowTask;

/* loaded from: input_file:xyz/mytang0/brook/core/FlowExecutor.class */
public class FlowExecutor<T extends FlowTask> {
    private static final Logger log;
    private final MetadataService metadataService;
    private final FlowTaskRegistry<T> flowTaskRegistry;
    private final FlowLockFacade flowLockFacade;
    private final QueueProperties queueProperties;
    private final ExecutionProperties executionProperties;
    private final DelayedTaskMonitorProperties delayedTaskMonitorProperties;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final FlowAspect flowAspect = new FlowAspect();
    private final TaskAspect taskAspect = new TaskAspect();
    private final EngineActuator engineActuator = (EngineActuator) ExtensionDirector.getExtensionLoader(EngineActuator.class).getDefaultExtension();
    private final FlowCacheFactory flowCacheFactory = (FlowCacheFactory) ExtensionDirector.getExtensionLoader(FlowCacheFactory.class).getDefaultExtension();
    private final ExecutorService flowStarter = ((ExecutorFactory) ExtensionDirector.getExtensionLoader(ExecutorFactory.class).getDefaultExtension()).getExecutor(ExecutorEnum.FLOW_STARTER);
    private final ExecutorService asyncExecutor = ((ExecutorFactory) ExtensionDirector.getExtensionLoader(ExecutorFactory.class).getDefaultExtension()).getExecutor(ExecutorEnum.ASYNC_EXECUTOR);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: xyz.mytang0.brook.core.FlowExecutor$1, reason: invalid class name */
    /* loaded from: input_file:xyz/mytang0/brook/core/FlowExecutor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$xyz$mytang0$brook$common$metadata$definition$FlowDef$TimeoutPolicy;
        static final /* synthetic */ int[] $SwitchMap$xyz$mytang0$brook$common$metadata$definition$TaskDef$TimeoutPolicy;
        static final /* synthetic */ int[] $SwitchMap$xyz$mytang0$brook$common$metadata$enums$TaskStatus;
        static final /* synthetic */ int[] $SwitchMap$xyz$mytang0$brook$common$metadata$definition$TaskDef$RetryLogic;
        static final /* synthetic */ int[] $SwitchMap$xyz$mytang0$brook$common$metadata$enums$FlowStatus = new int[FlowStatus.values().length];

        static {
            try {
                $SwitchMap$xyz$mytang0$brook$common$metadata$enums$FlowStatus[FlowStatus.COMPLETED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$xyz$mytang0$brook$common$metadata$enums$FlowStatus[FlowStatus.FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$xyz$mytang0$brook$common$metadata$enums$FlowStatus[FlowStatus.TERMINATED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$xyz$mytang0$brook$common$metadata$enums$FlowStatus[FlowStatus.PAUSED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$xyz$mytang0$brook$common$metadata$enums$FlowStatus[FlowStatus.TIMED_OUT.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$xyz$mytang0$brook$common$metadata$enums$FlowStatus[FlowStatus.RUNNING.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            $SwitchMap$xyz$mytang0$brook$common$metadata$definition$TaskDef$RetryLogic = new int[TaskDef.RetryLogic.values().length];
            try {
                $SwitchMap$xyz$mytang0$brook$common$metadata$definition$TaskDef$RetryLogic[TaskDef.RetryLogic.FIXED.ordinal()] = 1;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$xyz$mytang0$brook$common$metadata$definition$TaskDef$RetryLogic[TaskDef.RetryLogic.EXPONENTIAL_BACKOFF.ordinal()] = 2;
            } catch (NoSuchFieldError e8) {
            }
            $SwitchMap$xyz$mytang0$brook$common$metadata$enums$TaskStatus = new int[TaskStatus.values().length];
            try {
                $SwitchMap$xyz$mytang0$brook$common$metadata$enums$TaskStatus[TaskStatus.CANCELED.ordinal()] = 1;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$xyz$mytang0$brook$common$metadata$enums$TaskStatus[TaskStatus.TIMED_OUT.ordinal()] = 2;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$xyz$mytang0$brook$common$metadata$enums$TaskStatus[TaskStatus.RETRIED.ordinal()] = 3;
            } catch (NoSuchFieldError e11) {
            }
            $SwitchMap$xyz$mytang0$brook$common$metadata$definition$TaskDef$TimeoutPolicy = new int[TaskDef.TimeoutPolicy.values().length];
            try {
                $SwitchMap$xyz$mytang0$brook$common$metadata$definition$TaskDef$TimeoutPolicy[TaskDef.TimeoutPolicy.ALERT_ONLY.ordinal()] = 1;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$xyz$mytang0$brook$common$metadata$definition$TaskDef$TimeoutPolicy[TaskDef.TimeoutPolicy.RETRY.ordinal()] = 2;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$xyz$mytang0$brook$common$metadata$definition$TaskDef$TimeoutPolicy[TaskDef.TimeoutPolicy.TIME_OUT.ordinal()] = 3;
            } catch (NoSuchFieldError e14) {
            }
            $SwitchMap$xyz$mytang0$brook$common$metadata$definition$FlowDef$TimeoutPolicy = new int[FlowDef.TimeoutPolicy.values().length];
            try {
                $SwitchMap$xyz$mytang0$brook$common$metadata$definition$FlowDef$TimeoutPolicy[FlowDef.TimeoutPolicy.ALERT_ONLY.ordinal()] = 1;
            } catch (NoSuchFieldError e15) {
            }
            try {
                $SwitchMap$xyz$mytang0$brook$common$metadata$definition$FlowDef$TimeoutPolicy[FlowDef.TimeoutPolicy.TIME_OUT.ordinal()] = 2;
            } catch (NoSuchFieldError e16) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:xyz/mytang0/brook/core/FlowExecutor$DecideResult.class */
    public static class DecideResult {
        boolean complete;
        List<TaskInstance> tasksToBeScheduled = new LinkedList();
        List<TaskInstance> tasksToBeUpdated = new LinkedList();
        List<TaskInstance> tasksToBeRetried = new LinkedList();

        public boolean isComplete() {
            return this.complete;
        }

        public List<TaskInstance> getTasksToBeScheduled() {
            return this.tasksToBeScheduled;
        }

        public List<TaskInstance> getTasksToBeUpdated() {
            return this.tasksToBeUpdated;
        }

        public List<TaskInstance> getTasksToBeRetried() {
            return this.tasksToBeRetried;
        }

        public void setComplete(boolean z) {
            this.complete = z;
        }

        public void setTasksToBeScheduled(List<TaskInstance> list) {
            this.tasksToBeScheduled = list;
        }

        public void setTasksToBeUpdated(List<TaskInstance> list) {
            this.tasksToBeUpdated = list;
        }

        public void setTasksToBeRetried(List<TaskInstance> list) {
            this.tasksToBeRetried = list;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof DecideResult)) {
                return false;
            }
            DecideResult decideResult = (DecideResult) obj;
            if (!decideResult.canEqual(this) || isComplete() != decideResult.isComplete()) {
                return false;
            }
            List<TaskInstance> tasksToBeScheduled = getTasksToBeScheduled();
            List<TaskInstance> tasksToBeScheduled2 = decideResult.getTasksToBeScheduled();
            if (tasksToBeScheduled == null) {
                if (tasksToBeScheduled2 != null) {
                    return false;
                }
            } else if (!tasksToBeScheduled.equals(tasksToBeScheduled2)) {
                return false;
            }
            List<TaskInstance> tasksToBeUpdated = getTasksToBeUpdated();
            List<TaskInstance> tasksToBeUpdated2 = decideResult.getTasksToBeUpdated();
            if (tasksToBeUpdated == null) {
                if (tasksToBeUpdated2 != null) {
                    return false;
                }
            } else if (!tasksToBeUpdated.equals(tasksToBeUpdated2)) {
                return false;
            }
            List<TaskInstance> tasksToBeRetried = getTasksToBeRetried();
            List<TaskInstance> tasksToBeRetried2 = decideResult.getTasksToBeRetried();
            return tasksToBeRetried == null ? tasksToBeRetried2 == null : tasksToBeRetried.equals(tasksToBeRetried2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof DecideResult;
        }

        public int hashCode() {
            int i = (1 * 59) + (isComplete() ? 79 : 97);
            List<TaskInstance> tasksToBeScheduled = getTasksToBeScheduled();
            int hashCode = (i * 59) + (tasksToBeScheduled == null ? 43 : tasksToBeScheduled.hashCode());
            List<TaskInstance> tasksToBeUpdated = getTasksToBeUpdated();
            int hashCode2 = (hashCode * 59) + (tasksToBeUpdated == null ? 43 : tasksToBeUpdated.hashCode());
            List<TaskInstance> tasksToBeRetried = getTasksToBeRetried();
            return (hashCode2 * 59) + (tasksToBeRetried == null ? 43 : tasksToBeRetried.hashCode());
        }

        public String toString() {
            return "FlowExecutor.DecideResult(complete=" + isComplete() + ", tasksToBeScheduled=" + getTasksToBeScheduled() + ", tasksToBeUpdated=" + getTasksToBeUpdated() + ", tasksToBeRetried=" + getTasksToBeRetried() + ")";
        }
    }

    public FlowExecutor(FlowLockFacade flowLockFacade, FlowTaskRegistry<T> flowTaskRegistry, QueueProperties queueProperties, MetadataProperties metadataProperties, ExecutionProperties executionProperties, DelayedTaskMonitorProperties delayedTaskMonitorProperties) {
        this.flowLockFacade = flowLockFacade;
        this.flowTaskRegistry = flowTaskRegistry;
        this.metadataService = new MetadataFacade(metadataProperties);
        this.queueProperties = queueProperties;
        this.executionProperties = executionProperties;
        this.delayedTaskMonitorProperties = delayedTaskMonitorProperties;
        DelayedTaskMonitor.init(this, flowLockFacade, delayedTaskMonitorProperties);
    }

    public String startFlow(StartFlowReq startFlowReq) {
        if (startFlowReq.getFlowDef() == null) {
            Objects.requireNonNull(startFlowReq.getName(), "The 'flowName' is null");
            startFlowReq.setFlowDef(this.metadataService.getFlow(startFlowReq.getName()));
        }
        Objects.requireNonNull(startFlowReq.getFlowDef(), String.format("The flowDef of '%s' not exist", startFlowReq.getName()));
        FlowInstance newFlowInstance = newFlowInstance(startFlowReq);
        checkConcurrency(newFlowInstance);
        createFlow(newFlowInstance);
        this.flowStarter.execute(() -> {
            execute(newFlowInstance);
        });
        return newFlowInstance.getFlowId();
    }

    public FlowInstance requestFlow(StartFlowReq startFlowReq) {
        long currentTimeMillis = TimeUtils.currentTimeMillis();
        if (startFlowReq.getFlowDef() == null) {
            Objects.requireNonNull(startFlowReq.getName(), "The 'flowName' is null");
            startFlowReq.setFlowDef(this.metadataService.getFlow(startFlowReq.getName()));
        }
        Objects.requireNonNull(startFlowReq.getFlowDef(), String.format("The flowDef of '%s' not exist", startFlowReq.getName()));
        FlowInstance newFlowInstance = newFlowInstance(startFlowReq);
        createFlow(newFlowInstance);
        Future<?> submit = this.flowStarter.submit(() -> {
            execute(newFlowInstance);
        });
        long j = 5000;
        FlowDef.ControlDef controlDef = newFlowInstance.getFlowDef().getControlDef();
        if (controlDef != null && controlDef.getTimeoutMs() > 0) {
            j = controlDef.getTimeoutMs();
        }
        try {
            try {
                submit.get(remainWaitTime(j, currentTimeMillis), TimeUnit.MILLISECONDS);
                try {
                    getExecutionDAO(newFlowInstance).deleteFlow(newFlowInstance.getFlowId());
                } catch (Throwable th) {
                    log.warn("Request-level flow failed to delete flow instance: {}", newFlowInstance.getFlowId());
                }
            } catch (Throwable th2) {
                try {
                    getExecutionDAO(newFlowInstance).deleteFlow(newFlowInstance.getFlowId());
                } catch (Throwable th3) {
                    log.warn("Request-level flow failed to delete flow instance: {}", newFlowInstance.getFlowId());
                }
                throw th2;
            }
        } catch (TimeoutException e) {
            newFlowInstance.setStatus(FlowStatus.TIMED_OUT);
            try {
                getExecutionDAO(newFlowInstance).deleteFlow(newFlowInstance.getFlowId());
            } catch (Throwable th4) {
                log.warn("Request-level flow failed to delete flow instance: {}", newFlowInstance.getFlowId());
            }
        } catch (Throwable th5) {
            newFlowInstance.setStatus(FlowStatus.FAILED);
            newFlowInstance.setReasonForNotCompleting(ExceptionUtils.getMessage(th5));
            try {
                getExecutionDAO(newFlowInstance).deleteFlow(newFlowInstance.getFlowId());
            } catch (Throwable th6) {
                log.warn("Request-level flow failed to delete flow instance: {}", newFlowInstance.getFlowId());
            }
        }
        return newFlowInstance;
    }

    public void executeTask(String str) {
        TaskInstance task = getTask(str);
        if (task == null) {
            log.warn("Execute taskId: {}, but no such task found.", str);
        } else {
            execute(task);
        }
    }

    public void execute(String str) {
        if (this.flowLockFacade.acquireLock(str)) {
            try {
                FlowInstance flow = getFlow(str);
                if (flow == null) {
                    log.warn("Execute flowId: {}, but no such flow found.", str);
                } else {
                    executePerfectlyUnsafe(flow);
                }
            } finally {
                this.flowLockFacade.releaseLock(str);
            }
        }
    }

    public void execute(FlowInstance flowInstance) {
        if (this.flowLockFacade.acquireLock(flowInstance.getFlowId())) {
            try {
                executePerfectlyUnsafe(flowInstance);
            } finally {
                this.flowLockFacade.releaseLock(flowInstance.getFlowId());
            }
        }
    }

    private void executePerfectlyUnsafe(FlowInstance flowInstance) {
        try {
            try {
                fillDef(flowInstance);
                FlowContext.setCurrentFlow(flowInstance);
                executeUnsafe(flowInstance);
                FlowContext.removeCurrentFlow();
            } catch (Throwable th) {
                exceptionHandler(flowInstance, th);
                executeUnsafe(flowInstance);
                FlowContext.removeCurrentFlow();
            }
        } catch (Throwable th2) {
            FlowContext.removeCurrentFlow();
            throw th2;
        }
    }

    public void execute(TaskInstance taskInstance) {
        boolean z;
        if (taskInstance.getStatus().isFinished()) {
            return;
        }
        try {
            z = executeUnsafe(taskInstance);
        } catch (TerminateException e) {
            FlowStatus flowStatus = e.getFlowStatus();
            if (flowStatus != null) {
                taskInstance.setStatus(convertStatus(flowStatus));
            } else {
                taskInstance.setStatus(TaskStatus.FAILED);
            }
            taskInstance.setReasonForNotCompleting(e.getLocalizedMessage());
            z = true;
        } catch (Throwable th) {
            taskInstance.setStatus(TaskStatus.FAILED);
            taskInstance.setReasonForNotCompleting("Execute exception: " + ExceptionUtils.getMessage(th));
            z = true;
        }
        if (z) {
            TaskResult taskResult = new TaskResult(taskInstance.getFlowId(), taskInstance.getTaskId(), taskInstance.getStatus());
            taskResult.setOutput(taskInstance.getOutput());
            taskResult.setProgress(taskInstance.getProgress());
            taskResult.setReasonForNotCompleting(taskInstance.getReasonForNotCompleting());
            updateTask(taskResult);
        }
    }

    private void executeUnsafe(FlowInstance flowInstance) {
        if (flowInstance.getStatus().isTerminal()) {
            terminal(flowInstance);
            return;
        }
        DecideResult decide = decide(flowInstance);
        if (decide.isComplete()) {
            updateTasks(decide.getTasksToBeUpdated());
            completeFlow(flowInstance);
            executeUnsafe(flowInstance);
            return;
        }
        List<TaskInstance> tasksToBeScheduled = decide.getTasksToBeScheduled();
        List<TaskInstance> tasksToBeUpdated = decide.getTasksToBeUpdated();
        List<TaskInstance> tasksToBeRetried = decide.getTasksToBeRetried();
        if (CollectionUtils.isNotEmpty(tasksToBeScheduled)) {
            scheduleTasks(flowInstance, tasksToBeScheduled).forEach(taskInstance -> {
                if (executeUnsafe(taskInstance)) {
                    this.taskAspect.onTerminated(taskInstance);
                    tasksToBeUpdated.add(taskInstance);
                }
            });
        }
        if (CollectionUtils.isNotEmpty(tasksToBeUpdated)) {
            updateTasks(tasksToBeUpdated);
        }
        if (CollectionUtils.isNotEmpty(tasksToBeRetried)) {
            addToQueue(tasksToBeRetried);
        }
        if (CollectionUtils.isNotEmpty(tasksToBeUpdated)) {
            executeUnsafe(flowInstance);
        }
    }

    public void terminate(String str, String str2) {
        boolean acquireLock = this.flowLockFacade.acquireLock(str, FlowConstants.LOCK_TRY_TIME_MS);
        if (!acquireLock) {
            log.warn("Safe point not reached, force terminate flow: {}.", str);
        }
        try {
            Optional.ofNullable(getExecutionDAO().getFlow(str)).ifPresent(flowInstance -> {
                if (flowInstance.getStatus().isTerminal()) {
                    return;
                }
                fillDef(flowInstance);
                flowInstance.setStatus(FlowStatus.TERMINATED);
                flowInstance.setReasonForNotCompleting(str2);
                Optional.ofNullable(flowInstance.getFlowDef()).ifPresent(flowDef -> {
                    triggerFailureFlow(flowInstance, flowDef.getFailureFlowName());
                });
                executePerfectlyUnsafe(flowInstance);
            });
            if (acquireLock) {
                this.flowLockFacade.releaseLock(str);
            }
        } catch (Throwable th) {
            if (acquireLock) {
                this.flowLockFacade.releaseLock(str);
            }
            throw th;
        }
    }

    public void pause(String str) {
        if (!this.flowLockFacade.acquireLock(str, FlowConstants.LOCK_TRY_TIME_MS)) {
            throw new IllegalStateException(String.format("Error acquiring lock when pause flow: %s", str));
        }
        try {
            Optional.ofNullable(getExecutionDAO().getFlow(str)).ifPresent(flowInstance -> {
                if (flowInstance.getStatus().isTerminal() || flowInstance.getStatus().isPaused()) {
                    return;
                }
                fillDef(flowInstance);
                flowInstance.setStatus(FlowStatus.PAUSED);
                Optional.ofNullable(flowInstance.getFlowDef()).ifPresent(flowDef -> {
                    triggerFailureFlow(flowInstance, flowDef.getFailureFlowName());
                });
                executePerfectlyUnsafe(flowInstance);
            });
        } finally {
            this.flowLockFacade.releaseLock(str);
        }
    }

    public void resume(String str) {
        if (!this.flowLockFacade.acquireLock(str, FlowConstants.LOCK_TRY_TIME_MS)) {
            throw new IllegalStateException(String.format("Error acquiring lock when resume flow: %s", str));
        }
        try {
            Optional.ofNullable(getExecutionDAO().getFlow(str)).ifPresent(flowInstance -> {
                if (flowInstance.getStatus().isPaused()) {
                    fillDef(flowInstance);
                    flowInstance.setStatus(FlowStatus.RUNNING);
                    flowInstance.setLastUpdated(TimeUtils.currentTimeMillis());
                    executePerfectlyUnsafe(flowInstance);
                }
            });
        } finally {
            this.flowLockFacade.releaseLock(str);
        }
    }

    public void retry(String str, boolean z) {
        FlowInstance flow = getFlow(str);
        if (flow == null || CollectionUtils.isEmpty(flow.getTaskInstances())) {
            return;
        }
        if (!flow.getStatus().isTerminal()) {
            throw new IllegalStateException(String.format("The flow instance %s is still running and therefore cannot be retied", str));
        }
        FlowInstance findLastFailedIfAny = findLastFailedIfAny(flow);
        if (findLastFailedIfAny == null) {
            return;
        }
        if (z && !StringUtils.equals(str, findLastFailedIfAny.getFlowId()) && StringUtils.isNotBlank(findLastFailedIfAny.getParentFlowId())) {
            findLastFailedIfAny = getFlow(findLastFailedIfAny.getParentFlowId());
        }
        try {
            fillDef(findLastFailedIfAny);
            FlowContext.setCurrentFlow(findLastFailedIfAny);
            retry(findLastFailedIfAny);
        } finally {
            FlowContext.removeCurrentFlow();
        }
    }

    private void retry(FlowInstance flowInstance) {
        List list = (List) flowInstance.getTaskInstances().stream().filter((v0) -> {
            return v0.isHanging();
        }).map((v0) -> {
            return v0.getTaskId();
        }).collect(Collectors.toList());
        flowInstance.setTaskInstances((List) flowInstance.getTaskInstances().stream().filter(taskInstance -> {
            return !taskInstance.isHanging();
        }).collect(Collectors.toList()));
        List list2 = (List) flowInstance.getTaskInstances().stream().filter(taskInstance2 -> {
            return taskInstance2.getStatus().isUnsuccessfullyTerminated();
        }).collect(Collectors.toList());
        if (CollectionUtils.isEmpty(list2)) {
            return;
        }
        flowInstance.setStatus(FlowStatus.RUNNING);
        flowInstance.setReasonForNotCompleting((String) null);
        updateFlow(flowInstance);
        list.forEach(this::deleteTask);
        for (TaskInstance taskInstance3 : (List) list2.stream().map(taskInstance4 -> {
            return taskToBeRescheduled(taskInstance4, 0L);
        }).collect(Collectors.toList())) {
            if (executeUnsafe(taskInstance3)) {
                this.taskAspect.onTerminated(taskInstance3);
                updateTask(taskInstance3);
                execute(flowInstance.getFlowId());
            }
        }
        updateParents(flowInstance);
    }

    private TaskInstance taskToBeRescheduled(TaskInstance taskInstance, long j) {
        TaskInstance copy = taskInstance.copy();
        copy.setStartDelayMs(j);
        copy.setRetryCount(taskInstance.getRetryCount() + 1);
        copy.setStatus(TaskStatus.SCHEDULED);
        copy.setReasonForNotCompleting((String) null);
        copy.setSubFlowId((String) null);
        copy.setRetryTime(TimeUtils.currentTimeMillis() + copy.getStartDelayMs());
        copy.setScheduledTime(0L);
        copy.setStartTime(0L);
        copy.setEndTime(0L);
        taskInstance.setRetryCount(copy.getRetryCount());
        taskInstance.setRetryTime(copy.getRetryTime());
        return copy;
    }

    private void updateParents(FlowInstance flowInstance) {
        while (flowInstance.hasParent()) {
            TaskInstance task = getTask(flowInstance.getParentTaskId());
            task.setStatus(TaskStatus.IN_PROGRESS);
            task.setReasonForNotCompleting((String) null);
            updateTask(task);
            FlowInstance flow = getFlow(flowInstance.getParentFlowId());
            flow.setStatus(FlowStatus.RUNNING);
            flow.setReasonForNotCompleting((String) null);
            updateFlow(flow);
            flowInstance = flow;
        }
    }

    public void skipTask(SkipTaskReq skipTaskReq) {
        FlowInstance flow = getFlow(skipTaskReq.getFlowId());
        if (!flow.getStatus().isRunning()) {
            throw new IllegalStateException(String.format("The flow instance %s is not running so the task %s cannot be skipped", skipTaskReq.getFlowId(), skipTaskReq.getTaskName()));
        }
        fillDef(flow);
        TaskDef taskDef = getTaskDef(flow, skipTaskReq.getTaskName());
        if (taskDef == null) {
            throw new IllegalStateException(String.format("The task %s does not exist in the flowDef %s", skipTaskReq.getTaskName(), flow.getFlowName()));
        }
        if (flow.getTaskByName(skipTaskReq.getTaskName()).isPresent()) {
            throw new IllegalStateException(String.format("The task %s has already been processed, cannot be skipped", skipTaskReq.getTaskName()));
        }
        TaskInstance create = TaskInstance.create(taskDef);
        create.setStartTime(TimeUtils.currentTimeMillis());
        create.setStatus(TaskStatus.SKIPPED);
        create.setInput(skipTaskReq.getInput());
        create.setOutput(skipTaskReq.getOutput());
        create.setEndTime(TimeUtils.currentTimeMillis());
        if (CollectionUtils.isNotEmpty(createTasks(Collections.singletonList(create)))) {
            this.taskAspect.onTerminated(create);
            executePerfectlyUnsafe(flow);
        }
    }

    public FlowInstance getCropFlow(String str) {
        return crop(getFlow(str));
    }

    public FlowInstance getCropFlowByCorrelationId(String str) {
        return crop(getFlowByCorrelationId(str));
    }

    public FlowInstance getFlow(String str) {
        return getExecutionDAO().getFlow(str);
    }

    public FlowInstance getFlowByCorrelationId(String str) {
        return getExecutionDAO().getFlowByCorrelationId(str);
    }

    private FlowInstance crop(FlowInstance flowInstance) {
        if (flowInstance == null) {
            return null;
        }
        flowInstance.setTaskInstances((List) flowInstance.getTaskInstances().stream().filter(taskInstance -> {
            return !taskInstance.isHanging();
        }).collect(Collectors.toList()));
        return flowInstance;
    }

    public TaskInstance getTask(String str) {
        return getExecutionDAO().getTask(str);
    }

    public List<String> getRunningTaskIds(String str) {
        FlowInstance flow = getFlow(str);
        return flow != null ? (List) flow.getTaskInstances().stream().filter(taskInstance -> {
            return TaskStatus.IN_PROGRESS.equals(taskInstance.getStatus());
        }).map((v0) -> {
            return v0.getTaskId();
        }).collect(Collectors.toList()) : Collections.emptyList();
    }

    private DecideResult decide(FlowInstance flowInstance) {
        TaskDef taskDef;
        DecideResult decideResult = new DecideResult();
        if (flowInstance.getStatus().isTerminal()) {
            return decideResult;
        }
        checkFlowTimeout(flowInstance);
        if (flowInstance.getStatus().isPaused()) {
            return decideResult;
        }
        HashSet hashSet = new HashSet();
        ArrayList<TaskInstance> arrayList = new ArrayList();
        flowInstance.getTaskInstances().forEach(taskInstance -> {
            if (taskInstance.isExecuted()) {
                hashSet.add(taskInstance.getTaskName());
            } else {
                if (taskInstance.getStatus().isSkipped()) {
                    return;
                }
                arrayList.add(taskInstance);
            }
        });
        boolean z = false;
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        if (CollectionUtils.isEmpty(arrayList) && CollectionUtils.isEmpty(hashSet)) {
            if (CollectionUtils.isEmpty(flowInstance.getFlowDef().getTaskDefs())) {
                throw new TerminateException("No tasks found to be executed", FlowStatus.COMPLETED);
            }
            TaskDef taskDef2 = (TaskDef) flowInstance.getFlowDef().getTaskDefs().get(0);
            while (true) {
                taskDef = taskDef2;
                if (!isTaskSkipped(flowInstance, taskDef)) {
                    break;
                }
                taskDef2 = getNextTask(flowInstance, taskDef);
            }
            if (taskDef != null) {
                getMappedTasks(flowInstance, taskDef).forEach(taskInstance2 -> {
                });
            }
        } else {
            for (TaskInstance taskInstance3 : arrayList) {
                if (!taskInstance3.getStatus().isTerminal()) {
                    checkTaskTimeout(taskInstance3);
                    if (taskInstance3.getRetryTime() < TimeUtils.currentTimeMillis()) {
                        linkedHashMap.putIfAbsent(taskInstance3.getTaskName(), taskInstance3);
                        hashSet.remove(taskInstance3.getTaskName());
                    }
                }
                if (!taskInstance3.getStatus().isSuccessful()) {
                    TaskInstance retryTask = getRetryTask(taskInstance3);
                    if (retryTask != null) {
                        decideResult.getTasksToBeRetried().add(retryTask);
                        decideResult.getTasksToBeUpdated().add(taskInstance3);
                    }
                    z = true;
                } else if (!taskInstance3.isExecuted() && taskInstance3.getStatus().isTerminal()) {
                    taskInstance3.setExecuted(true);
                    if (taskInstance3.getStatus().isHanged()) {
                        getMappedTasks(flowInstance, taskInstance3, taskInstance3.getTaskDef().getHangDef().getDetermineTaskDef()).forEach(taskInstance4 -> {
                            if (linkedHashMap.putIfAbsent(taskInstance4.getTaskName(), taskInstance4) == null) {
                                taskInstance4.setHanging(true);
                                taskInstance4.setParentTaskId(taskInstance3.getTaskId());
                                taskInstance3.setHangTaskId(taskInstance4.getTaskId());
                            }
                        });
                    } else if (taskInstance3.isHanging()) {
                        feedbackHang(flowInstance, taskInstance3);
                    } else {
                        Optional.ofNullable(getNextTask(flowInstance, taskInstance3.getTaskDef())).ifPresent(taskDef3 -> {
                            getMappedTasks(flowInstance, taskDef3).forEach(taskInstance5 -> {
                            });
                        });
                    }
                    decideResult.getTasksToBeUpdated().add(taskInstance3);
                }
            }
        }
        if (CollectionUtils.isNotEmpty(hashSet)) {
            decideResult.getTasksToBeScheduled().addAll((Collection) linkedHashMap.values().stream().filter(taskInstance5 -> {
                return !hashSet.contains(taskInstance5.getTaskName());
            }).collect(Collectors.toList()));
        } else {
            decideResult.getTasksToBeScheduled().addAll(linkedHashMap.values());
        }
        if (!z && CollectionUtils.isEmpty(decideResult.getTasksToBeScheduled()) && checkForFlowCompletion(flowInstance)) {
            decideResult.setComplete(true);
        }
        return decideResult;
    }

    public List<TaskInstance> getMappedTasks(FlowInstance flowInstance, TaskDef taskDef) {
        return getMappedTasks(flowInstance, null, taskDef);
    }

    public List<TaskInstance> getMappedTasks(FlowInstance flowInstance, TaskInstance taskInstance, TaskDef taskDef) {
        T flowTask = this.flowTaskRegistry.getFlowTask(taskDef.getType());
        int i = 0;
        try {
            Object orElseGet = Optional.ofNullable(taskInstance).map(taskInstance2 -> {
                return ParameterUtils.getTaskInput(taskInstance2, taskDef);
            }).orElseGet(() -> {
                return ParameterUtils.getTaskInput(flowInstance, taskDef);
            });
            if (orElseGet instanceof Map) {
                flowTask.verify(new Configuration((Map) orElseGet));
            }
            i = 3;
            return flowTask.getMappedTasks(TaskMapperContext.builder().flowInstance(flowInstance).taskDef(taskDef).input(flowTask.getInput(orElseGet)).build());
        } catch (ValidationException | IllegalArgumentException e) {
            throw new TerminateException(String.format("Illegal task(%s) input, verify stage:(%d) illegal details:(%s)", taskDef.getName(), Integer.valueOf(i), e.getLocalizedMessage()));
        }
    }

    private void terminateUnsafe(FlowInstance flowInstance, FlowStatus flowStatus, String str, String str2) {
        if (flowInstance.getStatus().isTerminal()) {
            return;
        }
        String flowId = flowInstance.getFlowId();
        flowInstance.setStatus((FlowStatus) Optional.ofNullable(flowStatus).orElse(FlowStatus.TERMINATED));
        flowInstance.setReasonForNotCompleting(str);
        try {
            triggerFailureFlow(flowInstance, str2);
        } catch (Throwable th) {
            log.error(String.format("Failed to start failure flow: %s", str2), th);
        }
        List<String> cancelNonTerminalTasks = cancelNonTerminalTasks(flowInstance);
        if (CollectionUtils.isNotEmpty(cancelNonTerminalTasks)) {
            throw new FlowException(FlowErrorCode.FLOW_EXECUTION_ERROR, String.format("Error canceling flow: %s tasks: %s", flowId, String.join(",", cancelNonTerminalTasks)));
        }
    }

    private void triggerFailureFlow(FlowInstance flowInstance, String str) {
        if (StringUtils.isNotBlank(str)) {
            Optional.ofNullable(this.metadataService.getFlow(str)).ifPresent(flowDef -> {
                Object mappingValue = ParameterUtils.getMappingValue(flowInstance, flowDef.getInput());
                FlowDef copy = flowDef.copy();
                copy.setInput((Object) null);
                flowInstance.setFailureFlowId(startFlow(StartFlowReq.builder().name(str).flowDef(copy).input(mappingValue).build()));
            });
        }
    }

    private List<String> cancelNonTerminalTasks(FlowInstance flowInstance) {
        LinkedList linkedList = new LinkedList();
        if (CollectionUtils.isNotEmpty(flowInstance.getTaskInstances())) {
            for (TaskInstance taskInstance : flowInstance.getTaskInstances()) {
                if (!taskInstance.getStatus().isTerminal() || taskInstance.getStatus().isHanged() || taskInstance.isRetryable()) {
                    TaskDef taskDef = taskInstance.getTaskDef();
                    T flowTask = this.flowTaskRegistry.getFlowTask(taskDef.getType());
                    try {
                        if (!taskInstance.getStatus().isTerminal() || taskInstance.isRetryable()) {
                            taskInstance.setStatus(TaskStatus.CANCELED);
                        }
                        flowTask.cancel(taskInstance);
                        this.taskAspect.onTerminated(taskInstance);
                    } catch (Throwable th) {
                        taskInstance.setReasonForNotCompleting(th.getLocalizedMessage());
                        linkedList.add(taskInstance.getTaskDef().getName());
                        log.error("Error canceling flow task:{}/{} in flow: {}", new Object[]{taskDef.getType(), taskInstance.getTaskId(), flowInstance.getFlowId(), th});
                    }
                    updateTask(taskInstance);
                }
            }
        }
        return linkedList;
    }

    private void checkFlowTimeout(FlowInstance flowInstance) {
        FlowDef.ControlDef controlDef;
        FlowDef flowDef = flowInstance.getFlowDef();
        if (flowDef == null) {
            log.warn("Missing flow definition : {}", flowInstance.getFlowId());
            return;
        }
        if (flowInstance.getStatus().isTerminal() || (controlDef = flowDef.getControlDef()) == null || controlDef.getTimeoutMs() <= 0) {
            return;
        }
        long currentTimeMillis = TimeUtils.currentTimeMillis() - flowInstance.getStartTime();
        if (currentTimeMillis < controlDef.getTimeoutMs()) {
            return;
        }
        String format = String.format("Flow timed out after %d milliseconds. Timeout configured as %d milliseconds. Timeout policy configured to %s", Long.valueOf(currentTimeMillis), Long.valueOf(controlDef.getTimeoutMs()), controlDef.getTimeoutPolicy().name());
        switch (AnonymousClass1.$SwitchMap$xyz$mytang0$brook$common$metadata$definition$FlowDef$TimeoutPolicy[controlDef.getTimeoutPolicy().ordinal()]) {
            case 1:
                log.warn(format);
                return;
            case 2:
                throw new TerminateException(format, FlowStatus.TIMED_OUT);
            default:
                return;
        }
    }

    private void checkTaskTimeout(TaskInstance taskInstance) {
        TaskDef taskDef = taskInstance.getTaskDef();
        if (taskDef == null) {
            log.warn("Missing task definition for task:{} in flow:{}", taskInstance.getTaskId(), taskInstance.getFlowId());
            return;
        }
        TaskDef.ControlDef controlDef = taskDef.getControlDef();
        if (controlDef != null && !taskInstance.getStatus().isTerminal() && controlDef.getTimeoutMs() > 0 && taskInstance.getStartTime() > 0) {
            long timeoutMs = controlDef.getTimeoutMs();
            long currentTimeMillis = TimeUtils.currentTimeMillis() - (taskInstance.getStartTime() + controlDef.getStartDelayMs());
            if (currentTimeMillis < timeoutMs) {
                return;
            }
            String format = String.format("Task timed out after %d milliseconds. Timeout configured as %d milliseconds. Timeout policy configured to %s", Long.valueOf(currentTimeMillis), Long.valueOf(controlDef.getTimeoutMs()), controlDef.getTimeoutPolicy().name());
            switch (AnonymousClass1.$SwitchMap$xyz$mytang0$brook$common$metadata$definition$TaskDef$TimeoutPolicy[controlDef.getTimeoutPolicy().ordinal()]) {
                case 1:
                    log.warn(format);
                    return;
                case 2:
                    taskInstance.setStatus(TaskStatus.TIMED_OUT);
                    taskInstance.setReasonForNotCompleting(format);
                    return;
                case 3:
                    taskInstance.setStatus(TaskStatus.TIMED_OUT);
                    taskInstance.setReasonForNotCompleting(format);
                    throw new TerminateException(format, FlowStatus.TIMED_OUT, taskInstance);
                default:
                    return;
            }
        }
    }

    public TaskDef getTaskDef(FlowInstance flowInstance, String str) {
        FlowDef flowDef = flowInstance.getFlowDef();
        if (flowDef == null || flowDef.getTaskDefs() == null) {
            return null;
        }
        for (TaskDef taskDef : flowDef.getTaskDefs()) {
            if (str.equals(taskDef.getName())) {
                return taskDef;
            }
            if (taskDef.getHangDef() != null) {
                TaskDef determineTaskDef = taskDef.getHangDef().getDetermineTaskDef();
                if (str.equals(determineTaskDef.getName())) {
                    return determineTaskDef;
                }
            }
        }
        return null;
    }

    public TaskDef getNextTask(FlowInstance flowInstance, TaskDef taskDef) {
        FlowDef flowDef = flowInstance.getFlowDef();
        if (flowDef == null || flowDef.getTaskDefs() == null) {
            return null;
        }
        TaskDef nextTask = getNextTask(flowDef.getTaskDefs().iterator(), taskDef);
        while (true) {
            TaskDef taskDef2 = nextTask;
            if (!isTaskSkipped(flowInstance, taskDef2)) {
                return taskDef2;
            }
            nextTask = getNextTask(flowDef.getTaskDefs().iterator(), taskDef2);
        }
    }

    private TaskDef getNextTask(Iterator<TaskDef> it, TaskDef taskDef) {
        while (it.hasNext()) {
            TaskDef next = it.next();
            if (taskDef.getName().equals(next.getName())) {
                break;
            }
            TaskDef nextTask = getNextTask(next, taskDef);
            if (nextTask != null) {
                return nextTask;
            }
        }
        if (it.hasNext()) {
            return it.next();
        }
        return null;
    }

    public TaskDef getNextTask(TaskDef taskDef, TaskDef taskDef2) {
        return this.flowTaskRegistry.getFlowTask(taskDef.getType()).next(taskDef, taskDef2);
    }

    private boolean isTaskSkipped(FlowInstance flowInstance, TaskDef taskDef) {
        TaskDef.SkipDef skipDef;
        boolean z = false;
        if (taskDef != null) {
            try {
                TaskDef.ControlDef controlDef = taskDef.getControlDef();
                if (controlDef != null && (skipDef = controlDef.getSkipDef()) != null && StringUtils.isNotBlank(skipDef.getSkipCondition())) {
                    z = flowInstance.getSkipTasks().contains(taskDef.getName());
                    if (!z) {
                        z = BooleanUtils.toBoolean(String.valueOf(this.engineActuator.compute((String) Optional.ofNullable(skipDef.getEngineType()).orElse(FlowConstants.DEFAULT_ENGINE_TYPE), skipDef.getSkipCondition(), ParameterUtils.flowContext(flowInstance))));
                        if (z) {
                            flowInstance.getSkipTasks().add(taskDef.getName());
                        }
                    }
                }
            } catch (Throwable th) {
                throw new TerminateException(th);
            }
        }
        return z;
    }

    private TaskInstance getRetryTask(TaskInstance taskInstance) {
        FlowStatus flowStatus;
        if (TimeUtils.currentTimeMillis() < taskInstance.getRetryTime()) {
            return null;
        }
        int i = 0;
        TaskDef.ControlDef controlDef = taskInstance.getTaskDef().getControlDef();
        if (controlDef != null) {
            i = controlDef.getRetryCount();
        }
        int retryCount = taskInstance.getRetryCount();
        if (taskInstance.getStatus().isRetryable() && i > retryCount) {
            if (!$assertionsDisabled && controlDef == null) {
                throw new AssertionError();
            }
            long j = 0;
            switch (AnonymousClass1.$SwitchMap$xyz$mytang0$brook$common$metadata$definition$TaskDef$RetryLogic[controlDef.getRetryLogic().ordinal()]) {
                case 1:
                    j = controlDef.getRetryDelayMs();
                    break;
                case 2:
                    long retryDelayMs = controlDef.getRetryDelayMs() * ((long) Math.pow(2.0d, taskInstance.getRetryCount()));
                    j = retryDelayMs < 0 ? 2147483647L : retryDelayMs;
                    break;
            }
            return taskToBeRescheduled(taskInstance, j);
        }
        String reasonForNotCompleting = taskInstance.getReasonForNotCompleting();
        switch (AnonymousClass1.$SwitchMap$xyz$mytang0$brook$common$metadata$enums$TaskStatus[taskInstance.getStatus().ordinal()]) {
            case 1:
                flowStatus = FlowStatus.TERMINATED;
                break;
            case 2:
                flowStatus = FlowStatus.TIMED_OUT;
                break;
            case 3:
                reasonForNotCompleting = String.format("The number of retries %d has been exhausted", Integer.valueOf(retryCount));
                flowStatus = FlowStatus.TIMED_OUT;
                taskInstance.setStatus(TaskStatus.TIMED_OUT);
                break;
            default:
                flowStatus = FlowStatus.FAILED;
                break;
        }
        throw new TerminateException(reasonForNotCompleting, flowStatus, taskInstance);
    }

    private static String deduplicateKey(TaskInstance taskInstance) {
        return taskInstance.getTaskDef().getName();
    }

    private List<TaskInstance> scheduleTasks(FlowInstance flowInstance, List<TaskInstance> list) {
        ArrayList arrayList = new ArrayList();
        List<TaskInstance> deduplicateTasks = deduplicateTasks(flowInstance, list);
        if (CollectionUtils.isEmpty(deduplicateTasks)) {
            return list;
        }
        try {
            List<TaskInstance> createTasks = createTasks(deduplicateTasks);
            if (CollectionUtils.isNotEmpty(createTasks)) {
                createTasks.forEach(taskInstance -> {
                    if (needToQueue(taskInstance)) {
                        arrayList.add(taskInstance);
                    }
                });
                flowInstance.getTaskInstances().addAll(createTasks);
            }
            if (CollectionUtils.isNotEmpty(arrayList)) {
                addToQueue(arrayList);
                list.removeAll(arrayList);
            }
            return list;
        } catch (Throwable th) {
            String format = String.format("Error scheduling tasks: %s, for flow: %s", list.stream().map((v0) -> {
                return v0.getTaskId();
            }).collect(Collectors.toList()), flowInstance.getFlowId());
            log.error(format, th);
            throw new TerminateException(format);
        }
    }

    private List<TaskInstance> deduplicateTasks(FlowInstance flowInstance, List<TaskInstance> list) {
        if (CollectionUtils.isEmpty(list)) {
            return Collections.emptyList();
        }
        List list2 = (List) flowInstance.getTaskInstances().stream().map(FlowExecutor::deduplicateKey).collect(Collectors.toList());
        List<TaskInstance> list3 = list;
        if (CollectionUtils.isNotEmpty(list2)) {
            list3 = (List) list.stream().filter(taskInstance -> {
                return !list2.contains(deduplicateKey(taskInstance));
            }).collect(Collectors.toList());
        }
        return list3;
    }

    private boolean needToQueue(TaskInstance taskInstance) {
        return taskInstance.getStartDelayMs() > 0;
    }

    private void addToQueue(List<TaskInstance> list) {
        getQueueService().offer(QueueUtils.getTaskDelayQueueName(), (List) list.stream().map(taskInstance -> {
            QueueMessage queueMessage = new QueueMessage();
            queueMessage.setType(taskInstance.getTaskName());
            queueMessage.setId(Joiner.on("@").join(taskInstance.getTaskId(), Integer.valueOf(taskInstance.getRetryCount()), new Object[0]));
            queueMessage.setDelayMs(taskInstance.getStartDelayMs());
            return queueMessage;
        }).collect(Collectors.toList()));
    }

    private boolean executeUnsafe(TaskInstance taskInstance) {
        if (taskInstance.getStatus().isFinished()) {
            log.warn("Task instance:{} finished", taskInstance.getTaskId());
            return false;
        }
        if (taskInstance.getStatus().isScheduled() && checkConcurrency(taskInstance)) {
            TaskInstance retryTask = getRetryTask(taskInstance);
            taskInstance.setReasonForNotCompleting(taskInstance.getReasonForNotCompleting() + "(concurrency limit)");
            addToQueue(Collections.singletonList(retryTask));
            return true;
        }
        log.info("Start execute task: {} {}", taskInstance.getTaskName(), taskInstance.getTaskId());
        try {
            try {
                FlowContext.setCurrentTask(taskInstance);
                if (taskInstance.getStatus().isScheduled()) {
                    this.taskAspect.onCreated(taskInstance);
                }
                taskInstance.setStatus(TaskStatus.IN_PROGRESS);
                if (0 == taskInstance.getStartTime()) {
                    taskInstance.setStartTime(TimeUtils.currentTimeMillis());
                }
                boolean isEnableCache = isEnableCache(taskInstance);
                String str = null;
                FlowCache flowCache = null;
                if (isEnableCache) {
                    str = getCacheKey(taskInstance);
                    flowCache = this.flowCacheFactory.getDefaultCache();
                    Object obj = flowCache.get(str);
                    if (obj != null) {
                        taskInstance.setOutput(obj);
                        taskInstance.setStatus(TaskStatus.COMPLETED);
                        taskInstance.setEndTime(TimeUtils.currentTimeMillis());
                        log.info("Finish execute task: {} {}", taskInstance.getTaskName(), taskInstance.getTaskId());
                        FlowContext.removeCurrentTask();
                        return true;
                    }
                }
                boolean execute = this.flowTaskRegistry.getFlowTask(taskInstance.getTaskDef().getType()).execute(taskInstance);
                if (execute) {
                    buildLink(taskInstance);
                    taskInstance.setOutput(ParameterUtils.getTaskOutput(taskInstance));
                    if (taskInstance.getStatus().isCompleted()) {
                        if (isEnableCache) {
                            flowCache.put(str, taskInstance.getOutput());
                        } else {
                            checkAndHang(taskInstance);
                        }
                    }
                }
                return execute;
            } catch (TerminateException e) {
                e.setTaskInstance(taskInstance);
                throw e;
            }
        } finally {
            taskInstance.setEndTime(TimeUtils.currentTimeMillis());
            log.info("Finish execute task: {} {}", taskInstance.getTaskName(), taskInstance.getTaskId());
            FlowContext.removeCurrentTask();
        }
    }

    private boolean isEnableCache(TaskInstance taskInstance) {
        TaskDef.ControlDef controlDef = taskInstance.getTaskDef().getControlDef();
        if (controlDef == null || controlDef.getEnableCache() == null) {
            return false;
        }
        return controlDef.getEnableCache().booleanValue();
    }

    private String getCacheKey(TaskInstance taskInstance) {
        return taskInstance.getTaskName() + ":" + JsonUtils.toJsonString(taskInstance.getInput());
    }

    private void buildLink(TaskInstance taskInstance) {
        TaskDef.LinkDef linkDef = taskInstance.getTaskDef().getLinkDef();
        if (linkDef == null || taskInstance.getLink() != null) {
            return;
        }
        try {
            TaskInstance.Link link = new TaskInstance.Link();
            link.setTitle(linkDef.getTitle());
            Optional.ofNullable(ParameterUtils.getMappingValue(FlowContext.getCurrentFlow(), linkDef.getUrl())).ifPresent(obj -> {
                link.setUrl(String.valueOf(obj));
            });
            taskInstance.setLink(link);
        } catch (Throwable th) {
        }
    }

    private void checkAndHang(TaskInstance taskInstance) {
        if (taskInstance.getTaskDef().getCheckDef() != null) {
            if (!checkSuccess(taskInstance)) {
                log.info("Business failure, reason: {}", taskInstance.getReasonForNotCompleting());
                taskInstance.setStatus(TaskStatus.FAILED_WITH_TERMINAL_ERROR);
                return;
            } else if (checkRetry(taskInstance)) {
                log.info("The retry condition is met, waiting for retry, current {} retries, taskName: {} taskId: {}", new Object[]{Integer.valueOf(taskInstance.getRetryCount()), taskInstance.getTaskName(), taskInstance.getTaskId()});
                taskInstance.setStatus(TaskStatus.RETRIED);
                taskInstance.setReasonForNotCompleting("Waiting for retry");
                return;
            }
        }
        if (taskInstance.getTaskDef().getHangDef() != null) {
            processHang(taskInstance);
        }
    }

    private boolean checkSuccess(TaskInstance taskInstance) {
        TaskDef.CheckDef checkDef = taskInstance.getTaskDef().getCheckDef();
        TaskDef.SuccessDef successDef = checkDef.getSuccessDef();
        if (successDef == null || !StringUtils.isNotBlank(successDef.getSuccessCondition())) {
            return true;
        }
        boolean z = BooleanUtils.toBoolean(String.valueOf(this.engineActuator.compute((String) Optional.ofNullable(checkDef.getEngineType()).orElse(FlowConstants.DEFAULT_ENGINE_TYPE), successDef.getSuccessCondition(), taskInstance.getOutput())));
        if (!z && StringUtils.isNotBlank(successDef.getFailureReasonExpression())) {
            taskInstance.setReasonForNotCompleting(JsonUtils.toJsonString(this.engineActuator.compute((String) Optional.ofNullable(checkDef.getEngineType()).orElse(FlowConstants.DEFAULT_ENGINE_TYPE), successDef.getFailureReasonExpression(), taskInstance.getOutput())));
        }
        return z;
    }

    private boolean checkRetry(TaskInstance taskInstance) {
        TaskDef.CheckDef checkDef = taskInstance.getTaskDef().getCheckDef();
        TaskDef.RetryDef retryDef = checkDef.getRetryDef();
        if (retryDef != null && StringUtils.isNotBlank(retryDef.getRetryCondition())) {
            return BooleanUtils.toBoolean(String.valueOf(this.engineActuator.compute((String) Optional.ofNullable(checkDef.getEngineType()).orElse(FlowConstants.DEFAULT_ENGINE_TYPE), retryDef.getRetryCondition(), taskInstance.getOutput())));
        }
        return false;
    }

    private void processHang(TaskInstance taskInstance) {
        TaskDef.HangDef hangDef = taskInstance.getTaskDef().getHangDef();
        if (hangDef == null || hangDef.getDetermineTaskDef() == null) {
            return;
        }
        taskInstance.setStatus(TaskStatus.HANGED);
    }

    private void feedbackHang(FlowInstance flowInstance, TaskInstance taskInstance) {
        Optional filter = Optional.ofNullable(taskInstance.getParentTaskId()).filter((v0) -> {
            return StringUtils.isNotBlank(v0);
        });
        flowInstance.getClass();
        filter.flatMap(flowInstance::getTaskById).ifPresent(taskInstance2 -> {
            taskInstance2.setExecuted(false);
            if (taskInstance.getStatus().isCompleted()) {
                taskInstance2.setStatus(TaskStatus.COMPLETED);
            } else {
                taskInstance2.setStatus(TaskStatus.FAILED_WITH_TERMINAL_ERROR);
                taskInstance2.setReasonForNotCompleting((String) Optional.ofNullable(taskInstance.getReasonForNotCompleting()).orElse("Hanged"));
            }
            if (taskInstance2.getTaskDef().getHangDef().isFeedbackOutput()) {
                taskInstance2.setOutput(taskInstance.getOutput());
            }
            this.taskAspect.onTerminated(taskInstance2);
            updateTask(taskInstance2);
        });
    }

    private void exceptionHandler(FlowInstance flowInstance, Throwable th) {
        if (th instanceof TerminateException) {
            terminateExceptionHandler(flowInstance, (TerminateException) th);
            return;
        }
        if (th instanceof IllegalArgumentException) {
            terminateExceptionHandler(flowInstance, new TerminateException(th));
        } else if (!(th instanceof BizException)) {
            terminateUnsafe(flowInstance, FlowStatus.FAILED, ExceptionUtils.getMessage(th), flowInstance.getFlowDef().getFailureFlowName());
        } else {
            flowInstance.setStatus(FlowStatus.FAILED);
            flowInstance.setReasonForNotCompleting(th.getMessage());
        }
    }

    private void terminateExceptionHandler(FlowInstance flowInstance, TerminateException terminateException) {
        Optional.ofNullable(terminateException.getTaskInstance()).ifPresent(taskInstance -> {
            if (taskInstance.isHanging()) {
                feedbackHang(flowInstance, taskInstance);
            }
            this.taskAspect.onTerminated(taskInstance);
            updateTask(taskInstance);
        });
        terminateUnsafe(flowInstance, terminateException.getFlowStatus(), terminateException.getLocalizedMessage(), flowInstance.getFlowDef().getFailureFlowName());
    }

    private void terminal(FlowInstance flowInstance) {
        if (flowInstance.getStatus().isTerminal()) {
            if (!flowInstance.getStatus().isSuccessful()) {
                cancelNonTerminalTasks(flowInstance);
            }
            try {
                updateOutput(flowInstance);
                this.flowAspect.onTerminated(flowInstance);
                updateParentFlow(flowInstance);
                updateFlow(flowInstance);
            } finally {
                this.flowLockFacade.deleteLock(flowInstance.getFlowId());
            }
        }
    }

    private void updateOutput(FlowInstance flowInstance) {
        flowInstance.setOutput(ParameterUtils.getFlowOutput(flowInstance));
    }

    private void completeFlow(FlowInstance flowInstance) {
        if (flowInstance.getStatus().equals(FlowStatus.COMPLETED)) {
            return;
        }
        if (flowInstance.getStatus().isTerminal()) {
            throw new FlowException(FlowErrorCode.FLOW_EXECUTION_CONFLICT, "Flow is already in terminal state. Current status: " + flowInstance.getStatus());
        }
        flowInstance.setStatus(FlowStatus.COMPLETED);
    }

    public void createFlow(FlowInstance flowInstance) {
        this.flowAspect.onCreating(flowInstance);
        getExecutionDAO(flowInstance).createFlow(flowInstance);
        this.flowAspect.onCreated(flowInstance);
        log.info("Create new flow instance: {} {}", flowInstance.getFlowName(), flowInstance.getFlowId());
    }

    private void updateFlow(FlowInstance flowInstance) {
        flowInstance.setLastUpdated(TimeUtils.currentTimeMillis());
        if (flowInstance.getStatus().isTerminal()) {
            flowInstance.setEndTime(TimeUtils.currentTimeMillis());
        }
        getExecutionDAO().updateFlow(flowInstance);
    }

    private void updateParentFlow(FlowInstance flowInstance) {
        if (StringUtils.isBlank(flowInstance.getParentFlowId()) || StringUtils.isBlank(flowInstance.getParentTaskId())) {
            return;
        }
        TaskResult taskResult = new TaskResult(flowInstance.getParentFlowId(), flowInstance.getParentTaskId(), convertStatus(flowInstance.getStatus()));
        taskResult.setOutput(flowInstance.getOutput());
        taskResult.setReasonForNotCompleting(flowInstance.getReasonForNotCompleting());
        updateTask(taskResult);
    }

    private TaskStatus convertStatus(FlowStatus flowStatus) {
        switch (AnonymousClass1.$SwitchMap$xyz$mytang0$brook$common$metadata$enums$FlowStatus[flowStatus.ordinal()]) {
            case 1:
                return TaskStatus.COMPLETED;
            case 2:
            case 3:
            case 4:
                return TaskStatus.FAILED;
            case 5:
                return TaskStatus.TIMED_OUT;
            case 6:
                return TaskStatus.IN_PROGRESS;
            default:
                return TaskStatus.FAILED;
        }
    }

    public TaskInstance getTaskByName(String str, String str2) {
        return getExecutionDAO().getTaskByName(str, str2);
    }

    public void updateTask(TaskResult taskResult) {
        if (!this.flowLockFacade.acquireLock(taskResult.getFlowId())) {
            throw new FlowException(FlowErrorCode.FLOW_EXECUTION_ERROR, String.format("Error acquiring lock when update task result, flowId: %s taskId: %s", taskResult.getFlowId(), taskResult.getTaskId()));
        }
        try {
            updateTaskUnsafe(taskResult);
        } finally {
            this.flowLockFacade.releaseLock(taskResult.getFlowId());
        }
    }

    private void updateTaskUnsafe(TaskResult taskResult) {
        FlowInstance flow = getFlow(taskResult.getFlowId());
        if (flow == null) {
            throw new FlowException(FlowErrorCode.FLOW_NOT_EXIST, String.format("No such flow found by flowId: %s", taskResult.getFlowId()));
        }
        if (flow.getStatus().isTerminal()) {
            log.info("Flow: {} has already finished execution. Task update for: {} ignored.", taskResult.getFlowId(), taskResult.getTaskId());
            return;
        }
        fillDef(flow);
        TaskInstance taskInstance = (TaskInstance) flow.getTaskById(taskResult.getTaskId()).orElseThrow(() -> {
            return new FlowException(FlowErrorCode.TASK_NOT_EXIST, String.format("No such task found by taskId: %s", taskResult.getTaskId()));
        });
        taskInstance.setOutput(taskResult.getOutput());
        taskInstance.setStatus(taskResult.getStatus());
        taskInstance.setProgress(taskResult.getProgress());
        taskInstance.setReasonForNotCompleting(taskResult.getReasonForNotCompleting());
        taskInstance.setEndTime(TimeUtils.currentTimeMillis());
        this.taskAspect.onTerminated(taskInstance);
        updateTask(taskInstance);
        executePerfectlyUnsafe(flow);
    }

    private List<TaskInstance> createTasks(List<TaskInstance> list) {
        return getExecutionDAO().createTasks(list);
    }

    private void updateTasks(List<TaskInstance> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        list.forEach(this::fillTask);
        try {
            getExecutionDAO().updateTasks(list);
        } catch (Throwable th) {
            log.error(String.format("Error updating tasks: %s in flow: %s", Joiner.on(",").join((Iterable) list.stream().map((v0) -> {
                return v0.getTaskId();
            }).collect(Collectors.toList())), list.get(0).getFlowId()), th);
            throw th;
        }
    }

    private void updateTask(TaskInstance taskInstance) {
        try {
            getExecutionDAO().updateTask(fillTask(taskInstance));
        } catch (Throwable th) {
            log.error(String.format("Error updating task: %s in flow: %s", taskInstance.getTaskId(), taskInstance.getFlowId()), th);
            throw th;
        }
    }

    private void deleteTask(String str) {
        getExecutionDAO().deleteTask(str);
    }

    private void fillDef(FlowInstance flowInstance) {
        if (flowInstance.getFlowDef() == null) {
            Optional ofNullable = Optional.ofNullable(this.metadataService.getFlow(flowInstance.getFlowName()));
            flowInstance.getClass();
            ofNullable.ifPresent(flowInstance::setFlowDef);
        }
    }

    private TaskInstance fillTask(TaskInstance taskInstance) {
        if (taskInstance.getStatus() != null) {
            taskInstance.setLastUpdated(TimeUtils.currentTimeMillis());
            if (taskInstance.getStatus().isTerminal() && taskInstance.getEndTime() == 0) {
                taskInstance.setEndTime(TimeUtils.currentTimeMillis());
            }
        }
        return taskInstance;
    }

    private boolean checkForFlowCompletion(FlowInstance flowInstance) {
        if (CollectionUtils.isEmpty(flowInstance.getTaskInstances())) {
            return false;
        }
        HashMap hashMap = new HashMap();
        flowInstance.getTaskInstances().forEach(taskInstance -> {
        });
        if (((Stream) flowInstance.getFlowDef().getTaskDefs().stream().parallel()).allMatch(taskDef -> {
            if (flowInstance.getSkipTasks().contains(taskDef.getName())) {
                return true;
            }
            TaskStatus taskStatus = (TaskStatus) hashMap.get(taskDef.getName());
            return taskStatus != null && taskStatus.isSuccessful() && taskStatus.isTerminal();
        }) && hashMap.values().stream().allMatch((v0) -> {
            return v0.isTerminal();
        })) {
            return ((Stream) flowInstance.getTaskInstances().stream().parallel()).noneMatch(taskInstance2 -> {
                TaskDef nextTask = getNextTask(flowInstance, taskInstance2.getTaskDef());
                return (nextTask == null || hashMap.containsKey(nextTask.getName())) ? false : true;
            });
        }
        return false;
    }

    private static FlowInstance newFlowInstance(StartFlowReq startFlowReq) {
        FlowDef flowDef = (FlowDef) Objects.requireNonNull(startFlowReq.getFlowDef());
        FlowInstance create = FlowInstance.create(flowDef);
        create.setParentFlowId(startFlowReq.getParentFlowId());
        create.setParentTaskId(startFlowReq.getParentTaskId());
        create.setCorrelationId(startFlowReq.getCorrelationId());
        create.setInput(ParameterUtils.getFlowInput(flowDef, startFlowReq.getInput(), startFlowReq.getExtension()));
        create.setExtension(startFlowReq.getExtension());
        return create;
    }

    private static long remainWaitTime(long j, long j2) {
        long currentTimeMillis = TimeUtils.currentTimeMillis() - j2;
        return currentTimeMillis < 0 ? j : j - currentTimeMillis;
    }

    private void checkConcurrency(FlowInstance flowInstance) {
        Integer concurrencyLimit;
        FlowDef flowDef = flowInstance.getFlowDef();
        if (flowDef == null || flowDef.getControlDef() == null || (concurrencyLimit = flowDef.getControlDef().getConcurrencyLimit()) == null || 0 >= concurrencyLimit.intValue()) {
            return;
        }
        List runningFlowIds = getExecutionDAO().getRunningFlowIds(flowDef.getName());
        if (CollectionUtils.isNotEmpty(runningFlowIds) && concurrencyLimit.intValue() < runningFlowIds.size()) {
            throw new FlowException(FlowErrorCode.CONCURRENCY_LIMIT, String.format("Trigger concurrency limit, limit: %d current: %d", concurrencyLimit, Integer.valueOf(runningFlowIds.size())));
        }
    }

    private boolean checkConcurrency(TaskInstance taskInstance) {
        Integer concurrencyLimit;
        TaskDef taskDef = taskInstance.getTaskDef();
        if (taskDef == null || taskDef.getControlDef() == null || (concurrencyLimit = taskDef.getControlDef().getConcurrencyLimit()) == null || 0 >= concurrencyLimit.intValue()) {
            return false;
        }
        List runningTaskIds = getExecutionDAO().getRunningTaskIds(taskDef.getName());
        if (!CollectionUtils.isNotEmpty(runningTaskIds) || concurrencyLimit.intValue() >= runningTaskIds.size() || runningTaskIds.contains(taskInstance.getTaskId())) {
            return false;
        }
        log.warn("Task: {} trigger concurrency limit, limit: {} current: {}", new Object[]{taskInstance.getTaskId(), concurrencyLimit, Integer.valueOf(runningTaskIds.size())});
        return true;
    }

    private FlowInstance findLastFailedIfAny(FlowInstance flowInstance) {
        return (FlowInstance) flowInstance.getTaskInstances().stream().filter(taskInstance -> {
            return taskInstance.getStatus().isUnsuccessfullyTerminated();
        }).findFirst().map(taskInstance2 -> {
            return findFirstFailedIfAny(taskInstance2, flowInstance);
        }).orElse(null);
    }

    private FlowInstance findFirstFailedIfAny(TaskInstance taskInstance, FlowInstance flowInstance) {
        if (StringUtils.isNotBlank(taskInstance.getSubFlowId()) && taskInstance.getStatus().isTerminal() && !taskInstance.getStatus().isSuccessful()) {
            FlowInstance flow = getFlow(taskInstance.getSubFlowId());
            Optional findFirst = flow.getTaskInstances().stream().filter(taskInstance2 -> {
                return taskInstance2.getStatus().isUnsuccessfullyTerminated();
            }).findFirst();
            if (findFirst.isPresent()) {
                return findFirstFailedIfAny((TaskInstance) findFirst.get(), flow);
            }
        }
        return flowInstance;
    }

    private ExecutionDAO getExecutionDAO() {
        return getExecutionDAO(null);
    }

    private ExecutionDAO getExecutionDAO(FlowInstance flowInstance) {
        String protocol = this.executionProperties.getProtocol();
        if (flowInstance == null) {
            flowInstance = FlowContext.getCurrentFlow();
        }
        if (flowInstance != null && flowInstance.getFlowDef() != null) {
            FlowDef flowDef = flowInstance.getFlowDef();
            if (flowDef.getControlDef() != null) {
                FlowDef.ControlDef controlDef = flowDef.getControlDef();
                if (StringUtils.isNotBlank(controlDef.getExecutionProtocol())) {
                    protocol = controlDef.getExecutionProtocol();
                }
            }
        }
        return (ExecutionDAO) ExtensionLoader.getExtension(ExecutionDAO.class, protocol);
    }

    private QueueService getQueueService() {
        String protocol = this.queueProperties.getProtocol();
        FlowInstance currentFlow = FlowContext.getCurrentFlow();
        if (currentFlow != null && currentFlow.getFlowDef() != null) {
            FlowDef flowDef = currentFlow.getFlowDef();
            if (flowDef.getControlDef() != null) {
                FlowDef.ControlDef controlDef = flowDef.getControlDef();
                if (StringUtils.isNotBlank(controlDef.getQueueProtocol())) {
                    protocol = controlDef.getQueueProtocol();
                }
            }
        }
        QueueService queueService = (QueueService) ExtensionLoader.getExtension(QueueService.class, protocol);
        if (queueService != null) {
            DelayedTaskMonitor.init(queueService, this, this.flowLockFacade, this.delayedTaskMonitorProperties);
        }
        return queueService;
    }

    static {
        $assertionsDisabled = !FlowExecutor.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(FlowExecutor.class);
    }
}
