package cn.schoolwow.data.thread.flow.work;

import cn.schoolwow.data.thread.domain.DataThreadProgress;
import cn.schoolwow.data.thread.flow.work.map.GetMapFileFlow;
import cn.schoolwow.data.thread.listener.ProgressListener;
import cn.schoolwow.data.thread.work.map.MapFile;
import cn.schoolwow.data.thread.work.map.MapWorkResult;
import cn.schoolwow.data.thread.work.reduce.ReduceDataThreadHandler;
import cn.schoolwow.quickflow.domain.FlowContext;
import cn.schoolwow.quickflow.flow.BusinessFlow;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;

/* loaded from: input_file:cn/schoolwow/data/thread/flow/work/ExecuteReduceWorkFlow.class */
public class ExecuteReduceWorkFlow<T> implements BusinessFlow {
    public void executeBusinessFlow(FlowContext flowContext) throws Exception {
        String str = (String) flowContext.checkData("name");
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) flowContext.checkData("threadPoolExecutor");
        ReduceDataThreadHandler reduceDataThreadHandler = (ReduceDataThreadHandler) flowContext.checkData("reduceDataThreadHandler");
        MapWorkResult mapWorkResult = (MapWorkResult) flowContext.checkData("mapWorkResult");
        boolean booleanValue = ((Boolean) flowContext.getData("deleteFile", true)).booleanValue();
        ProgressListener progressListener = (ProgressListener) flowContext.getData("progressListener");
        Map map = (Map) flowContext.getData("threadExceptionMap", new HashMap());
        DataThreadProgress dataThreadProgress = (DataThreadProgress) flowContext.checkData("dataThreadProgress");
        dataThreadProgress.total = mapWorkResult.idList.size();
        dataThreadProgress.type = "Reduce";
        dataThreadProgress.description = "id列表:" + mapWorkResult.idList;
        AtomicInteger atomicInteger = new AtomicInteger(1);
        flowContext.putTemporaryData("idList", new ArrayList());
        for (String str2 : mapWorkResult.idList) {
            threadPoolExecutor.execute(() -> {
                try {
                    try {
                        File file = (File) flowContext.startFlow(new GetMapFileFlow()).putThreadLocalData("workName", mapWorkResult.name).putThreadLocalData("id", str2).execute().checkData("mapFile");
                        String readFileToString = FileUtils.readFileToString(file, "UTF-8");
                        MapFile mapFile = new MapFile();
                        mapFile.id = str2;
                        mapFile.content = readFileToString;
                        reduceDataThreadHandler.reduce(mapFile, file);
                        if (booleanValue) {
                            FileUtils.forceDelete(file);
                            flowContext.putTemporaryDataIfAbsent("mapDirectory", file.getParent());
                        }
                        atomicInteger.getAndIncrement();
                        if (null != progressListener) {
                            progressListener.progress(str, atomicInteger.get(), mapWorkResult.idList.size());
                        }
                        dataThreadProgress.current = atomicInteger.get();
                        dataThreadProgress.percent = (dataThreadProgress.current * 100) / dataThreadProgress.total;
                    } catch (Exception e) {
                        synchronized (map) {
                            map.put(Long.valueOf(Thread.currentThread().getId()), e);
                            atomicInteger.getAndIncrement();
                            if (null != progressListener) {
                                progressListener.progress(str, atomicInteger.get(), mapWorkResult.idList.size());
                            }
                            dataThreadProgress.current = atomicInteger.get();
                            dataThreadProgress.percent = (dataThreadProgress.current * 100) / dataThreadProgress.total;
                        }
                    }
                } catch (Throwable th) {
                    atomicInteger.getAndIncrement();
                    if (null != progressListener) {
                        progressListener.progress(str, atomicInteger.get(), mapWorkResult.idList.size());
                    }
                    dataThreadProgress.current = atomicInteger.get();
                    dataThreadProgress.percent = (dataThreadProgress.current * 100) / dataThreadProgress.total;
                    throw th;
                }
            });
        }
    }

    public String name() {
        return "执行Reduce类型任务";
    }
}
