package org.usergrid.batch.service;

import com.hazelcast.impl.TransactionImpl;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.Assert;
import org.usergrid.batch.JobExecution;
import org.usergrid.batch.JobRuntime;
import org.usergrid.batch.JobRuntimeException;
import org.usergrid.batch.repository.JobAccessor;
import org.usergrid.batch.repository.JobDescriptor;
import org.usergrid.mq.Message;
import org.usergrid.mq.QueueManager;
import org.usergrid.mq.QueueManagerFactory;
import org.usergrid.mq.QueueQuery;
import org.usergrid.mq.QueueResults;
import org.usergrid.persistence.EntityManager;
import org.usergrid.persistence.EntityManagerFactory;
import org.usergrid.persistence.Query;
import org.usergrid.persistence.Results;
import org.usergrid.persistence.SimpleEntityRef;
import org.usergrid.persistence.cassandra.CassandraService;
import org.usergrid.persistence.entities.JobData;
import org.usergrid.persistence.entities.JobStat;
import org.usergrid.persistence.exceptions.TransactionNotFoundException;

/* loaded from: input_file:usergrid-scheduler-0.0.27.1.jar:org/usergrid/batch/service/SchedulerServiceImpl.class */
public class SchedulerServiceImpl implements SchedulerService, JobAccessor, JobRuntimeService {
    private static final String STATS_ID = "statsId";
    private static final String JOB_ID = "jobId";
    private static final String JOB_NAME = "jobName";
    private static final Logger logger = LoggerFactory.getLogger(SchedulerServiceImpl.class);
    private static final String DEFAULT_QUEUE_NAME = "/jobs";
    private QueueManagerFactory qmf;
    private EntityManagerFactory emf;
    private QueueManager qm;
    private EntityManager em;
    private String jobQueueName = DEFAULT_QUEUE_NAME;
    private long jobTimeout = TransactionImpl.DEFAULT_TXN_TIMEOUT;

    @Override // org.usergrid.batch.service.SchedulerService
    public JobData createJob(String str, long j, JobData jobData) {
        Assert.notNull(str, "jobName is required");
        Assert.notNull(jobData, "jobData is required");
        try {
            jobData.setJobName(str);
            JobData jobData2 = (JobData) this.em.create(jobData);
            scheduleJob(str, j, jobData2.getUuid(), ((JobStat) this.em.create(new JobStat(str, jobData2.getUuid()))).getUuid());
            return jobData2;
        } catch (Exception e) {
            throw new JobRuntimeException(e);
        }
    }

    private void scheduleJob(String str, long j, UUID uuid, UUID uuid2) {
        Assert.notNull(str, "jobName is required");
        Assert.isTrue(j > -1, "fireTime must be positive");
        Assert.notNull(uuid, "jobDataId is required");
        Assert.notNull(uuid2, "jobStatId is required");
        Message message = new Message();
        message.setTimestamp(j);
        message.setStringProperty(JOB_NAME, str);
        message.setProperty(JOB_ID, uuid);
        message.setProperty(STATS_ID, uuid2);
        this.qm.postToQueue(this.jobQueueName, message);
    }

    @Override // org.usergrid.batch.service.SchedulerService
    public void deleteJob(UUID uuid) {
        try {
            this.em.delete(new SimpleEntityRef("jobData", uuid));
        } catch (Exception e) {
            throw new JobRuntimeException(e);
        }
    }

    @Override // org.usergrid.batch.repository.JobAccessor
    public List<JobDescriptor> getJobs(int i) {
        QueueQuery queueQuery = new QueueQuery();
        queueQuery.setTimeout(this.jobTimeout);
        queueQuery.setLimit(i);
        QueueResults fromQueue = this.qm.getFromQueue(this.jobQueueName, queueQuery);
        ArrayList arrayList = new ArrayList(fromQueue.size());
        for (Message message : fromQueue.getMessages()) {
            UUID fromString = UUID.fromString(message.getStringProperty(JOB_ID));
            UUID fromString2 = UUID.fromString(message.getStringProperty(STATS_ID));
            String stringProperty = message.getStringProperty(JOB_NAME);
            try {
                JobData jobData = (JobData) this.em.get(fromString, JobData.class);
                JobStat jobStat = (JobStat) this.em.get(fromString2, JobStat.class);
                if (jobData == null || jobStat == null) {
                    logger.info("Received job with data id '{}' from the queue, but no data was found.  Dropping job", fromString);
                    this.qm.deleteTransaction(this.jobQueueName, message.getTransaction(), null);
                    if (jobData != null) {
                        this.em.delete(jobData);
                    }
                    if (jobStat != null) {
                        this.em.delete(jobStat);
                    }
                } else {
                    arrayList.add(new JobDescriptor(stringProperty, message.getUuid(), message.getTransaction(), jobData, jobStat, this));
                }
            } catch (Exception e) {
                logger.error("Unable to retrieve job data for jobname {}, job id {}, stats id {}.  Skipping to avoid job loss", new Object[]{stringProperty, fromString, fromString2, e});
            }
        }
        return arrayList;
    }

    @Override // org.usergrid.batch.service.JobRuntimeService
    public void heartbeat(JobRuntime jobRuntime, long j) {
        try {
            jobRuntime.setTransactionId(this.qm.renewTransaction(this.jobQueueName, jobRuntime.getTransactionId(), new QueueQuery().withTimeout(j)));
        } catch (TransactionNotFoundException e) {
            logger.error("Could not renew transaction", (Throwable) e);
            throw new JobRuntimeException("Could not renew transaction during heartbeat", e);
        }
    }

    @Override // org.usergrid.batch.service.JobRuntimeService
    public void heartbeat(JobRuntime jobRuntime) {
        heartbeat(jobRuntime, this.jobTimeout);
    }

    @Override // org.usergrid.batch.service.JobRuntimeService
    public void delay(JobRuntime jobRuntime) {
        delayRetry(jobRuntime.getExecution(), jobRuntime.getDelay());
    }

    @Override // org.usergrid.batch.repository.JobAccessor
    public void save(JobExecution jobExecution) {
        JobData jobData = jobExecution.getJobData();
        JobStat jobStats = jobExecution.getJobStats();
        JobExecution.Status status = jobExecution.getStatus();
        try {
            if (status == JobExecution.Status.COMPLETED) {
                logger.info("Job {} is complete", jobData.getJobName());
                this.qm.deleteTransaction(this.jobQueueName, jobExecution.getTransactionId(), null);
                this.em.delete(jobData);
            } else if (status == JobExecution.Status.DEAD) {
                logger.warn("Job {} is dead.  Removing", jobData.getJobName());
                this.qm.deleteTransaction(this.jobQueueName, jobExecution.getTransactionId(), null);
                this.em.update(jobData);
            } else {
                this.em.update(jobData);
            }
            logger.info("Updating stats for job {}", jobData.getJobName());
            this.em.update(jobStats);
        } catch (Exception e) {
            throw new JobRuntimeException(String.format("Unable to delete job data with id %s", jobData.getUuid()), e);
        }
    }

    @Override // org.usergrid.batch.service.SchedulerService
    public Results queryJobData(Query query) throws Exception {
        if (query == null) {
            query = new Query();
        }
        return this.em.searchCollection(this.em.getApplicationRef(), "job_data", query);
    }

    @Override // org.usergrid.batch.repository.JobAccessor
    public void delayRetry(JobExecution jobExecution, long j) {
        JobData jobData = jobExecution.getJobData();
        JobStat jobStats = jobExecution.getJobStats();
        try {
            if (jobExecution.getStatus() == JobExecution.Status.DEAD) {
                this.qm.deleteTransaction(this.jobQueueName, jobExecution.getTransactionId(), null);
                this.em.update(jobData);
                this.em.update(jobStats);
            } else {
                scheduleJob(jobExecution.getJobName(), System.currentTimeMillis() + j, jobData.getUuid(), jobStats.getUuid());
                this.qm.deleteTransaction(this.jobQueueName, jobExecution.getTransactionId(), null);
                this.em.update(jobData);
                this.em.update(jobStats);
            }
        } catch (Exception e) {
            throw new JobRuntimeException(String.format("Unable to delete job data with id %s", jobData.getUuid()), e);
        }
    }

    @Override // org.usergrid.batch.service.SchedulerService
    public JobStat getStatsForJob(String str, UUID uuid) throws Exception {
        EntityManager entityManager = this.emf.getEntityManager(CassandraService.MANAGEMENT_APPLICATION_ID);
        Query query = new Query();
        query.addEqualityFilter(JOB_NAME, str);
        query.addEqualityFilter(JOB_ID, uuid);
        Results searchCollection = entityManager.searchCollection(entityManager.getApplicationRef(), "job_stats", query);
        if (searchCollection.size() == 1) {
            return (JobStat) searchCollection.getEntity();
        }
        return null;
    }

    @PostConstruct
    public void init() {
        this.qm = this.qmf.getQueueManager(CassandraService.MANAGEMENT_APPLICATION_ID);
        this.em = this.emf.getEntityManager(CassandraService.MANAGEMENT_APPLICATION_ID);
    }

    @Autowired
    public void setQmf(QueueManagerFactory queueManagerFactory) {
        this.qmf = queueManagerFactory;
    }

    @Autowired
    public void setEmf(EntityManagerFactory entityManagerFactory) {
        this.emf = entityManagerFactory;
    }

    public void setJobQueueName(String str) {
        this.jobQueueName = str;
    }

    public void setJobTimeout(long j) {
        this.jobTimeout = j;
    }
}
