package org.visallo.core.model;

import org.json.JSONObject;
import org.visallo.core.config.Configuration;
import org.visallo.core.exception.VisalloException;
import org.visallo.core.ingest.WorkerSpout;
import org.visallo.core.ingest.WorkerTuple;
import org.visallo.core.model.workQueue.WorkQueueRepository;
import org.visallo.core.status.StatusServer;
import org.visallo.core.util.VisalloLogger;
import org.visallo.core.util.VisalloLoggerFactory;

/* loaded from: input_file:org/visallo/core/model/WorkerBase.class */
public abstract class WorkerBase {
    private final boolean statusEnabled;
    private final boolean exitOnNextTupleFailure;
    private WorkQueueRepository workQueueRepository;
    private volatile boolean shouldRun;
    private StatusServer statusServer = null;

    /* JADX INFO: Access modifiers changed from: protected */
    public WorkerBase(WorkQueueRepository workQueueRepository, Configuration configuration) {
        this.workQueueRepository = workQueueRepository;
        this.exitOnNextTupleFailure = configuration.getBoolean(getClass().getName() + ".exitOnNextTupleFailure", true);
        this.statusEnabled = configuration.getBoolean(Configuration.STATUS_ENABLED, true);
    }

    public void run() throws Exception {
        VisalloLogger logger = VisalloLoggerFactory.getLogger(getClass());
        logger.debug("begin runner", new Object[0]);
        WorkerSpout prepareWorkerSpout = prepareWorkerSpout();
        this.shouldRun = true;
        if (this.statusEnabled) {
            this.statusServer = createStatusServer();
        }
        while (this.shouldRun) {
            try {
                WorkerTuple nextTuple = prepareWorkerSpout.nextTuple();
                if (nextTuple == null) {
                    Thread.sleep(100L);
                } else {
                    try {
                        logger.debug("start processing", new Object[0]);
                        long currentTimeMillis = System.currentTimeMillis();
                        process(nextTuple.getMessageId(), nextTuple.getJson());
                        logger.debug("completed processing in (%dms)", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                        prepareWorkerSpout.ack(nextTuple.getMessageId());
                    } catch (Throwable th) {
                        logger.error("Could not process tuple: %s", nextTuple, th);
                        prepareWorkerSpout.fail(nextTuple.getMessageId());
                    }
                }
            } catch (InterruptedException e) {
                throw e;
            } catch (Exception e2) {
                handleNextTupleException(logger, e2);
            }
        }
    }

    protected void handleNextTupleException(VisalloLogger visalloLogger, Exception exc) throws InterruptedException {
        if (this.exitOnNextTupleFailure) {
            throw new VisalloException("Failed to get next tuple", exc);
        }
        visalloLogger.error("Failed to get next tuple", exc);
        Thread.sleep(10000L);
    }

    protected abstract StatusServer createStatusServer() throws Exception;

    protected abstract void process(Object obj, JSONObject jSONObject) throws Exception;

    public void stop() {
        this.shouldRun = false;
        if (this.statusServer != null) {
            this.statusServer.shutdown();
        }
    }

    protected WorkerSpout prepareWorkerSpout() {
        WorkerSpout createWorkerSpout = this.workQueueRepository.createWorkerSpout(getQueueName());
        createWorkerSpout.open();
        return createWorkerSpout;
    }

    protected abstract String getQueueName();

    /* JADX INFO: Access modifiers changed from: protected */
    public WorkQueueRepository getWorkQueueRepository() {
        return this.workQueueRepository;
    }

    public boolean shouldRun() {
        return this.shouldRun;
    }
}
