package org.finos.tracdap.svc.orch.service;

import java.io.Serializable;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.finos.tracdap.api.JobRequest;
import org.finos.tracdap.api.JobStatus;
import org.finos.tracdap.api.MetadataWriteRequest;
import org.finos.tracdap.api.TrustedMetadataApiGrpc;
import org.finos.tracdap.common.auth.internal.AuthHelpers;
import org.finos.tracdap.common.auth.internal.InternalAuthProvider;
import org.finos.tracdap.common.cache.CacheEntry;
import org.finos.tracdap.common.config.ConfigFormat;
import org.finos.tracdap.common.config.ConfigParser;
import org.finos.tracdap.common.exception.EConfigParse;
import org.finos.tracdap.common.exception.ETracInternal;
import org.finos.tracdap.common.exception.EValidation;
import org.finos.tracdap.common.exec.ExecutorJobInfo;
import org.finos.tracdap.common.exec.ExecutorJobStatus;
import org.finos.tracdap.common.exec.ExecutorVolumeType;
import org.finos.tracdap.common.exec.IBatchExecutor;
import org.finos.tracdap.common.exec.LaunchArg;
import org.finos.tracdap.common.exec.LaunchCmd;
import org.finos.tracdap.common.metadata.MetadataCodec;
import org.finos.tracdap.common.metadata.MetadataUtil;
import org.finos.tracdap.common.validation.Validator;
import org.finos.tracdap.config.JobResult;
import org.finos.tracdap.metadata.JobStatusCode;
import org.finos.tracdap.metadata.ObjectDefinition;
import org.finos.tracdap.metadata.ObjectType;
import org.finos.tracdap.metadata.TagUpdate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/finos/tracdap/svc/orch/service/JobProcessor.class */
public class JobProcessor {
    private static final Duration DELEGATE_SESSION_TIMEOUT = Duration.of(5, ChronoUnit.MINUTES);
    private final TrustedMetadataApiGrpc.TrustedMetadataApiBlockingStub metaClient;
    private final InternalAuthProvider internalAuth;
    private final IBatchExecutor<?> executor;
    private final JobProcessorHelpers lifecycle;
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final Validator validator = new Validator();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.finos.tracdap.svc.orch.service.JobProcessor$1, reason: invalid class name */
    /* loaded from: input_file:org/finos/tracdap/svc/orch/service/JobProcessor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$finos$tracdap$common$exec$ExecutorJobStatus = new int[ExecutorJobStatus.values().length];

        static {
            try {
                $SwitchMap$org$finos$tracdap$common$exec$ExecutorJobStatus[ExecutorJobStatus.QUEUED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$finos$tracdap$common$exec$ExecutorJobStatus[ExecutorJobStatus.RUNNING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$finos$tracdap$common$exec$ExecutorJobStatus[ExecutorJobStatus.COMPLETE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$finos$tracdap$common$exec$ExecutorJobStatus[ExecutorJobStatus.SUCCEEDED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$finos$tracdap$common$exec$ExecutorJobStatus[ExecutorJobStatus.FAILED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$finos$tracdap$common$exec$ExecutorJobStatus[ExecutorJobStatus.STATUS_UNKNOWN.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    public JobProcessor(TrustedMetadataApiGrpc.TrustedMetadataApiBlockingStub trustedMetadataApiBlockingStub, InternalAuthProvider internalAuthProvider, IBatchExecutor<?> iBatchExecutor, JobProcessorHelpers jobProcessorHelpers) {
        this.metaClient = trustedMetadataApiBlockingStub;
        this.internalAuth = internalAuthProvider;
        this.executor = iBatchExecutor;
        this.lifecycle = jobProcessorHelpers;
    }

    public JobState newJob(JobRequest jobRequest) {
        JobState jobState = new JobState();
        jobState.tenant = jobRequest.getTenant();
        jobState.owner = AuthHelpers.currentUser();
        jobState.jobType = jobRequest.getJob().getJobType();
        jobState.definition = jobRequest.getJob();
        jobState.jobRequest = jobRequest;
        jobState.tracStatus = JobStatusCode.PREPARING;
        return jobState;
    }

    public JobStatus getStatus(JobState jobState) {
        JobStatus.Builder newBuilder = JobStatus.newBuilder();
        if (jobState.jobId != null) {
            newBuilder.setJobId(jobState.jobId);
        }
        newBuilder.setStatusCode(jobState.tracStatus);
        if (jobState.statusMessage != null) {
            newBuilder.setStatusMessage(jobState.statusMessage);
        }
        if ((jobState.tracStatus == JobStatusCode.SUCCEEDED || jobState.tracStatus == JobStatusCode.FAILED) && (jobState.cacheStatus.startsWith("EXECUTOR_") || jobState.cacheStatus.startsWith("RESULTS_"))) {
            newBuilder.setStatusCode(JobStatusCode.FINISHING);
            newBuilder.clearStatusMessage();
        }
        return newBuilder.build();
    }

    public JobState assembleAndValidate(JobState jobState) {
        JobState m5clone = jobState.m5clone();
        m5clone.credentials = this.internalAuth.createDelegateSession(jobState.owner, DELEGATE_SESSION_TIMEOUT);
        JobState buildJobConfig = this.lifecycle.buildJobConfig(this.lifecycle.allocateResultIds(this.lifecycle.loadResources(this.lifecycle.applyTransform(m5clone))));
        buildJobConfig.tracStatus = JobStatusCode.VALIDATED;
        return buildJobConfig;
    }

    public JobState saveInitialMetadata(JobState jobState) {
        JobState m5clone = jobState.m5clone();
        m5clone.credentials = this.internalAuth.createDelegateSession(jobState.owner, DELEGATE_SESSION_TIMEOUT);
        JobState saveInitialMetadata = this.lifecycle.saveInitialMetadata(m5clone);
        saveInitialMetadata.tracStatus = JobStatusCode.QUEUED;
        saveInitialMetadata.cacheStatus = CacheStatus.QUEUED_IN_TRAC;
        return saveInitialMetadata;
    }

    public JobState scheduleLaunch(JobState jobState) {
        JobState m5clone = jobState.m5clone();
        m5clone.tracStatus = JobStatusCode.PREPARING;
        m5clone.cacheStatus = CacheStatus.LAUNCH_SCHEDULED;
        return updateMetadata(m5clone);
    }

    private JobState updateMetadata(JobState jobState) {
        JobState m5clone = jobState.m5clone();
        m5clone.credentials = this.internalAuth.createDelegateSession(jobState.owner, DELEGATE_SESSION_TIMEOUT);
        m5clone.jobId = this.metaClient.withCallCredentials(m5clone.credentials).updateTag(MetadataWriteRequest.newBuilder().setTenant(jobState.tenant).setObjectType(ObjectType.JOB).setPriorVersion(MetadataUtil.selectorFor(jobState.jobId)).addTagUpdates(TagUpdate.newBuilder().setAttrName("trac_job_status").setValue(MetadataCodec.encodeValue(jobState.tracStatus.name()))).build());
        return m5clone;
    }

    public JobState launchJob(JobState jobState) {
        String str = jobState.jobKey;
        IBatchExecutor stronglyTypedExecutor = stronglyTypedExecutor();
        Serializable createVolume = stronglyTypedExecutor.createVolume(str, stronglyTypedExecutor.createVolume(str, stronglyTypedExecutor.createVolume(str, stronglyTypedExecutor.createVolume(str, stronglyTypedExecutor.createBatch(str), "config", ExecutorVolumeType.CONFIG_DIR), "result", ExecutorVolumeType.RESULT_DIR), "log", ExecutorVolumeType.RESULT_DIR), "scratch", ExecutorVolumeType.SCRATCH_DIR);
        byte[] quoteConfig = ConfigParser.quoteConfig(jobState.jobConfig, ConfigFormat.JSON);
        Serializable startBatch = stronglyTypedExecutor.startBatch(str, stronglyTypedExecutor.writeFile(str, stronglyTypedExecutor.writeFile(str, createVolume, "config", "job_config.json", quoteConfig), "config", "sys_config.json", ConfigParser.quoteConfig(jobState.sysConfig, ConfigFormat.JSON)), LaunchCmd.trac(), List.of(LaunchArg.string("--sys-config"), LaunchArg.path("config", "sys_config.json"), LaunchArg.string("--job-config"), LaunchArg.path("config", "job_config.json"), LaunchArg.string("--job-result-dir"), LaunchArg.path("result", "."), LaunchArg.string("--job-result-format"), LaunchArg.string("json"), LaunchArg.string("--scratch-dir"), LaunchArg.path("scratch", ".")));
        this.log.info("Job has been sent to the executor: [{}]", str);
        JobState m5clone = jobState.m5clone();
        m5clone.tracStatus = JobStatusCode.SUBMITTED;
        m5clone.cacheStatus = CacheStatus.SENT_TO_EXECUTOR;
        m5clone.executorStatus = ExecutorJobStatus.STATUS_UNKNOWN;
        m5clone.executorState = startBatch;
        return m5clone;
    }

    public List<ExecutorJobInfo> pollExecutorJobs(List<CacheEntry<JobState>> list) {
        IBatchExecutor stronglyTypedExecutor = stronglyTypedExecutor();
        return stronglyTypedExecutor.pollBatches((List) list.stream().map((v0) -> {
            return v0.value();
        }).filter(jobState -> {
            return jobState.executorState != null;
        }).map(jobState2 -> {
            return Map.entry(jobState2.jobKey, stronglyTypedState(stronglyTypedExecutor, jobState2.executorState));
        }).collect(Collectors.toList()));
    }

    public JobState recordJobStatus(JobState jobState, ExecutorJobInfo executorJobInfo) {
        JobState m5clone = jobState.m5clone();
        m5clone.executorStatus = executorJobInfo.getStatus();
        this.log.info("Job status received from executor: [{}] {}", m5clone.jobKey, m5clone.executorStatus);
        switch (AnonymousClass1.$SwitchMap$org$finos$tracdap$common$exec$ExecutorJobStatus[executorJobInfo.getStatus().ordinal()]) {
            case 1:
                m5clone.tracStatus = JobStatusCode.SUBMITTED;
                m5clone.cacheStatus = CacheStatus.QUEUED_IN_EXECUTOR;
                return updateMetadata(m5clone);
            case JobManager.PROCESSING_RETRY_LIMIT /* 2 */:
                m5clone.tracStatus = JobStatusCode.RUNNING;
                m5clone.cacheStatus = CacheStatus.RUNNING_IN_EXECUTOR;
                return updateMetadata(m5clone);
            case 3:
                m5clone.tracStatus = JobStatusCode.FINISHING;
                m5clone.cacheStatus = CacheStatus.EXECUTOR_COMPLETE;
                return m5clone;
            case 4:
                m5clone.tracStatus = JobStatusCode.FINISHING;
                m5clone.cacheStatus = CacheStatus.EXECUTOR_SUCCEEDED;
                return m5clone;
            case 5:
                m5clone.tracStatus = JobStatusCode.FAILED;
                m5clone.cacheStatus = CacheStatus.EXECUTOR_FAILED;
                m5clone.statusMessage = executorJobInfo.getStatusMessage();
                m5clone.errorDetail = executorJobInfo.getErrorDetail();
                this.log.error("Execution failed for [{}]: {}", m5clone.jobKey, executorJobInfo.getStatusMessage());
                this.log.error("Error detail for [{}]\n{}", m5clone.jobKey, executorJobInfo.getErrorDetail());
                return m5clone;
            case JobManager.DEFAULT_EXECUTOR_JOB_LIMIT /* 6 */:
            default:
                m5clone.tracStatus = JobStatusCode.FAILED;
                m5clone.cacheStatus = CacheStatus.EXECUTOR_FAILED;
                m5clone.statusMessage = "Job status could not be determined";
                return m5clone;
        }
    }

    public JobState fetchJobResult(JobState jobState) {
        IBatchExecutor stronglyTypedExecutor = stronglyTypedExecutor();
        Serializable stronglyTypedState = stronglyTypedState(stronglyTypedExecutor, jobState.executorState);
        if (jobState.executorState == null) {
            this.log.info("Cannot fetch job result: [{}] Executor state is not available", jobState.jobKey);
            JobState m5clone = jobState.m5clone();
            m5clone.tracStatus = JobStatusCode.FAILED;
            m5clone.cacheStatus = CacheStatus.RESULTS_INVALID;
            m5clone.statusMessage = "executor state is not available";
            return m5clone;
        }
        try {
            JobResult parseConfig = ConfigParser.parseConfig(stronglyTypedExecutor.readFile(jobState.jobKey, stronglyTypedState, "result", String.format("job_result_%s.json", jobState.jobKey)), ConfigFormat.JSON, JobResult.class);
            for (Map.Entry entry : parseConfig.getResultsMap().entrySet()) {
                this.log.info("Validating job result: [{}] item [{}]", jobState.jobKey, entry.getKey());
                this.validator.validateFixedObject((ObjectDefinition) entry.getValue());
            }
            JobState m5clone2 = jobState.m5clone();
            m5clone2.jobResult = parseConfig;
            m5clone2.tracStatus = JobStatusCode.SUCCEEDED;
            m5clone2.cacheStatus = CacheStatus.RESULTS_RECEIVED;
            return m5clone2;
        } catch (EConfigParse | EValidation e) {
            String orElse = e.getMessage().lines().findFirst().orElse("No details available");
            JobState m5clone3 = jobState.m5clone();
            m5clone3.tracStatus = JobStatusCode.FAILED;
            m5clone3.cacheStatus = CacheStatus.RESULTS_INVALID;
            m5clone3.statusMessage = orElse;
            this.log.error("Failed to decode job result: [{}] {}", new Object[]{jobState.jobKey, orElse, e});
            return m5clone3;
        }
    }

    public JobState saveResultMetadata(JobState jobState) {
        JobState m5clone = jobState.m5clone();
        m5clone.credentials = this.internalAuth.createDelegateSession(jobState.owner, DELEGATE_SESSION_TIMEOUT);
        this.lifecycle.processJobResult(m5clone);
        m5clone.cacheStatus = CacheStatus.RESULTS_SAVED;
        return m5clone;
    }

    public JobState cleanUpJob(JobState jobState) {
        if (jobState.executorState == null) {
            JobState m5clone = jobState.m5clone();
            m5clone.cacheStatus = CacheStatus.READY_TO_REMOVE;
            m5clone.executorStatus = ExecutorJobStatus.STATUS_UNKNOWN;
            this.log.warn("Job could not be cleaned up: [{}] Executor state is not available", jobState.jobKey);
            this.log.warn("There may be an orphaned task in the executor");
            return m5clone;
        }
        IBatchExecutor stronglyTypedExecutor = stronglyTypedExecutor();
        stronglyTypedExecutor.destroyBatch(jobState.jobKey, stronglyTypedState(stronglyTypedExecutor, jobState.executorState));
        JobState m5clone2 = jobState.m5clone();
        m5clone2.cacheStatus = CacheStatus.READY_TO_REMOVE;
        m5clone2.executorStatus = ExecutorJobStatus.STATUS_UNKNOWN;
        m5clone2.executorState = null;
        return m5clone2;
    }

    public JobState scheduleRemoval(JobState jobState) {
        JobState m5clone = jobState.m5clone();
        m5clone.cacheStatus = CacheStatus.REMOVAL_SCHEDULED;
        return m5clone;
    }

    public JobState handleProcessingFailed(JobState jobState, String str, Exception exc) {
        JobState m5clone = jobState.m5clone();
        m5clone.tracStatus = JobStatusCode.FAILED;
        m5clone.statusMessage = str;
        m5clone.exception = exc;
        m5clone.credentials = this.internalAuth.createDelegateSession(jobState.owner, DELEGATE_SESSION_TIMEOUT);
        this.lifecycle.processJobResult(m5clone);
        m5clone.cacheStatus = CacheStatus.READY_TO_REMOVE;
        return m5clone;
    }

    private <TState extends Serializable> IBatchExecutor<TState> stronglyTypedExecutor() {
        return (IBatchExecutor<TState>) this.executor;
    }

    private <TState extends Serializable> TState stronglyTypedState(IBatchExecutor<TState> iBatchExecutor, Object obj) {
        Class stateClass = iBatchExecutor.stateClass();
        if (obj == null) {
            throw new ETracInternal("Invalid job state: null");
        }
        if (stateClass.isInstance(obj)) {
            return (TState) obj;
        }
        throw new ETracInternal(String.format("Invalid job state: Expected [%s], got [%s]", stateClass.getName(), obj.getClass().getName()));
    }
}
