package io.polaris.framework.toolkit.elasticjob.base;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.dangdang.ddframe.job.lite.internal.schedule.JobRegistry;
import com.dangdang.ddframe.job.lite.internal.sharding.ShardingService;
import io.polaris.framework.toolkit.elasticjob.context.JobCtx;
import io.polaris.framework.toolkit.elasticjob.context.JobCtxHolder;
import io.polaris.framework.toolkit.elasticjob.err.JobSkipException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/polaris/framework/toolkit/elasticjob/base/BaseDataFlowPooledJob.class */
public abstract class BaseDataFlowPooledJob<T> extends BasePooledJob<T> implements DataflowJob {
    private static final Logger log = LoggerFactory.getLogger(BaseDataFlowPooledJob.class);
    private Map<String, PooledJobExecutor> jobExecutors = new ConcurrentHashMap();

    private String toKey(ShardingContext shardingContext) {
        return shardingContext.getJobName() + "/" + shardingContext.getShardingTotalCount() + "/" + shardingContext.getShardingItem();
    }

    private PooledJobExecutor getOrInit(ShardingContext shardingContext) {
        String key = toKey(shardingContext);
        PooledJobExecutor pooledJobExecutor = this.jobExecutors.get(key);
        if (pooledJobExecutor == null) {
            pooledJobExecutor = new PooledJobExecutor(shardingContext);
            try {
                doBefore(pooledJobExecutor, shardingContext);
                pooledJobExecutor.setStatisticsPersistedDao(this::saveJobStatistics);
                pooledJobExecutor.doInit();
                this.jobExecutors.put(key, pooledJobExecutor);
            } catch (RuntimeException e) {
                log.error("[作业:" + shardingContext.getJobName() + ", 分片:" + shardingContext.getShardingItem() + "] 初始化作业资源时发生异常: " + e.getMessage(), e);
                doAfter(pooledJobExecutor, shardingContext);
                throw e;
            }
        }
        return pooledJobExecutor;
    }

    @Override // io.polaris.framework.toolkit.elasticjob.base.BasePooledJob
    protected void doAfter(PooledJobExecutor pooledJobExecutor, ShardingContext shardingContext) {
        pooledJobExecutor.close();
        this.jobExecutors.remove(toKey(pooledJobExecutor.getShardingContext()));
    }

    public final List<T> fetchData(ShardingContext shardingContext) {
        PooledJobExecutor orInit = getOrInit(shardingContext);
        List<T> list = null;
        try {
            list = doFetchData(orInit, shardingContext);
        } catch (JobSkipException e) {
            log.warn("[作业:{}, 分片:{}] {}", new Object[]{shardingContext.getJobName(), Integer.valueOf(shardingContext.getShardingItem()), e.getMessage()});
        } catch (RuntimeException e2) {
            log.warn("[作业:{}, 分片:{}] 获取待处理数据时发生异常.", shardingContext.getJobName(), Integer.valueOf(shardingContext.getShardingItem()));
            doAfter(orInit, shardingContext);
            throw e2;
        }
        if (list == null || list.isEmpty()) {
            log.info("[作业:{}, 分片:{}] 待处理数据全部处理完成.", shardingContext.getJobName(), Integer.valueOf(shardingContext.getShardingItem()));
            orInit.notifyFetched(0);
            doAfter(orInit, shardingContext);
        }
        return list;
    }

    public final void processData(ShardingContext shardingContext, List list) {
        PooledJobExecutor orInit = getOrInit(shardingContext);
        if (list != null) {
            try {
                if (!list.isEmpty()) {
                    orInit.notifyFetched(list.size());
                    for (Object obj : list) {
                        orInit.execute(() -> {
                            doProcessData(orInit, shardingContext, obj);
                        });
                    }
                    if (!isEligibleForJobRunning(shardingContext.getJobName())) {
                        log.info("[作业:{}, 分片:{}] 不满足取数条件, 不再继续取数.", shardingContext.getJobName(), Integer.valueOf(shardingContext.getShardingItem()));
                        orInit.notifyFetched(0);
                        doAfter(orInit, shardingContext);
                    }
                }
            } catch (JobSkipException e) {
                log.warn("[作业:{}, 分片:{}] {}", new Object[]{shardingContext.getJobName(), Integer.valueOf(shardingContext.getShardingItem()), e.getMessage()});
                orInit.notifyFetched(0);
                doAfter(orInit, shardingContext);
                return;
            } catch (RuntimeException e2) {
                log.error("[作业:" + shardingContext.getJobName() + ", 分片:" + shardingContext.getShardingItem() + "] 处理数据时发生异常: " + e2.getMessage(), e2);
                doAfter(orInit, shardingContext);
                throw e2;
            }
        }
        log.info("[作业:{}, 分片:{}] 待处理数据全部处理完成.", shardingContext.getJobName(), Integer.valueOf(shardingContext.getShardingItem()));
        orInit.notifyFetched(0);
        doAfter(orInit, shardingContext);
    }

    protected boolean isEligibleForJobRunning(String str) {
        boolean z = true;
        JobCtx jobCtx = JobCtxHolder.get();
        if (jobCtx != null) {
            if (!jobCtx.getJobApi().getJobSettingsApi().getJobSettings(str).isStreamingProcess()) {
                z = false;
            } else if (new ShardingService(JobRegistry.getInstance().getRegCenter(str), str).isNeedSharding()) {
                z = false;
            }
        }
        return z;
    }
}
