package org.shoulder.batch.service.impl;

import jakarta.annotation.Nonnull;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.shoulder.batch.enums.BatchDetailResultStatusEnum;
import org.shoulder.batch.enums.BatchErrorCodeEnum;
import org.shoulder.batch.log.ShoulderBatchLoggers;
import org.shoulder.batch.model.BatchDataSlice;
import org.shoulder.batch.model.BatchRecordDetail;
import org.shoulder.batch.spi.BatchTaskSliceHandler;
import org.shoulder.core.exception.CommonErrorCodeEnum;
import org.shoulder.core.i18.Translator;
import org.shoulder.core.log.Logger;
import org.shoulder.core.util.ContextUtils;

/* loaded from: input_file:org/shoulder/batch/service/impl/BatchProcessor.class */
public class BatchProcessor implements Runnable {
    private static final Logger log = ShoulderBatchLoggers.DEFAULT;
    private final BlockingQueue<BatchDataSlice> taskQueue;
    private final BlockingQueue<BatchRecordDetail> resultQueue;
    private final Collection<BatchTaskSliceHandler> batchTaskSliceHandlers = ContextUtils.getBeansOfType(BatchTaskSliceHandler.class).values();
    protected Translator translator = (Translator) ContextUtils.getBean(Translator.class);
    protected String batchId;

    public BatchProcessor(String str, BlockingQueue<BatchDataSlice> blockingQueue, BlockingQueue<BatchRecordDetail> blockingQueue2) {
        this.batchId = str;
        this.taskQueue = blockingQueue;
        this.resultQueue = blockingQueue2;
    }

    @Override // java.lang.Runnable
    public void run() {
        int i = 0;
        while (true) {
            BatchDataSlice poll = this.taskQueue.poll();
            if (poll == null) {
                log.info("{} stop, processed {}", getBatchId(), Integer.valueOf(i));
                return;
            } else {
                putResult(doWork(poll));
                i++;
            }
        }
    }

    private void putResult(List<BatchRecordDetail> list) {
        for (int i = 0; i < list.size(); i++) {
            try {
                this.resultQueue.put(list.get(i));
            } catch (InterruptedException e) {
                log.error("put result into queue FAIL, size=" + list.size() + " put=" + i, e);
                return;
            }
        }
    }

    public String getBatchId() {
        return this.batchId;
    }

    public List<BatchRecordDetail> doWork(@Nonnull BatchDataSlice batchDataSlice) {
        log.info("task start. {}", batchDataSlice);
        if (CollectionUtils.isEmpty(batchDataSlice.getBatchList())) {
            return Collections.emptyList();
        }
        String dataType = batchDataSlice.getDataType();
        String operationType = batchDataSlice.getOperationType();
        BatchTaskSliceHandler orElseThrow = this.batchTaskSliceHandlers.stream().filter(batchTaskSliceHandler -> {
            return batchTaskSliceHandler.support(dataType, operationType);
        }).findFirst().orElseThrow(() -> {
            return BatchErrorCodeEnum.DATA_TYPE_OR_OPERATION_NOT_SUPPORT.toException(new Object[]{dataType, operationType});
        });
        List<BatchRecordDetail> list = null;
        try {
            log.debug("begin_handle dataType=" + dataType + "operation=" + operationType + ",batchId=" + getBatchId() + ", slice=" + batchDataSlice.getSequence() + ", handler=" + orElseThrow.getClass().getName());
            list = orElseThrow.handle(batchDataSlice);
        } catch (Exception e) {
            log.error("Batch Process FAIL! dataType=" + dataType + "operation=" + operationType + ",batchId=" + getBatchId() + ", slice=" + batchDataSlice.getSequence() + ", handler=" + orElseThrow.getClass().getName(), e);
        }
        log.info("task {}-{} finished", batchDataSlice.getBatchId(), Integer.valueOf(batchDataSlice.getSequence()));
        return polluteUnknownIfMissingResult(batchDataSlice, ListUtils.emptyIfNull(list));
    }

    private List<BatchRecordDetail> polluteUnknownIfMissingResult(@Nonnull BatchDataSlice batchDataSlice, @Nonnull List<BatchRecordDetail> list) {
        int intValue = batchDataSlice.calculateDataSize().intValue();
        int size = list.size();
        if (intValue == size) {
            return list;
        }
        log.warnWithErrorCode(BatchErrorCodeEnum.TASK_SLICE_RESULT_INVALID.getCode(), BatchErrorCodeEnum.TASK_SLICE_RESULT_INVALID.getMessage(), Integer.valueOf(intValue), Integer.valueOf(size));
        Map map = (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getIndex();
        }, batchRecordDetail -> {
            return batchRecordDetail;
        }, (batchRecordDetail2, batchRecordDetail3) -> {
            return batchRecordDetail3;
        }));
        Stream<R> map2 = batchDataSlice.dataIndexStream().map(num -> {
            return (BatchRecordDetail) Optional.ofNullable((BatchRecordDetail) map.get(num)).orElse(new BatchRecordDetail(num.intValue(), "UNKNOWN", BatchDetailResultStatusEnum.FAILED.getCode().intValue(), CommonErrorCodeEnum.UNKNOWN.getCode(), new String[0]));
        });
        Objects.requireNonNull(list);
        map2.forEach((v1) -> {
            r1.add(v1);
        });
        return list;
    }
}
