package io.kestra.jdbc.runner;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.ExecutionKilledExecution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.flows.Concurrency;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.sla.ExecutionMonitoringSLA;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.flows.sla.SLAMonitor;
import io.kestra.core.models.flows.sla.SLAMonitorStorage;
import io.kestra.core.models.flows.sla.Violation;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.ExecutableUtils;
import io.kestra.core.runners.ExecutionDelay;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.core.runners.Executor;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.runners.ExecutorService;
import io.kestra.core.runners.ExecutorState;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.runners.SubflowExecution;
import io.kestra.core.runners.SubflowExecutionResult;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.runners.WorkerTask;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.runners.WorkerTaskRunning;
import io.kestra.core.runners.WorkerTrigger;
import io.kestra.core.runners.WorkerTriggerRunning;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.FlowTriggerService;
import io.kestra.core.services.LogService;
import io.kestra.core.services.PluginDefaultService;
import io.kestra.core.services.SLAService;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.core.topologies.FlowTopologyService;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.Either;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.Rethrow;
import io.kestra.core.utils.TruthUtils;
import io.kestra.jdbc.JdbcMapper;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.kestra.plugin.core.flow.ForEachItem;
import io.kestra.plugin.core.flow.Template;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.transaction.exceptions.CannotCreateTransactionException;
import jakarta.annotation.Nullable;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import lombok.Generated;
import org.apache.commons.lang3.tuple.Pair;
import org.jooq.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

@Singleton
@JdbcRunnerEnabled
/* loaded from: input_file:io/kestra/jdbc/runner/JdbcExecutor.class */
public class JdbcExecutor implements ExecutorInterface, Service {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(JdbcExecutor.class);
    private static final ObjectMapper MAPPER = JdbcMapper.of();

    @Inject
    private AbstractJdbcExecutionRepository executionRepository;

    @Inject
    @Named("executionQueue")
    private QueueInterface<Execution> executionQueue;

    @Inject
    @Named("workerJobQueue")
    private QueueInterface<WorkerJob> workerTaskQueue;

    @Inject
    @Named("workerTaskResultQueue")
    private QueueInterface<WorkerTaskResult> workerTaskResultQueue;

    @Inject
    @Named("workerTaskLogQueue")
    private QueueInterface<LogEntry> logQueue;

    @Inject
    @Named("flowQueue")
    private QueueInterface<FlowWithSource> flowQueue;

    @Inject
    @Named("executionKilledQueue")
    protected QueueInterface<ExecutionKilled> killQueue;

    @Inject
    @Named("subflowExecutionResultQueue")
    private QueueInterface<SubflowExecutionResult> subflowExecutionResultQueue;

    @Inject
    private RunContextFactory runContextFactory;

    @Inject
    private PluginDefaultService pluginDefaultService;

    @Inject
    private Optional<Template.TemplateExecutorInterface> templateExecutorInterface;

    @Inject
    private ExecutorService executorService;

    @Inject
    private ConditionService conditionService;

    @Inject
    private MultipleConditionStorageInterface multipleConditionStorage;

    @Inject
    private FlowTriggerService flowTriggerService;

    @Inject
    private MetricRegistry metricRegistry;

    @Inject
    protected FlowListenersInterface flowListeners;

    @Inject
    private AbstractJdbcSubflowExecutionStorage subflowExecutionStorage;

    @Inject
    private ExecutionService executionService;

    @Inject
    private AbstractJdbcExecutionDelayStorage executionDelayStorage;

    @Inject
    private AbstractJdbcExecutionQueuedStorage executionQueuedStorage;

    @Inject
    private AbstractJdbcExecutorStateStorage executorStateStorage;

    @Inject
    private FlowTopologyService flowTopologyService;
    protected List<FlowWithSource> allFlows;

    @Inject
    private WorkerGroupService workerGroupService;

    @Inject
    private SkipExecutionService skipExecutionService;

    @Inject
    private AbstractJdbcWorkerJobRunningRepository workerJobRunningRepository;

    @Inject
    private LogService logService;

    @Inject
    private SLAMonitorStorage slaMonitorStorage;

    @Inject
    private SLAService slaService;
    private final FlowRepositoryInterface flowRepository;
    private final JdbcServiceLivenessCoordinator serviceLivenessCoordinator;
    private final ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher;
    private final AbstractJdbcFlowTopologyRepository flowTopologyRepository;
    private final ScheduledExecutorService scheduledDelay = Executors.newSingleThreadScheduledExecutor();
    private final String id = IdUtils.create();
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final AtomicReference<Service.ServiceState> state = new AtomicReference<>();
    private final List<Runnable> receiveCancellations = new ArrayList();

    @Inject
    public JdbcExecutor(@Nullable JdbcServiceLivenessCoordinator jdbcServiceLivenessCoordinator, FlowRepositoryInterface flowRepositoryInterface, AbstractJdbcFlowTopologyRepository abstractJdbcFlowTopologyRepository, ApplicationEventPublisher<ServiceStateChangeEvent> applicationEventPublisher) {
        this.serviceLivenessCoordinator = jdbcServiceLivenessCoordinator;
        this.flowRepository = flowRepositoryInterface;
        this.flowTopologyRepository = abstractJdbcFlowTopologyRepository;
        this.eventPublisher = applicationEventPublisher;
    }

    public void run() {
        setState(Service.ServiceState.CREATED);
        if (this.serviceLivenessCoordinator != null) {
            this.serviceLivenessCoordinator.setExecutor(this);
        }
        this.flowListeners.run();
        this.flowListeners.listen(list -> {
            this.allFlows = list;
        });
        Await.until(() -> {
            return this.allFlows != null;
        }, Duration.ofMillis(100L), Duration.ofMinutes(5L));
        this.receiveCancellations.addFirst(this.executionQueue.receive(Executor.class, this::executionQueue));
        this.receiveCancellations.addFirst(this.workerTaskResultQueue.receive(Executor.class, this::workerTaskResultQueue));
        this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue));
        this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue));
        ScheduledFuture<?> scheduleAtFixedRate = this.scheduledDelay.scheduleAtFixedRate(this::executionDelaySend, 0L, 1L, TimeUnit.SECONDS);
        ScheduledFuture<?> scheduleAtFixedRate2 = this.scheduledDelay.scheduleAtFixedRate(this::executionSLAMonitor, 0L, 1L, TimeUnit.SECONDS);
        Thread.ofVirtual().name("jdbc-delay-exception-watcher").start(() -> {
            Objects.requireNonNull(scheduleAtFixedRate);
            Await.until(scheduleAtFixedRate::isDone);
            try {
                scheduleAtFixedRate.get();
            } catch (InterruptedException | CancellationException | ExecutionException e) {
                if (e.getCause() == null || e.getCause().getClass() == CannotCreateTransactionException.class) {
                    return;
                }
                log.error("Executor fatal exception in the scheduledDelay thread", e);
                close();
                KestraContext.getContext().shutdown();
            }
        });
        Thread.ofVirtual().name("jdbc-sla-monitor-exception-watcher").start(() -> {
            Objects.requireNonNull(scheduleAtFixedRate2);
            Await.until(scheduleAtFixedRate2::isDone);
            try {
                scheduleAtFixedRate2.get();
            } catch (InterruptedException | CancellationException | ExecutionException e) {
                if (e.getCause() == null || e.getCause().getClass() == CannotCreateTransactionException.class) {
                    return;
                }
                log.error("Executor fatal exception in the scheduledSLAMonitor thread", e);
                close();
                KestraContext.getContext().shutdown();
            }
        });
        this.receiveCancellations.addFirst(this.flowQueue.receive(FlowTopology.class, either -> {
            FlowWithSource flowWithSource;
            Stream stream;
            if (either.isRight()) {
                log.error("Unable to deserialize a flow: {}", ((DeserializationException) either.getRight()).getMessage());
                try {
                    flowWithSource = (FlowWithSource) FlowWithException.from(MAPPER.readTree(((DeserializationException) either.getRight()).getRecord()), (Exception) either.getRight()).orElseThrow(IOException::new);
                } catch (IOException e) {
                    log.error("Unexpected exception when trying to handle a deserialization error", e);
                    return;
                }
            } else {
                flowWithSource = (FlowWithSource) either.getLeft();
            }
            try {
                AbstractJdbcFlowTopologyRepository abstractJdbcFlowTopologyRepository = this.flowTopologyRepository;
                FlowWithSource flowWithSource2 = flowWithSource;
                if (flowWithSource.isDeleted()) {
                    stream = Stream.empty();
                } else {
                    FlowWithSource flowWithSource3 = flowWithSource;
                    stream = this.flowTopologyService.topology(flowWithSource, this.allFlows.stream().filter(flowWithSource4 -> {
                        return Objects.equals(flowWithSource4.getTenantId(), flowWithSource3.getTenantId());
                    }).toList());
                }
                abstractJdbcFlowTopologyRepository.save(flowWithSource2, stream.distinct().toList());
            } catch (Exception e2) {
                log.error("Unable to save flow topology", e2);
            }
        }));
        setState(Service.ServiceState.RUNNING);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reEmitWorkerJobsForWorkers(Configuration configuration, List<String> list) {
        this.workerJobRunningRepository.getWorkerJobWithWorkerDead(configuration.dsl(), list).forEach(workerJobRunning -> {
            if (workerJobRunning instanceof WorkerTaskRunning) {
                WorkerTaskRunning workerTaskRunning = (WorkerTaskRunning) workerJobRunning;
                if (this.skipExecutionService.skipExecution(workerTaskRunning.getTaskRun())) {
                    log.warn("Skipping execution {}", workerTaskRunning.getTaskRun().getExecutionId());
                    this.workerJobRunningRepository.deleteByKey(workerTaskRunning.uid());
                } else {
                    try {
                        this.workerTaskQueue.emit(WorkerTask.builder().taskRun(workerTaskRunning.getTaskRun()).task(workerTaskRunning.getTask()).runContext(workerTaskRunning.getRunContext()).build());
                        this.logService.logTaskRun(workerTaskRunning.getTaskRun(), log, Level.WARN, "Re-emitting WorkerTask.", new Object[0]);
                    } catch (QueueException e) {
                        this.logService.logTaskRun(workerTaskRunning.getTaskRun(), log, Level.ERROR, "Unable to re-emit WorkerTask.", new Object[]{e});
                    }
                }
            }
            if (workerJobRunning instanceof WorkerTriggerRunning) {
                WorkerTriggerRunning workerTriggerRunning = (WorkerTriggerRunning) workerJobRunning;
                try {
                    this.workerTaskQueue.emit(WorkerTrigger.builder().trigger(workerTriggerRunning.getTrigger()).conditionContext(workerTriggerRunning.getConditionContext()).triggerContext(workerTriggerRunning.getTriggerContext()).build());
                    this.logService.logTrigger(workerTriggerRunning.getTriggerContext(), log, Level.WARN, "Re-emitting WorkerTrigger.", new Object[0]);
                } catch (QueueException e2) {
                    this.logService.logTrigger(workerTriggerRunning.getTriggerContext(), log, Level.ERROR, "Unable to re-emit WorkerTrigger.", new Object[]{e2});
                }
            }
        });
    }

    private void executionQueue(Either<Execution, DeserializationException> either) {
        if (either.isRight()) {
            log.error("Unable to deserialize an execution: {}", ((DeserializationException) either.getRight()).getMessage());
            return;
        }
        Execution execution = (Execution) either.getLeft();
        if (this.skipExecutionService.skipExecution(execution)) {
            log.warn("Skipping execution {}", execution.getId());
            return;
        }
        Executor lock = this.executionRepository.lock(execution.getId(), Rethrow.throwFunction(pair -> {
            try {
                Execution execution2 = (Execution) pair.getLeft();
                ExecutorState executorState = (ExecutorState) pair.getRight();
                Flow transform = transform(this.flowRepository.findByExecutionWithSource(execution2), execution2);
                Executor withFlow = new Executor(execution2, (Long) null).withFlow(transform);
                if (execution2.getState().getCurrent() == State.Type.CREATED && execution2.getScheduleDate() != null && execution2.getScheduleDate().isAfter(Instant.now())) {
                    this.executionDelayStorage.save(ExecutionDelay.builder().executionId(withFlow.getExecution().getId()).date(execution2.getScheduleDate()).state(State.Type.RUNNING).delayType(ExecutionDelay.DelayType.RESUME_FLOW).build());
                    return Pair.of(withFlow, executorState);
                }
                if (execution2.getState().getCurrent() == State.Type.CREATED && !ListUtils.isEmpty(transform.getSla())) {
                    Stream stream = transform.getSla().stream();
                    Class<ExecutionMonitoringSLA> cls = ExecutionMonitoringSLA.class;
                    Objects.requireNonNull(ExecutionMonitoringSLA.class);
                    Stream filter = stream.filter((v1) -> {
                        return r1.isInstance(v1);
                    });
                    Class<ExecutionMonitoringSLA> cls2 = ExecutionMonitoringSLA.class;
                    Objects.requireNonNull(ExecutionMonitoringSLA.class);
                    filter.map((v1) -> {
                        return r1.cast(v1);
                    }).map(executionMonitoringSLA -> {
                        return SLAMonitor.builder().executionId(execution2.getId()).slaId(((SLA) executionMonitoringSLA).getId()).deadline(execution2.getState().getStartDate().plus((TemporalAmount) executionMonitoringSLA.getDuration())).build();
                    }).toList().forEach(sLAMonitor -> {
                        this.slaMonitorStorage.save(sLAMonitor);
                    });
                }
                if (execution2.getState().getCurrent() == State.Type.CREATED && transform.getConcurrency() != null) {
                    withFlow = this.executorService.checkConcurrencyLimit(withFlow, transform, execution2, ((ExecutionCount) this.executionRepository.executionCounts(transform.getTenantId(), List.of(new io.kestra.core.models.executions.statistics.Flow(transform.getNamespace(), transform.getId())), List.of(State.Type.RUNNING, State.Type.PAUSED), null, null, null).getFirst()).getCount().longValue());
                    if (withFlow.getExecutionRunning() != null && withFlow.getExecutionRunning().getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
                        this.executionQueuedStorage.save(ExecutionQueued.fromExecutionRunning(withFlow.getExecutionRunning()));
                        return Pair.of(withFlow, executorState);
                    }
                    if (withFlow.getExecution().getState().isTerminated()) {
                        return Pair.of(withFlow, executorState);
                    }
                }
                Executor handleExecutionChangedSLA = this.executorService.handleExecutionChangedSLA(withFlow);
                if (log.isDebugEnabled()) {
                    this.executorService.log(log, true, handleExecutionChangedSLA);
                }
                Executor process = this.executorService.process(handleExecutionChangedSLA);
                if (!process.getNexts().isEmpty() && deduplicateNexts(execution2, executorState, process.getNexts())) {
                    process.withExecution(this.executorService.onNexts(process.getFlow(), process.getExecution(), process.getNexts()), "onNexts");
                }
                if (!process.getWorkerTasks().isEmpty()) {
                    process.getWorkerTasks().stream().filter(workerTask -> {
                        return deduplicateWorkerTask(execution2, executorState, workerTask.getTaskRun());
                    }).forEach(Rethrow.throwConsumer(workerTask2 -> {
                        try {
                            if (TruthUtils.isTruthy(workerTask2.getRunContext().render(workerTask2.getTask().getRunIf()))) {
                                if (workerTask2.getTask().isSendToWorkerTask()) {
                                    this.workerTaskQueue.emit((String) this.workerGroupService.resolveGroupFromJob(workerTask2).map(workerGroup -> {
                                        return workerGroup.getKey();
                                    }).orElse(null), workerTask2);
                                }
                                if (workerTask2.getTask().isFlowable()) {
                                    this.workerTaskResultQueue.emit(new WorkerTaskResult(workerTask2.getTaskRun().withState(State.Type.RUNNING)));
                                }
                            } else {
                                this.workerTaskResultQueue.emit(new WorkerTaskResult(workerTask2.getTaskRun().withState(State.Type.SKIPPED)));
                            }
                        } catch (IllegalVariableEvaluationException e) {
                            this.workerTaskResultQueue.emit(new WorkerTaskResult(workerTask2.getTaskRun().withState(State.Type.FAILED)));
                            workerTask2.getRunContext().logger().error("Unable to evaluate the runIf condition for task {}", workerTask2.getTask().getId(), e);
                        }
                    }));
                }
                if (!process.getWorkerTaskResults().isEmpty()) {
                    process.getWorkerTaskResults().forEach(Rethrow.throwConsumer(workerTaskResult -> {
                        this.workerTaskResultQueue.emit(workerTaskResult);
                    }));
                }
                if (!process.getSubflowExecutionResults().isEmpty()) {
                    process.getSubflowExecutionResults().forEach(Rethrow.throwConsumer(subflowExecutionResult -> {
                        this.subflowExecutionResultQueue.emit(subflowExecutionResult);
                    }));
                }
                if (!process.getExecutionDelays().isEmpty()) {
                    process.getExecutionDelays().forEach(executionDelay -> {
                        this.executionDelayStorage.save(executionDelay);
                    });
                }
                if (!process.getSubflowExecutions().isEmpty()) {
                    this.subflowExecutionStorage.save(process.getSubflowExecutions());
                    process.getSubflowExecutions().stream().filter(subflowExecution -> {
                        return deduplicateSubflowExecution(execution2, executorState, subflowExecution.getParentTaskRun());
                    }).toList().forEach(Rethrow.throwConsumer(subflowExecution2 -> {
                        Execution execution3 = subflowExecution2.getExecution();
                        String format = String.format("Created new execution [[link execution=\"%s\" flowId=\"%s\" namespace=\"%s\"]]", execution3.getId(), execution3.getFlowId(), execution3.getNamespace());
                        log.info(format);
                        this.logQueue.emit(LogEntry.of(subflowExecution2.getParentTaskRun()).toBuilder().level(Level.INFO).message(format).timestamp(subflowExecution2.getParentTaskRun().getState().getStartDate()).thread(Thread.currentThread().getName()).build());
                        this.executionQueue.emit(subflowExecution2.getExecution());
                        if (subflowExecution2.getParentTask().waitForExecution()) {
                            sendSubflowExecutionResult(execution2, subflowExecution2, subflowExecution2.getParentTaskRun());
                        }
                    }));
                }
                return Pair.of(process, executorState);
            } catch (QueueException e) {
                try {
                    this.executionQueue.emit(execution.failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED));
                    return null;
                } catch (QueueException e2) {
                    log.error("Unable to emit the execution {}", execution.getId(), e2);
                    return null;
                }
            }
        }));
        if (lock != null) {
            toExecution(lock);
        }
    }

    private void sendSubflowExecutionResult(Execution execution, SubflowExecution<?> subflowExecution, TaskRun taskRun) {
        Flow findByExecution = this.flowRepository.findByExecution(execution);
        try {
            subflowExecution.getParentTask().createSubflowExecutionResult(this.runContextFactory.of(findByExecution, subflowExecution.getParentTask(), execution, subflowExecution.getParentTaskRun()), taskRun, findByExecution, execution).ifPresent(Rethrow.throwConsumer(subflowExecutionResult -> {
                this.subflowExecutionResultQueue.emit(subflowExecutionResult);
            }));
        } catch (Exception e) {
            log.error("Unable to create the Subflow Execution Result", e);
            try {
                this.subflowExecutionResultQueue.emit(SubflowExecutionResult.builder().executionId(execution.getId()).state(State.Type.FAILED).parentTaskRun(taskRun.withState(State.Type.FAILED).withAttempts(List.of(TaskRunAttempt.builder().state(new State().withState(State.Type.FAILED)).build()))).build());
            } catch (QueueException e2) {
                log.error("Unable to emit the subflow execution result", e2);
            }
        }
    }

    private void workerTaskResultQueue(Either<WorkerTaskResult, DeserializationException> either) {
        if (either.isRight()) {
            log.error("Unable to deserialize a worker task result: {}", ((DeserializationException) either.getRight()).getMessage());
            return;
        }
        WorkerTaskResult workerTaskResult = (WorkerTaskResult) either.getLeft();
        if (this.skipExecutionService.skipExecution(workerTaskResult.getTaskRun())) {
            log.warn("Skipping execution {}", workerTaskResult.getTaskRun().getExecutionId());
            return;
        }
        if (log.isDebugEnabled()) {
            this.executorService.log(log, true, workerTaskResult);
        }
        Executor lock = this.executionRepository.lock(workerTaskResult.getTaskRun().getExecutionId(), pair -> {
            Execution execution = (Execution) pair.getLeft();
            Executor executor = new Executor(execution, (Long) null);
            if (execution == null) {
                throw new IllegalStateException("Execution state don't exist for " + workerTaskResult.getTaskRun().getExecutionId() + ", receive " + String.valueOf(workerTaskResult));
            }
            if (!execution.hasTaskRunJoinable(workerTaskResult.getTaskRun())) {
                return null;
            }
            try {
                Execution addDynamicTaskRun = this.executorService.addDynamicTaskRun(executor.getExecution(), this.flowRepository.findByExecution(executor.getExecution()), workerTaskResult);
                if (addDynamicTaskRun != null) {
                    executor = executor.withExecution(addDynamicTaskRun, "addDynamicTaskRun");
                }
                TaskRun taskRun = workerTaskResult.getTaskRun();
                Execution withTaskRun = executor.getExecution().withTaskRun(taskRun);
                if (taskRun.getState().getCurrent() == State.Type.KILLED && taskRun.getParentTaskRunId() != null) {
                    withTaskRun = this.executionService.killParentTaskruns(taskRun, withTaskRun);
                }
                executor = executor.withExecution(withTaskRun, "joinWorkerResult");
                if (taskRun.getState().isTerminated()) {
                    this.metricRegistry.counter("executor.taskrun.ended.count", this.metricRegistry.tags(workerTaskResult, new String[0])).increment();
                    this.metricRegistry.timer("executor.taskrun.ended.duration", this.metricRegistry.tags(workerTaskResult, new String[0])).record(taskRun.getState().getDuration());
                    log.trace("TaskRun terminated: {}", taskRun);
                    this.workerJobRunningRepository.deleteByKey(taskRun.getId());
                }
                return Pair.of(executor, (ExecutorState) pair.getRight());
            } catch (InternalException e) {
                return Pair.of(handleFailedExecutionFromExecutor(executor, e), (ExecutorState) pair.getRight());
            }
        });
        if (lock != null) {
            toExecution(lock);
        }
    }

    private void subflowExecutionResultQueue(Either<SubflowExecutionResult, DeserializationException> either) {
        if (either.isRight()) {
            log.error("Unable to deserialize a subflow execution result: {}", ((DeserializationException) either.getRight()).getMessage());
            return;
        }
        SubflowExecutionResult subflowExecutionResult = (SubflowExecutionResult) either.getLeft();
        if (this.skipExecutionService.skipExecution(subflowExecutionResult.getExecutionId())) {
            log.warn("Skipping execution {}", subflowExecutionResult.getExecutionId());
            return;
        }
        if (this.skipExecutionService.skipExecution(subflowExecutionResult.getParentTaskRun())) {
            log.warn("Skipping execution {}", subflowExecutionResult.getParentTaskRun().getExecutionId());
            return;
        }
        if (log.isDebugEnabled()) {
            this.executorService.log(log, true, subflowExecutionResult);
        }
        Executor lock = this.executionRepository.lock(subflowExecutionResult.getParentTaskRun().getExecutionId(), pair -> {
            TaskRun parentTaskRun;
            Execution execution = (Execution) pair.getLeft();
            Executor executor = new Executor(execution, (Long) null);
            if (execution == null) {
                throw new IllegalStateException("Execution state don't exist for " + subflowExecutionResult.getParentTaskRun().getExecutionId() + ", receive " + String.valueOf(subflowExecutionResult));
            }
            if (!execution.hasTaskRunJoinable(subflowExecutionResult.getParentTaskRun())) {
                return null;
            }
            try {
                Flow findByExecution = this.flowRepository.findByExecution(executor.getExecution());
                ForEachItem.ForEachItemExecutable findTaskByTaskId = findByExecution.findTaskByTaskId(subflowExecutionResult.getParentTaskRun().getTaskId());
                if (findTaskByTaskId instanceof ForEachItem.ForEachItemExecutable) {
                    ForEachItem.ForEachItemExecutable forEachItemExecutable = findTaskByTaskId;
                    parentTaskRun = ExecutableUtils.manageIterations(this.runContextFactory.of(findByExecution, findTaskByTaskId, executor.getExecution(), subflowExecutionResult.getParentTaskRun()).storage(), subflowExecutionResult.getParentTaskRun(), executor.getExecution(), forEachItemExecutable.getTransmitFailed().booleanValue(), forEachItemExecutable.isAllowFailure(), forEachItemExecutable.isAllowWarning());
                } else {
                    parentTaskRun = subflowExecutionResult.getParentTaskRun();
                }
                Execution withTaskRun = executor.getExecution().withTaskRun(parentTaskRun);
                if (parentTaskRun.getState().getCurrent() == State.Type.KILLED && parentTaskRun.getParentTaskRunId() != null) {
                    withTaskRun = this.executionService.killParentTaskruns(parentTaskRun, withTaskRun);
                }
                executor = executor.withExecution(withTaskRun, "joinSubflowExecutionResult");
                if (parentTaskRun.getState().isTerminated()) {
                    this.metricRegistry.counter("executor.taskrun.ended.count", this.metricRegistry.tags(subflowExecutionResult, new String[0])).increment();
                    this.metricRegistry.timer("executor.taskrun.ended.duration", this.metricRegistry.tags(subflowExecutionResult, new String[0])).record(parentTaskRun.getState().getDuration());
                    log.trace("TaskRun terminated: {}", parentTaskRun);
                }
                return Pair.of(executor, (ExecutorState) pair.getRight());
            } catch (InternalException e) {
                return Pair.of(handleFailedExecutionFromExecutor(executor, e), (ExecutorState) pair.getRight());
            }
        });
        if (lock != null) {
            toExecution(lock);
        }
    }

    private void killQueue(Either<ExecutionKilled, DeserializationException> either) {
        if (either.isRight()) {
            log.error("Unable to deserialize a killed execution: {}", ((DeserializationException) either.getRight()).getMessage());
            return;
        }
        ExecutionKilledExecution executionKilledExecution = (ExecutionKilled) either.getLeft();
        if (executionKilledExecution.getState() != ExecutionKilled.State.EXECUTED && (executionKilledExecution instanceof ExecutionKilledExecution)) {
            ExecutionKilledExecution executionKilledExecution2 = executionKilledExecution;
            if (this.skipExecutionService.skipExecution(executionKilledExecution2.getExecutionId())) {
                log.warn("Skipping execution {}", executionKilledExecution2.getExecutionId());
                return;
            }
            if (log.isDebugEnabled()) {
                this.executorService.log(log, true, executionKilledExecution2);
            }
            try {
                this.killQueue.emit(ExecutionKilledExecution.builder().executionId(executionKilledExecution2.getExecutionId()).isOnKillCascade(false).state(ExecutionKilled.State.EXECUTED).tenantId(executionKilledExecution2.getTenantId()).build());
            } catch (QueueException e) {
                log.error("Unable to kill the execution {}", executionKilledExecution2.getExecutionId(), e);
            }
            Executor mayTransitExecutionToKillingStateAndGet = mayTransitExecutionToKillingStateAndGet(executionKilledExecution2.getExecutionId());
            if (((Boolean) Optional.ofNullable(executionKilledExecution2.getIsOnKillCascade()).orElse(true)).booleanValue()) {
                this.executionService.killSubflowExecutions(executionKilledExecution.getTenantId(), executionKilledExecution2.getExecutionId()).doOnNext(executionKilledExecution3 -> {
                    try {
                        this.killQueue.emit(executionKilledExecution3);
                    } catch (QueueException e2) {
                        log.error("Unable to kill the execution {}", executionKilledExecution3.getExecutionId(), e2);
                    }
                }).blockLast();
            }
            if (mayTransitExecutionToKillingStateAndGet != null) {
                toExecution(mayTransitExecutionToKillingStateAndGet, true);
            }
        }
    }

    private Executor mayTransitExecutionToKillingStateAndGet(String str) {
        return this.executionRepository.lock(str, pair -> {
            Execution execution = (Execution) pair.getLeft();
            return Pair.of(new Executor(execution, (Long) null).withExecution(this.executionService.kill(execution, this.flowRepository.findByExecution(execution)), "joinKillingExecution"), (ExecutorState) pair.getRight());
        });
    }

    private void toExecution(Executor executor) {
        toExecution(executor, false);
    }

    private void toExecution(Executor executor, boolean z) {
        try {
            boolean z2 = false;
            boolean z3 = false;
            if (executor.getException() != null) {
                executor = handleFailedExecutionFromExecutor(executor, executor.getException());
                z2 = true;
                z3 = true;
            } else if (executor.isExecutionUpdated()) {
                z2 = true;
            }
            if (z2) {
                if (log.isDebugEnabled()) {
                    this.executorService.log(log, false, executor);
                }
                if (z3) {
                    this.executionQueue.emit(executor.getExecution());
                } else {
                    ((JdbcQueue) this.executionQueue).emitOnly(null, executor.getExecution());
                }
                if (this.executorService.canBePurged(executor)) {
                    this.executorStateStorage.delete(executor.getExecution());
                }
                Execution execution = executor.getExecution();
                if (!execution.getState().getCurrent().equals(executor.getOriginalState())) {
                    this.flowTriggerService.computeExecutionsFromFlowTriggers(execution, this.allFlows.stream().map(flowWithSource -> {
                        return flowWithSource.toFlow();
                    }).toList(), Optional.of(this.multipleConditionStorage)).forEach(Rethrow.throwConsumer(execution2 -> {
                        this.executionQueue.emit(execution2);
                    }));
                }
                if (executor.getFlow() != null && this.conditionService.isTerminatedWithListeners(executor.getFlow(), executor.getExecution())) {
                    this.subflowExecutionStorage.get(execution.getId()).ifPresent(subflowExecution -> {
                        if (subflowExecution.getParentTask() != null && subflowExecution.getParentTask().waitForExecution()) {
                            sendSubflowExecutionResult(execution, subflowExecution, subflowExecution.getParentTaskRun().withState(execution.getState().getCurrent()));
                        }
                        this.subflowExecutionStorage.delete(subflowExecution);
                    });
                    if (!ListUtils.isEmpty(executor.getFlow().getSla())) {
                        Stream stream = executor.getFlow().getSla().stream();
                        Class<ExecutionMonitoringSLA> cls = ExecutionMonitoringSLA.class;
                        Objects.requireNonNull(ExecutionMonitoringSLA.class);
                        if (stream.anyMatch((v1) -> {
                            return r1.isInstance(v1);
                        })) {
                            this.slaMonitorStorage.purge(executor.getExecution().getId());
                        }
                    }
                    if (executor.getFlow().getConcurrency() != null && executor.getFlow().getConcurrency().getBehavior() == Concurrency.Behavior.QUEUE) {
                        this.executionQueuedStorage.pop(executor.getFlow().getTenantId(), executor.getFlow().getNamespace(), executor.getFlow().getId(), Rethrow.throwConsumer(execution3 -> {
                            this.executionQueue.emit(execution3.withState(State.Type.RUNNING));
                        }));
                    }
                }
            }
        } catch (QueueException e) {
            if (z) {
                return;
            }
            this.executionRepository.lock(executor.getExecution().getId(), pair -> {
                Execution execution4 = (Execution) pair.getLeft();
                try {
                    this.executionQueue.emit(execution4.failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED));
                    return null;
                } catch (QueueException e2) {
                    log.error("Unable to emit the execution {}", execution4.getId(), e2);
                    return null;
                }
            });
        }
    }

    private Flow transform(FlowWithSource flowWithSource, Execution execution) {
        if (this.templateExecutorInterface.isPresent()) {
            try {
                flowWithSource = Template.injectTemplate(flowWithSource, execution, (str, str2, str3) -> {
                    return (io.kestra.core.models.templates.Template) this.templateExecutorInterface.get().findById(str, str2, str3).orElse(null);
                }).withSource(flowWithSource.getSource());
            } catch (InternalException e) {
                log.warn("Failed to inject template", e);
            }
        }
        return this.pluginDefaultService.injectDefaults(flowWithSource, execution);
    }

    private void executionDelaySend() {
        if (this.shutdown.get()) {
            return;
        }
        this.executionDelayStorage.get(executionDelay -> {
            Executor lock = this.executionRepository.lock(executionDelay.getExecutionId(), pair -> {
                Executor executor = new Executor((Execution) pair.getLeft(), (Long) null);
                Flow findByExecution = this.flowRepository.findByExecution((Execution) pair.getLeft());
                try {
                    if (executionDelay.getDelayType().equals(ExecutionDelay.DelayType.RESUME_FLOW)) {
                        executor = executionDelay.getTaskRunId() == null ? executor.withExecution(((Execution) pair.getKey()).withState(executionDelay.getState()), "pausedRestart") : executor.withExecution(this.executionService.markAs((Execution) pair.getKey(), findByExecution, executionDelay.getTaskRunId(), executionDelay.getState()), "pausedRestart");
                    } else if (executionDelay.getDelayType().equals(ExecutionDelay.DelayType.RESTART_FAILED_TASK)) {
                        executor = executor.withExecution(this.executionService.retryTask((Execution) pair.getKey(), executionDelay.getTaskRunId()), "retryFailedTask");
                    } else if (executionDelay.getDelayType().equals(ExecutionDelay.DelayType.RESTART_FAILED_FLOW)) {
                        executor = executor.withExecution(this.executionService.replay(executor.getExecution(), (String) null, (Integer) null), "retryFailedFlow");
                    } else if (executionDelay.getDelayType().equals(ExecutionDelay.DelayType.CONTINUE_FLOWABLE)) {
                        executor = executor.withExecution(this.executionService.retryWaitFor(executor.getExecution(), executionDelay.getTaskRunId()), "continueLoop");
                    }
                } catch (Exception e) {
                    executor = handleFailedExecutionFromExecutor(executor, e);
                }
                return Pair.of(executor, (ExecutorState) pair.getRight());
            });
            if (lock != null) {
                toExecution(lock);
            }
        });
    }

    private void executionSLAMonitor() {
        if (this.shutdown.get()) {
            return;
        }
        this.slaMonitorStorage.processExpired(Instant.now(), sLAMonitor -> {
            Executor lock = this.executionRepository.lock(sLAMonitor.getExecutionId(), pair -> {
                Executor executor = new Executor((Execution) pair.getLeft(), (Long) null);
                Optional findFirst = this.flowRepository.findByExecution((Execution) pair.getLeft()).getSla().stream().filter(sla -> {
                    return sla.getId().equals(sLAMonitor.getSlaId());
                }).findFirst();
                if (findFirst.isEmpty()) {
                    log.debug("Cannot find the SLA '{}' if the flow for execution '{}', ignoring it.", sLAMonitor.getSlaId(), sLAMonitor.getExecutionId());
                    return null;
                }
                try {
                    RunContext of = this.runContextFactory.of(executor.getFlow(), executor.getExecution());
                    Optional evaluateExecutionMonitoringSLA = this.slaService.evaluateExecutionMonitoringSLA(of, executor.getExecution(), (SLA) findFirst.get());
                    if (evaluateExecutionMonitoringSLA.isPresent()) {
                        log.info("Processing expired SLA monitor '{}' for execution '{}'.", sLAMonitor.getSlaId(), sLAMonitor.getExecutionId());
                        executor = this.executorService.processViolation(of, executor, (Violation) evaluateExecutionMonitoringSLA.get());
                    }
                } catch (Exception e) {
                    executor = handleFailedExecutionFromExecutor(executor, e);
                }
                return Pair.of(executor, (ExecutorState) pair.getRight());
            });
            if (lock != null) {
                toExecution(lock);
            }
        });
    }

    private boolean deduplicateNexts(Execution execution, ExecutorState executorState, List<TaskRun> list) {
        return list.stream().anyMatch(taskRun -> {
            String str = taskRun.getParentTaskRunId() + "-" + taskRun.getTaskId() + "-" + taskRun.getValue() + "-" + (taskRun.getAttempts() != null ? taskRun.getAttempts().size() : 0) + taskRun.getIteration();
            if (executorState.getChildDeduplication().containsKey(str)) {
                log.trace("Duplicate Nexts on execution '{}' with key '{}'", execution.getId(), str);
                return false;
            }
            executorState.getChildDeduplication().put(str, taskRun.getId());
            return true;
        });
    }

    private boolean deduplicateWorkerTask(Execution execution, ExecutorState executorState, TaskRun taskRun) {
        String str = taskRun.getId() + (taskRun.getAttempts() != null ? taskRun.getAttempts().size() : 0) + taskRun.getIteration();
        if (((State.Type) executorState.getWorkerTaskDeduplication().get(str)) == taskRun.getState().getCurrent()) {
            log.trace("Duplicate WorkerTask on execution '{}' for taskRun '{}', value '{}, taskId '{}'", new Object[]{execution.getId(), taskRun.getId(), taskRun.getValue(), taskRun.getTaskId()});
            return false;
        }
        executorState.getWorkerTaskDeduplication().put(str, taskRun.getState().getCurrent());
        return true;
    }

    private boolean deduplicateSubflowExecution(Execution execution, ExecutorState executorState, TaskRun taskRun) {
        String deduplicationKey = deduplicationKey(taskRun);
        if (((State.Type) executorState.getSubflowExecutionDeduplication().get(deduplicationKey)) != taskRun.getState().getCurrent()) {
            executorState.getSubflowExecutionDeduplication().put(deduplicationKey, taskRun.getState().getCurrent());
            return true;
        }
        Logger logger = log;
        Object[] objArr = new Object[5];
        objArr[0] = execution.getId();
        objArr[1] = taskRun.getId();
        objArr[2] = taskRun.getValue();
        objArr[3] = taskRun.getTaskId();
        objArr[4] = taskRun.getAttempts() == null ? null : Integer.valueOf(taskRun.getAttempts().size() + 1);
        logger.trace("Duplicate SubflowExecution on execution '{}' for taskRun '{}', value '{}', taskId '{}', attempt '{}'", objArr);
        return false;
    }

    private String deduplicationKey(TaskRun taskRun) {
        return taskRun.getId() + (taskRun.getAttempts() != null ? "-" + taskRun.getAttempts().size() : "") + (taskRun.getIteration() == null ? "" : "-" + taskRun.getIteration());
    }

    private Executor handleFailedExecutionFromExecutor(Executor executor, Exception exc) {
        Execution.FailedExecutionWithLog failedExecutionFromExecutor = executor.getExecution().failedExecutionFromExecutor(exc);
        failedExecutionFromExecutor.getLogs().forEach(logEntry -> {
            try {
                this.logQueue.emitAsync(logEntry);
            } catch (QueueException e) {
            }
        });
        return executor.withExecution(failedExecutionFromExecutor.getExecution(), "exception");
    }

    @PreDestroy
    public void close() {
        if (this.shutdown.compareAndSet(false, true)) {
            if (log.isDebugEnabled()) {
                log.debug("Terminating");
            }
            setState(Service.ServiceState.TERMINATING);
            this.receiveCancellations.forEach((v0) -> {
                v0.run();
            });
            this.scheduledDelay.shutdown();
            setState(Service.ServiceState.TERMINATED_GRACEFULLY);
            if (log.isDebugEnabled()) {
                log.debug("Closed ({})", this.state.get().name());
            }
        }
    }

    private void setState(Service.ServiceState serviceState) {
        this.state.set(serviceState);
        this.eventPublisher.publishEvent(new ServiceStateChangeEvent(this));
    }

    public String getId() {
        return this.id;
    }

    public Service.ServiceType getType() {
        return Service.ServiceType.EXECUTOR;
    }

    public Service.ServiceState getState() {
        return this.state.get();
    }
}
