package io.kestra.runner.memory;

import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.Executor;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.runners.ExecutorService;
import io.kestra.core.runners.MemoryFlowExecutor;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.runners.WorkerTask;
import io.kestra.core.runners.WorkerTaskExecution;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.TaskDefaultService;
import io.kestra.core.tasks.flows.Template;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@MemoryQueueEnabled
/* loaded from: input_file:io/kestra/runner/memory/MemoryExecutor.class */
public class MemoryExecutor implements ExecutorInterface {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(MemoryExecutor.class);
    private static final MemoryMultipleConditionStorage multipleConditionStorage = new MemoryMultipleConditionStorage();
    private static final ConcurrentHashMap<String, ExecutionState> EXECUTIONS = new ConcurrentHashMap<>();
    private static final ConcurrentHashMap<String, WorkerTaskExecution> WORKERTASKEXECUTIONS_WATCHER = new ConcurrentHashMap<>();
    private List<Flow> allFlows;
    private final ScheduledExecutorService schedulerDelay = Executors.newSingleThreadScheduledExecutor();

    @Inject
    private ApplicationContext applicationContext;

    @Inject
    private FlowRepositoryInterface flowRepository;

    @Inject
    @Named("executionQueue")
    private QueueInterface<Execution> executionQueue;

    @Inject
    @Named("workerTaskQueue")
    private QueueInterface<WorkerTask> workerTaskQueue;

    @Inject
    @Named("workerTaskResultQueue")
    private QueueInterface<WorkerTaskResult> workerTaskResultQueue;

    @Inject
    @Named("workerTaskLogQueue")
    private QueueInterface<LogEntry> logQueue;

    @Inject
    private FlowService flowService;

    @Inject
    private TaskDefaultService taskDefaultService;

    @Inject
    private Template.TemplateExecutorInterface templateExecutorInterface;

    @Inject
    private ExecutorService executorService;

    @Inject
    private ConditionService conditionService;

    @Inject
    private RunContextFactory runContextFactory;

    @Inject
    private MetricRegistry metricRegistry;

    @Inject
    private ExecutionService executionService;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/kestra/runner/memory/MemoryExecutor$ExecutionState.class */
    public static class ExecutionState {
        private final Execution execution;
        private Map<String, TaskRun> taskRuns;
        private Map<String, State.Type> workerTaskDeduplication;
        private Map<String, String> childDeduplication;

        public ExecutionState(Execution execution) {
            this.taskRuns = new ConcurrentHashMap();
            this.workerTaskDeduplication = new ConcurrentHashMap();
            this.childDeduplication = new ConcurrentHashMap();
            this.execution = execution;
        }

        public ExecutionState(ExecutionState executionState, Execution execution) {
            this.taskRuns = new ConcurrentHashMap();
            this.workerTaskDeduplication = new ConcurrentHashMap();
            this.childDeduplication = new ConcurrentHashMap();
            this.execution = execution;
            this.taskRuns = executionState.taskRuns;
            this.workerTaskDeduplication = executionState.workerTaskDeduplication;
            this.childDeduplication = executionState.childDeduplication;
        }

        private static String taskRunKey(TaskRun taskRun) {
            return taskRun.getId() + "-" + (taskRun.getValue() == null ? "null" : taskRun.getValue());
        }

        public ExecutionState from(Execution execution) {
            return new ExecutionState(this, execution.withTaskRunList((List) execution.getTaskRunList().stream().map(taskRun -> {
                if (!this.taskRuns.containsKey(taskRunKey(taskRun))) {
                    return taskRun;
                }
                TaskRun taskRun = this.taskRuns.get(taskRunKey(taskRun));
                return execution.hasTaskRunJoinable(taskRun) ? taskRun : taskRun;
            }).collect(Collectors.toList())));
        }

        public ExecutionState from(WorkerTaskResult workerTaskResult, ExecutorService executorService, FlowRepositoryInterface flowRepositoryInterface) throws InternalException {
            this.taskRuns.compute(taskRunKey(workerTaskResult.getTaskRun()), (str, taskRun) -> {
                return workerTaskResult.getTaskRun();
            });
            Execution addDynamicTaskRun = executorService.addDynamicTaskRun(this.execution, flowRepositoryInterface.findByExecution(this.execution), workerTaskResult);
            return addDynamicTaskRun != null ? new ExecutionState(this, addDynamicTaskRun) : this;
        }
    }

    public void run() {
        this.applicationContext.registerSingleton(new MemoryFlowExecutor(this.flowRepository));
        this.allFlows = this.flowRepository.findAll();
        this.executionQueue.receive(MemoryExecutor.class, this::executionQueue);
        this.workerTaskResultQueue.receive(MemoryExecutor.class, this::workerTaskResultQueue);
    }

    private void executionQueue(Execution execution) {
        if (execution.getTaskRunList() == null || execution.getTaskRunList().size() == 0 || execution.getState().isCreated()) {
            handleExecution(saveExecution(execution));
        }
    }

    private Flow transform(Flow flow, Execution execution) {
        try {
            flow = Template.injectTemplate(flow, execution, (str, str2) -> {
                return (io.kestra.core.models.templates.Template) this.templateExecutorInterface.findById(str, str2).orElse(null);
            });
        } catch (InternalException e) {
            log.warn("Failed to inject template", e);
        }
        return this.taskDefaultService.injectDefaults(flow, execution);
    }

    private void handleExecution(ExecutionState executionState) {
        synchronized (this) {
            Flow transform = transform(this.flowRepository.findByExecution(executionState.execution), executionState.execution);
            Execution execution = executionState.execution;
            Executor withFlow = new Executor(execution, (Long) null).withFlow(transform);
            if (log.isDebugEnabled()) {
                this.executorService.log(log, true, withFlow);
            }
            Executor process = this.executorService.process(withFlow);
            if (process.getNexts().size() > 0 && deduplicateNexts(execution, process.getNexts())) {
                process.withExecution(this.executorService.onNexts(process.getFlow(), process.getExecution(), process.getNexts()), "onNexts");
            }
            if (process.getException() != null) {
                handleFailedExecutionFromExecutor(process, process.getException());
            } else if (process.isExecutionUpdated()) {
                toExecution(process);
            }
            if (process.getWorkerTasks().size() > 0) {
                List list = (List) process.getWorkerTasks().stream().filter(workerTask -> {
                    return deduplicateWorkerTask(execution, workerTask.getTaskRun());
                }).collect(Collectors.toList());
                Stream filter = list.stream().filter(workerTask2 -> {
                    return workerTask2.getTask().isSendToWorkerTask();
                });
                QueueInterface<WorkerTask> queueInterface = this.workerTaskQueue;
                Objects.requireNonNull(queueInterface);
                filter.forEach((v1) -> {
                    r1.emit(v1);
                });
                Stream map = list.stream().filter(workerTask3 -> {
                    return workerTask3.getTask().isFlowable();
                }).map(workerTask4 -> {
                    return new WorkerTaskResult(workerTask4.withTaskRun(workerTask4.getTaskRun().withState(State.Type.RUNNING)));
                });
                QueueInterface<WorkerTaskResult> queueInterface2 = this.workerTaskResultQueue;
                Objects.requireNonNull(queueInterface2);
                map.forEach((v1) -> {
                    r1.emit(v1);
                });
            }
            if (process.getWorkerTaskResults().size() > 0) {
                List workerTaskResults = process.getWorkerTaskResults();
                QueueInterface<WorkerTaskResult> queueInterface3 = this.workerTaskResultQueue;
                Objects.requireNonNull(queueInterface3);
                workerTaskResults.forEach((v1) -> {
                    r1.emit(v1);
                });
            }
            if (process.getExecutionDelays().size() > 0) {
                process.getExecutionDelays().forEach(executionDelay -> {
                    long between = ChronoUnit.MICROS.between(Instant.now(), executionDelay.getDate());
                    if (between <= 0) {
                        between = 1;
                    }
                    this.schedulerDelay.schedule(() -> {
                        try {
                            this.executionQueue.emit(this.executionService.markAs(EXECUTIONS.get(executionDelay.getExecutionId()).execution, executionDelay.getTaskRunId(), State.Type.RUNNING));
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }, between, TimeUnit.MICROSECONDS);
                });
            }
            if (process.getWorkerTaskExecutions().size() > 0) {
                process.getWorkerTaskExecutions().forEach(workerTaskExecution -> {
                    WORKERTASKEXECUTIONS_WATCHER.put(workerTaskExecution.getExecution().getId(), workerTaskExecution);
                    this.executionQueue.emit(workerTaskExecution.getExecution());
                });
            }
            if (this.conditionService.isTerminatedWithListeners(transform, execution)) {
                this.executionQueue.emit(execution);
            }
            if (this.conditionService.isTerminatedWithListeners(transform, execution)) {
                multipleConditionStorage.save(this.flowService.multipleFlowTrigger(this.allFlows.stream(), transform, execution, multipleConditionStorage));
                List flowTriggerExecution = this.flowService.flowTriggerExecution(this.allFlows.stream(), execution, multipleConditionStorage);
                QueueInterface<Execution> queueInterface4 = this.executionQueue;
                Objects.requireNonNull(queueInterface4);
                flowTriggerExecution.forEach((v1) -> {
                    r1.emit(v1);
                });
                List multipleFlowToDelete = this.flowService.multipleFlowToDelete(this.allFlows.stream(), multipleConditionStorage);
                MemoryMultipleConditionStorage memoryMultipleConditionStorage = multipleConditionStorage;
                Objects.requireNonNull(memoryMultipleConditionStorage);
                multipleFlowToDelete.forEach(memoryMultipleConditionStorage::delete);
            }
            if (this.conditionService.isTerminatedWithListeners(transform, execution) && WORKERTASKEXECUTIONS_WATCHER.containsKey(execution.getId())) {
                WorkerTaskExecution workerTaskExecution2 = WORKERTASKEXECUTIONS_WATCHER.get(execution.getId());
                this.workerTaskResultQueue.emit(workerTaskExecution2.getTask().createWorkerTaskResult(this.runContextFactory, workerTaskExecution2, this.flowRepository.findByExecution(execution), execution));
                WORKERTASKEXECUTIONS_WATCHER.remove(execution.getId());
            }
        }
    }

    private void handleFailedExecutionFromExecutor(Executor executor, Exception exc) {
        Execution.FailedExecutionWithLog failedExecutionFromExecutor = executor.getExecution().failedExecutionFromExecutor(exc);
        try {
            List logs = failedExecutionFromExecutor.getLogs();
            QueueInterface<LogEntry> queueInterface = this.logQueue;
            Objects.requireNonNull(queueInterface);
            logs.forEach((v1) -> {
                r1.emit(v1);
            });
            toExecution(executor.withExecution(failedExecutionFromExecutor.getExecution(), "exception"));
        } catch (Exception e) {
            log.error("Failed to produce {}", exc.getMessage(), e);
        }
    }

    private ExecutionState saveExecution(Execution execution) {
        return EXECUTIONS.compute(execution.getId(), (str, executionState) -> {
            return executionState == null ? new ExecutionState(execution) : executionState.from(execution);
        });
    }

    private void toExecution(Executor executor) {
        if (log.isDebugEnabled()) {
            this.executorService.log(log, false, executor);
        }
        this.executionQueue.emit(executor.getExecution());
        handleExecution(saveExecution(executor.getExecution()));
        if (this.executorService.canBePurged(executor)) {
            EXECUTIONS.remove(executor.getExecution().getId());
        }
    }

    private void workerTaskResultQueue(WorkerTaskResult workerTaskResult) {
        synchronized (this) {
            if (log.isDebugEnabled()) {
                this.executorService.log(log, true, workerTaskResult);
            }
            if (workerTaskResult.getTaskRun().getState().isTerninated()) {
                this.metricRegistry.counter("executor.taskrun.ended.count", this.metricRegistry.tags(workerTaskResult, new String[0])).increment();
                this.metricRegistry.timer("executor.taskrun.ended.duration", this.metricRegistry.tags(workerTaskResult, new String[0])).record(workerTaskResult.getTaskRun().getState().getDuration());
            }
            EXECUTIONS.compute(workerTaskResult.getTaskRun().getExecutionId(), (str, executionState) -> {
                if (executionState == null) {
                    throw new IllegalStateException("Execution state don't exist for " + str + ", receive " + workerTaskResult);
                }
                if (!executionState.execution.hasTaskRunJoinable(workerTaskResult.getTaskRun())) {
                    return executionState;
                }
                try {
                    return executionState.from(workerTaskResult, this.executorService, this.flowRepository);
                } catch (InternalException e) {
                    return new ExecutionState(executionState, executionState.execution.failedExecutionFromExecutor(e).getExecution());
                }
            });
            toExecution(new Executor(EXECUTIONS.get(workerTaskResult.getTaskRun().getExecutionId()).execution, (Long) null).withFlow(transform(this.flowRepository.findByExecution(EXECUTIONS.get(workerTaskResult.getTaskRun().getExecutionId()).execution), EXECUTIONS.get(workerTaskResult.getTaskRun().getExecutionId()).execution)));
        }
    }

    private boolean deduplicateWorkerTask(Execution execution, TaskRun taskRun) {
        ExecutionState executionState = EXECUTIONS.get(execution.getId());
        String str = taskRun.getExecutionId() + "-" + taskRun.getId();
        if (executionState.workerTaskDeduplication.get(str) == taskRun.getState().getCurrent()) {
            log.trace("Duplicate WorkerTask on execution '{}' for taskRun '{}', value '{}, taskId '{}'", new Object[]{execution.getId(), taskRun.getId(), taskRun.getValue(), taskRun.getTaskId()});
            return false;
        }
        executionState.workerTaskDeduplication.put(str, taskRun.getState().getCurrent());
        return true;
    }

    private boolean deduplicateNexts(Execution execution, List<TaskRun> list) {
        ExecutionState executionState = EXECUTIONS.get(execution.getId());
        return list.stream().anyMatch(taskRun -> {
            String str = taskRun.getParentTaskRunId() + "-" + taskRun.getTaskId() + "-" + taskRun.getValue();
            if (executionState.childDeduplication.containsKey(str)) {
                log.trace("Duplicate Nexts on execution '{}' with key '{}'", execution.getId(), str);
                return false;
            }
            executionState.childDeduplication.put(str, taskRun.getId());
            return true;
        });
    }

    public void close() throws IOException {
        this.executionQueue.close();
        this.workerTaskQueue.close();
        this.workerTaskResultQueue.close();
        this.logQueue.close();
    }
}
