package org.dkpro.lab.engine.impl;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.dkpro.lab.engine.ExecutionException;
import org.dkpro.lab.engine.LifeCycleException;
import org.dkpro.lab.engine.TaskContext;
import org.dkpro.lab.engine.TaskExecutionEngine;
import org.dkpro.lab.engine.TaskExecutionService;
import org.dkpro.lab.engine.impl.BatchTaskEngine;
import org.dkpro.lab.storage.UnresolvedImportException;
import org.dkpro.lab.task.BatchTask;
import org.dkpro.lab.task.Task;
import org.dkpro.lab.task.TaskContextMetadata;
import org.springframework.beans.factory.annotation.Value;

/* loaded from: input_file:org/dkpro/lab/engine/impl/MultiThreadBatchTaskEngine.class */
public class MultiThreadBatchTaskEngine extends BatchTaskEngine {
    public static final String PROP_THREADS = "engine.batch.maxThreads";
    private final Log log = LogFactory.getLog(getClass());

    @Value("#{ @Properties['engine.batch.maxThreads'] }")
    private int maxThreads = Runtime.getRuntime().availableProcessors() - 1;

    /* loaded from: input_file:org/dkpro/lab/engine/impl/MultiThreadBatchTaskEngine$ExecutionThread.class */
    protected class ExecutionThread extends Thread {
        private final TaskContext aContext;
        private final Task task;
        private final Map<String, Object> aConfig;
        private final Set<String> scope;
        private TaskContextMetadata taskContextMetadata;

        public ExecutionThread(TaskContext taskContext, Task task, Map<String, Object> map, Set<String> set) {
            this.aContext = taskContext;
            this.task = task;
            this.aConfig = map;
            this.scope = set;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            TaskExecutionService executionService = this.aContext.getExecutionService();
            TaskExecutionEngine createEngine = executionService.createEngine(this.task);
            createEngine.setContextFactory(new BatchTaskEngine.ScopedTaskContextFactory(executionService.getContextFactory(), this.aConfig, this.scope));
            try {
                this.taskContextMetadata = this.aContext.getStorageService().getContext(createEngine.run(this.task));
            } catch (ExecutionException | LifeCycleException e) {
                throw new RuntimeException(e);
            }
        }

        public TaskContextMetadata getTaskContextMetadata() {
            return this.taskContextMetadata;
        }
    }

    public MultiThreadBatchTaskEngine() {
    }

    public MultiThreadBatchTaskEngine(int i) {
        setMaxThreads(i);
    }

    public void setMaxThreads(int i) {
        this.maxThreads = i;
    }

    @Override // org.dkpro.lab.engine.impl.BatchTaskEngine
    protected void executeConfiguration(BatchTask batchTask, TaskContext taskContext, Map<String, Object> map, Set<String> set) throws ExecutionException, LifeCycleException {
        ConcurrentHashMap concurrentHashMap;
        if (this.log.isTraceEnabled()) {
            Iterator<String> it = set.iterator();
            while (it.hasNext()) {
                this.log.trace("-- Already executed: " + it.next());
            }
        }
        HashSet hashSet = new HashSet();
        if (batchTask.getScope() != null) {
            hashSet.addAll(batchTask.getScope());
        }
        Iterator<Task> it2 = batchTask.getTasks().iterator();
        while (it2.hasNext()) {
            taskContext.getLifeCycleManager().configure(taskContext, it2.next(), map);
        }
        LinkedList linkedList = new LinkedList(batchTask.getTasks());
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap();
        int i = 0;
        do {
            i++;
            hashMap.clear();
            hashMap2.clear();
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.maxThreads);
            concurrentHashMap = new ConcurrentHashMap(concurrentHashMap2);
            concurrentHashMap2.clear();
            while (!linkedList.isEmpty()) {
                Task task = (Task) linkedList.poll();
                TaskContextMetadata existingExecution = getExistingExecution(batchTask, taskContext, task, map, set);
                if (existingExecution == null) {
                    this.log.info("Executing task [" + task.getType() + "]");
                    if (task instanceof BatchTask) {
                        ((BatchTask) task).setScope(hashSet);
                    }
                    ExecutionThread executionThread = new ExecutionThread(taskContext, task, map, set);
                    hashMap.put(task, executionThread);
                    hashMap2.put(newFixedThreadPool.submit(executionThread), task);
                } else {
                    this.log.debug("Using existing execution [" + existingExecution.getId() + "]");
                    set.add(existingExecution.getId());
                    hashSet.add(existingExecution.getId());
                }
            }
            for (Map.Entry entry : hashMap2.entrySet()) {
                try {
                    ((Future) entry.getKey()).get();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } catch (java.util.concurrent.ExecutionException e2) {
                    Task task2 = (Task) entry.getValue();
                    this.log.info("Task exec failed for [" + task2.getType() + "]");
                    concurrentHashMap2.put(task2, e2);
                }
            }
            this.log.debug("Calling shutdown");
            newFixedThreadPool.shutdown();
            this.log.debug("All threads finished");
            for (Map.Entry entry2 : hashMap.entrySet()) {
                Task task3 = (Task) entry2.getKey();
                TaskContextMetadata taskContextMetadata = ((ExecutionThread) entry2.getValue()).getTaskContextMetadata();
                if (taskContextMetadata == null) {
                    Throwable th = (Throwable) concurrentHashMap2.get(task3);
                    if (!(th instanceof UnresolvedImportException) && !(th instanceof java.util.concurrent.ExecutionException)) {
                        throw new RuntimeException(th);
                    }
                    concurrentHashMap2.put(task3, th);
                    linkedList.add(task3);
                } else {
                    set.add(taskContextMetadata.getId());
                    hashSet.add(taskContextMetadata.getId());
                }
            }
        } while (!concurrentHashMap2.keySet().equals(concurrentHashMap.keySet()));
        if (concurrentHashMap2.isEmpty()) {
            this.log.info("MultiThreadBatchTask completed successfully. Total number of outer loop runs: " + i);
            return;
        }
        StringBuilder sb = new StringBuilder();
        for (Throwable th2 : concurrentHashMap2.values()) {
            sb.append("\n -");
            sb.append(th2.getMessage());
        }
        Throwable th3 = (Throwable) concurrentHashMap2.values().iterator().next();
        if (!(th3 instanceof RuntimeException)) {
            throw new RuntimeException(sb.toString(), th3);
        }
        throw ((RuntimeException) th3);
    }
}
