package org.jppf.execute.async;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.jppf.JPPFReconnectionNotification;
import org.jppf.execute.ExecutionInfo;
import org.jppf.execute.NodeTaskWrapper;
import org.jppf.execute.ThreadManager;
import org.jppf.node.protocol.AbstractTask;
import org.jppf.node.protocol.BundleWithTasks;
import org.jppf.node.protocol.JPPFExceptionResult;
import org.jppf.node.protocol.Task;
import org.jppf.node.protocol.TaskBundle;
import org.jppf.node.protocol.TaskExecutionDispatcher;
import org.jppf.scheduling.JPPFScheduleHandler;
import org.jppf.utils.JPPFConfiguration;
import org.jppf.utils.LoggingUtils;
import org.jppf.utils.TypedProperties;
import org.jppf.utils.collections.ArrayListHashMap;
import org.jppf.utils.collections.CollectionMap;
import org.jppf.utils.configuration.JPPFProperties;
import org.jppf.utils.configuration.JPPFProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/jppf/execute/async/AbstractAsyncExecutionManager.class */
public abstract class AbstractAsyncExecutionManager implements AsyncExecutionManager {
    private static final Logger log = LoggerFactory.getLogger(AbstractAsyncExecutionManager.class);
    private static final boolean debugEnabled = LoggingUtils.isDebugEnabled(log);
    private static final boolean traceEnabled = log.isTraceEnabled();
    protected final ThreadManager threadManager;
    protected final JPPFScheduleHandler timeoutHandler = new JPPFScheduleHandler("Task Timeout Timer");
    protected final AtomicBoolean configChanged = new AtomicBoolean(true);
    protected AtomicReference<JPPFReconnectionNotification> reconnectionNotification = new AtomicReference<>(null);
    protected final Map<String, JobProcessingEntry> jobEntries = new HashMap();
    protected final CollectionMap<String, Long> jobBundleIds = new ArrayListHashMap();
    protected final List<ExecutionManagerListener> listeners = new CopyOnWriteArrayList();
    protected final Map<String, JobPendingEntry> pendingEntries = new HashMap();
    protected final CollectionMap<String, Long> pendingBundleIds = new ArrayListHashMap();
    protected final TaskExecutionDispatcher taskNotificationDispatcher = new TaskExecutionDispatcher(getClass().getClassLoader());

    public AbstractAsyncExecutionManager(TypedProperties typedProperties, JPPFProperty<Integer> jPPFProperty) {
        this.threadManager = ThreadManager.newInstance(typedProperties, jPPFProperty);
    }

    @Override // org.jppf.execute.async.AsyncExecutionManager
    public void execute(BundleWithTasks bundleWithTasks) throws Exception {
        List<Task<?>> tasks = bundleWithTasks.getTasks();
        TaskBundle bundle = bundleWithTasks.getBundle();
        if (tasks == null || tasks.isEmpty()) {
            return;
        }
        if (debugEnabled) {
            log.debug("executing {} tasks of bundle {}", Integer.valueOf(tasks.size()), bundle);
        }
        JobProcessingEntry upVar = setup(bundleWithTasks);
        upVar.executionManager = this;
        String str = bundle.getUuid() + bundle.getBundleId();
        synchronized (this.jobEntries) {
            JobPendingEntry remove = this.pendingEntries.remove(str);
            if (remove != null) {
                this.pendingBundleIds.removeValue(bundle.getUuid(), bundle.getBundleId());
                upVar.jobCancelled.set(remove.jobCancelled.get());
            }
            this.jobEntries.put(str, upVar);
            this.jobBundleIds.putValue(bundle.getUuid(), bundle.getBundleId());
        }
        synchronized (upVar) {
            if (upVar.jobCancelled.get()) {
                if (debugEnabled) {
                    log.debug("bundle was cancelled before its execution started, ending job {}", bundle);
                }
                jobEnded(upVar);
            } else {
                if (debugEnabled) {
                    log.debug("wrapping up to {} executable tasks of bundle {}", Integer.valueOf(tasks.size()), bundle);
                }
                for (Task<?> task : tasks) {
                    if (!(task instanceof JPPFExceptionResult)) {
                        if (task instanceof AbstractTask) {
                            ((AbstractTask) task).setExecutionDispatcher(this.taskNotificationDispatcher);
                        }
                        upVar.taskWrapperList.add(new NodeTaskWrapper(upVar, task, upVar.getClassLoader(), this.timeoutHandler));
                        upVar.submittedCount++;
                    }
                }
                if (upVar.taskWrapperList.isEmpty()) {
                    if (debugEnabled) {
                        log.debug("there are no tasks to execute in bundle {}, ending job", bundle);
                    }
                    jobEnded(upVar);
                } else {
                    if (debugEnabled) {
                        log.debug("submitting {} executable tasks of bundle {}", Integer.valueOf(upVar.taskWrapperList.size()), bundle);
                    }
                    for (NodeTaskWrapper nodeTaskWrapper : upVar.taskWrapperList) {
                        getExecutor().submit(nodeTaskWrapper, nodeTaskWrapper);
                    }
                    if (debugEnabled) {
                        log.debug("submited {} tasks", Integer.valueOf(upVar.taskWrapperList.size()));
                    }
                }
            }
        }
    }

    @Override // org.jppf.execute.async.AsyncExecutionManager
    public void cancelAllTasks(boolean z, boolean z2) {
        if (debugEnabled) {
            log.debug("cancelling all tasks with: callOnCancel={}, requeue={}", Boolean.valueOf(z), Boolean.valueOf(z2));
        }
        HashSet hashSet = new HashSet();
        synchronized (this.jobEntries) {
            hashSet.addAll(this.pendingBundleIds.keySet());
            hashSet.addAll(this.jobBundleIds.keySet());
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                cancelJob((String) it.next(), z, z2);
            }
        }
    }

    @Override // org.jppf.execute.async.AsyncExecutionManager
    public void cancelJob(String str, boolean z, boolean z2) {
        if (debugEnabled) {
            log.debug("cancelling all tasks with: callOnCancel={}, requeue={}, jobUuid={}", new Object[]{Boolean.valueOf(z), Boolean.valueOf(z2), str});
        }
        synchronized (this.jobEntries) {
            Collection<Long> values = this.pendingBundleIds.getValues(str);
            if (values != null) {
                Iterator it = new ArrayList(values).iterator();
                while (it.hasNext()) {
                    long longValue = ((Long) it.next()).longValue();
                    JobPendingEntry jobPendingEntry = this.pendingEntries.get(str + longValue);
                    if (jobPendingEntry != null) {
                        if (debugEnabled) {
                            log.debug("setting cancelled status on pending entry with jobUuid={}, bundleId={}", str, Long.valueOf(longValue));
                        }
                        jobPendingEntry.jobCancelled.set(true);
                    }
                }
            }
            Collection<Long> values2 = this.jobBundleIds.getValues(str);
            if (debugEnabled) {
                log.debug("cancelling {} bundles for jobUuid={}", Integer.valueOf(values2 == null ? 0 : values2.size()), str);
            }
            if (values2 == null) {
                return;
            }
            Iterator it2 = new ArrayList(values2).iterator();
            while (it2.hasNext()) {
                JobProcessingEntry jobProcessingEntry = this.jobEntries.get(str + ((Long) it2.next()).longValue());
                if (jobProcessingEntry != null) {
                    synchronized (jobProcessingEntry) {
                        jobProcessingEntry.jobCancelled.set(true);
                        if (debugEnabled) {
                            log.debug("cancelling {}", jobProcessingEntry.bundle);
                        }
                        if (z2) {
                            jobProcessingEntry.bundle.setRequeue(true);
                            jobProcessingEntry.bundle.getSLA().setSuspended(true);
                        }
                        if (jobProcessingEntry.taskWrapperList != null) {
                            Iterator<NodeTaskWrapper> it3 = jobProcessingEntry.taskWrapperList.iterator();
                            while (it3.hasNext()) {
                                cancelTask(it3.next(), z);
                            }
                        }
                    }
                }
            }
        }
    }

    private static void cancelTask(NodeTaskWrapper nodeTaskWrapper, boolean z) {
        if (debugEnabled) {
            log.debug("cancelling task = {}", nodeTaskWrapper);
        }
        Future<?> future = nodeTaskWrapper.getFuture();
        if (future.isDone()) {
            return;
        }
        if (debugEnabled) {
            log.debug("calling future.cancel(true) for task = {}", nodeTaskWrapper);
        }
        nodeTaskWrapper.cancel(z);
        future.cancel(nodeTaskWrapper.getTask().isInterruptible());
        nodeTaskWrapper.cancelTimeoutAction();
        if (nodeTaskWrapper.hasStarted()) {
            return;
        }
        nodeTaskWrapper.taskEnded();
    }

    @Override // org.jppf.execute.async.AsyncExecutionManager
    public void shutdown() {
        if (debugEnabled) {
            log.debug("closing {}", this);
        }
        getExecutor().shutdownNow();
        this.timeoutHandler.clear(true);
        this.taskNotificationDispatcher.close();
    }

    protected abstract JobProcessingEntry setup(BundleWithTasks bundleWithTasks);

    protected abstract void cleanup(JobProcessingEntry jobProcessingEntry);

    @Override // org.jppf.execute.async.AsyncExecutionManager
    public void taskEnded(NodeTaskWrapper nodeTaskWrapper) {
        try {
            if (traceEnabled) {
                log.trace("task ended: {}", nodeTaskWrapper);
            }
            long elapsedTime = nodeTaskWrapper.getElapsedTime();
            JobProcessingEntry jobEntry = nodeTaskWrapper.getJobEntry();
            boolean z = false;
            synchronized (jobEntry) {
                TaskBundle taskBundle = jobEntry.bundle;
                jobEntry.accumulatedElapsed.addAndGet(elapsedTime);
                int incrementAndGet = jobEntry.resultCount.incrementAndGet();
                int i = jobEntry.submittedCount;
                ExecutionInfo executionInfo = nodeTaskWrapper.getExecutionInfo();
                long j = executionInfo == null ? 0L : executionInfo.cpuTime / 1000000;
                Task<?> task = nodeTaskWrapper.getTask();
                if (traceEnabled) {
                    log.trace("sending task ended notification for {}, bundle={}", nodeTaskWrapper, taskBundle);
                }
                this.taskNotificationDispatcher.fireTaskEnded(task, taskBundle.getUuid(), taskBundle.getName(), j, elapsedTime / 1000000, task.getThrowable() != null);
                if (traceEnabled) {
                    log.trace("resultCount={} for {}", Integer.valueOf(incrementAndGet), nodeTaskWrapper);
                }
                if (incrementAndGet >= i) {
                    z = true;
                }
            }
            if (z) {
                jobEnded(jobEntry);
            }
        } catch (RuntimeException e) {
            log.error("error in taskEnded() for {}", nodeTaskWrapper, e);
        }
    }

    private void jobEnded(JobProcessingEntry jobProcessingEntry) {
        TaskBundle taskBundle;
        List<Task<?>> list;
        Throwable th;
        synchronized (jobProcessingEntry) {
            taskBundle = jobProcessingEntry.bundle;
            list = jobProcessingEntry.taskList;
            th = jobProcessingEntry.t;
            cleanup(jobProcessingEntry);
        }
        if (debugEnabled) {
            log.debug("processing completion of {} tasks of job {}", Integer.valueOf(list.size()), taskBundle);
        }
        synchronized (this.jobEntries) {
            this.jobEntries.remove(taskBundle.getUuid() + taskBundle.getBundleId());
            this.jobBundleIds.removeValue(taskBundle.getUuid(), taskBundle.getBundleId());
        }
        fireJobFinished(taskBundle, list, th);
    }

    @Override // org.jppf.execute.async.AsyncExecutionManager
    public ExecutorService getExecutor() {
        return this.threadManager.getExecutorService();
    }

    @Override // org.jppf.execute.async.AsyncExecutionManager
    public boolean checkConfigChanged() {
        return this.configChanged.compareAndSet(true, false);
    }

    @Override // org.jppf.execute.async.AsyncExecutionManager
    public void triggerConfigChanged() {
        this.configChanged.compareAndSet(false, true);
    }

    @Override // org.jppf.execute.async.AsyncExecutionManager
    public void setThreadPoolSize(int i) {
        if (i <= 0) {
            log.warn("ignored attempt to set the thread pool size to 0 or less: " + i);
            return;
        }
        int threadPoolSize = getThreadPoolSize();
        this.threadManager.setPoolSize(i);
        if (threadPoolSize != getThreadPoolSize()) {
            log.info("Node thread pool size changed from " + threadPoolSize + " to " + i);
            JPPFConfiguration.set(JPPFProperties.PROCESSING_THREADS, Integer.valueOf(i));
            triggerConfigChanged();
        }
    }

    @Override // org.jppf.execute.async.AsyncExecutionManager
    public int getThreadPoolSize() {
        return this.threadManager.getPoolSize();
    }

    @Override // org.jppf.execute.async.AsyncExecutionManager
    public int getThreadsPriority() {
        return this.threadManager.getPriority();
    }

    @Override // org.jppf.execute.async.AsyncExecutionManager
    public void updateThreadsPriority(int i) {
        this.threadManager.setPriority(i);
    }

    @Override // org.jppf.execute.async.AsyncExecutionManager
    public ThreadManager getThreadManager() {
        return this.threadManager;
    }

    @Override // org.jppf.execute.async.AsyncExecutionManager
    public TaskExecutionDispatcher getTaskNotificationDispatcher() {
        return this.taskNotificationDispatcher;
    }

    @Override // org.jppf.execute.async.AsyncExecutionManager
    public int getNbBundles(String str) {
        int i = 0;
        synchronized (this.jobEntries) {
            Collection<Long> values = this.jobBundleIds.getValues(str);
            if (values != null) {
                i = 0 + values.size();
            }
            Collection<Long> values2 = this.pendingBundleIds.getValues(str);
            if (values2 != null) {
                i += values2.size();
            }
        }
        return i;
    }

    @Override // org.jppf.execute.async.AsyncExecutionManager
    public void addExecutionManagerListener(ExecutionManagerListener executionManagerListener) {
        if (executionManagerListener != null) {
            this.listeners.add(executionManagerListener);
        }
    }

    @Override // org.jppf.execute.async.AsyncExecutionManager
    public void removeExecutionManagerListener(ExecutionManagerListener executionManagerListener) {
        if (executionManagerListener != null) {
            this.listeners.remove(executionManagerListener);
        }
    }

    protected void fireJobFinished(TaskBundle taskBundle, List<Task<?>> list, Throwable th) {
        if (debugEnabled) {
            log.debug("sending notification to listeners for completion of {} tasks of job {}", Integer.valueOf(list.size()), taskBundle);
        }
        for (ExecutionManagerListener executionManagerListener : this.listeners) {
            if (executionManagerListener != null) {
                executionManagerListener.bundleExecuted(taskBundle, list, th);
            }
        }
    }

    @Override // org.jppf.execute.async.AsyncExecutionManager
    public void addPendingJobEntry(TaskBundle taskBundle) {
        if (debugEnabled) {
            log.debug("adding pending entry for {}", taskBundle);
        }
        synchronized (this.jobEntries) {
            JobPendingEntry jobPendingEntry = new JobPendingEntry();
            jobPendingEntry.bundle = taskBundle;
            this.pendingEntries.put(taskBundle.getUuid() + taskBundle.getBundleId(), jobPendingEntry);
            this.pendingBundleIds.putValue(taskBundle.getUuid(), taskBundle.getBundleId());
        }
    }
}
