/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.jdbc.runner;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.flows.Concurrency;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.ExecutableUtils;
import io.kestra.core.runners.ExecutionDelay;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.core.runners.Executor;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.runners.ExecutorService;
import io.kestra.core.runners.ExecutorState;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.runners.SubflowExecution;
import io.kestra.core.runners.SubflowExecutionResult;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.runners.WorkerTask;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.runners.WorkerTaskRunning;
import io.kestra.core.runners.WorkerTrigger;
import io.kestra.core.runners.WorkerTriggerRunning;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.services.AbstractFlowTriggerService;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.LogService;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.TaskDefaultService;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.core.storages.Storage;
import io.kestra.core.tasks.flows.ForEachItem;
import io.kestra.core.tasks.flows.Template;
import io.kestra.core.topologies.FlowTopologyService;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.Either;
import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.JdbcMapper;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.kestra.jdbc.runner.AbstractJdbcExecutionDelayStorage;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.jdbc.runner.AbstractJdbcSubflowExecutionStorage;
import io.kestra.jdbc.runner.JdbcQueue;
import io.kestra.jdbc.runner.JdbcRunnerEnabled;
import io.kestra.jdbc.runner.JdbcServiceLivenessCoordinator;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.transaction.exceptions.CannotCreateTransactionException;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;
import org.apache.commons.lang3.tuple.Pair;
import org.jooq.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

@Singleton
@JdbcRunnerEnabled
public class JdbcExecutor
implements ExecutorInterface,
Service {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(JdbcExecutor.class);
    private static final ObjectMapper MAPPER = JdbcMapper.of();
    private final ScheduledExecutorService scheduledDelay = Executors.newSingleThreadScheduledExecutor();
    @Inject
    private ApplicationContext applicationContext;
    @Inject
    private AbstractJdbcExecutionRepository executionRepository;
    @Inject
    @Named(value="executionQueue")
    private QueueInterface<Execution> executionQueue;
    @Inject
    @Named(value="workerJobQueue")
    private QueueInterface<WorkerJob> workerTaskQueue;
    @Inject
    @Named(value="workerTaskResultQueue")
    private QueueInterface<WorkerTaskResult> workerTaskResultQueue;
    @Inject
    @Named(value="workerTaskLogQueue")
    private QueueInterface<LogEntry> logQueue;
    @Inject
    @Named(value="flowQueue")
    private QueueInterface<Flow> flowQueue;
    @Inject
    @Named(value="executionKilledQueue")
    protected QueueInterface<ExecutionKilled> killQueue;
    @Inject
    @Named(value="subflowExecutionResultQueue")
    private QueueInterface<SubflowExecutionResult> subflowExecutionResultQueue;
    @Inject
    private RunContextFactory runContextFactory;
    @Inject
    private TaskDefaultService taskDefaultService;
    @Inject
    private Optional<Template.TemplateExecutorInterface> templateExecutorInterface;
    @Inject
    private ExecutorService executorService;
    @Inject
    private ConditionService conditionService;
    @Inject
    private MultipleConditionStorageInterface multipleConditionStorage;
    @Inject
    private AbstractFlowTriggerService flowTriggerService;
    @Inject
    private MetricRegistry metricRegistry;
    @Inject
    protected FlowListenersInterface flowListeners;
    @Inject
    private AbstractJdbcSubflowExecutionStorage subflowExecutionStorage;
    @Inject
    private ExecutionService executionService;
    @Inject
    private AbstractJdbcExecutionDelayStorage executionDelayStorage;
    @Inject
    private AbstractJdbcExecutionQueuedStorage executionQueuedStorage;
    @Inject
    private AbstractJdbcExecutorStateStorage executorStateStorage;
    @Inject
    private FlowTopologyService flowTopologyService;
    protected List<Flow> allFlows;
    @Inject
    private WorkerGroupService workerGroupService;
    @Inject
    private SkipExecutionService skipExecutionService;
    @Inject
    private AbstractJdbcWorkerJobRunningRepository workerJobRunningRepository;
    @Inject
    private LogService logService;
    private final FlowRepositoryInterface flowRepository;
    private final JdbcServiceLivenessCoordinator serviceLivenessCoordinator;
    private final ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher;
    private final AbstractJdbcFlowTopologyRepository flowTopologyRepository;
    private final String id = IdUtils.create();
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final AtomicReference<Service.ServiceState> state = new AtomicReference();

    @Inject
    public JdbcExecutor(JdbcServiceLivenessCoordinator serviceLivenessCoordinator, FlowRepositoryInterface flowRepository, AbstractJdbcFlowTopologyRepository flowTopologyRepository, ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher) {
        this.serviceLivenessCoordinator = serviceLivenessCoordinator;
        this.flowRepository = flowRepository;
        this.flowTopologyRepository = flowTopologyRepository;
        this.eventPublisher = eventPublisher;
    }

    public void run() {
        this.setState(Service.ServiceState.CREATED);
        this.serviceLivenessCoordinator.setExecutor(this);
        this.flowListeners.run();
        this.flowListeners.listen(flows -> {
            this.allFlows = flows;
        });
        Await.until(() -> this.allFlows != null, (Duration)Duration.ofMillis(100L), (Duration)Duration.ofMinutes(5L));
        this.executionQueue.receive(Executor.class, this::executionQueue);
        this.workerTaskResultQueue.receive(Executor.class, this::workerTaskResultQueue);
        this.killQueue.receive(Executor.class, this::killQueue);
        this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue);
        ScheduledFuture<?> scheduledDelayFuture = this.scheduledDelay.scheduleAtFixedRate(this::executionDelaySend, 0L, 1L, TimeUnit.SECONDS);
        Thread scheduledDelayExceptionThread = new Thread(() -> {
            block2: {
                Await.until(scheduledDelayFuture::isDone);
                try {
                    scheduledDelayFuture.get();
                }
                catch (InterruptedException | ExecutionException e) {
                    if (e.getCause().getClass() == CannotCreateTransactionException.class) break block2;
                    log.error("Executor fatal exception in the scheduledDelay thread", (Throwable)e);
                    this.close();
                    KestraContext.getContext().shutdown();
                }
            }
        }, "jdbc-delay-exception-watcher");
        scheduledDelayExceptionThread.start();
        this.flowQueue.receive(FlowTopology.class, either -> {
            Flow flow;
            if (either.isRight()) {
                log.error("Unable to deserialize a flow: {}", (Object)((DeserializationException)((Object)((Object)either.getRight()))).getMessage());
                try {
                    JsonNode jsonNode = MAPPER.readTree(((DeserializationException)((Object)((Object)either.getRight()))).getRecord());
                    flow = (Flow)FlowWithException.from((JsonNode)jsonNode, (Exception)((Exception)either.getRight())).orElseThrow(IOException::new);
                }
                catch (IOException e) {
                    log.error("Unexpected exception when trying to handle a deserialization error", (Throwable)e);
                    return;
                }
            } else {
                flow = (Flow)either.getLeft();
            }
            this.flowTopologyRepository.save(flow, (flow.isDeleted() ? Stream.empty() : this.flowTopologyService.topology(flow, this.allFlows.stream())).distinct().collect(Collectors.toList()));
        });
        this.setState(Service.ServiceState.RUNNING);
    }

    void reEmitWorkerJobsForWorkers(Configuration configuration, List<String> ids) {
        this.workerJobRunningRepository.getWorkerJobWithWorkerDead(configuration.dsl(), ids).forEach(workerJobRunning -> {
            if (workerJobRunning instanceof WorkerTaskRunning) {
                WorkerTaskRunning workerTaskRunning = (WorkerTaskRunning)workerJobRunning;
                if (this.skipExecutionService.skipExecution(workerTaskRunning.getTaskRun().getExecutionId())) {
                    log.warn("Skipping execution {}", (Object)workerTaskRunning.getTaskRun().getId());
                    this.workerJobRunningRepository.deleteByKey(workerTaskRunning.uid());
                } else {
                    this.workerTaskQueue.emit((Object)WorkerTask.builder().taskRun(workerTaskRunning.getTaskRun()).task(workerTaskRunning.getTask()).runContext(workerTaskRunning.getRunContext()).build());
                    this.logService.logTaskRun(workerTaskRunning.getTaskRun(), log, Level.WARN, "Re-emitting WorkerTask.", new Object[0]);
                }
            }
            if (workerJobRunning instanceof WorkerTriggerRunning) {
                WorkerTriggerRunning workerTriggerRunning = (WorkerTriggerRunning)workerJobRunning;
                this.workerTaskQueue.emit((Object)WorkerTrigger.builder().trigger(workerTriggerRunning.getTrigger()).conditionContext(workerTriggerRunning.getConditionContext()).triggerContext(workerTriggerRunning.getTriggerContext()).build());
                this.logService.logTrigger(workerTriggerRunning.getTriggerContext(), log, Level.WARN, "Re-emitting WorkerTrigger.", new Object[0]);
            }
        });
    }

    private void executionQueue(Either<Execution, DeserializationException> either) {
        if (either.isRight()) {
            log.error("Unable to deserialize an execution: {}", (Object)((DeserializationException)((Object)either.getRight())).getMessage());
            return;
        }
        Execution message = (Execution)either.getLeft();
        if (this.skipExecutionService.skipExecution(message.getId())) {
            log.warn("Skipping execution {}", (Object)message.getId());
            return;
        }
        Executor result = this.executionRepository.lock(message.getId(), pair -> {
            Execution execution = (Execution)pair.getLeft();
            ExecutorState executorState = (ExecutorState)pair.getRight();
            Flow flow = this.transform(this.flowRepository.findByExecution(execution), execution);
            Executor executor = new Executor(execution, null).withFlow(flow);
            if (execution.getState().getCurrent() == State.Type.CREATED && flow.getConcurrency() != null) {
                ExecutionCount count = this.executionRepository.executionCounts(flow.getTenantId(), List.of(new io.kestra.core.models.executions.statistics.Flow(flow.getNamespace(), flow.getId())), List.of(State.Type.RUNNING, State.Type.PAUSED), null, null).get(0);
                if ((executor = this.executorService.checkConcurrencyLimit(executor, flow, execution, count.getCount().longValue())).getExecutionRunning() != null && executor.getExecutionRunning().getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
                    this.executionQueuedStorage.save(ExecutionQueued.fromExecutionRunning((ExecutionRunning)executor.getExecutionRunning()));
                    return Pair.of((Object)executor, (Object)executorState);
                }
                if (executor.getExecution().getState().isTerminated()) {
                    return Pair.of((Object)executor, (Object)executorState);
                }
            }
            if (log.isDebugEnabled()) {
                this.executorService.log(log, Boolean.valueOf(true), executor);
            }
            if (!(executor = this.executorService.process(executor)).getNexts().isEmpty() && this.deduplicateNexts(execution, executorState, executor.getNexts())) {
                executor.withExecution(this.executorService.onNexts(executor.getFlow(), executor.getExecution(), executor.getNexts()), "onNexts");
            }
            if (!executor.getWorkerTasks().isEmpty()) {
                List<WorkerTask> workerTasksDedup = executor.getWorkerTasks().stream().filter(workerTask -> this.deduplicateWorkerTask(execution, executorState, workerTask.getTaskRun())).toList();
                workerTasksDedup.stream().filter(workerTask -> workerTask.getTask().isSendToWorkerTask()).forEach(workerTask -> this.workerTaskQueue.emit(this.workerGroupService.resolveGroupFromJob((WorkerJob)workerTask), workerTask));
                workerTasksDedup.stream().filter(workerTask -> workerTask.getTask().isFlowable()).map(workerTask -> new WorkerTaskResult(workerTask.withTaskRun(workerTask.getTaskRun().withState(State.Type.RUNNING)))).forEach(arg_0 -> this.workerTaskResultQueue.emit(arg_0));
            }
            if (!executor.getWorkerTaskResults().isEmpty()) {
                executor.getWorkerTaskResults().forEach(arg_0 -> this.workerTaskResultQueue.emit(arg_0));
            }
            if (!executor.getSubflowExecutionResults().isEmpty()) {
                executor.getSubflowExecutionResults().forEach(arg_0 -> this.subflowExecutionResultQueue.emit(arg_0));
            }
            if (!executor.getExecutionDelays().isEmpty()) {
                executor.getExecutionDelays().forEach(executionDelay -> this.executionDelayStorage.save((ExecutionDelay)executionDelay));
            }
            if (!executor.getSubflowExecutions().isEmpty()) {
                this.subflowExecutionStorage.save(executor.getSubflowExecutions());
                List<SubflowExecution> subflowExecutionDedup = executor.getSubflowExecutions().stream().filter(subflowExecution -> this.deduplicateSubflowExecution(execution, executorState, subflowExecution.getParentTaskRun())).toList();
                subflowExecutionDedup.forEach(subflowExecution -> {
                    String log = "Create new execution for flow '" + subflowExecution.getExecution().getNamespace() + "'.'" + subflowExecution.getExecution().getFlowId() + "' with id '" + subflowExecution.getExecution().getId() + "'";
                    log.info(log);
                    this.logQueue.emit((Object)LogEntry.of((TaskRun)subflowExecution.getParentTaskRun()).toBuilder().level(Level.INFO).message(log).timestamp(subflowExecution.getParentTaskRun().getState().getStartDate()).thread(Thread.currentThread().getName()).build());
                    this.executionQueue.emit((Object)subflowExecution.getExecution());
                    if (((ExecutableTask)subflowExecution.getParentTask()).waitForExecution()) {
                        this.sendSubflowExecutionResult(execution, (SubflowExecution<?>)subflowExecution, subflowExecution.getParentTaskRun());
                    }
                });
            }
            return Pair.of((Object)executor, (Object)executorState);
        });
        if (result != null) {
            this.toExecution(result);
        }
    }

    private void sendSubflowExecutionResult(Execution execution, SubflowExecution<?> subflowExecution, TaskRun taskRun) {
        Flow workerTaskFlow = this.flowRepository.findByExecution(execution);
        ExecutableTask executableTask = (ExecutableTask)subflowExecution.getParentTask();
        RunContext runContext = this.runContextFactory.of(workerTaskFlow, subflowExecution.getParentTask(), execution, subflowExecution.getParentTaskRun());
        try {
            Optional subflowExecutionResult = executableTask.createSubflowExecutionResult(runContext, taskRun, workerTaskFlow, execution);
            subflowExecutionResult.ifPresent(workerTaskResult -> this.subflowExecutionResultQueue.emit(workerTaskResult));
        }
        catch (Exception e) {
            log.error("Unable to create the Subflow Execution Result", (Throwable)e);
            this.subflowExecutionResultQueue.emit((Object)SubflowExecutionResult.builder().executionId(execution.getId()).state(State.Type.FAILED).parentTaskRun(taskRun.withState(State.Type.FAILED).withAttempts(List.of(TaskRunAttempt.builder().state(new State().withState(State.Type.FAILED)).build()))).build());
        }
    }

    private void workerTaskResultQueue(Either<WorkerTaskResult, DeserializationException> either) {
        Executor executor;
        if (either.isRight()) {
            log.error("Unable to deserialize a worker task result: {}", (Object)((DeserializationException)((Object)either.getRight())).getMessage());
            return;
        }
        WorkerTaskResult message = (WorkerTaskResult)either.getLeft();
        if (this.skipExecutionService.skipExecution(message.getTaskRun().getExecutionId())) {
            log.warn("Skipping execution {}", (Object)message.getTaskRun().getExecutionId());
            return;
        }
        if (log.isDebugEnabled()) {
            this.executorService.log(log, Boolean.valueOf(true), message);
        }
        if ((executor = this.executionRepository.lock(message.getTaskRun().getExecutionId(), pair -> {
            Execution execution = (Execution)pair.getLeft();
            Executor current = new Executor(execution, null);
            if (execution == null) {
                throw new IllegalStateException("Execution state don't exist for " + message.getTaskRun().getExecutionId() + ", receive " + message);
            }
            if (execution.hasTaskRunJoinable(message.getTaskRun())) {
                try {
                    Flow flow = this.flowRepository.findByExecution(current.getExecution());
                    Execution newExecution = this.executorService.addDynamicTaskRun(current.getExecution(), flow, message);
                    if (newExecution != null) {
                        current = current.withExecution(newExecution, "addDynamicTaskRun");
                    }
                    TaskRun taskRun = message.getTaskRun();
                    newExecution = current.getExecution().withTaskRun(taskRun);
                    if (taskRun.getState().getCurrent() == State.Type.KILLED && taskRun.getParentTaskRunId() != null) {
                        newExecution = this.executionService.killParentTaskruns(taskRun, newExecution);
                    }
                    current = current.withExecution(newExecution, "joinWorkerResult");
                    if (taskRun.getState().isTerminated()) {
                        this.metricRegistry.counter("executor.taskrun.ended.count", this.metricRegistry.tags(message, new String[0])).increment();
                        this.metricRegistry.timer("executor.taskrun.ended.duration", this.metricRegistry.tags(message, new String[0])).record(taskRun.getState().getDuration());
                        log.trace("TaskRun terminated: {}", (Object)taskRun);
                        this.workerJobRunningRepository.deleteByKey(taskRun.getId());
                    }
                    return Pair.of((Object)current, (Object)((ExecutorState)pair.getRight()));
                }
                catch (InternalException e) {
                    return Pair.of((Object)this.handleFailedExecutionFromExecutor(current, (Exception)((Object)e)), (Object)((ExecutorState)pair.getRight()));
                }
            }
            return null;
        })) != null) {
            this.toExecution(executor);
        }
    }

    private void subflowExecutionResultQueue(Either<SubflowExecutionResult, DeserializationException> either) {
        Executor executor;
        if (either.isRight()) {
            log.error("Unable to deserialize a subflow execution result: {}", (Object)((DeserializationException)((Object)either.getRight())).getMessage());
            return;
        }
        SubflowExecutionResult message = (SubflowExecutionResult)either.getLeft();
        if (this.skipExecutionService.skipExecution(message.getExecutionId())) {
            log.warn("Skipping execution {}", (Object)message.getExecutionId());
            return;
        }
        if (this.skipExecutionService.skipExecution(message.getParentTaskRun().getExecutionId())) {
            log.warn("Skipping execution {}", (Object)message.getParentTaskRun().getExecutionId());
            return;
        }
        if (log.isDebugEnabled()) {
            this.executorService.log(log, Boolean.valueOf(true), message);
        }
        if ((executor = this.executionRepository.lock(message.getParentTaskRun().getExecutionId(), pair -> {
            Execution execution = (Execution)pair.getLeft();
            Executor current = new Executor(execution, null);
            if (execution == null) {
                throw new IllegalStateException("Execution state don't exist for " + message.getParentTaskRun().getExecutionId() + ", receive " + message);
            }
            if (execution.hasTaskRunJoinable(message.getParentTaskRun())) {
                try {
                    TaskRun taskRun;
                    Flow flow = this.flowRepository.findByExecution(current.getExecution());
                    Task task = flow.findTaskByTaskId(message.getParentTaskRun().getTaskId());
                    if (task instanceof ForEachItem.ForEachItemExecutable) {
                        ForEachItem.ForEachItemExecutable forEachItem = (ForEachItem.ForEachItemExecutable)task;
                        RunContext runContext = this.runContextFactory.of(flow, task, current.getExecution(), message.getParentTaskRun());
                        taskRun = ExecutableUtils.manageIterations((Storage)runContext.storage(), (TaskRun)message.getParentTaskRun(), (Execution)current.getExecution(), (boolean)forEachItem.getTransmitFailed(), (boolean)forEachItem.isAllowFailure());
                    } else {
                        taskRun = message.getParentTaskRun();
                    }
                    Execution newExecution = current.getExecution().withTaskRun(taskRun);
                    if (taskRun.getState().getCurrent() == State.Type.KILLED && taskRun.getParentTaskRunId() != null) {
                        newExecution = this.executionService.killParentTaskruns(taskRun, newExecution);
                    }
                    current = current.withExecution(newExecution, "joinSubflowExecutionResult");
                    if (taskRun.getState().isTerminated()) {
                        this.metricRegistry.counter("executor.taskrun.ended.count", this.metricRegistry.tags(message, new String[0])).increment();
                        this.metricRegistry.timer("executor.taskrun.ended.duration", this.metricRegistry.tags(message, new String[0])).record(taskRun.getState().getDuration());
                        log.trace("TaskRun terminated: {}", (Object)taskRun);
                    }
                    return Pair.of((Object)current, (Object)((ExecutorState)pair.getRight()));
                }
                catch (InternalException e) {
                    return Pair.of((Object)this.handleFailedExecutionFromExecutor(current, (Exception)((Object)e)), (Object)((ExecutorState)pair.getRight()));
                }
            }
            return null;
        })) != null) {
            this.toExecution(executor);
        }
    }

    private void killQueue(Either<ExecutionKilled, DeserializationException> either) {
        if (either.isRight()) {
            log.error("Unable to deserialize a killed execution: {}", (Object)((DeserializationException)((Object)either.getRight())).getMessage());
            return;
        }
        ExecutionKilled event = (ExecutionKilled)either.getLeft();
        if (event.getState() == ExecutionKilled.State.EXECUTED) {
            return;
        }
        if (this.skipExecutionService.skipExecution(event.getExecutionId())) {
            log.warn("Skipping execution {}", (Object)event.getExecutionId());
            return;
        }
        if (log.isDebugEnabled()) {
            this.executorService.log(log, Boolean.valueOf(true), event);
        }
        this.killQueue.emit((Object)ExecutionKilled.builder().executionId(event.getExecutionId()).isOnKillCascade(Boolean.valueOf(false)).state(ExecutionKilled.State.EXECUTED).build());
        Executor executor = this.mayTransitExecutionToKillingStateAndGet(event.getExecutionId());
        Boolean isOnKillCascade = Optional.ofNullable(event.getIsOnKillCascade()).orElse(true);
        if (isOnKillCascade.booleanValue()) {
            this.executionService.killSubflowExecutions(event.getTenantId(), event.getExecutionId()).doOnNext(arg_0 -> this.killQueue.emit(arg_0)).blockLast();
        }
        if (executor != null) {
            this.toExecution(executor);
        }
    }

    private Executor mayTransitExecutionToKillingStateAndGet(String executionId) {
        return this.executionRepository.lock(executionId, pair -> {
            Execution currentExecution = (Execution)pair.getLeft();
            Execution killing = this.executionService.kill(currentExecution);
            Executor current = new Executor(currentExecution, null).withExecution(killing, "joinKillingExecution");
            return Pair.of((Object)current, (Object)((ExecutorState)pair.getRight()));
        });
    }

    private void toExecution(Executor executor) {
        boolean shouldSend = false;
        boolean hasFailure = false;
        if (executor.getException() != null) {
            executor = this.handleFailedExecutionFromExecutor(executor, executor.getException());
            shouldSend = true;
            hasFailure = true;
        } else if (executor.isExecutionUpdated()) {
            shouldSend = true;
        }
        if (!shouldSend) {
            return;
        }
        if (log.isDebugEnabled()) {
            this.executorService.log(log, Boolean.valueOf(false), executor);
        }
        if (hasFailure) {
            this.executionQueue.emit((Object)executor.getExecution());
        } else {
            ((JdbcQueue)this.executionQueue).emitOnly(null, executor.getExecution());
        }
        if (this.executorService.canBePurged(executor)) {
            this.executorStateStorage.delete(executor.getExecution());
        }
        if (executor.getFlow() != null && this.conditionService.isTerminatedWithListeners(executor.getFlow(), executor.getExecution())) {
            Execution execution = executor.getExecution();
            this.flowTriggerService.computeExecutionsFromFlowTriggers(execution, this.allFlows, Optional.of(this.multipleConditionStorage)).forEach(arg_0 -> this.executionQueue.emit(arg_0));
            this.subflowExecutionStorage.get(execution.getId()).ifPresent(subflowExecution -> {
                if (subflowExecution.getParentTask() != null && ((ExecutableTask)subflowExecution.getParentTask()).waitForExecution()) {
                    this.sendSubflowExecutionResult(execution, (SubflowExecution<?>)subflowExecution, subflowExecution.getParentTaskRun().withState(execution.getState().getCurrent()));
                }
                this.subflowExecutionStorage.delete((SubflowExecution<?>)subflowExecution);
            });
            if (executor.getFlow().getConcurrency() != null && executor.getFlow().getConcurrency().getBehavior() == Concurrency.Behavior.QUEUE) {
                this.executionQueuedStorage.pop(executor.getFlow().getTenantId(), executor.getFlow().getNamespace(), executor.getFlow().getId(), queued -> this.executionQueue.emit((Object)queued.withState(State.Type.RUNNING)));
            }
        }
    }

    private Flow transform(Flow flow, Execution execution) {
        if (this.templateExecutorInterface.isPresent()) {
            try {
                flow = Template.injectTemplate((Flow)flow, (Execution)execution, (tenantId, namespace, id) -> this.templateExecutorInterface.get().findById(tenantId, namespace, id).orElse(null));
            }
            catch (InternalException e) {
                log.warn("Failed to inject template", (Throwable)e);
            }
        }
        return this.taskDefaultService.injectDefaults(flow, execution);
    }

    private void executionDelaySend() {
        if (this.shutdown.get()) {
            return;
        }
        this.executionDelayStorage.get(executionDelay -> {
            Executor result = this.executionRepository.lock(executionDelay.getExecutionId(), pair -> {
                Executor executor = new Executor((Execution)pair.getLeft(), null);
                try {
                    if (executionDelay.getDelayType().equals((Object)ExecutionDelay.DelayType.RESUME_FLOW)) {
                        Execution markAsExecution = this.executionService.markAs((Execution)pair.getKey(), executionDelay.getTaskRunId(), executionDelay.getState());
                        executor = executor.withExecution(markAsExecution, "pausedRestart");
                    } else if (executionDelay.getDelayType().equals((Object)ExecutionDelay.DelayType.RESTART_FAILED_TASK)) {
                        Execution newAttempt = this.executionService.retryTask((Execution)pair.getKey(), executionDelay.getTaskRunId());
                        executor = executor.withExecution(newAttempt, "retryFailedTask");
                    } else if (executionDelay.getDelayType().equals((Object)ExecutionDelay.DelayType.RESTART_FAILED_FLOW)) {
                        Execution newExecution = this.executionService.replay(executor.getExecution(), null, null);
                        executor = executor.withExecution(newExecution, "retryFailedFlow");
                    }
                }
                catch (Exception e) {
                    executor = this.handleFailedExecutionFromExecutor(executor, e);
                }
                return Pair.of((Object)executor, (Object)((ExecutorState)pair.getRight()));
            });
            if (result != null) {
                this.toExecution(result);
            }
        });
    }

    private boolean deduplicateNexts(Execution execution, ExecutorState executorState, List<TaskRun> taskRuns) {
        return taskRuns.stream().anyMatch(taskRun -> {
            String deduplicationKey = taskRun.getParentTaskRunId() + "-" + taskRun.getTaskId() + "-" + taskRun.getValue() + "-" + (taskRun.getAttempts() != null ? taskRun.getAttempts().size() : 0);
            if (executorState.getChildDeduplication().containsKey(deduplicationKey)) {
                log.trace("Duplicate Nexts on execution '{}' with key '{}'", (Object)execution.getId(), (Object)deduplicationKey);
                return false;
            }
            executorState.getChildDeduplication().put(deduplicationKey, taskRun.getId());
            return true;
        });
    }

    private boolean deduplicateWorkerTask(Execution execution, ExecutorState executorState, TaskRun taskRun) {
        String deduplicationKey = taskRun.getId() + (taskRun.getAttempts() != null ? taskRun.getAttempts().size() : 0);
        State.Type current = (State.Type)executorState.getWorkerTaskDeduplication().get(deduplicationKey);
        if (current == taskRun.getState().getCurrent()) {
            log.trace("Duplicate WorkerTask on execution '{}' for taskRun '{}', value '{}, taskId '{}'", new Object[]{execution.getId(), taskRun.getId(), taskRun.getValue(), taskRun.getTaskId()});
            return false;
        }
        executorState.getWorkerTaskDeduplication().put(deduplicationKey, taskRun.getState().getCurrent());
        return true;
    }

    private boolean deduplicateSubflowExecution(Execution execution, ExecutorState executorState, TaskRun taskRun) {
        String deduplicationKey = taskRun.getId() + (String)(taskRun.getIteration() == null ? "" : "-" + taskRun.getIteration());
        State.Type current = (State.Type)executorState.getSubflowExecutionDeduplication().get(deduplicationKey);
        if (current == taskRun.getState().getCurrent()) {
            log.trace("Duplicate SubflowExecution on execution '{}' for taskRun '{}', value '{}, taskId '{}'", new Object[]{execution.getId(), taskRun.getId(), taskRun.getValue(), taskRun.getTaskId()});
            return false;
        }
        executorState.getSubflowExecutionDeduplication().put(deduplicationKey, taskRun.getState().getCurrent());
        return true;
    }

    private Executor handleFailedExecutionFromExecutor(Executor executor, Exception e) {
        Execution.FailedExecutionWithLog failedExecutionWithLog = executor.getExecution().failedExecutionFromExecutor(e);
        try {
            failedExecutionWithLog.getLogs().forEach(arg_0 -> this.logQueue.emitAsync(arg_0));
        }
        catch (Exception ex) {
            log.error("Failed to produce {}", (Object)e.getMessage(), (Object)ex);
        }
        return executor.withExecution(failedExecutionWithLog.getExecution(), "exception");
    }

    @PreDestroy
    public void close() {
        if (this.shutdown.compareAndSet(false, true)) {
            log.info("Terminating.");
            this.setState(Service.ServiceState.TERMINATING);
            this.scheduledDelay.shutdown();
            this.setState(Service.ServiceState.TERMINATED_GRACEFULLY);
            log.info("Executor closed ({}).", (Object)this.state.get().name().toLowerCase());
        }
    }

    private void setState(Service.ServiceState state) {
        this.state.set(state);
        this.eventPublisher.publishEvent((Object)new ServiceStateChangeEvent((Service)this));
    }

    public String getId() {
        return this.id;
    }

    public Service.ServiceType getType() {
        return Service.ServiceType.EXECUTOR;
    }

    public Service.ServiceState getState() {
        return this.state.get();
    }
}

