package io.kestra.jdbc.runner;

import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.DefaultFlowExecutor;
import io.kestra.core.runners.Executor;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.runners.ExecutorService;
import io.kestra.core.runners.ExecutorState;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.services.AbstractFlowTriggerService;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.TaskDefaultService;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.core.tasks.flows.Template;
import io.kestra.core.topologies.FlowTopologyService;
import io.kestra.core.utils.Await;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository;
import io.micronaut.context.ApplicationContext;
import io.micronaut.transaction.exceptions.CannotCreateTransactionException;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

@Singleton
@JdbcRunnerEnabled
/* loaded from: input_file:io/kestra/jdbc/runner/JdbcExecutor.class */
public class JdbcExecutor implements ExecutorInterface {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(JdbcExecutor.class);
    private final ScheduledExecutorService schedulerDelay = Executors.newSingleThreadScheduledExecutor();
    private Boolean isShutdown = false;

    @Inject
    private ApplicationContext applicationContext;

    @Inject
    private FlowRepositoryInterface flowRepository;

    @Inject
    private AbstractJdbcExecutionRepository executionRepository;

    @Inject
    @Named("executionQueue")
    private QueueInterface<Execution> executionQueue;

    @Inject
    @Named("workerJobQueue")
    private QueueInterface<WorkerJob> workerTaskQueue;

    @Inject
    @Named("workerTaskResultQueue")
    private QueueInterface<WorkerTaskResult> workerTaskResultQueue;

    @Inject
    @Named("workerTaskLogQueue")
    private QueueInterface<LogEntry> logQueue;

    @Inject
    private RunContextFactory runContextFactory;

    @Inject
    private TaskDefaultService taskDefaultService;

    @Inject
    private Optional<Template.TemplateExecutorInterface> templateExecutorInterface;

    @Inject
    private ExecutorService executorService;

    @Inject
    private ConditionService conditionService;

    @Inject
    private MultipleConditionStorageInterface multipleConditionStorage;

    @Inject
    private AbstractFlowTriggerService flowTriggerService;

    @Inject
    private MetricRegistry metricRegistry;

    @Inject
    protected FlowListenersInterface flowListeners;

    @Inject
    private AbstractJdbcWorkerTaskExecutionStorage workerTaskExecutionStorage;

    @Inject
    private ExecutionService executionService;

    @Inject
    private AbstractJdbcExecutionDelayStorage abstractExecutionDelayStorage;

    @Inject
    private AbstractJdbcExecutorStateStorage executorStateStorage;

    @Inject
    private FlowTopologyService flowTopologyService;

    @Inject
    private AbstractJdbcFlowTopologyRepository flowTopologyRepository;
    protected List<Flow> allFlows;

    @Inject
    @Named("flowQueue")
    private QueueInterface<Flow> flowQueue;

    @Inject
    private WorkerGroupService workerGroupService;

    @Inject
    private SkipExecutionService skipExecutionService;

    public void run() {
        this.flowListeners.run();
        this.flowListeners.listen(list -> {
            this.allFlows = list;
        });
        Await.until(() -> {
            return this.allFlows != null;
        }, Duration.ofMillis(100L), Duration.ofMinutes(5L));
        this.applicationContext.registerSingleton(new DefaultFlowExecutor(this.flowListeners, this.flowRepository));
        this.executionQueue.receive(Executor.class, this::executionQueue);
        this.workerTaskResultQueue.receive(Executor.class, this::workerTaskResultQueue);
        ScheduledFuture<?> scheduleAtFixedRate = this.schedulerDelay.scheduleAtFixedRate(this::executionDelaySend, 0L, 1L, TimeUnit.SECONDS);
        new Thread(() -> {
            Objects.requireNonNull(scheduleAtFixedRate);
            Await.until(scheduleAtFixedRate::isDone);
            try {
                scheduleAtFixedRate.get();
            } catch (InterruptedException | ExecutionException e) {
                if (e.getCause().getClass() != CannotCreateTransactionException.class) {
                    log.error("Executor fatal exception", e);
                    this.applicationContext.close();
                    Runtime.getRuntime().exit(1);
                }
            }
        }, "jdbc-delay").start();
        this.flowQueue.receive(FlowTopology.class, flow -> {
            if (flow == null || (flow instanceof FlowWithException)) {
                return;
            }
            this.flowTopologyRepository.save(flow, (List<FlowTopology>) (flow.isDeleted() ? Stream.empty() : this.flowTopologyService.topology(flow, this.allFlows.stream())).distinct().collect(Collectors.toList()));
        });
    }

    private void executionQueue(Execution execution) {
        if (this.skipExecutionService.skipExecution(execution.getId())) {
            log.warn("Skipping execution {}", execution.getId());
            return;
        }
        Executor lock = this.executionRepository.lock(execution.getId(), pair -> {
            Execution execution2 = (Execution) pair.getLeft();
            ExecutorState executorState = (ExecutorState) pair.getRight();
            Flow transform = transform(this.flowRepository.findByExecution(execution2), execution2);
            Executor withFlow = new Executor(execution2, (Long) null).withFlow(transform);
            if (log.isDebugEnabled()) {
                this.executorService.log(log, true, withFlow);
            }
            Executor process = this.executorService.process(withFlow);
            if (process.getNexts().size() > 0 && deduplicateNexts(execution2, executorState, process.getNexts())) {
                process.withExecution(this.executorService.onNexts(process.getFlow(), process.getExecution(), process.getNexts()), "onNexts");
            }
            if (process.getWorkerTasks().size() > 0) {
                List list = process.getWorkerTasks().stream().filter(workerTask -> {
                    return deduplicateWorkerTask(execution2, executorState, workerTask.getTaskRun());
                }).toList();
                list.stream().filter(workerTask2 -> {
                    return workerTask2.getTask().isSendToWorkerTask();
                }).forEach(workerTask3 -> {
                    this.workerTaskQueue.emit(this.workerGroupService.resolveGroupFromJob(workerTask3), workerTask3);
                });
                Stream map = list.stream().filter(workerTask4 -> {
                    return workerTask4.getTask().isFlowable();
                }).map(workerTask5 -> {
                    return new WorkerTaskResult(workerTask5.withTaskRun(workerTask5.getTaskRun().withState(State.Type.RUNNING)));
                });
                QueueInterface<WorkerTaskResult> queueInterface = this.workerTaskResultQueue;
                Objects.requireNonNull(queueInterface);
                map.forEach((v1) -> {
                    r1.emit(v1);
                });
            }
            if (process.getWorkerTaskResults().size() > 0) {
                List workerTaskResults = process.getWorkerTaskResults();
                QueueInterface<WorkerTaskResult> queueInterface2 = this.workerTaskResultQueue;
                Objects.requireNonNull(queueInterface2);
                workerTaskResults.forEach((v1) -> {
                    r1.emit(v1);
                });
            }
            if (process.getExecutionDelays().size() > 0) {
                process.getExecutionDelays().forEach(executionDelay -> {
                    this.abstractExecutionDelayStorage.save(executionDelay);
                });
            }
            if (process.getWorkerTaskExecutions().size() > 0) {
                this.workerTaskExecutionStorage.save(process.getWorkerTaskExecutions());
                ((List) process.getWorkerTaskExecutions().stream().filter(workerTaskExecution -> {
                    return deduplicateWorkerTaskExecution(execution2, executorState, workerTaskExecution.getTaskRun());
                }).collect(Collectors.toList())).forEach(workerTaskExecution2 -> {
                    String str = "Create new execution for flow '" + workerTaskExecution2.getExecution().getNamespace() + "'." + workerTaskExecution2.getExecution().getFlowId() + "' with id '" + workerTaskExecution2.getExecution().getId() + "' from task '" + workerTaskExecution2.getTask().getId() + "' and taskrun '" + workerTaskExecution2.getTaskRun().getId() + (workerTaskExecution2.getTaskRun().getValue() != null ? " (" + workerTaskExecution2.getTaskRun().getValue() + ")" : "") + "'";
                    log.info(str);
                    this.logQueue.emit(LogEntry.of(workerTaskExecution2.getTaskRun()).toBuilder().level(Level.INFO).message(str).timestamp(workerTaskExecution2.getTaskRun().getState().getStartDate()).thread(Thread.currentThread().getName()).build());
                    this.executionQueue.emit(workerTaskExecution2.getExecution());
                });
            }
            if (this.conditionService.isTerminatedWithListeners(transform, execution2) && deduplicateFlowTrigger(execution2, executorState)) {
                List computeExecutionsFromFlowTriggers = this.flowTriggerService.computeExecutionsFromFlowTriggers(execution2, this.allFlows, Optional.of(this.multipleConditionStorage));
                QueueInterface<Execution> queueInterface3 = this.executionQueue;
                Objects.requireNonNull(queueInterface3);
                computeExecutionsFromFlowTriggers.forEach((v1) -> {
                    r1.emit(v1);
                });
            }
            if (this.conditionService.isTerminatedWithListeners(transform, execution2)) {
                this.workerTaskExecutionStorage.get(execution2.getId()).ifPresent(workerTaskExecution3 -> {
                    this.workerTaskResultQueue.emit(workerTaskExecution3.getTask().createWorkerTaskResult(this.runContextFactory, workerTaskExecution3, this.flowRepository.findByExecution(execution2), execution2));
                    this.workerTaskExecutionStorage.delete(workerTaskExecution3);
                });
            }
            return Pair.of(process, executorState);
        });
        if (lock != null) {
            toExecution(lock);
        }
    }

    private void workerTaskResultQueue(WorkerTaskResult workerTaskResult) {
        if (this.skipExecutionService.skipExecution(workerTaskResult.getTaskRun().getTaskId())) {
            log.warn("Skipping execution {}", workerTaskResult.getTaskRun().getExecutionId());
            return;
        }
        if (log.isDebugEnabled()) {
            this.executorService.log(log, true, workerTaskResult);
        }
        if (workerTaskResult.getTaskRun().getState().isTerminated()) {
            this.metricRegistry.counter("executor.taskrun.ended.count", this.metricRegistry.tags(workerTaskResult, new String[0])).increment();
            this.metricRegistry.timer("executor.taskrun.ended.duration", this.metricRegistry.tags(workerTaskResult, new String[0])).record(workerTaskResult.getTaskRun().getState().getDuration());
        }
        Executor lock = this.executionRepository.lock(workerTaskResult.getTaskRun().getExecutionId(), pair -> {
            Execution execution = (Execution) pair.getLeft();
            Executor executor = new Executor(execution, (Long) null);
            if (execution == null) {
                throw new IllegalStateException("Execution state don't exist for " + workerTaskResult.getTaskRun().getExecutionId() + ", receive " + workerTaskResult);
            }
            if (!execution.hasTaskRunJoinable(workerTaskResult.getTaskRun())) {
                return null;
            }
            try {
                Execution addDynamicTaskRun = this.executorService.addDynamicTaskRun(executor.getExecution(), this.flowRepository.findByExecution(executor.getExecution()), workerTaskResult);
                if (addDynamicTaskRun != null) {
                    executor = executor.withExecution(addDynamicTaskRun, "addDynamicTaskRun");
                }
                return Pair.of(executor.withExecution(executor.getExecution().withTaskRun(workerTaskResult.getTaskRun()), "joinWorkerResult"), (ExecutorState) pair.getRight());
            } catch (InternalException e) {
                return Pair.of(handleFailedExecutionFromExecutor(executor, e), (ExecutorState) pair.getRight());
            }
        });
        if (lock != null) {
            toExecution(lock);
        }
    }

    private void toExecution(Executor executor) {
        boolean z = false;
        boolean z2 = false;
        if (executor.getException() != null) {
            executor = handleFailedExecutionFromExecutor(executor, executor.getException());
            z = true;
            z2 = true;
        } else if (executor.isExecutionUpdated()) {
            z = true;
        }
        if (z) {
            if (log.isDebugEnabled()) {
                this.executorService.log(log, false, executor);
            }
            if (z2) {
                this.executionQueue.emit(executor.getExecution());
            } else {
                ((JdbcQueue) this.executionQueue).emitOnly(null, executor.getExecution());
            }
            if (this.executorService.canBePurged(executor)) {
                this.executorStateStorage.delete(executor.getExecution());
            }
        }
    }

    private Flow transform(Flow flow, Execution execution) {
        if (this.templateExecutorInterface.isPresent()) {
            try {
                flow = Template.injectTemplate(flow, execution, (str, str2) -> {
                    return (io.kestra.core.models.templates.Template) this.templateExecutorInterface.get().findById(str, str2).orElse(null);
                });
            } catch (InternalException e) {
                log.warn("Failed to inject template", e);
            }
        }
        return this.taskDefaultService.injectDefaults(flow, execution);
    }

    private void executionDelaySend() {
        if (this.isShutdown.booleanValue()) {
            return;
        }
        this.abstractExecutionDelayStorage.get(executionDelay -> {
            Executor lock = this.executionRepository.lock(executionDelay.getExecutionId(), pair -> {
                Executor executor = new Executor((Execution) pair.getLeft(), (Long) null);
                try {
                    if (executor.getExecution().findTaskRunByTaskRunId(executionDelay.getTaskRunId()).getState().getCurrent() == State.Type.PAUSED) {
                        executor = executor.withExecution(this.executionService.markAs((Execution) pair.getKey(), executionDelay.getTaskRunId(), executionDelay.getState()), "pausedRestart");
                    }
                } catch (Exception e) {
                    executor = handleFailedExecutionFromExecutor(executor, e);
                }
                return Pair.of(executor, (ExecutorState) pair.getRight());
            });
            if (lock != null) {
                toExecution(lock);
            }
        });
    }

    private boolean deduplicateNexts(Execution execution, ExecutorState executorState, List<TaskRun> list) {
        return list.stream().anyMatch(taskRun -> {
            String str = taskRun.getParentTaskRunId() + "-" + taskRun.getTaskId() + "-" + taskRun.getValue();
            if (executorState.getChildDeduplication().containsKey(str)) {
                log.trace("Duplicate Nexts on execution '{}' with key '{}'", execution.getId(), str);
                return false;
            }
            executorState.getChildDeduplication().put(str, taskRun.getId());
            return true;
        });
    }

    private boolean deduplicateWorkerTask(Execution execution, ExecutorState executorState, TaskRun taskRun) {
        String id = taskRun.getId();
        if (((State.Type) executorState.getWorkerTaskDeduplication().get(id)) == taskRun.getState().getCurrent()) {
            log.trace("Duplicate WorkerTask on execution '{}' for taskRun '{}', value '{}, taskId '{}'", new Object[]{execution.getId(), taskRun.getId(), taskRun.getValue(), taskRun.getTaskId()});
            return false;
        }
        executorState.getWorkerTaskDeduplication().put(id, taskRun.getState().getCurrent());
        return true;
    }

    private boolean deduplicateWorkerTaskExecution(Execution execution, ExecutorState executorState, TaskRun taskRun) {
        String id = taskRun.getId();
        if (((State.Type) executorState.getWorkerTaskExecutionDeduplication().get(id)) == taskRun.getState().getCurrent()) {
            log.trace("Duplicate WorkerTaskExecution on execution '{}' for taskRun '{}', value '{}, taskId '{}'", new Object[]{execution.getId(), taskRun.getId(), taskRun.getValue(), taskRun.getTaskId()});
            return false;
        }
        executorState.getWorkerTaskExecutionDeduplication().put(id, taskRun.getState().getCurrent());
        return true;
    }

    private boolean deduplicateFlowTrigger(Execution execution, ExecutorState executorState) {
        if (executorState.getFlowTriggerDeduplication().booleanValue()) {
            log.trace("Duplicate Flow Trigger on execution '{}'", execution.getId());
            return false;
        }
        executorState.setFlowTriggerDeduplication(true);
        return true;
    }

    private Executor handleFailedExecutionFromExecutor(Executor executor, Exception exc) {
        Execution.FailedExecutionWithLog failedExecutionFromExecutor = executor.getExecution().failedExecutionFromExecutor(exc);
        try {
            List logs = failedExecutionFromExecutor.getLogs();
            QueueInterface<LogEntry> queueInterface = this.logQueue;
            Objects.requireNonNull(queueInterface);
            logs.forEach((v1) -> {
                r1.emitAsync(v1);
            });
            return executor.withExecution(failedExecutionFromExecutor.getExecution(), "exception");
        } catch (Exception e) {
            log.error("Failed to produce {}", exc.getMessage(), e);
            return executor;
        }
    }

    public void close() throws IOException {
        this.isShutdown = true;
        this.schedulerDelay.shutdown();
        this.executionQueue.close();
        this.workerTaskQueue.close();
        this.workerTaskResultQueue.close();
        this.logQueue.close();
    }
}
