package org.jppf.execute;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.jppf.JPPFReconnectionNotification;
import org.jppf.execute.ThreadManager;
import org.jppf.node.protocol.AbstractTask;
import org.jppf.node.protocol.DataProvider;
import org.jppf.node.protocol.JPPFExceptionResult;
import org.jppf.node.protocol.Task;
import org.jppf.node.protocol.TaskBundle;
import org.jppf.node.protocol.TaskExecutionDispatcher;
import org.jppf.scheduling.JPPFScheduleHandler;
import org.jppf.utils.JPPFConfiguration;
import org.jppf.utils.LoggingUtils;
import org.jppf.utils.ReflectionHelper;
import org.jppf.utils.TypedProperties;
import org.jppf.utils.configuration.JPPFProperties;
import org.jppf.utils.configuration.JPPFProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/jppf/execute/AbstractExecutionManager.class */
public abstract class AbstractExecutionManager implements ExecutionManager {
    private static final Logger log = LoggerFactory.getLogger(AbstractExecutionManager.class);
    private static final boolean debugEnabled = LoggingUtils.isDebugEnabled(log);
    protected final ThreadManager threadManager;
    protected final JPPFScheduleHandler timeoutHandler = new JPPFScheduleHandler("Task Timeout Timer");
    protected TaskBundle bundle = null;
    protected List<Task<?>> taskList = null;
    protected List<String> uuidList = null;
    protected List<NodeTaskWrapper> taskWrapperList = null;
    protected final AtomicBoolean configChanged = new AtomicBoolean(true);
    protected AtomicReference<JPPFReconnectionNotification> reconnectionNotification = new AtomicReference<>(null);
    protected AtomicBoolean jobCancelled = new AtomicBoolean(false);
    protected ThreadManager.UsedClassLoader usedClassLoader = null;
    protected DataProvider dataProvider = null;
    protected final AtomicLong accumulatedElapsed = new AtomicLong(0);
    protected final TaskExecutionDispatcher taskNotificationDispatcher = new TaskExecutionDispatcher(getClass().getClassLoader());

    public AbstractExecutionManager(JPPFProperty<Integer> jPPFProperty) {
        int intValue = ((Integer) JPPFConfiguration.get(jPPFProperty)).intValue();
        intValue = intValue <= 0 ? Runtime.getRuntime().availableProcessors() : intValue;
        JPPFConfiguration.set(jPPFProperty, Integer.valueOf(intValue));
        log.info("running " + intValue + " processing thread" + (intValue > 1 ? "s" : ""));
        this.threadManager = createThreadManager(intValue);
    }

    protected static ThreadManager createThreadManager(int i) {
        ThreadManager threadManager = null;
        TypedProperties properties = JPPFConfiguration.getProperties();
        String str = (String) properties.get((JPPFProperty) JPPFProperties.THREAD_MANAGER_CLASS);
        if (!"default".equalsIgnoreCase(str) && !ThreadManagerThreadPool.class.getName().equals(str) && str != null) {
            try {
                Object invokeConstructor = ReflectionHelper.invokeConstructor(Class.forName(str), new Class[]{Integer.TYPE}, Integer.valueOf(i));
                if (invokeConstructor instanceof ThreadManager) {
                    threadManager = (ThreadManager) invokeConstructor;
                    log.info("Using custom thread manager: " + str);
                }
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
        }
        if (threadManager == null) {
            log.info("Using default thread manager");
            return new ThreadManagerThreadPool(i);
        }
        properties.set(JPPFProperties.PROCESSING_THREADS, Integer.valueOf(threadManager.getPoolSize()));
        log.info("Node running " + i + " processing thread" + (i > 1 ? "s" : ""));
        boolean isCpuTimeEnabled = threadManager.isCpuTimeEnabled();
        properties.setBoolean("cpuTimeSupported", isCpuTimeEnabled);
        log.info("Thread CPU time measurement is " + (isCpuTimeEnabled ? "" : "not ") + "supported");
        return threadManager;
    }

    @Override // org.jppf.execute.ExecutionManager
    public void execute(TaskBundle taskBundle, List<Task<?>> list) throws Exception {
        if (list == null || list.isEmpty()) {
            return;
        }
        if (debugEnabled) {
            log.debug("executing " + list.size() + " tasks");
        }
        try {
            setup(taskBundle, list);
            if (!isJobCancelled()) {
                int i = 0;
                ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(getExecutor());
                synchronized (this.taskWrapperList) {
                    for (Task<?> task : list) {
                        if (!(task instanceof JPPFExceptionResult)) {
                            if (task instanceof AbstractTask) {
                                ((AbstractTask) task).setExecutionDispatcher(this.taskNotificationDispatcher);
                            }
                            NodeTaskWrapper nodeTaskWrapper = new NodeTaskWrapper(task, this.usedClassLoader.getClassLoader(), this.timeoutHandler, this.threadManager.isCpuTimeEnabled());
                            this.taskWrapperList.add(nodeTaskWrapper);
                            executorCompletionService.submit(nodeTaskWrapper, nodeTaskWrapper);
                            i++;
                        }
                    }
                }
                for (int i2 = 0; i2 < i; i2++) {
                    try {
                        Future take = executorCompletionService.take();
                        if (!take.isCancelled()) {
                            NodeTaskWrapper nodeTaskWrapper2 = (NodeTaskWrapper) take.get();
                            JPPFReconnectionNotification reconnectionNotification = nodeTaskWrapper2.getReconnectionNotification();
                            if (reconnectionNotification != null) {
                                cancelAllTasks(true, false);
                                throw reconnectionNotification;
                                break;
                            }
                            taskEnded(nodeTaskWrapper2);
                        }
                    } catch (Exception e) {
                        log.debug("Exception when executing task", e);
                    }
                }
            }
        } finally {
            cleanup();
        }
    }

    @Override // org.jppf.execute.ExecutionManager
    public void cancelAllTasks(boolean z, boolean z2) {
        if (debugEnabled) {
            log.debug("cancelling all tasks with: callOnCancel=" + z + ", requeue=" + z2);
        }
        if (z2 && this.bundle != null) {
            synchronized (this.bundle) {
                this.bundle.setRequeue(true);
                this.bundle.getSLA().setSuspended(true);
            }
        }
        if (this.taskWrapperList != null) {
            synchronized (this.taskWrapperList) {
                Iterator<NodeTaskWrapper> it = this.taskWrapperList.iterator();
                while (it.hasNext()) {
                    cancelTask(it.next(), z);
                }
            }
        }
    }

    private void cancelTask(NodeTaskWrapper nodeTaskWrapper, boolean z) {
        if (debugEnabled) {
            log.debug("cancelling task = " + nodeTaskWrapper);
        }
        Future<?> future = nodeTaskWrapper.getFuture();
        if (future.isDone()) {
            return;
        }
        if (debugEnabled) {
            log.debug("calling future.cancel(true) for task = " + nodeTaskWrapper);
        }
        nodeTaskWrapper.cancel(z);
        future.cancel(nodeTaskWrapper.getTask().isInterruptible());
        nodeTaskWrapper.cancelTimeoutAction();
        taskEnded(nodeTaskWrapper);
    }

    @Override // org.jppf.execute.ExecutionManager
    public void shutdown() {
        getExecutor().shutdownNow();
        this.timeoutHandler.clear(true);
        this.taskNotificationDispatcher.close();
    }

    protected abstract void setup(TaskBundle taskBundle, List<Task<?>> list);

    protected abstract void cleanup();

    protected void taskEnded(NodeTaskWrapper nodeTaskWrapper) {
        long elapsedTime = nodeTaskWrapper.getElapsedTime();
        this.accumulatedElapsed.addAndGet(elapsedTime);
        ExecutionInfo executionInfo = nodeTaskWrapper.getExecutionInfo();
        long j = executionInfo == null ? 0L : executionInfo.cpuTime / 1000000;
        Task<?> task = nodeTaskWrapper.getTask();
        this.taskNotificationDispatcher.fireTaskEnded(task, getCurrentJobId(), getCurrentJobName(), j, elapsedTime / 1000000, task.getThrowable() != null);
    }

    @Override // org.jppf.execute.ExecutionManager
    public String getCurrentJobId() {
        if (this.bundle != null) {
            return this.bundle.getUuid();
        }
        return null;
    }

    public String getCurrentJobName() {
        if (this.bundle != null) {
            return this.bundle.getName();
        }
        return null;
    }

    @Override // org.jppf.execute.ExecutionManager
    public ExecutorService getExecutor() {
        return this.threadManager.getExecutorService();
    }

    @Override // org.jppf.execute.ExecutionManager
    public boolean checkConfigChanged() {
        return this.configChanged.compareAndSet(true, false);
    }

    @Override // org.jppf.execute.ExecutionManager
    public void triggerConfigChanged() {
        this.configChanged.compareAndSet(false, true);
    }

    @Override // org.jppf.execute.ExecutionManager
    public void setThreadPoolSize(int i) {
        if (i <= 0) {
            log.warn("ignored attempt to set the thread pool size to 0 or less: " + i);
            return;
        }
        int threadPoolSize = getThreadPoolSize();
        this.threadManager.setPoolSize(i);
        if (threadPoolSize != getThreadPoolSize()) {
            log.info("Node thread pool size changed from " + threadPoolSize + " to " + i);
            JPPFConfiguration.set(JPPFProperties.PROCESSING_THREADS, Integer.valueOf(i));
            triggerConfigChanged();
        }
    }

    @Override // org.jppf.execute.ExecutionManager
    public int getThreadPoolSize() {
        return this.threadManager.getPoolSize();
    }

    @Override // org.jppf.execute.ExecutionManager
    public int getThreadsPriority() {
        return this.threadManager.getPriority();
    }

    @Override // org.jppf.execute.ExecutionManager
    public void updateThreadsPriority(int i) {
        this.threadManager.setPriority(i);
    }

    @Override // org.jppf.execute.ExecutionManager
    public ThreadManager getThreadManager() {
        return this.threadManager;
    }

    @Override // org.jppf.execute.ExecutionManager
    public boolean isJobCancelled() {
        return this.jobCancelled.get();
    }

    @Override // org.jppf.execute.ExecutionManager
    public void setJobCancelled(boolean z) {
        this.jobCancelled.set(z);
    }

    @Override // org.jppf.execute.ExecutionManager
    public TaskBundle getBundle() {
        return this.bundle;
    }

    @Override // org.jppf.execute.ExecutionManager
    public void setBundle(TaskBundle taskBundle) {
        this.bundle = taskBundle;
    }

    @Override // org.jppf.execute.ExecutionManager
    public TaskExecutionDispatcher getTaskNotificationDispatcher() {
        return this.taskNotificationDispatcher;
    }
}
