package org.shoulder.batch.service.impl;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import org.shoulder.batch.constant.BatchConstants;
import org.shoulder.batch.enums.BatchDetailResultStatusEnum;
import org.shoulder.batch.log.ShoulderBatchLoggers;
import org.shoulder.batch.model.BatchData;
import org.shoulder.batch.model.BatchDataSlice;
import org.shoulder.batch.model.BatchRecord;
import org.shoulder.batch.model.BatchRecordDetail;
import org.shoulder.batch.progress.BatchProgressRecord;
import org.shoulder.batch.progress.ProgressAble;
import org.shoulder.batch.repository.BatchRecordDetailPersistentService;
import org.shoulder.batch.repository.BatchRecordPersistentService;
import org.shoulder.batch.spi.DefaultTaskSplitHandler;
import org.shoulder.batch.spi.TaskSplitHandler;
import org.shoulder.core.context.AppContext;
import org.shoulder.core.exception.CommonErrorCodeEnum;
import org.shoulder.core.log.Logger;
import org.shoulder.core.util.AssertUtils;
import org.shoulder.core.util.ContextUtils;
import org.shoulder.log.operation.context.OpLogContextHolder;
import org.shoulder.log.operation.enums.OperationResult;
import org.shoulder.validate.exception.ParamErrorCodeEnum;

/* loaded from: input_file:org/shoulder/batch/service/impl/BatchManager.class */
public class BatchManager implements Runnable, ProgressAble {
    protected static final Logger log = ShoulderBatchLoggers.DEFAULT;
    private static final int MAX_WORKER_SIZE = 4;
    protected ExecutorService threadPool = (ExecutorService) ContextUtils.getBean(BatchConstants.BATCH_THREAD_POOL_NAME);
    protected BatchRecordPersistentService batchRecordPersistentService = (BatchRecordPersistentService) ContextUtils.getBean(BatchRecordPersistentService.class);
    protected BatchRecordDetailPersistentService batchRecordDetailPersistentService = (BatchRecordDetailPersistentService) ContextUtils.getBean(BatchRecordDetailPersistentService.class);
    protected Long userId;
    protected String languageId;
    protected BatchData batchData;
    protected BatchProgressRecord progress;
    protected BatchRecord result;
    protected BlockingQueue<BatchDataSlice> jobQueue;
    protected BlockingQueue<BatchRecordDetail> resultQueue;

    public BatchManager(BatchData batchData) {
        batchData.setBatchId(generateBatchId());
        AssertUtils.notNull(batchData, CommonErrorCodeEnum.ILLEGAL_PARAM, new Object[0]);
        AssertUtils.notNull(batchData.getDataType(), CommonErrorCodeEnum.ILLEGAL_PARAM, new Object[0]);
        AssertUtils.notNull(batchData.getOperation(), CommonErrorCodeEnum.ILLEGAL_PARAM, new Object[0]);
        AssertUtils.notEmpty(batchData.getDataList(), CommonErrorCodeEnum.ILLEGAL_PARAM, new Object[0]);
        String userId = AppContext.getUserId();
        this.userId = Long.valueOf(userId == null ? 0L : Long.parseLong(userId));
        this.languageId = AppContext.getLocaleOrDefault().toString();
        this.batchData = batchData;
        this.progress = new BatchProgressRecord();
        this.progress.setId(batchData.getBatchId());
        this.progress.setTotal(batchData.getDataList().size());
    }

    private static String generateBatchId() {
        return UUID.randomUUID().toString();
    }

    @Override // java.lang.Runnable
    public void run() {
        log.debug("batch task start, dataType={}, operation={}", this.batchData.getDataType(), this.batchData.getOperation());
        List<BatchDataSlice> splitTask = splitTask(this.batchData);
        int size = splitTask.size();
        AssertUtils.isTrue(size > 0, ParamErrorCodeEnum.PARAM_ILLEGAL, new Object[]{"after splitTask, jobSize can't be 0"});
        this.jobQueue = new LinkedBlockingQueue(size);
        this.jobQueue.addAll(splitTask);
        int intValue = ((Integer) splitTask.stream().map((v0) -> {
            return v0.calculateDataSize();
        }).reduce((v0, v1) -> {
            return Integer.sum(v0, v1);
        }).orElse(0)).intValue();
        int min = Integer.min(MAX_WORKER_SIZE, size);
        this.resultQueue = new LinkedBlockingQueue(intValue);
        if (intValue < 2) {
            log.warn("batch task split and Total only={}! subJobNum={}, workNum={}", new Object[]{Integer.valueOf(intValue), Integer.valueOf(size), Integer.valueOf(min)});
        } else {
            log.debug("batch task split, total={}, subJobNum={}, workNum={}", new Object[]{Integer.valueOf(intValue), Integer.valueOf(size), Integer.valueOf(min)});
        }
        this.progress.setTotal(intValue);
        compositeBatchRecords(intValue);
        this.progress.start();
        int i = 0;
        while (true) {
            if (i >= min - 1) {
                break;
            }
            if (!canEmployWorker(new BatchProcessor(this.batchData.getBatchId(), this.jobQueue, this.resultQueue))) {
                log.warnWithErrorCode(CommonErrorCodeEnum.SERVER_BUSY.getCode(), "employ workers fail, fail back to execute by current, it may cost more time.");
                break;
            }
            i++;
        }
        new BatchProcessor(this.batchData.getBatchId(), this.jobQueue, this.resultQueue).run();
        handleResult(intValue);
        this.progress.finish();
        this.result.setSuccessNum(this.progress.getSuccessNum());
        this.result.setFailNum(this.progress.getFailNum());
        persistentImportRecord();
        fillOperationLog();
        log.info("batch task finished.");
    }

    private void compositeBatchRecords(int i) {
        this.result = BatchRecord.builder().id(this.batchData.getBatchId()).dataType(this.batchData.getDataType()).operation(this.batchData.getOperation()).totalNum(i).createTime(new Date()).creator(this.userId).build();
        this.result.setDetailList(new ArrayList(i));
    }

    protected List<BatchDataSlice> splitTask(BatchData batchData) {
        List<BatchDataSlice> list = (List) ContextUtils.getBeansOfType(TaskSplitHandler.class).values().stream().filter(taskSplitHandler -> {
            return !(taskSplitHandler instanceof DefaultTaskSplitHandler);
        }).filter(taskSplitHandler2 -> {
            return taskSplitHandler2.support(batchData);
        }).findFirst().map(taskSplitHandler3 -> {
            return taskSplitHandler3.splitTask(batchData);
        }).orElse(null);
        return list != null ? list : ((TaskSplitHandler) ContextUtils.getBean(DefaultTaskSplitHandler.class)).splitTask(batchData);
    }

    private boolean canEmployWorker(BatchProcessor batchProcessor) {
        try {
            this.threadPool.execute(batchProcessor);
            return true;
        } catch (Exception e) {
            log.warn(CommonErrorCodeEnum.SERVER_BUSY, e);
            return false;
        }
    }

    private void handleResult(int i) {
        for (int i2 = 0; i2 < i; i2++) {
            BatchRecordDetail batchRecordDetail = (BatchRecordDetail) takeUnExceptInterrupted(this.resultQueue);
            if (BatchDetailResultStatusEnum.SUCCESS.getCode().intValue() == batchRecordDetail.getStatus()) {
                this.progress.addSuccess(1);
            } else {
                this.progress.addFail(1);
            }
            batchRecordDetail.setOperation(this.batchData.getOperation());
            batchRecordDetail.setRecordId(this.batchData.getBatchId());
            this.result.getDetailList().add(batchRecordDetail);
        }
        if (this.jobQueue.isEmpty()) {
            return;
        }
        log.errorWithErrorCode(CommonErrorCodeEnum.CODING.getCode(), "jobQueue not empty!");
        this.jobQueue.clear();
    }

    private <T> T takeUnExceptInterrupted(BlockingQueue<T> blockingQueue) {
        try {
            return blockingQueue.take();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    protected void persistentImportRecord() {
        if (this.batchData.isPersistentRecord()) {
            AssertUtils.isFalse(this.result.getDetailList().stream().map((v0) -> {
                return v0.getSource();
            }).anyMatch((v0) -> {
                return Objects.isNull(v0);
            }), CommonErrorCodeEnum.CODING, new Object[]{"impl need invoke setSource()."});
            try {
                this.batchRecordPersistentService.insert(this.result);
                this.batchRecordDetailPersistentService.batchSave(this.result.getId(), (List) this.result.getDetailList().stream().sorted(Comparator.comparingInt((v0) -> {
                    return v0.getIndex();
                })).collect(Collectors.toList()));
            } catch (Exception e) {
                log.warnWithErrorCode(CommonErrorCodeEnum.DATA_STORAGE_FAIL.getCode(), "persistentImportRecord fail", e);
                throw CommonErrorCodeEnum.DATA_STORAGE_FAIL.toException(e, new Object[0]);
            }
        }
    }

    private void fillOperationLog() {
        OperationResult of = OperationResult.of(this.progress.getSuccessNum() > 0, this.progress.getFailNum() > 0);
        if (OpLogContextHolder.getContext() == null || OpLogContextHolder.getLog() == null) {
            return;
        }
        OpLogContextHolder.getLog().setResult(of).addDetailItem(String.valueOf(this.progress.getSuccessNum())).addDetailItem(String.valueOf(this.progress.getFailNum())).setObjectId(this.batchData.getBatchId()).setObjectType(this.batchData.getDataType());
        OpLogContextHolder.enableAutoLog();
    }

    public String getBatchId() {
        return this.batchData.getBatchId();
    }

    @Override // org.shoulder.batch.progress.ProgressAble
    public BatchProgressRecord getBatchProgress() {
        return this.progress;
    }
}
