package eu.unicore.xnjs.ems;

import eu.unicore.util.Log;
import eu.unicore.xnjs.XNJS;
import eu.unicore.xnjs.ems.event.ContinueProcessingEvent;
import eu.unicore.xnjs.ems.event.XnjsEvent;
import eu.unicore.xnjs.persistence.IActionStore;
import eu.unicore.xnjs.util.LogUtil;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:eu/unicore/xnjs/ems/JobRunner.class */
public class JobRunner extends Thread {
    private final XNJS xnjs;
    private final InternalManager mgr;
    private final ActionStateChangeListener changeListener;
    private final IActionStore jobs;
    private final Dispatcher dispatcher;
    private static final Logger logger = LogUtil.getLogger(LogUtil.XNJS, JobRunner.class);
    private static AtomicInteger count = new AtomicInteger(0);
    private final BlockingQueue<String> transfer = new ArrayBlockingQueue(1);
    volatile boolean isInterrupted = false;

    public JobRunner(XNJS xnjs, Dispatcher dispatcher) throws Exception {
        this.xnjs = xnjs;
        this.dispatcher = dispatcher;
        this.mgr = (InternalManager) xnjs.get(InternalManager.class);
        this.changeListener = (ActionStateChangeListener) xnjs.get(ActionStateChangeListener.class, true);
        this.jobs = xnjs.getActionStore("JOBS");
        int incrementAndGet = count.incrementAndGet();
        super.setName("XNJS-" + xnjs.getID() + "-JobRunner-" + incrementAndGet);
        logger.debug("Job runner thread {} starting", Integer.valueOf(incrementAndGet));
    }

    @Override // java.lang.Thread
    public synchronized void interrupt() {
        if (this.isInterrupted) {
            return;
        }
        this.isInterrupted = true;
        logger.debug("{} stopping", getName());
        count.decrementAndGet();
        super.interrupt();
    }

    public void process(String str) {
        this.transfer.offer(str);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        logger.info("Worker {} starting.", getName());
        while (!this.isInterrupted) {
            try {
                while (this.mgr.isPaused()) {
                    Thread.sleep(300L);
                }
                Action action = null;
                String poll = this.transfer.poll(100L, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    logger.debug("Processing {}", poll);
                    try {
                        action = this.jobs.getForUpdate(poll);
                    } catch (Exception e) {
                        logger.warn("Can't get action for processing", e);
                    }
                    if (action != null) {
                        if (check(action)) {
                            process(action);
                        } else {
                            sendToSleep(action);
                        }
                    }
                    this.dispatcher.notifyAvailable(this);
                }
            } catch (Exception e2) {
                logger.info("Worker {} stopped.", getName());
                return;
            }
        }
    }

    private boolean check(Action action) {
        return action.getNotBefore() - System.currentTimeMillis() < 0;
    }

    private void sendToSleep(Action action) {
        action.setWaiting(true);
        long notBefore = action.getNotBefore() - System.currentTimeMillis();
        if (notBefore <= 0) {
            notBefore = 5000;
        }
        this.mgr.scheduleEvent(new ContinueProcessingEvent(action.getUUID()), (int) notBefore, TimeUnit.MILLISECONDS);
        this.mgr.doneProcessing(action);
    }

    private void process(Action action) {
        int status = action.getStatus();
        try {
            try {
                LogUtil.fillLogContext(action);
                Processor createProcessor = this.xnjs.createProcessor(action.getType());
                logger.trace("Processing Action <{}> in status {}", action.getUUID(), ActionStatus.toString(action.getStatus()));
                createProcessor.process(action);
                logger.trace("New status for Action <{}>: {}", action.getUUID(), ActionStatus.toString(action.getStatus()));
                XnjsEvent checkNotify = checkNotify(action, status);
                this.mgr.doneProcessing(action);
                if (checkNotify != null) {
                    this.mgr.handleEvent(checkNotify);
                }
                LogUtil.clearLogContext();
            } catch (Throwable th) {
                try {
                    action.setStatus(7);
                    action.getResult().setStatusCode(2);
                    action.getResult().setErrorMessage(Log.createFaultMessage("Processing failed", th));
                    XnjsEvent checkNotify2 = checkNotify(action, status);
                    this.mgr.errorProcessing(action, th);
                    if (checkNotify2 != null) {
                        this.mgr.handleEvent(checkNotify2);
                    }
                } catch (Exception e) {
                    logger.error("Error during error reporting for action <" + action.getUUID() + ">", e);
                    try {
                        this.mgr.errorProcessing(action, e);
                    } catch (Exception e2) {
                    }
                }
                LogUtil.clearLogContext();
            }
        } catch (Throwable th2) {
            LogUtil.clearLogContext();
            throw th2;
        }
    }

    private XnjsEvent checkNotify(Action action, int i) {
        int status = action.getStatus();
        ContinueProcessingEvent continueProcessingEvent = null;
        if (status != i) {
            try {
                if (this.changeListener != null) {
                    this.changeListener.stateChanged(action);
                }
            } catch (Exception e) {
                logger.warn("Internal error during state change notification.", e);
            }
            if (status == 7 && action.getParentActionID() != null) {
                try {
                    continueProcessingEvent = new ContinueProcessingEvent(action.getParentActionID());
                } catch (Exception e2) {
                    logger.error("Error sending notification", e2);
                }
            }
        }
        return continueProcessingEvent;
    }
}
