package io.polaris.framework.toolkit.elasticjob.context;

import com.alibaba.fastjson2.JSON;
import com.dangdang.ddframe.job.api.ElasticJob;
import com.dangdang.ddframe.job.api.JobType;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.config.script.ScriptJobConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.executor.handler.JobProperties;
import com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import com.dangdang.ddframe.job.lite.api.strategy.JobShardingStrategy;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.internal.schedule.JobRegistry;
import com.dangdang.ddframe.job.lite.internal.schedule.JobScheduleController;
import com.dangdang.ddframe.job.lite.lifecycle.domain.JobSettings;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.google.common.base.Optional;
import io.polaris.core.annotation.Internal;
import io.polaris.core.err.Exceptions;
import io.polaris.core.lang.Objs;
import io.polaris.core.os.OS;
import io.polaris.core.reflect.Reflects;
import io.polaris.core.string.Strings;
import io.polaris.core.time.Dates;
import io.polaris.framework.core.context.AppCtx;
import io.polaris.framework.core.gray.GrayCapable;
import io.polaris.framework.toolkit.elasticjob.base.QuartzJobDelegate;
import io.polaris.framework.toolkit.elasticjob.entity.JobCfgEntity;
import io.polaris.framework.toolkit.elasticjob.entity.JobEnvEntity;
import io.polaris.framework.toolkit.elasticjob.entity.JobRuntimeEntity;
import io.polaris.framework.toolkit.elasticjob.err.JobException;
import io.polaris.framework.toolkit.elasticjob.ext.JobEventRdbConfiguration;
import io.polaris.framework.toolkit.elasticjob.ext.RotateServerNodeJobShardingStrategy;
import io.polaris.framework.toolkit.elasticjob.handler.DynamicExecutorServiceHandler;
import io.polaris.framework.toolkit.elasticjob.properties.DataflowJobProperties;
import io.polaris.framework.toolkit.elasticjob.properties.ElasticJobProperties;
import io.polaris.framework.toolkit.elasticjob.properties.JobCoreProperties;
import io.polaris.framework.toolkit.elasticjob.properties.JobsProperties;
import io.polaris.framework.toolkit.elasticjob.properties.ScriptJobProperties;
import io.polaris.framework.toolkit.elasticjob.properties.SimpleJobProperties;
import io.polaris.framework.toolkit.elasticjob.repository.JobRepository;
import io.polaris.framework.toolkit.elasticjob.repository.JobRuntimeRepository;
import io.polaris.framework.toolkit.elasticjob.repository.NoopJobRepository;
import io.polaris.framework.toolkit.elasticjob.repository.NoopJobRuntimeRepository;
import io.polaris.framework.toolkit.elasticjob.repository.RdbJobRepository;
import io.polaris.framework.toolkit.elasticjob.repository.RdbJobRuntimeRepository;
import java.text.ParseException;
import java.time.Instant;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.sql.DataSource;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.zookeeper.CreateMode;
import org.quartz.CronExpression;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.TriggerBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ConfigurableApplicationContext;

/* loaded from: input_file:io/polaris/framework/toolkit/elasticjob/context/JobCtx.class */
public class JobCtx {
    private static final Logger log = LoggerFactory.getLogger(JobCtx.class);
    public static final Class<? extends JobShardingStrategy> DEFAULT_JOB_SHARDING_STRATEGY = RotateServerNodeJobShardingStrategy.class;
    public static final String DEFAULT_INSTANCE_REG_PATH = "@instances";
    private final ElasticJobProperties properties;
    private final JobEventConfiguration jobEventConfig;
    private final ConfigurableApplicationContext context;
    private final boolean enabledRdbEvent;
    private final DataSource dataSource;
    private final JobRepository jobRepository;
    private final JobRuntimeRepository jobRuntimeRepository;
    private final String profile;
    private final String sysId;
    private final String vmProcessUid;
    private final long vmStartTime;
    private JobEnvEntity jobEnv;
    private DefaultSchedulerHolder defaultInstance;
    private ElasticSchedulerHolder elasticInstance;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private Map<String, JobRuntimeEntity> jobs = Collections.synchronizedMap(new LinkedHashMap());

    public JobCtx(ConfigurableApplicationContext configurableApplicationContext, DataSource dataSource, JobRepository jobRepository, ElasticJobProperties elasticJobProperties) {
        this.context = configurableApplicationContext;
        this.dataSource = dataSource;
        this.properties = elasticJobProperties;
        this.enabledRdbEvent = elasticJobProperties.isEnabledRdbEvent();
        this.jobRepository = jobRepository != null ? jobRepository : this.enabledRdbEvent ? new RdbJobRepository(dataSource) : new NoopJobRepository();
        this.jobRuntimeRepository = this.enabledRdbEvent ? new RdbJobRuntimeRepository(dataSource) : new NoopJobRuntimeRepository();
        this.jobEventConfig = new JobEventRdbConfiguration(this);
        this.profile = AppCtx.getActiveProfile();
        this.sysId = AppCtx.getPlatformProperties().getAppId();
        List<String> instanceHostPatterns = elasticJobProperties.getZookeeper().getInstanceHostPatterns();
        if (instanceHostPatterns != null) {
            this.vmProcessUid = OS.getPriorOrFirstIp((String[]) instanceHostPatterns.toArray(new String[0])) + "#" + OS.getPid();
        } else {
            this.vmProcessUid = OS.getFirstIp() + "#" + OS.getPid();
        }
        this.vmStartTime = OS.getVmStartTime();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.warn("运行JVM虚拟机停止钩子: 停止Job上下文");
            if (this.started.get()) {
                shutdown();
            }
        }));
    }

    @Internal
    public synchronized void bindZookeeperRegistryCenter(ZookeeperRegistryCenter zookeeperRegistryCenter) {
        if (zookeeperRegistryCenter == null) {
            return;
        }
        if (this.elasticInstance != null) {
            throw new JobException("已存在 ZookeeperRegistryCenter 实例");
        }
        ZookeeperConfiguration zookeeperConfiguration = (ZookeeperConfiguration) Reflects.invokeQuietly(zookeeperRegistryCenter, Reflects.getMethod(ZookeeperRegistryCenter.class, "getZkConfig", new Class[0]), new Object[0]);
        this.jobEnv = new JobEnvEntity();
        this.jobEnv.setProfile(this.profile);
        this.jobEnv.setSysId(this.sysId);
        this.jobEnv.setZkJobNamespace(zookeeperConfiguration.getNamespace());
        this.jobEnv.setZkAddress(zookeeperConfiguration.getServerLists());
        this.jobEnv.setZkRegPath(this.properties.getZookeeper().getInstanceRegPath());
        if (Strings.isBlank(this.jobEnv.getZkRegPath())) {
            this.jobEnv.setZkRegPath(DEFAULT_INSTANCE_REG_PATH);
        } else {
            this.jobEnv.setZkRegPath(this.jobEnv.getZkRegPath().replaceFirst("^/+", "").replaceFirst("/+$", ""));
        }
        this.jobEnv.setBaseSleepTime(Integer.valueOf(zookeeperConfiguration.getBaseSleepTimeMilliseconds()));
        this.jobEnv.setMaxSleepTime(Integer.valueOf(zookeeperConfiguration.getMaxSleepTimeMilliseconds()));
        this.jobEnv.setMaxRetries(Integer.valueOf(zookeeperConfiguration.getMaxRetries()));
        this.jobEnv.setSessionTimeout(Integer.valueOf(zookeeperConfiguration.getSessionTimeoutMilliseconds()));
        this.jobEnv.setConnectionTimeout(Integer.valueOf(zookeeperConfiguration.getConnectionTimeoutMilliseconds()));
        this.jobEnv.setDigest(zookeeperConfiguration.getDigest());
        this.elasticInstance = new ElasticSchedulerHolder(zookeeperRegistryCenter, zookeeperConfiguration);
    }

    public ZookeeperRegistryCenter getZookeeperRegistryCenter() {
        if (this.elasticInstance == null) {
            initElasticInstance();
        }
        return this.elasticInstance.getRegCenter();
    }

    public void startup() {
        log.info("启动作业调度...");
        try {
            if (!this.started.get()) {
                synchronized (this) {
                    if (!this.started.get()) {
                        initEnv();
                        registerElasticInstanceNode();
                        initElasticJob();
                        checkElasticJob();
                        startElasticJob();
                        this.started.set(true);
                    }
                }
            }
        } catch (Exception e) {
            log.error("启动过程发生异常", e);
            shutdown();
        }
    }

    public JobRuntimeEntity getJobRuntime(String str) {
        JobRuntimeEntity jobRuntimeEntity = this.jobs.get(str);
        if (jobRuntimeEntity == null) {
            return null;
        }
        return jobRuntimeEntity.m111clone();
    }

    public synchronized void reload() {
        log.info("重新加载作业配置");
        Map<String, JobRuntimeEntity> initializeAllJobEntities = initializeAllJobEntities();
        for (String str : (String[]) this.jobs.keySet().toArray(new String[0])) {
            JobRuntimeEntity jobRuntimeEntity = initializeAllJobEntities.get(str);
            if (jobRuntimeEntity != null) {
                initializeAllJobEntities.remove(str);
            }
            reload(str, jobRuntimeEntity);
        }
        initializeAllJobEntities.forEach((str2, jobRuntimeEntity2) -> {
            reload(str2, jobRuntimeEntity2);
        });
    }

    public synchronized void reload(String str) {
        reload(str, initializeJobEntity(str));
    }

    public synchronized void reload(String str, JobRuntimeEntity jobRuntimeEntity) {
        log.info("重新加载作业配置: {}", str);
        JobRuntimeEntity jobRuntimeEntity2 = this.jobs.get(str);
        if (jobRuntimeEntity2 != null) {
            shutdownJob(str);
        }
        if (jobRuntimeEntity == null) {
            log.warn("作业配置不存在: {}", str);
            return;
        }
        if (jobRuntimeEntity2 != null) {
            jobRuntimeEntity.setId(jobRuntimeEntity2.getId());
            jobRuntimeEntity.setMessage("重新加载作业配置");
            this.jobs.put(str, jobRuntimeEntity);
            this.jobRuntimeRepository.updateJobRuntime(jobRuntimeEntity);
        } else {
            this.jobs.put(str, jobRuntimeEntity);
            this.jobRuntimeRepository.insertJobRuntimeList(Collections.singletonList(jobRuntimeEntity));
        }
        try {
            scheduleJob(jobRuntimeEntity);
            doUpdateJobState(jobRuntimeEntity, JobState.INITIAL, "生成调度作业");
        } catch (Exception e) {
            log.error("生成调度作业失败", e);
            doUpdateJobState(jobRuntimeEntity, "生成调度作业失败: " + e.getMessage(), e);
        }
    }

    public synchronized void shutdown() {
        log.info("停止上下文");
        for (JobRuntimeEntity jobRuntimeEntity : this.jobs.values()) {
            if (!Boolean.TRUE.equals(jobRuntimeEntity.getShardingEnabled())) {
                try {
                    this.defaultInstance.getScheduler().deleteJob(this.defaultInstance.jobKey(jobRuntimeEntity.getJobName()));
                } catch (SchedulerException e) {
                    throw new JobException("作业调度停止失败: " + jobRuntimeEntity.getJobName(), e);
                }
            }
            doUpdateJobState(jobRuntimeEntity, JobState.TERMINATED, "关闭作业调度");
        }
        this.started.set(false);
        this.jobs.clear();
        if (this.elasticInstance != null) {
            this.elasticInstance.close();
            this.elasticInstance = null;
        }
        if (this.defaultInstance != null) {
            this.defaultInstance.close();
            this.defaultInstance = null;
        }
    }

    public boolean isStarted() {
        return this.started.get();
    }

    private void initEnv() {
        initJobEnv();
        initElasticInstance();
        initDefaultInstance();
    }

    private void initJobEnv() {
        if (this.jobEnv != null) {
            return;
        }
        synchronized (this) {
            if (this.jobEnv != null) {
                return;
            }
            JobEnvEntity jobEnv = this.jobRepository.getJobEnv(this.profile, this.sysId);
            if (jobEnv == null) {
                jobEnv = new JobEnvEntity();
                jobEnv.setProfile(this.profile);
                jobEnv.setSysId(this.sysId);
            }
            String serverLists = this.properties.getZookeeper().getServerLists();
            String digest = this.properties.getZookeeper().getDigest();
            if (Strings.isNotBlank(serverLists)) {
                jobEnv.setZkAddress(serverLists);
                jobEnv.setDigest(digest);
            }
            if (Strings.isBlank(jobEnv.getZkAddress())) {
                throw new IllegalArgumentException("Zookeeper服务地址未配置");
            }
            String namespace = this.properties.getZookeeper().getNamespace();
            if (Strings.isNotBlank(namespace)) {
                jobEnv.setZkJobNamespace(namespace);
            }
            if (Strings.isBlank(jobEnv.getZkJobNamespace())) {
                jobEnv.setZkJobNamespace("elastic-job-" + this.sysId);
            }
            if (Strings.isNotBlank(jobEnv.getZkJobNamespace())) {
                jobEnv.setZkJobNamespace(jobEnv.getZkJobNamespace().replaceFirst("^/+", ""));
            }
            if (AppCtx.getPlatformProperties().getGray().isEnabled()) {
                jobEnv.setZkJobNamespace("gray-" + AppCtx.getPlatformProperties().getGray().getEnv() + "-" + jobEnv.getZkJobNamespace());
            }
            Integer baseSleepTimeMilliseconds = this.properties.getZookeeper().getBaseSleepTimeMilliseconds();
            if (baseSleepTimeMilliseconds != null) {
                jobEnv.setBaseSleepTime(baseSleepTimeMilliseconds);
            }
            Integer maxSleepTimeMilliseconds = this.properties.getZookeeper().getMaxSleepTimeMilliseconds();
            if (maxSleepTimeMilliseconds != null) {
                jobEnv.setMaxSleepTime(maxSleepTimeMilliseconds);
            }
            Integer maxRetries = this.properties.getZookeeper().getMaxRetries();
            if (maxRetries != null) {
                jobEnv.setMaxRetries(maxRetries);
            }
            Integer sessionTimeoutMilliseconds = this.properties.getZookeeper().getSessionTimeoutMilliseconds();
            if (sessionTimeoutMilliseconds != null) {
                jobEnv.setSessionTimeout(sessionTimeoutMilliseconds);
            }
            Integer connectionTimeoutMilliseconds = this.properties.getZookeeper().getConnectionTimeoutMilliseconds();
            if (connectionTimeoutMilliseconds != null) {
                jobEnv.setConnectionTimeout(connectionTimeoutMilliseconds);
            }
            String instanceRegPath = this.properties.getZookeeper().getInstanceRegPath();
            if (Strings.isNotBlank(instanceRegPath)) {
                jobEnv.setZkRegPath(instanceRegPath.replaceFirst("^/+", "").replaceFirst("/+$", ""));
            }
            if (Strings.isBlank(jobEnv.getZkRegPath())) {
                jobEnv.setZkRegPath(DEFAULT_INSTANCE_REG_PATH);
            } else {
                jobEnv.setZkRegPath(jobEnv.getZkRegPath().replaceFirst("^/+", "").replaceFirst("/+$", ""));
            }
            this.jobEnv = jobEnv;
        }
    }

    private void initDefaultInstance() {
        if (this.defaultInstance != null) {
            return;
        }
        synchronized (this) {
            if (this.defaultInstance != null) {
                return;
            }
            this.defaultInstance = new DefaultSchedulerHolder();
        }
    }

    private void initElasticInstance() {
        if (this.elasticInstance != null) {
            return;
        }
        synchronized (this) {
            if (this.elasticInstance != null) {
                return;
            }
            this.elasticInstance = new ElasticSchedulerHolder(this.jobEnv, this.dataSource);
        }
    }

    private void registerElasticInstanceNode() {
        CuratorFramework client = this.elasticInstance.getRegCenter().getClient();
        String str = "/" + this.jobEnv.getZkRegPath().replaceFirst("^/+", "").replaceFirst("/+$", "") + "/" + this.vmProcessUid;
        boolean z = false;
        Exception exc = null;
        for (int i = 0; i < 3; i++) {
            try {
                log.info("注册Zookeeper作业执行端节点,namespace:{},path:{},state:{}", new Object[]{client.getNamespace(), str, client.getState()});
                ((ACLBackgroundPathAndBytesable) client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)).forPath(str);
                client.setData().forPath(str, (this.vmProcessUid + "@" + Dates.YYYY_MM_DD_HH_MM_SS_SSS.format(Instant.now())).getBytes());
                z = true;
                break;
            } catch (Exception e) {
                log.debug("注册节点失败", e);
                exc = e;
            }
        }
        if (!z) {
            throw new JobException("注册Zookeeper作业执行端节点失败", exc);
        }
    }

    private List<String> getAllRegisterInstanceNodes() throws Exception {
        return Collections.unmodifiableList((List) this.elasticInstance.getRegCenter().getClient().getChildren().forPath("/" + this.jobEnv.getZkRegPath().replaceFirst("^/+", "").replaceFirst("/+$", "")));
    }

    private Map<String, JobRuntimeEntity> initializeAllJobEntities() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        JobsProperties jobs = this.properties.getJobs();
        if (jobs != null) {
            if (jobs.getSimpleJobs() != null) {
                jobs.getSimpleJobs().forEach((str, simpleJobProperties) -> {
                    if (!simpleJobProperties.isEnabled()) {
                        log.warn("作业调度{}未启用，请确认配置是否正确", str);
                    } else if (linkedHashMap.containsKey(str)) {
                        log.warn("作业调度{}命名重复，请确认配置是否正确", str);
                    } else {
                        linkedHashMap.putIfAbsent(str, newJobRuntime(simpleJobProperties, str));
                    }
                });
            }
            if (jobs.getDataflowJobs() != null) {
                jobs.getDataflowJobs().forEach((str2, dataflowJobProperties) -> {
                    if (!dataflowJobProperties.isEnabled()) {
                        log.warn("作业调度{}未启用，请确认配置是否正确", str2);
                    } else if (linkedHashMap.containsKey(str2)) {
                        log.warn("作业调度{}命名重复，请确认配置是否正确", str2);
                    } else {
                        linkedHashMap.putIfAbsent(str2, newJobRuntime(dataflowJobProperties, str2));
                    }
                });
            }
            if (jobs.getScriptJobs() != null) {
                jobs.getScriptJobs().forEach((str3, scriptJobProperties) -> {
                    if (!scriptJobProperties.isEnabled()) {
                        log.warn("作业调度{}未启用，请确认配置是否正确", str3);
                    } else if (linkedHashMap.containsKey(str3)) {
                        log.warn("作业调度{}命名重复，请确认配置是否正确", str3);
                    } else {
                        linkedHashMap.putIfAbsent(str3, newJobRuntime(scriptJobProperties, str3));
                    }
                });
            }
        }
        List<JobCfgEntity> allJobInfoCfg = this.jobRepository.getAllJobInfoCfg(this.jobEnv.getProfile(), this.jobEnv.getSysId());
        if (allJobInfoCfg != null && !allJobInfoCfg.isEmpty()) {
            allJobInfoCfg.forEach(jobCfgEntity -> {
                if (Boolean.TRUE.equals(jobCfgEntity.getDeleted()) || linkedHashMap.containsKey(jobCfgEntity.getJobName())) {
                    return;
                }
                JobRuntimeEntity newJobRuntime = newJobRuntime(jobCfgEntity);
                linkedHashMap.put(newJobRuntime.getJobName(), newJobRuntime);
            });
        }
        return linkedHashMap;
    }

    private JobRuntimeEntity initializeJobEntity(String str) {
        JobsProperties jobs = this.properties.getJobs();
        if (jobs != null) {
            if (jobs.getSimpleJobs() != null) {
                SimpleJobProperties simpleJobProperties = jobs.getSimpleJobs().get(str);
                if (simpleJobProperties.isEnabled()) {
                    return newJobRuntime(simpleJobProperties, str);
                }
            }
            if (jobs.getSimpleJobs() != null) {
                SimpleJobProperties simpleJobProperties2 = jobs.getSimpleJobs().get(str);
                if (simpleJobProperties2.isEnabled()) {
                    return newJobRuntime(simpleJobProperties2, str);
                }
            }
            if (jobs.getDataflowJobs() != null) {
                DataflowJobProperties dataflowJobProperties = jobs.getDataflowJobs().get(str);
                if (dataflowJobProperties.isEnabled()) {
                    return newJobRuntime(dataflowJobProperties, str);
                }
            }
            if (jobs.getScriptJobs() != null) {
                ScriptJobProperties scriptJobProperties = jobs.getScriptJobs().get(str);
                if (scriptJobProperties.isEnabled()) {
                    return newJobRuntime(scriptJobProperties, str);
                }
            }
        }
        JobCfgEntity jobInfoCfg = this.jobRepository.getJobInfoCfg(this.jobEnv.getProfile(), this.jobEnv.getSysId(), str);
        if (jobInfoCfg == null || Boolean.TRUE.equals(jobInfoCfg.getDeleted())) {
            return null;
        }
        return newJobRuntime(jobInfoCfg);
    }

    private void initElasticJob() {
        this.jobs.putAll(initializeAllJobEntities());
        doDeleteOfflineJobRuntime();
        if (this.jobs.isEmpty()) {
            log.warn("无作业配置信息");
        } else {
            doPersistJobRuntime();
        }
    }

    private void checkElasticJob() {
        boolean isEnabled = AppCtx.getPlatformProperties().getGray().isEnabled();
        Iterator<Map.Entry<String, JobRuntimeEntity>> it = this.jobs.entrySet().iterator();
        while (it.hasNext()) {
            JobRuntimeEntity value = it.next().getValue();
            if (Boolean.TRUE.equals(value.getShardingEnabled())) {
                if (value.getJobType() == JobType.DATAFLOW || value.getJobType() == JobType.SIMPLE) {
                    Class<?> jobExecutorClass = value.getJobExecutorClass();
                    if (value.getJobType() == JobType.SIMPLE && !SimpleJob.class.isAssignableFrom(jobExecutorClass)) {
                        throw new JobException("作业执行器类型不匹配！jobName=" + value.getJobName() + ",jobType=" + value.getJobType() + ",jobExecutor:" + jobExecutorClass.getCanonicalName());
                    }
                    if (value.getJobType() == JobType.DATAFLOW && !DataflowJob.class.isAssignableFrom(jobExecutorClass)) {
                        throw new JobException("作业执行器类型不匹配！jobName=" + value.getJobName() + ",jobType=" + value.getJobType() + ",jobExecutor:" + jobExecutorClass.getCanonicalName());
                    }
                    if (isEnabled && !GrayCapable.class.isAssignableFrom(jobExecutorClass)) {
                        throw new JobException("启用灰度模式，作业执行器必须实现灰度功能，请继承对应类型的框架类！jobName=" + value.getJobName() + ",jobExecutor:" + jobExecutorClass.getCanonicalName());
                    }
                } else {
                    if (value.getJobType() != JobType.SCRIPT) {
                        throw new JobException("作业类型不支持！jobName=" + value.getJobName() + ",jobType=" + value.getJobType());
                    }
                    if (Strings.isBlank(value.getJobExecutor())) {
                        throw new JobException("作业脚本命令不能为空！jobName=" + value.getJobName());
                    }
                }
            }
        }
    }

    private void startElasticJob() {
        for (JobRuntimeEntity jobRuntimeEntity : this.jobs.values()) {
            try {
                scheduleJob(jobRuntimeEntity);
                doUpdateJobState(jobRuntimeEntity, JobState.INITIAL, "生成调度作业");
            } catch (Exception e) {
                log.error("生成调度作业失败", e);
                doUpdateJobState(jobRuntimeEntity, "生成调度作业失败: " + e.getMessage(), e);
            }
        }
    }

    private JobRuntimeEntity newJobRuntime(JobCfgEntity jobCfgEntity) {
        return newJobRuntime(jobCfgEntity, UUID.randomUUID().toString().replace("-", ""));
    }

    private JobRuntimeEntity newJobRuntime(JobCfgEntity jobCfgEntity, String str) {
        JobRuntimeEntity newEntity = JobRuntimeEntity.newEntity(jobCfgEntity);
        newEntity.setJobName(jobCfgEntity.getJobName());
        newEntity.setId(str);
        fillJobRuntimeWithEnv(newEntity);
        return newEntity;
    }

    private void fillJobRuntimeWithEnv(JobRuntimeEntity jobRuntimeEntity) {
        jobRuntimeEntity.setProfile(this.jobEnv.getProfile());
        jobRuntimeEntity.setSysId(this.jobEnv.getSysId());
        jobRuntimeEntity.setCreateDate(new Date());
        jobRuntimeEntity.setUpdateDate(new Date());
        jobRuntimeEntity.setDeleted(false);
        jobRuntimeEntity.setJobState(JobState.INITIAL);
        jobRuntimeEntity.setVmProcessUid(this.vmProcessUid);
        jobRuntimeEntity.setVmStartTime(new Date(this.vmStartTime));
        Thread currentThread = Thread.currentThread();
        jobRuntimeEntity.setVmThreadId(String.valueOf(currentThread.getId()));
        jobRuntimeEntity.setVmThreadName(currentThread.getName());
    }

    private JobRuntimeEntity newJobRuntime(JobCoreProperties jobCoreProperties, String str) {
        return newJobRuntime(jobCoreProperties, str, UUID.randomUUID().toString().replace("-", ""));
    }

    private JobRuntimeEntity newJobRuntime(JobCoreProperties jobCoreProperties, String str, String str2) {
        JobRuntimeEntity newEntity = JobRuntimeEntity.newEntity(jobCoreProperties);
        newEntity.setJobName(str);
        newEntity.setId(str2);
        fillJobRuntimeWithEnv(newEntity);
        return newEntity;
    }

    private void doDeleteOfflineJobRuntime() {
        try {
            List<String> allRegisterInstanceNodes = getAllRegisterInstanceNodes();
            log.info("删除本执行端以及所有离线执行端的作业运行信息. 当前在线: {}", allRegisterInstanceNodes);
            HashSet hashSet = new HashSet();
            hashSet.addAll(allRegisterInstanceNodes);
            hashSet.remove(this.vmProcessUid);
            this.jobRuntimeRepository.deleteJobRuntimeExcept(this.jobEnv.getProfile(), this.jobEnv.getSysId(), hashSet);
        } catch (Exception e) {
            throw new JobException("获取执行端注册节点信息失败", e);
        }
    }

    private void doPersistJobRuntime() {
        this.jobRuntimeRepository.insertJobRuntimeList(this.jobs.values());
    }

    private void doUpdateJobState(JobRuntimeEntity jobRuntimeEntity, JobState jobState, String str) {
        jobRuntimeEntity.setJobState(jobState);
        jobRuntimeEntity.setMessage(str);
        jobRuntimeEntity.setUpdateDate(new Date());
        this.jobRuntimeRepository.updateJobState(jobRuntimeEntity);
    }

    private void doUpdateJobState(JobRuntimeEntity jobRuntimeEntity, String str, Exception exc) {
        jobRuntimeEntity.setJobState(JobState.ERROR);
        if (exc != null) {
            String stackTrace = Exceptions.getStackTrace(exc);
            jobRuntimeEntity.setStackTrace((stackTrace == null || stackTrace.length() <= 2000) ? stackTrace : stackTrace.substring(0, 2000));
        }
        jobRuntimeEntity.setMessage((str == null || str.length() <= 500) ? str : str.substring(0, 500));
        jobRuntimeEntity.setUpdateDate(new Date());
        this.jobRuntimeRepository.updateJobErrorState(jobRuntimeEntity);
    }

    private void scheduleJob(JobRuntimeEntity jobRuntimeEntity) {
        SimpleJobConfiguration scriptJobConfiguration;
        JobScheduler jobScheduler;
        try {
            CronExpression.validateExpression(jobRuntimeEntity.getJobCron());
            if (!Boolean.TRUE.equals(jobRuntimeEntity.getShardingEnabled())) {
                log.info("生成本地调度作业: {}", jobRuntimeEntity.getJobName());
                try {
                    JobKey jobKey = this.defaultInstance.jobKey(jobRuntimeEntity.getJobName());
                    JobBuilder storeDurably = JobBuilder.newJob().ofType(QuartzJobDelegate.class).withIdentity(jobKey).withDescription(jobRuntimeEntity.getDescription()).storeDurably();
                    if (Strings.isNotBlank(jobRuntimeEntity.getJobAttrs())) {
                        try {
                            JSON.parseObject(jobRuntimeEntity.getJobAttrs()).forEach((str, obj) -> {
                                storeDurably.usingJobData(str, Objects.toString(obj, null));
                            });
                        } catch (Exception e) {
                            log.warn(e.getMessage(), e);
                        }
                    }
                    this.defaultInstance.getScheduler().scheduleJob(storeDurably.usingJobData(QuartzJobDelegate.DELEGATE_CLASSNAME, jobRuntimeEntity.getJobExecutor()).usingJobData(QuartzJobDelegate.JOB_RUNTIME_ID, jobRuntimeEntity.getId()).build(), TriggerBuilder.newTrigger().forJob(jobKey).withSchedule(CronScheduleBuilder.cronSchedule(jobRuntimeEntity.getJobCron())).build());
                    return;
                } catch (JobException e2) {
                    throw e2;
                } catch (Exception e3) {
                    throw new JobException(e3.getMessage(), e3);
                }
            }
            log.info("生成分片调度作业: {}", jobRuntimeEntity.getJobName());
            try {
                ElasticJobListenerImpl elasticJobListenerImpl = new ElasticJobListenerImpl(this.jobRuntimeRepository, jobRuntimeEntity);
                JobCoreConfiguration.Builder jobProperties = JobCoreConfiguration.newBuilder(jobRuntimeEntity.getJobName(), jobRuntimeEntity.getJobCron(), (jobRuntimeEntity.getShardingTotalCount() == null || jobRuntimeEntity.getShardingTotalCount().intValue() < 1) ? 1 : jobRuntimeEntity.getShardingTotalCount().intValue()).shardingItemParameters(jobRuntimeEntity.getShardingItemParameters()).jobParameter(jobRuntimeEntity.getJobParameter()).failover(((Boolean) Objs.defaultIfNull(jobRuntimeEntity.getFailover(), false)).booleanValue()).misfire(((Boolean) Objs.defaultIfNull(jobRuntimeEntity.getMisfire(), true)).booleanValue()).description(jobRuntimeEntity.getDescription()).jobProperties(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), ((Class) Objs.defaultIfNull(jobRuntimeEntity.getExecutorServiceHandlerClass(), DynamicExecutorServiceHandler.class)).getCanonicalName()).jobProperties(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), ((Class) Objs.defaultIfNull(jobRuntimeEntity.getJobExceptionHandlerClass(), DefaultJobExceptionHandler.class)).getCanonicalName());
                if (Strings.isNotBlank(jobRuntimeEntity.getJobAttrs())) {
                    try {
                        JSON.parseObject(jobRuntimeEntity.getJobAttrs()).forEach((str2, obj2) -> {
                            jobProperties.jobProperties(str2, Objects.toString(obj2, null));
                        });
                    } catch (Exception e4) {
                        log.warn(e4.getMessage(), e4);
                    }
                }
                JobCoreConfiguration build = jobProperties.build();
                Class<?> cls = null;
                if (JobType.SIMPLE.equals(jobRuntimeEntity.getJobType())) {
                    cls = jobRuntimeEntity.getJobExecutorClass();
                    scriptJobConfiguration = new SimpleJobConfiguration(build, jobRuntimeEntity.getJobExecutor());
                } else if (JobType.DATAFLOW.equals(jobRuntimeEntity.getJobType())) {
                    cls = jobRuntimeEntity.getJobExecutorClass();
                    scriptJobConfiguration = new DataflowJobConfiguration(build, jobRuntimeEntity.getJobExecutor(), Boolean.TRUE.equals(jobRuntimeEntity.getStreamingProcess()));
                } else {
                    if (!JobType.SCRIPT.equals(jobRuntimeEntity.getJobType())) {
                        throw new JobException("作业类型配置有误");
                    }
                    scriptJobConfiguration = new ScriptJobConfiguration(build, jobRuntimeEntity.getJobExecutor());
                }
                LiteJobConfiguration build2 = LiteJobConfiguration.newBuilder(scriptJobConfiguration).disabled(((Boolean) Objs.defaultIfNull(jobRuntimeEntity.getDisabled(), false)).booleanValue()).overwrite(((Boolean) Objs.defaultIfNull(jobRuntimeEntity.getOverwrite(), true)).booleanValue()).jobShardingStrategyClass(((Class) Objs.defaultIfNull(jobRuntimeEntity.getJobShardingStrategyClass(), DEFAULT_JOB_SHARDING_STRATEGY)).getCanonicalName()).monitorExecution(((Boolean) Objs.defaultIfNull(jobRuntimeEntity.getMonitorExecution(), true)).booleanValue()).maxTimeDiffSeconds(((Integer) Objs.defaultIfNull(jobRuntimeEntity.getMaxTimeDiffSeconds(), -1)).intValue()).monitorPort(((Integer) Objs.defaultIfNull(jobRuntimeEntity.getMonitorPort(), -1)).intValue()).reconcileIntervalMinutes(((Integer) Objs.defaultIfNull(jobRuntimeEntity.getReconcileIntervalMinutes(), 10)).intValue()).build();
                if (cls != null) {
                    try {
                        jobScheduler = new SpringJobScheduler((ElasticJob) this.context.getBean(cls), this.elasticInstance.getRegCenter(), build2, this.jobEventConfig, new ElasticJobListener[]{elasticJobListenerImpl});
                    } catch (Throwable th) {
                        log.debug(th.getMessage(), th);
                        jobScheduler = new JobScheduler(this.elasticInstance.getRegCenter(), build2, this.jobEventConfig, new ElasticJobListener[]{elasticJobListenerImpl});
                    }
                } else {
                    jobScheduler = new JobScheduler(this.elasticInstance.getRegCenter(), build2, this.jobEventConfig, new ElasticJobListener[]{elasticJobListenerImpl});
                }
                try {
                    jobScheduler.init();
                } catch (Exception e5) {
                    log.warn("添加作业调度失败.尝试重新配置.错误信息:{}", e5.getMessage());
                    JobSettings jobSettings = new JobSettings();
                    jobSettings.setJobName(build2.getJobName());
                    jobSettings.setJobType(build2.getTypeConfig().getJobType().name());
                    jobSettings.setJobClass(build2.getTypeConfig().getJobClass());
                    jobSettings.setCron(build.getCron());
                    jobSettings.setShardingTotalCount(build.getShardingTotalCount());
                    jobSettings.setShardingItemParameters(build.getShardingItemParameters());
                    jobSettings.setJobParameter(build.getJobParameter());
                    jobSettings.setMonitorExecution(build2.isMonitorExecution());
                    if (scriptJobConfiguration instanceof DataflowJobConfiguration) {
                        jobSettings.setStreamingProcess(((DataflowJobConfiguration) scriptJobConfiguration).isStreamingProcess());
                    } else {
                        jobSettings.setStreamingProcess(false);
                    }
                    jobSettings.setMaxTimeDiffSeconds(build2.getMaxTimeDiffSeconds());
                    jobSettings.setMonitorPort(build2.getMonitorPort());
                    jobSettings.setFailover(build.isFailover());
                    jobSettings.setMisfire(build.isMisfire());
                    jobSettings.setJobShardingStrategyClass(build2.getJobShardingStrategyClass());
                    jobSettings.setDescription(build.getDescription());
                    for (JobProperties.JobPropertiesEnum jobPropertiesEnum : JobProperties.JobPropertiesEnum.values()) {
                        String str3 = build.getJobProperties().get(jobPropertiesEnum);
                        if (str3 != null) {
                            jobSettings.getJobProperties().put(jobPropertiesEnum.getKey(), str3);
                        }
                    }
                    if (scriptJobConfiguration instanceof ScriptJobConfiguration) {
                        jobSettings.setScriptCommandLine(((ScriptJobConfiguration) scriptJobConfiguration).getScriptCommandLine());
                    }
                    jobSettings.setReconcileIntervalMinutes(build2.getReconcileIntervalMinutes());
                    this.elasticInstance.getJobApi().getJobSettingsApi().updateJobSettings(jobSettings);
                    jobScheduler.init();
                }
            } catch (JobException e6) {
                throw e6;
            } catch (Exception e7) {
                throw new JobException(e7.getMessage(), e7);
            }
        } catch (ParseException e8) {
            throw new JobException("触发器时间表达式有误! " + e8.getMessage(), e8);
        }
    }

    public void shutdownJob(String str) {
        JobRuntimeEntity jobRuntimeEntity = this.jobs.get(str);
        if (jobRuntimeEntity == null) {
            throw new JobException("作业调度不存在: " + str);
        }
        if (Boolean.TRUE.equals(jobRuntimeEntity.getShardingEnabled())) {
            getJobApi().getJobOperatorApi().shutdown(Optional.of(str), Optional.absent());
        } else {
            try {
                this.defaultInstance.getScheduler().deleteJob(this.defaultInstance.jobKey(str));
            } catch (SchedulerException e) {
                throw new JobException("作业调度停止失败: " + str, e);
            }
        }
        doUpdateJobState(jobRuntimeEntity, JobState.TERMINATED, "关闭作业调度");
    }

    public void triggerJob(String str) {
        JobRuntimeEntity jobRuntimeEntity = this.jobs.get(str);
        if (jobRuntimeEntity == null) {
            throw new JobException("作业调度不存在: " + str);
        }
        if (Boolean.TRUE.equals(jobRuntimeEntity.getShardingEnabled())) {
            getJobApi().getJobOperatorApi().trigger(Optional.of(str), Optional.absent());
        } else {
            try {
                this.defaultInstance.getScheduler().triggerJob(this.defaultInstance.jobKey(str));
            } catch (SchedulerException e) {
                throw new JobException("作业调度停止失败: " + str, e);
            }
        }
        doUpdateJobState(jobRuntimeEntity, JobState.RUNNING, "触发作业立刻执行");
    }

    public void disableJob(String str) {
        JobRuntimeEntity jobRuntimeEntity = this.jobs.get(str);
        if (jobRuntimeEntity == null) {
            throw new JobException("作业调度不存在: " + str);
        }
        if (Boolean.TRUE.equals(jobRuntimeEntity.getShardingEnabled())) {
            getJobApi().getJobOperatorApi().disable(Optional.of(str), Optional.absent());
        } else {
            try {
                this.defaultInstance.getScheduler().deleteJob(this.defaultInstance.jobKey(str));
            } catch (SchedulerException e) {
                throw new JobException("作业调度停止失败: " + str, e);
            }
        }
        doUpdateJobState(jobRuntimeEntity, JobState.TERMINATED, "禁用作业");
    }

    public void enableJob(String str) {
        JobRuntimeEntity jobRuntimeEntity = this.jobs.get(str);
        if (jobRuntimeEntity == null) {
            throw new JobException("作业调度不存在: " + str);
        }
        if (Boolean.TRUE.equals(jobRuntimeEntity.getShardingEnabled())) {
            getJobApi().getJobOperatorApi().enable(Optional.of(str), Optional.absent());
        } else {
            scheduleJob(jobRuntimeEntity);
        }
        doUpdateJobState(jobRuntimeEntity, jobRuntimeEntity.getJobState() != JobState.TERMINATED ? jobRuntimeEntity.getJobState() : JobState.RUNNING, "启用作业");
    }

    public JobScheduleController getJobScheduleController(String str) {
        return JobRegistry.getInstance().getJobScheduleController(str);
    }

    public JobApi getJobApi() {
        return this.elasticInstance.getJobApi();
    }

    public Scheduler getDefaultScheduler() {
        return this.defaultInstance.getScheduler();
    }

    public boolean isEnabledRdbEvent() {
        return this.enabledRdbEvent;
    }

    public DataSource getDataSource() {
        return this.dataSource;
    }

    public JobRepository getJobRepository() {
        return this.jobRepository;
    }

    public JobRuntimeRepository getJobRuntimeRepository() {
        return this.jobRuntimeRepository;
    }

    public String getProfile() {
        return this.profile;
    }

    public String getSysId() {
        return this.sysId;
    }

    public String getVmProcessUid() {
        return this.vmProcessUid;
    }

    public long getVmStartTime() {
        return this.vmStartTime;
    }
}
