package com.netflix.genie.server.services.impl;

import com.netflix.client.ClientFactory;
import com.netflix.config.ConfigurationManager;
import com.netflix.genie.common.exceptions.CloudServiceException;
import com.netflix.genie.common.messages.BaseRequest;
import com.netflix.genie.common.messages.BaseResponse;
import com.netflix.genie.common.messages.JobInfoRequest;
import com.netflix.genie.common.messages.JobInfoResponse;
import com.netflix.genie.common.messages.JobStatusResponse;
import com.netflix.genie.common.model.JobInfoElement;
import com.netflix.genie.common.model.Types;
import com.netflix.genie.server.jobmanager.JobManagerFactory;
import com.netflix.genie.server.metrics.GenieNodeStatistics;
import com.netflix.genie.server.metrics.JobCountManager;
import com.netflix.genie.server.persistence.ClauseBuilder;
import com.netflix.genie.server.persistence.PersistenceManager;
import com.netflix.genie.server.persistence.QueryBuilder;
import com.netflix.genie.server.services.ExecutionService;
import com.netflix.genie.server.util.NetUtil;
import com.netflix.niws.client.http.HttpClientRequest;
import com.netflix.niws.client.http.HttpClientResponse;
import com.netflix.niws.client.http.RestClient;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import java.net.URI;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.persistence.EntityExistsException;
import javax.persistence.RollbackException;
import org.apache.commons.configuration.AbstractConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netflix/genie/server/services/impl/GenieExecutionServiceImpl.class */
public class GenieExecutionServiceImpl implements ExecutionService {
    private static Logger logger = LoggerFactory.getLogger(GenieExecutionServiceImpl.class);
    private static AbstractConfiguration conf = ConfigurationManager.getConfigInstance();
    private static int serverPort;
    private static String jobDirPrefix;
    private static String jobResourcePrefix;
    private PersistenceManager<JobInfoElement> pm = new PersistenceManager<>();
    private GenieNodeStatistics stats = GenieNodeStatistics.getInstance();

    @Override // com.netflix.genie.server.services.ExecutionService
    public JobInfoResponse submitJob(JobInfoRequest jobInfoRequest) {
        logger.info("called");
        JobInfoElement jobInfo = jobInfoRequest.getJobInfo();
        try {
            validateJobParams(jobInfo);
            int i = conf.getInt("netflix.genie.server.max.running.jobs", 0);
            int i2 = conf.getInt("netflix.genie.server.forward.jobs.threshold", 0);
            int i3 = conf.getInt("netflix.genie.server.max.idle.host.threshold", 0);
            int i4 = conf.getInt("netflix.genie.server.idle.host.threshold.delta", 0);
            synchronized (this) {
                try {
                    int numInstanceJobs = JobCountManager.getNumInstanceJobs();
                    logger.info("Number of running jobs: " + numInstanceJobs);
                    int i5 = numInstanceJobs - i4;
                    if (i5 > i3 || numInstanceJobs >= i) {
                        i5 = i3;
                    }
                    if (numInstanceJobs >= i2 && !jobInfo.isForwarded()) {
                        logger.info("Number of running jobs greater than forwarding threshold - trying to auto-forward");
                        String idleInstance = JobCountManager.getIdleInstance(i5);
                        if (!idleInstance.equals(NetUtil.getHostName())) {
                            jobInfo.setForwarded(true);
                            this.stats.incrGenieForwardedJobs();
                            return forwardJobRequest("http://" + idleInstance + ":" + serverPort + "/" + jobResourcePrefix, jobInfoRequest);
                        }
                    }
                    if (numInstanceJobs >= i) {
                        return new JobInfoResponse(new CloudServiceException(503, "Number of running jobs greater than system limit (" + i + ") - try another instance or try again later"));
                    }
                    buildJobURIs(jobInfo);
                    try {
                        this.pm.createEntity(jobInfo);
                        this.stats.incrGenieJobSubmissions();
                        try {
                            JobManagerFactory.getJobManager(jobInfo.getJobType()).launch(jobInfo);
                            jobInfo.setUpdateTime(Long.valueOf(System.currentTimeMillis()));
                            this.pm.updateEntity(jobInfo);
                            jobInfo = this.pm.getEntity(jobInfo.getJobID(), JobInfoElement.class);
                            JobInfoResponse jobInfoResponse = new JobInfoResponse();
                            jobInfoResponse.setMessage("Successfully launched job: " + jobInfo.getJobID());
                            jobInfoResponse.setJob(jobInfo);
                            return jobInfoResponse;
                        } catch (Exception e) {
                            logger.error("Failed to submit job: ", e);
                            jobInfo.setJobStatus(Types.JobStatus.FAILED, e.getMessage());
                            jobInfo.setUpdateTime(Long.valueOf(System.currentTimeMillis()));
                            this.pm.updateEntity(jobInfo);
                            this.stats.incrGenieFailedJobs();
                            return e instanceof CloudServiceException ? new JobInfoResponse((CloudServiceException) e) : new JobInfoResponse(new CloudServiceException(500, e.getMessage()));
                        }
                    } catch (RollbackException e2) {
                        logger.error("Can't create entity in the database", e2);
                        if (!(e2.getCause() instanceof EntityExistsException)) {
                            return new JobInfoResponse(new CloudServiceException(500, "Received exception: " + e2.getCause()));
                        }
                        logger.error(e2.getCause().getMessage());
                        return new JobInfoResponse(new CloudServiceException(409, "Job already exists for id: " + jobInfo.getJobID()));
                    }
                } catch (CloudServiceException e3) {
                    JobInfoResponse jobInfoResponse2 = new JobInfoResponse(e3);
                    logger.error(jobInfoResponse2.getErrorMsg(), e3);
                    return jobInfoResponse2;
                }
            }
        } catch (CloudServiceException e4) {
            return new JobInfoResponse(e4);
        }
    }

    @Override // com.netflix.genie.server.services.ExecutionService
    public JobInfoResponse getJobInfo(String str) {
        logger.info("called for jobId: " + str);
        try {
            JobInfoElement entity = this.pm.getEntity(str, JobInfoElement.class);
            if (entity == null) {
                String str2 = "Job not found: " + str;
                logger.error(str2);
                return new JobInfoResponse(new CloudServiceException(404, str2));
            }
            JobInfoResponse jobInfoResponse = new JobInfoResponse();
            jobInfoResponse.setJob(entity);
            jobInfoResponse.setMessage("Returning job information for: " + entity.getJobID());
            return jobInfoResponse;
        } catch (Exception e) {
            logger.error("Failed to get job info: ", e);
            return new JobInfoResponse(new CloudServiceException(500, e.getMessage()));
        }
    }

    @Override // com.netflix.genie.server.services.ExecutionService
    public JobInfoResponse getJobs(String str, String str2, String str3, String str4, String str5, Integer num, Integer num2) {
        logger.info("called");
        String simpleName = JobInfoElement.class.getSimpleName();
        try {
            ClauseBuilder clauseBuilder = new ClauseBuilder(ClauseBuilder.AND);
            if (str != null && !str.isEmpty()) {
                clauseBuilder.append("jobID like '" + str + "'");
            }
            if (str2 != null && !str2.isEmpty()) {
                clauseBuilder.append("jobName like '" + str2 + "'");
            }
            if (str3 != null && !str3.isEmpty()) {
                clauseBuilder.append("userName='" + str3 + "'");
            }
            if (str4 != null && !str4.isEmpty()) {
                if (Types.JobType.parse(str4) == null) {
                    throw new CloudServiceException(400, "Job type: " + str4 + " can only be HADOOP, HIVE or PIG");
                }
                clauseBuilder.append("jobType='" + str4.toUpperCase() + "'");
            }
            if (str5 != null && !str5.isEmpty()) {
                if (Types.JobStatus.parse(str5) == null) {
                    throw new CloudServiceException(400, "Unknown job status: " + str5);
                }
                clauseBuilder.append("status='" + str5.toUpperCase() + "'");
            }
            try {
                JobInfoElement[] query = this.pm.query(new QueryBuilder().table(simpleName).clause(clauseBuilder.toString()).limit(num).page(num2));
                if (query.length == 0) {
                    return new JobInfoResponse(new CloudServiceException(404, "No jobs found for specified criteria"));
                }
                JobInfoElement[] jobInfoElementArr = new JobInfoElement[query.length];
                for (int i = 0; i < query.length; i++) {
                    jobInfoElementArr[i] = query[i];
                }
                JobInfoResponse jobInfoResponse = new JobInfoResponse();
                jobInfoResponse.setJobs(jobInfoElementArr);
                jobInfoResponse.setMessage("Returning job information for specified criteria");
                return jobInfoResponse;
            } catch (Exception e) {
                logger.error("Failed to get job results from database: ", e);
                return new JobInfoResponse(new CloudServiceException(500, e.getMessage()));
            }
        } catch (CloudServiceException e2) {
            logger.error(e2.getMessage(), e2);
            return new JobInfoResponse(e2);
        }
    }

    @Override // com.netflix.genie.server.services.ExecutionService
    public JobStatusResponse getJobStatus(String str) {
        logger.info("called for jobId: " + str);
        try {
            JobInfoElement entity = this.pm.getEntity(str, JobInfoElement.class);
            if (entity == null) {
                String str2 = "Job not found: " + str;
                logger.error(str2);
                return new JobStatusResponse(new CloudServiceException(404, str2));
            }
            JobStatusResponse jobStatusResponse = new JobStatusResponse();
            jobStatusResponse.setMessage("Returning status for job: " + str);
            jobStatusResponse.setStatus(entity.getStatus());
            return jobStatusResponse;
        } catch (Exception e) {
            logger.error("Failed to get job results from database: ", e);
            return new JobStatusResponse(new CloudServiceException(500, e.getMessage()));
        }
    }

    @Override // com.netflix.genie.server.services.ExecutionService
    public JobStatusResponse killJob(String str) {
        logger.info("called for jobId: " + str);
        try {
            JobInfoElement entity = this.pm.getEntity(str, JobInfoElement.class);
            if (entity == null) {
                String str2 = "Job not found: " + str;
                logger.error(str2);
                return new JobStatusResponse(new CloudServiceException(404, str2));
            }
            String killURI = entity.getKillURI();
            if (killURI == null) {
                String str3 = "Failed to get killURI for jobID: " + str;
                logger.error(str3);
                return new JobStatusResponse(new CloudServiceException(500, str3));
            }
            try {
                if (!killURI.equals(getEndPoint() + "/" + jobResourcePrefix + "/" + str)) {
                    logger.debug("forwarding kill request to: " + killURI);
                    return forwardJobKill(killURI);
                }
                logger.debug("killing job on same instance: " + str);
                if (entity.getProcessHandle() == -1) {
                    JobStatusResponse jobStatusResponse = new JobStatusResponse();
                    jobStatusResponse.setStatus(entity.getStatus());
                    jobStatusResponse.setMessage("Job " + str + " has already failed");
                    return jobStatusResponse;
                }
                if (entity.getStatus().equalsIgnoreCase("SUCCEEDED") || entity.getStatus().equalsIgnoreCase("KILLED") || entity.getStatus().equalsIgnoreCase("FAILED")) {
                    JobStatusResponse jobStatusResponse2 = new JobStatusResponse();
                    jobStatusResponse2.setStatus(entity.getStatus());
                    jobStatusResponse2.setMessage("Job " + str + " is already done");
                    return jobStatusResponse2;
                }
                try {
                    JobManagerFactory.getJobManager(entity.getJobType()).kill(entity);
                    entity.setJobStatus(Types.JobStatus.KILLED, "Job killed on user request");
                    entity.setExitCode(Integer.valueOf(Types.SubprocessStatus.JOB_KILLED.code()));
                    this.stats.incrGenieKilledJobs();
                    ReentrantReadWriteLock dbLock = PersistenceManager.getDbLock();
                    try {
                        logger.debug("updating job status to KILLED for: " + str);
                        dbLock.writeLock().lock();
                        entity.setUpdateTime(Long.valueOf(System.currentTimeMillis()));
                        entity.setArchiveLocation(NetUtil.getArchiveURI(str));
                        this.pm.updateEntity(entity);
                        dbLock.writeLock().unlock();
                        JobStatusResponse jobStatusResponse3 = new JobStatusResponse();
                        jobStatusResponse3.setStatus(entity.getStatus());
                        jobStatusResponse3.setMessage("Successfully killed job: " + str);
                        return jobStatusResponse3;
                    } catch (Exception e) {
                        logger.error("Failed to update job status in database: ", e);
                        JobStatusResponse jobStatusResponse4 = new JobStatusResponse(new CloudServiceException(500, e.getMessage()));
                        if (dbLock.writeLock().isHeldByCurrentThread()) {
                            dbLock.writeLock().unlock();
                        }
                        return jobStatusResponse4;
                    }
                } catch (Exception e2) {
                    logger.error("Failed to kill job: ", e2);
                    return new JobStatusResponse(new CloudServiceException(500, "Failed to kill job: " + e2.getCause()));
                }
            } catch (CloudServiceException e3) {
                logger.error("Error while retrieving local hostname: " + e3.getMessage(), e3);
                return new JobStatusResponse(e3);
            }
        } catch (Exception e4) {
            logger.error("Failed to get job results from database: ", e4);
            return new JobStatusResponse(new CloudServiceException(500, e4.getMessage()));
        }
    }

    private void validateJobParams(JobInfoElement jobInfoElement) throws CloudServiceException {
        logger.debug("called");
        validateNameValuePair("userName", jobInfoElement.getUserName());
        validateNameValuePair("cmdArgs", jobInfoElement.getCmdArgs());
        validateNameValuePair("jobType", jobInfoElement.getJobType());
        if (Types.JobType.parse(jobInfoElement.getJobType()) == Types.JobType.HIVE) {
            validateNameValuePair("hiveArgs", jobInfoElement.getCmdArgs());
        }
        validateNameValuePair("schedule", jobInfoElement.getSchedule());
        if (Types.JobType.parse(jobInfoElement.getJobType()) != Types.JobType.HADOOP) {
            validateNameValuePair("configuration", jobInfoElement.getConfiguration());
        }
        if (jobInfoElement.getJobID() == null || jobInfoElement.getJobID().isEmpty()) {
            jobInfoElement.setJobID(UUID.randomUUID().toString());
        }
        jobInfoElement.setJobStatus(Types.JobStatus.INIT, "Initializing job");
    }

    private void validateNameValuePair(String str, String str2) throws CloudServiceException {
        logger.debug("called");
        if (str2 == null || str2.isEmpty()) {
            String str3 = "Invalid " + str + " parameter, can't be null or empty";
            logger.error(str3);
            throw new CloudServiceException(400, str3);
        }
        if (str.equals("jobType") && Types.JobType.parse(str2) == null) {
            String str4 = "Invalid " + str + ", Valid types are hadoop or hive or pig. Wrong value received: " + str2;
            logger.error(str4);
            throw new CloudServiceException(400, str4);
        }
        if (str.equals("hiveArgs")) {
            if (str2.contains("-f") || str2.contains("-e")) {
                return;
            }
            logger.error("Hive arguments must include either the -e or -f flag");
            throw new CloudServiceException(400, "Hive arguments must include either the -e or -f flag");
        }
        if (str.equals("schedule") && Types.Schedule.parse(str2) == null) {
            String str5 = "Invalid " + str + " type, Valid values are adhoc, sla or bonus. Wrong value received: " + str2;
            logger.error(str5);
            throw new CloudServiceException(400, str5);
        }
        if (str.equals("configuration") && Types.Configuration.parse(str2) == null) {
            String str6 = "Invalid " + str + " type, Valid values are prod or test or unittest. Wrong value received: " + str2;
            logger.error(str6);
            throw new CloudServiceException(400, str6);
        }
    }

    private void buildJobURIs(JobInfoElement jobInfoElement) throws CloudServiceException {
        jobInfoElement.setHostName(NetUtil.getHostName());
        jobInfoElement.setOutputURI(getEndPoint() + "/" + jobDirPrefix + "/" + jobInfoElement.getJobID());
        jobInfoElement.setKillURI(getEndPoint() + "/" + jobResourcePrefix + "/" + jobInfoElement.getJobID());
    }

    private String getEndPoint() throws CloudServiceException {
        return "http://" + NetUtil.getHostName() + ":" + serverPort;
    }

    private JobStatusResponse forwardJobKill(String str) {
        try {
            return (JobStatusResponse) executeRequest(HttpClientRequest.Verb.DELETE, str, null, JobStatusResponse.class);
        } catch (CloudServiceException e) {
            return new JobStatusResponse(e);
        }
    }

    private JobInfoResponse forwardJobRequest(String str, JobInfoRequest jobInfoRequest) {
        try {
            return (JobInfoResponse) executeRequest(HttpClientRequest.Verb.POST, str, jobInfoRequest, JobInfoResponse.class);
        } catch (CloudServiceException e) {
            return new JobInfoResponse(e);
        }
    }

    private <T extends BaseResponse> T executeRequest(HttpClientRequest.Verb verb, String str, BaseRequest baseRequest, Class<T> cls) throws CloudServiceException {
        HttpClientResponse httpClientResponse = null;
        try {
            try {
                try {
                    MultivaluedMapImpl multivaluedMapImpl = new MultivaluedMapImpl();
                    multivaluedMapImpl.add("Accept", "application/json");
                    HttpClientResponse execute = ((RestClient) ClientFactory.getNamedClient("genie")).execute(HttpClientRequest.newBuilder().setVerb(verb).setHeaders(multivaluedMapImpl).setUri(new URI(str)).setEntity(baseRequest).build());
                    if (execute == null) {
                        logger.error("Received null response while auto-forwarding request to Genie instance");
                        throw new CloudServiceException(500, "Received null response while auto-forwarding request to Genie instance");
                    }
                    logger.info("Response Status:" + execute.getStatus());
                    T t = (T) execute.getEntity(cls);
                    if (execute != null) {
                        execute.releaseResources();
                    }
                    return t;
                } catch (Exception e) {
                    String str2 = "Error while trying to auto-forward request: " + e.getMessage();
                    logger.error(str2, e);
                    throw new CloudServiceException(500, str2);
                }
            } catch (CloudServiceException e2) {
                throw e2;
            }
        } catch (Throwable th) {
            if (0 != 0) {
                httpClientResponse.releaseResources();
            }
            throw th;
        }
    }

    static {
        serverPort = 7001;
        jobDirPrefix = "genie-jobs";
        jobResourcePrefix = "genie/v0/jobs";
        serverPort = conf.getInt("netflix.appinfo.port", serverPort);
        jobDirPrefix = conf.getString("netflix.genie.server.job.dir.prefix", jobDirPrefix);
        jobResourcePrefix = conf.getString("netflix.genie.server.job.resource.prefix", jobResourcePrefix);
    }
}
