package io.polaris.framework.toolkit.elasticjob.base;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import io.polaris.framework.toolkit.elasticjob.base.PooledJobExecutor;
import io.polaris.framework.toolkit.elasticjob.err.JobException;
import io.polaris.framework.toolkit.elasticjob.err.JobSkipException;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/polaris/framework/toolkit/elasticjob/base/BasePooledJob.class */
public abstract class BasePooledJob<T> implements SimpleJob {
    private static final Logger log = LoggerFactory.getLogger(BasePooledJob.class);

    /* JADX INFO: Access modifiers changed from: protected */
    public void doBefore(PooledJobExecutor pooledJobExecutor, ShardingContext shardingContext) {
        pooledJobExecutor.setInterval(2000L);
    }

    protected List<T> doFetchData(ShardingContext shardingContext) {
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<T> doFetchData(PooledJobExecutor pooledJobExecutor, ShardingContext shardingContext) {
        return doFetchData(shardingContext);
    }

    protected void doProcessData(ShardingContext shardingContext, T t) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doProcessData(PooledJobExecutor pooledJobExecutor, ShardingContext shardingContext, T t) {
        doProcessData(shardingContext, t);
    }

    protected void doAfter(PooledJobExecutor pooledJobExecutor, ShardingContext shardingContext) {
        pooledJobExecutor.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void saveJobStatistics(PooledJobExecutor.StatisticsInfo statisticsInfo) {
        log.info("保存作业统计信息: {}", statisticsInfo);
    }

    public void execute(ShardingContext shardingContext) {
        PooledJobExecutor pooledJobExecutor = new PooledJobExecutor(shardingContext);
        try {
            try {
                doBefore(pooledJobExecutor, shardingContext);
                pooledJobExecutor.setStatisticsPersistedDao(this::saveJobStatistics);
                pooledJobExecutor.doInit();
                List<T> doFetchData = doFetchData(pooledJobExecutor, shardingContext);
                if (doFetchData == null || doFetchData.isEmpty()) {
                    log.info("[作业:{}, 分片:{}] 待处理数据全部处理完成.", shardingContext.getJobName(), Integer.valueOf(shardingContext.getShardingItem()));
                    pooledJobExecutor.notifyFetched(0);
                } else {
                    while (doFetchData != null && !doFetchData.isEmpty()) {
                        pooledJobExecutor.notifyFetched(doFetchData.size());
                        for (T t : doFetchData) {
                            pooledJobExecutor.execute(() -> {
                                doProcessData(pooledJobExecutor, shardingContext, t);
                            });
                        }
                        doFetchData = doFetchData(pooledJobExecutor, shardingContext);
                    }
                    log.info("[作业:{}, 分片:{}] 待处理数据全部处理完成.", shardingContext.getJobName(), Integer.valueOf(shardingContext.getShardingItem()));
                    pooledJobExecutor.notifyFetched(0);
                }
                doAfter(pooledJobExecutor, shardingContext);
            } catch (JobSkipException e) {
                log.warn("[作业:{}, 分片:{}] {}", new Object[]{shardingContext.getJobName(), Integer.valueOf(shardingContext.getShardingItem()), e.getMessage()});
                doAfter(pooledJobExecutor, shardingContext);
            } catch (Throwable th) {
                String str = "[作业:" + shardingContext.getJobName() + ", 分片:" + shardingContext.getShardingItem() + "] 运行异常: " + th.getMessage();
                log.error(str, th);
                throw new JobException(str, th);
            }
        } catch (Throwable th2) {
            doAfter(pooledJobExecutor, shardingContext);
            throw th2;
        }
    }
}
