package org.finos.tracdap.svc.orch.service;

import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.finos.tracdap.common.cache.CacheEntry;
import org.finos.tracdap.common.cache.CacheTicket;
import org.finos.tracdap.common.cache.IJobCache;
import org.finos.tracdap.common.exception.ECache;
import org.finos.tracdap.common.exception.ECacheTicket;
import org.finos.tracdap.common.exception.EExecutor;
import org.finos.tracdap.common.exception.EExecutorUnavailable;
import org.finos.tracdap.common.exception.EJobFailure;
import org.finos.tracdap.common.exception.EStartup;
import org.finos.tracdap.common.exception.ETrac;
import org.finos.tracdap.common.exception.ETracInternal;
import org.finos.tracdap.common.exec.ExecutorJobInfo;
import org.finos.tracdap.common.metadata.MetadataUtil;
import org.finos.tracdap.config.PlatformConfig;
import org.finos.tracdap.config.PluginConfig;
import org.finos.tracdap.metadata.JobStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/finos/tracdap/svc/orch/service/JobManager.class */
public class JobManager {
    public static final int PROCESSING_RETRY_LIMIT = 2;
    public static final int CACHE_POLL_ERROR_LIMIT = 100;
    public static final int EXECUTOR_POLL_ERROR_LIMIT = 20;
    public static final int JOB_REVISION_LIMIT = 100;
    public static final String POLL_INTERVAL_CONFIG_KEY = "pollInterval";
    public static final String TICKET_DURATION_CONFI_KEY = "ticketDuration";
    public static final String MAX_JOBS_CONFIG_KEY = "maxJobs";
    public static final int DEFAULT_CACHE_POLL_INTERVAL = 10;
    public static final int DEFAULT_CACHE_TICKET_DURATION = 10;
    public static final int DEFAULT_EXECUTOR_POLL_INTERVAL = 30;
    public static final int DEFAULT_EXECUTOR_TICKET_DURATION = 120;
    public static final int DEFAULT_EXECUTOR_JOB_LIMIT = 6;
    private final JobProcessor processor;
    private final IJobCache<JobState> cache;
    private final ScheduledExecutorService javaExecutor;
    private final Duration cachePollInterval;
    private final Duration cacheTicketDuration;
    private final Duration executorPollInterval;
    private final Duration executorTicketDuration;
    private final int executorJobLimit;
    public static final Duration STARTUP_DELAY = Duration.of(10, ChronoUnit.SECONDS);
    public static final Duration SCHEDULED_REMOVAL_DURATION = Duration.of(2, ChronoUnit.MINUTES);
    private static final List<String> STATUS_FOR_LAUNCH = List.of(CacheStatus.QUEUED_IN_TRAC);
    private static final List<String> STATUS_FOR_RUNNING_JOBS = List.of(CacheStatus.LAUNCH_SCHEDULED, CacheStatus.SENT_TO_EXECUTOR, CacheStatus.QUEUED_IN_EXECUTOR, CacheStatus.RUNNING_IN_EXECUTOR);
    private static final List<String> STATUS_FOR_UPDATE = List.of(CacheStatus.LAUNCH_SCHEDULED, CacheStatus.EXECUTOR_COMPLETE, CacheStatus.EXECUTOR_SUCCEEDED, CacheStatus.EXECUTOR_FAILED, CacheStatus.RESULTS_RECEIVED, CacheStatus.RESULTS_INVALID, CacheStatus.RESULTS_SAVED, CacheStatus.READY_FOR_CLEANUP, CacheStatus.READY_TO_REMOVE, CacheStatus.PROCESSING_FAILED);
    private static final List<Status.Code> GRPC_CAN_RETRY = List.of(Status.Code.UNAVAILABLE, Status.Code.DEADLINE_EXCEEDED);
    private static final List<Class<? extends ETrac>> TRAC_CAN_RETRY = List.of(EExecutorUnavailable.class);
    private final Logger log = LoggerFactory.getLogger(getClass());
    private ScheduledFuture<?> cachePollingTask = null;
    private ScheduledFuture<?> executorPollingTask = null;
    private final AtomicInteger cachePollErrorCount = new AtomicInteger(0);
    private final AtomicInteger executorPollErrorCount = new AtomicInteger(0);

    public JobManager(PlatformConfig platformConfig, JobProcessor jobProcessor, IJobCache<JobState> iJobCache, ScheduledExecutorService scheduledExecutorService) {
        this.processor = jobProcessor;
        this.cache = iJobCache;
        this.javaExecutor = scheduledExecutorService;
        this.cachePollInterval = Duration.ofSeconds(readIntegerProperty(platformConfig.getJobCache(), POLL_INTERVAL_CONFIG_KEY, 10));
        this.cacheTicketDuration = Duration.ofSeconds(readIntegerProperty(platformConfig.getJobCache(), TICKET_DURATION_CONFI_KEY, 10));
        this.executorPollInterval = Duration.ofSeconds(readIntegerProperty(platformConfig.getExecutor(), POLL_INTERVAL_CONFIG_KEY, 30));
        this.executorTicketDuration = Duration.ofSeconds(readIntegerProperty(platformConfig.getExecutor(), TICKET_DURATION_CONFI_KEY, DEFAULT_EXECUTOR_TICKET_DURATION));
        this.executorJobLimit = readIntegerProperty(platformConfig.getExecutor(), MAX_JOBS_CONFIG_KEY, 6);
    }

    private int readIntegerProperty(PluginConfig pluginConfig, String str, int i) {
        if (!pluginConfig.containsProperties(str)) {
            return i;
        }
        String propertiesOrDefault = pluginConfig.getPropertiesOrDefault(str, Integer.toString(i));
        try {
            return Integer.parseInt(propertiesOrDefault.trim());
        } catch (NumberFormatException e) {
            String format = String.format("Invalid config property [%s]: Expected an integer, got [%s]", str, propertiesOrDefault);
            this.log.error(format);
            throw new EStartup(format);
        }
    }

    public void start() {
        try {
            this.log.info("Starting job manager service...");
            this.cachePollingTask = this.javaExecutor.scheduleAtFixedRate(this::pollCache, STARTUP_DELAY.getSeconds(), this.cachePollInterval.getSeconds(), TimeUnit.SECONDS);
            this.executorPollingTask = this.javaExecutor.scheduleAtFixedRate(this::pollExecutor, STARTUP_DELAY.getSeconds(), this.executorPollInterval.getSeconds(), TimeUnit.SECONDS);
            this.log.info("Job manager service started OK");
        } catch (RejectedExecutionException e) {
            this.log.error("Job manager service failed to start: {}", e.getMessage(), e);
            throw new EStartup("Job manager service failed to start", e);
        }
    }

    public void stop() {
        this.log.info("Stopping job manager service...");
        if (this.cachePollingTask != null) {
            this.cachePollingTask.cancel(false);
        }
        if (this.executorPollingTask != null) {
            this.executorPollingTask.cancel(false);
        }
    }

    public JobState addNewJob(JobState jobState) {
        try {
            JobState m5clone = jobState.m5clone();
            m5clone.tracStatus = JobStatusCode.QUEUED;
            m5clone.cacheStatus = CacheStatus.QUEUED_IN_TRAC;
            JobState saveInitialMetadata = this.processor.saveInitialMetadata(m5clone);
            saveInitialMetadata.jobKey = MetadataUtil.objectKey(saveInitialMetadata.jobId);
            CacheTicket openNewTicket = this.cache.openNewTicket(saveInitialMetadata.jobKey, this.cacheTicketDuration);
            try {
                if (openNewTicket.superseded()) {
                    throw new ECacheTicket("Job could not be created because it already exists");
                }
                this.cache.createEntry(openNewTicket, saveInitialMetadata.cacheStatus, saveInitialMetadata);
                if (openNewTicket != null) {
                    openNewTicket.close();
                }
                this.javaExecutor.submit(this::pollCache);
                return saveInitialMetadata;
            } finally {
            }
        } catch (Exception e) {
            String format = String.format("Job was not accepted: %s", e.getMessage());
            this.log.error(format);
            throw new EJobFailure(format, e);
        }
    }

    public Optional<JobState> queryJob(String str) {
        Optional queryKey = this.cache.queryKey(str);
        if (queryKey.isEmpty()) {
            return Optional.empty();
        }
        if (((CacheEntry) queryKey.get()).cacheOk()) {
            return Optional.of((JobState) ((CacheEntry) queryKey.get()).value());
        }
        throw ((CacheEntry) queryKey.get()).cacheError();
    }

    private void pollCache() {
        try {
            Iterator it = this.cache.queryStatus(STATUS_FOR_UPDATE).iterator();
            while (it.hasNext()) {
                JobOperation nextOperation = getNextOperation((CacheEntry) it.next());
                this.javaExecutor.submit(() -> {
                    processJobOperation(nextOperation);
                });
            }
            List queryStatus = this.cache.queryStatus(STATUS_FOR_LAUNCH);
            int max = Math.max(this.executorJobLimit - this.cache.queryStatus(STATUS_FOR_RUNNING_JOBS, true).size(), 0);
            Iterator it2 = (queryStatus.size() > max ? queryStatus.subList(0, max) : queryStatus).iterator();
            while (it2.hasNext()) {
                JobOperation nextOperation2 = getNextOperation((CacheEntry) it2.next());
                this.javaExecutor.submit(() -> {
                    processJobOperation(nextOperation2);
                });
            }
            this.cachePollErrorCount.set(0);
        } catch (ECache e) {
            this.log.warn("There was a problem talking to the job cache: " + e.getMessage(), e);
            this.log.warn("Polling will continue, the cache may become available at a later time");
        } catch (Exception e2) {
            this.log.error("Unexpected error in cache polling loop: " + e2.getMessage(), e2);
            this.log.error("This is probably a bug, if it continues the orchestrator will be shut down");
            int incrementAndGet = this.cachePollErrorCount.incrementAndGet();
            if (incrementAndGet > 1 && incrementAndGet < 100) {
                this.log.error("Cache polling error has occurred [{}] times, the limit is [{}]", Integer.valueOf(incrementAndGet), 100);
            }
            if (incrementAndGet >= 100) {
                this.log.debug("FATAL: Cache polling error has occurred [{}] times, the limit has been reached", Integer.valueOf(incrementAndGet));
                this.log.error("FATAL: The orchestrator service will now be terminated");
                System.exit(-1);
            }
        }
    }

    private void pollExecutor() {
        try {
            List<CacheEntry<JobState>> list = (List) this.cache.queryStatus(STATUS_FOR_RUNNING_JOBS).stream().filter((v0) -> {
                return v0.cacheOk();
            }).filter(cacheEntry -> {
                return ((JobState) cacheEntry.value()).executorState != null;
            }).collect(Collectors.toList());
            List<ExecutorJobInfo> pollExecutorJobs = this.processor.pollExecutorJobs(list);
            for (int i = 0; i < list.size(); i++) {
                CacheEntry<JobState> cacheEntry2 = list.get(i);
                ExecutorJobInfo executorJobInfo = pollExecutorJobs.get(i);
                if (executorJobInfo.getStatus() != ((JobState) cacheEntry2.value()).executorStatus) {
                    JobOperation nextOperation = getNextOperation(cacheEntry2, executorJobInfo);
                    this.javaExecutor.submit(() -> {
                        processJobOperation(nextOperation);
                    });
                }
            }
        } catch (Exception e) {
            this.log.error("Unexpected error in executor polling loop: " + e.getMessage(), e);
            this.log.error("This is probably a bug, if it continues the orchestrator will be shut down");
            int incrementAndGet = this.executorPollErrorCount.incrementAndGet();
            if (incrementAndGet > 1 && incrementAndGet < 20) {
                this.log.error("Executor polling error has occurred [{}] times, the limit is [{}]", Integer.valueOf(incrementAndGet), 20);
            }
            if (incrementAndGet >= 20) {
                this.log.debug("FATAL: Executor polling error has occurred [{}] times, the limit has been reached", Integer.valueOf(incrementAndGet));
                this.log.error("FATAL: The orchestrator service will now be terminated");
                System.exit(-1);
            }
        } catch (ECache | EExecutor e2) {
            String str = e2 instanceof ECache ? "job cache" : "executor";
            this.log.warn("There was a problem talking to the {}}: {}", new Object[]{str, e2.getMessage(), e2});
            this.log.warn("Polling will continue, the {} may become available at a later time", str);
        }
    }

    private JobOperation getNextOperation(CacheEntry<JobState> cacheEntry) {
        return getNextOperation(cacheEntry.key(), cacheEntry.revision(), cacheEntry.status());
    }

    private JobOperation getNextOperation(String str, int i, String str2) {
        JobOperation jobOperation = new JobOperation();
        jobOperation.jobKey = str;
        jobOperation.revision = i;
        jobOperation.cacheStatus = str2;
        if (i > 100) {
            this.log.error("Internal job state error, key = [{}], revision = [{}], cache state = [{}]", new Object[]{str, Integer.valueOf(i), str2});
            this.log.error("Job revision limit has been hit, job will be removed without further processing");
            jobOperation.operationName = "remove_from_cache";
            jobOperation.operation = jobState -> {
                return null;
            };
            jobOperation.timeout = this.cacheTicketDuration;
            return jobOperation;
        }
        boolean z = -1;
        switch (str2.hashCode()) {
            case -1842705618:
                if (str2.equals(CacheStatus.QUEUED_IN_TRAC)) {
                    z = false;
                    break;
                }
                break;
            case -1803140340:
                if (str2.equals(CacheStatus.READY_TO_REMOVE)) {
                    z = 9;
                    break;
                }
                break;
            case -1756256475:
                if (str2.equals(CacheStatus.EXECUTOR_COMPLETE)) {
                    z = 2;
                    break;
                }
                break;
            case -1708581119:
                if (str2.equals(CacheStatus.LAUNCH_SCHEDULED)) {
                    z = true;
                    break;
                }
                break;
            case -1168247758:
                if (str2.equals(CacheStatus.READY_FOR_CLEANUP)) {
                    z = 8;
                    break;
                }
                break;
            case -1022891842:
                if (str2.equals(CacheStatus.RESULTS_SAVED)) {
                    z = 7;
                    break;
                }
                break;
            case -555677227:
                if (str2.equals(CacheStatus.EXECUTOR_SUCCEEDED)) {
                    z = 3;
                    break;
                }
                break;
            case -451897047:
                if (str2.equals(CacheStatus.EXECUTOR_FAILED)) {
                    z = 4;
                    break;
                }
                break;
            case 494362345:
                if (str2.equals(CacheStatus.PROCESSING_FAILED)) {
                    z = 11;
                    break;
                }
                break;
            case 635418222:
                if (str2.equals(CacheStatus.RESULTS_INVALID)) {
                    z = 6;
                    break;
                }
                break;
            case 1085584426:
                if (str2.equals(CacheStatus.RESULTS_RECEIVED)) {
                    z = 5;
                    break;
                }
                break;
            case 1970695574:
                if (str2.equals(CacheStatus.REMOVAL_SCHEDULED)) {
                    z = 10;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                jobOperation.operationName = "schedule_launch";
                JobProcessor jobProcessor = this.processor;
                Objects.requireNonNull(jobProcessor);
                jobOperation.operation = jobProcessor::scheduleLaunch;
                jobOperation.timeout = this.cacheTicketDuration;
                break;
            case true:
                jobOperation.operationName = "launch_job";
                JobProcessor jobProcessor2 = this.processor;
                Objects.requireNonNull(jobProcessor2);
                jobOperation.operation = jobProcessor2::launchJob;
                jobOperation.timeout = this.executorTicketDuration;
                break;
            case PROCESSING_RETRY_LIMIT /* 2 */:
            case true:
                jobOperation.operationName = "fetch_job_result";
                JobProcessor jobProcessor3 = this.processor;
                Objects.requireNonNull(jobProcessor3);
                jobOperation.operation = jobProcessor3::fetchJobResult;
                jobOperation.timeout = this.executorTicketDuration;
                break;
            case true:
            case true:
            case DEFAULT_EXECUTOR_JOB_LIMIT /* 6 */:
                jobOperation.operationName = "save_result_metadata";
                JobProcessor jobProcessor4 = this.processor;
                Objects.requireNonNull(jobProcessor4);
                jobOperation.operation = jobProcessor4::saveResultMetadata;
                jobOperation.timeout = this.cacheTicketDuration;
                break;
            case true:
            case true:
                jobOperation.operationName = "clean_up_job";
                JobProcessor jobProcessor5 = this.processor;
                Objects.requireNonNull(jobProcessor5);
                jobOperation.operation = jobProcessor5::cleanUpJob;
                jobOperation.timeout = this.cacheTicketDuration;
                break;
            case true:
                jobOperation.operationName = "schedule_removal";
                JobProcessor jobProcessor6 = this.processor;
                Objects.requireNonNull(jobProcessor6);
                jobOperation.operation = jobProcessor6::scheduleRemoval;
                jobOperation.timeout = this.cacheTicketDuration;
                break;
            case true:
                jobOperation.operationName = "remove_from_cache";
                jobOperation.operation = jobState2 -> {
                    return null;
                };
                jobOperation.timeout = this.cacheTicketDuration;
                break;
            case true:
                jobOperation.operationName = "handle_processing_failed";
                jobOperation.operation = jobState3 -> {
                    return this.processor.handleProcessingFailed(jobState3, jobState3.statusMessage, jobState3.exception);
                };
                jobOperation.timeout = this.cacheTicketDuration;
                break;
            default:
                this.log.error("Internal job state error, key = [{}], revision = [{}], cache state = [{}]", new Object[]{str, Integer.valueOf(i), str2});
                String str3 = "Internal job state error";
                ETracInternal eTracInternal = new ETracInternal("Internal job state error");
                jobOperation.operationName = "handle_processing_failed";
                jobOperation.operation = jobState4 -> {
                    return this.processor.handleProcessingFailed(jobState4, str3, eTracInternal);
                };
                jobOperation.timeout = this.cacheTicketDuration;
                break;
        }
        return jobOperation;
    }

    private JobOperation getNextOperation(CacheEntry<JobState> cacheEntry, ExecutorJobInfo executorJobInfo) {
        JobOperation jobOperation = new JobOperation();
        jobOperation.jobKey = cacheEntry.key();
        jobOperation.revision = cacheEntry.revision();
        jobOperation.cacheStatus = cacheEntry.status();
        jobOperation.timeout = this.cacheTicketDuration;
        if (STATUS_FOR_RUNNING_JOBS.contains(cacheEntry.status())) {
            jobOperation.operationName = "record_job_status";
            jobOperation.operation = jobState -> {
                return this.processor.recordJobStatus(jobState, executorJobInfo);
            };
        } else {
            this.log.error("Internal job state error, key = [{}], revision = [{}], cache state = [{}]", new Object[]{cacheEntry.key(), Integer.valueOf(cacheEntry.revision()), cacheEntry.status()});
            String str = "Internal job state error";
            ETracInternal eTracInternal = new ETracInternal("Internal job state error");
            jobOperation.operationName = "handle_processing_failed";
            jobOperation.operation = jobState2 -> {
                return this.processor.handleProcessingFailed(jobState2, str, eTracInternal);
            };
        }
        return jobOperation;
    }

    private String getNextErrorState(String str) {
        return str.equals(CacheStatus.PROCESSING_FAILED) ? CacheStatus.READY_FOR_CLEANUP : str.equals(CacheStatus.READY_FOR_CLEANUP) ? CacheStatus.READY_TO_REMOVE : CacheStatus.PROCESSING_FAILED;
    }

    private void processJobOperation(JobOperation jobOperation) {
        CacheTicket openTicket;
        int i = jobOperation.revision;
        String str = null;
        boolean z = false;
        try {
            openTicket = this.cache.openTicket(jobOperation.jobKey, jobOperation.revision, jobOperation.timeout);
            try {
            } finally {
            }
        } catch (Exception e) {
            this.log.warn("There was a problem talking to the job cache: " + e.getMessage(), e);
            this.log.warn("Processing will continue, the cache may become available at a later time");
        }
        if (openTicket.superseded()) {
            if (openTicket != null) {
                openTicket.close();
                return;
            }
            return;
        }
        JobState processRetryOrFail = processRetryOrFail(jobOperation, (JobState) this.cache.readEntry(openTicket).value());
        if (processRetryOrFail != null) {
            i = this.cache.updateEntry(openTicket, processRetryOrFail.cacheStatus, processRetryOrFail);
            str = processRetryOrFail.cacheStatus;
            z = processRetryOrFail.retries > 0;
        } else {
            this.cache.deleteEntry(openTicket);
        }
        if (openTicket != null) {
            openTicket.close();
        }
        if (i > jobOperation.revision && !z && str != null && STATUS_FOR_UPDATE.contains(str)) {
            JobOperation nextOperation = getNextOperation(jobOperation.jobKey, i, str);
            this.javaExecutor.submit(() -> {
                processJobOperation(nextOperation);
            });
        }
        if (i <= jobOperation.revision || !CacheStatus.REMOVAL_SCHEDULED.equals(str)) {
            return;
        }
        JobOperation nextOperation2 = getNextOperation(jobOperation.jobKey, i, str);
        this.javaExecutor.schedule(() -> {
            processJobOperation(nextOperation2);
        }, SCHEDULED_REMOVAL_DURATION.getSeconds(), TimeUnit.SECONDS);
    }

    private JobState processRetryOrFail(JobOperation jobOperation, JobState jobState) {
        try {
            if (jobState.retries == 0) {
                this.log.info("JOB OPERATION {}: [{}]", jobOperation.operationName, jobOperation.jobKey);
            } else {
                this.log.info("JOB OPERATION {} (attempt {} of {}): [{}]", new Object[]{jobOperation.operationName, Integer.valueOf(jobState.retries + 1), 2, jobOperation.jobKey});
            }
            JobState apply = jobOperation.operation.apply(jobState);
            if (apply != null) {
                apply.retries = 0;
            }
            return apply;
        } catch (Exception e) {
            JobState m5clone = jobState.m5clone();
            boolean errorCanRetry = errorCanRetry(e);
            if (errorCanRetry) {
                m5clone.retries++;
                if (m5clone.retries < 2) {
                    this.log.warn("JOB OPERATION FAILED {}: [{}] {}", new Object[]{jobOperation.operationName, jobState.jobKey, e.getMessage(), e});
                    this.log.warn("The operation can be retried (this was attempt {} of {})", Integer.valueOf(m5clone.retries), 2);
                    return m5clone;
                }
            }
            String nextErrorState = getNextErrorState(jobState.cacheStatus);
            this.log.error("JOB OPERATION FAILED {}: [{}] {}", new Object[]{jobOperation.operationName, jobState.jobKey, e.getMessage(), e});
            if (errorCanRetry) {
                this.log.error("The retry limit has been reached (this was attempt {} of {})", Integer.valueOf(m5clone.retries), 2);
            } else {
                this.log.error("This error is fatal and cannot be retried");
            }
            this.log.error("Job will be updated to the next error state: [{}] {}", jobState.jobKey, nextErrorState);
            m5clone.cacheStatus = nextErrorState;
            m5clone.statusMessage = e.getMessage();
            m5clone.exception = e;
            m5clone.retries = 0;
            return m5clone;
        }
    }

    private boolean errorCanRetry(Exception exc) {
        return exc instanceof StatusException ? GRPC_CAN_RETRY.contains(((StatusException) exc).getStatus().getCode()) : exc instanceof StatusRuntimeException ? GRPC_CAN_RETRY.contains(((StatusRuntimeException) exc).getStatus().getCode()) : TRAC_CAN_RETRY.contains(exc.getClass());
    }
}
