package com.mware.core.model;

import com.codahale.metrics.Counter;
import com.mware.core.config.Configuration;
import com.mware.core.exception.BcException;
import com.mware.core.ingest.WorkerSpout;
import com.mware.core.ingest.WorkerTuple;
import com.mware.core.ingest.dataworker.WorkerItem;
import com.mware.core.model.workQueue.WebQueueRepository;
import com.mware.core.model.workQueue.WorkQueueRepository;
import com.mware.core.status.MetricsManager;
import com.mware.core.status.StatusServer;
import com.mware.core.util.BcLogger;
import com.mware.core.util.BcLoggerFactory;
import com.mware.ge.traversal.GeTraverser;
import java.util.LinkedList;
import java.util.Queue;

/* loaded from: input_file:com/mware/core/model/WorkerBase.class */
public abstract class WorkerBase<TWorkerItem extends WorkerItem> {
    private final boolean statusEnabled;
    private final boolean exitOnNextTupleFailure;
    private final Counter queueSizeMetric;
    private final MetricsManager metricsManager;
    private final String queueSizeMetricName;
    private WorkQueueRepository workQueueRepository;
    private WebQueueRepository webQueueRepository;
    private volatile boolean shouldRun;
    private StatusServer statusServer = null;
    private final Queue<WorkerBase<TWorkerItem>.WorkerItemWrapper> tupleQueue = new LinkedList();
    private final int tupleQueueSize;
    private Thread processThread;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mware/core/model/WorkerBase$WorkerItemWrapper.class */
    public class WorkerItemWrapper {
        private final TWorkerItem workerItem;
        private final WorkerTuple workerTuple;

        public WorkerItemWrapper(TWorkerItem tworkeritem, WorkerTuple workerTuple) {
            this.workerItem = tworkeritem;
            this.workerTuple = workerTuple;
        }

        public Object getMessageId() {
            return this.workerTuple.getMessageId();
        }

        public WorkerTuple getWorkerTuple() {
            return this.workerTuple;
        }

        public TWorkerItem getWorkerItem() {
            return this.workerItem;
        }

        public String toString() {
            return "WorkerItemWrapper{messageId=" + getMessageId() + ", workerItem=" + this.workerItem + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public WorkerBase(WorkQueueRepository workQueueRepository, WebQueueRepository webQueueRepository, Configuration configuration, MetricsManager metricsManager) {
        this.workQueueRepository = workQueueRepository;
        this.webQueueRepository = webQueueRepository;
        this.metricsManager = metricsManager;
        this.exitOnNextTupleFailure = configuration.getBoolean(getClass().getName() + ".exitOnNextTupleFailure", true);
        this.tupleQueueSize = configuration.getInt(getClass().getName() + ".tupleQueueSize", 10).intValue();
        this.statusEnabled = configuration.getBoolean(Configuration.STATUS_ENABLED, true);
        this.queueSizeMetricName = metricsManager.getNamePrefix(this) + "queue-size-" + Thread.currentThread().getId();
        this.queueSizeMetric = metricsManager.counter(this.queueSizeMetricName);
    }

    protected void finalize() throws Throwable {
        this.metricsManager.removeMetric(this.queueSizeMetricName);
        super.finalize();
    }

    public void run() throws Exception {
        BcLogger logger = BcLoggerFactory.getLogger(getClass());
        logger.info("begin runner", new Object[0]);
        WorkerSpout prepareWorkerSpout = prepareWorkerSpout();
        this.shouldRun = true;
        if (this.statusEnabled) {
            this.statusServer = createStatusServer();
        }
        startProcessThread(logger, prepareWorkerSpout);
        pollWorkerSpout(logger, prepareWorkerSpout);
        logger.info("end runner", new Object[0]);
    }

    private void startProcessThread(BcLogger bcLogger, WorkerSpout workerSpout) {
        this.processThread = new Thread(() -> {
            while (this.shouldRun) {
                WorkerBase<TWorkerItem>.WorkerItemWrapper workerItemWrapper = null;
                try {
                    synchronized (this.tupleQueue) {
                        while (true) {
                            if (this.shouldRun && this.tupleQueue.size() == 0) {
                                this.tupleQueue.wait();
                            } else {
                                if (!this.shouldRun) {
                                    return;
                                }
                                if (this.tupleQueue.size() > 0) {
                                    workerItemWrapper = this.tupleQueue.remove();
                                    this.queueSizeMetric.dec();
                                    this.tupleQueue.notifyAll();
                                }
                                if (!this.shouldRun || workerItemWrapper != null) {
                                    break;
                                }
                            }
                        }
                    }
                } catch (Exception e) {
                    throw new BcException("Could not get next workerItem", e);
                }
            }
        });
        this.processThread.setName(Thread.currentThread().getName() + "-process");
        this.processThread.start();
    }

    private void pollWorkerSpout(BcLogger bcLogger, WorkerSpout workerSpout) throws InterruptedException {
        while (this.shouldRun) {
            WorkerTuple workerTuple = null;
            try {
                workerTuple = workerSpout.nextTuple();
                WorkerBase<TWorkerItem>.WorkerItemWrapper workerItemWrapper = workerTuple == null ? null : new WorkerItemWrapper(tupleDataToWorkerItem(workerTuple.getData()), workerTuple);
                if (workerItemWrapper == null) {
                    continue;
                } else {
                    synchronized (this.tupleQueue) {
                        this.tupleQueue.add(workerItemWrapper);
                        this.queueSizeMetric.inc();
                        this.tupleQueue.notifyAll();
                        while (this.shouldRun && this.tupleQueue.size() >= this.tupleQueueSize) {
                            this.tupleQueue.wait();
                        }
                    }
                }
            } catch (InterruptedException e) {
                if (workerTuple != null) {
                    workerSpout.fail(workerTuple);
                }
                throw e;
            } catch (Exception e2) {
                if (workerTuple != null) {
                    workerSpout.fail(workerTuple);
                }
                handleNextTupleException(bcLogger, e2);
            }
        }
    }

    protected void handleNextTupleException(BcLogger bcLogger, Exception exc) throws InterruptedException {
        if (this.exitOnNextTupleFailure) {
            throw new BcException("Failed to get next tuple", exc);
        }
        bcLogger.error("Failed to get next tuple", exc);
        Thread.sleep(GeTraverser.DEFAULT_DEGREE);
    }

    protected abstract StatusServer createStatusServer() throws Exception;

    protected abstract void process(TWorkerItem tworkeritem) throws Exception;

    protected abstract TWorkerItem tupleDataToWorkerItem(byte[] bArr);

    public void stop() {
        this.shouldRun = false;
        if (this.statusServer != null) {
            this.statusServer.shutdown();
        }
        synchronized (this.tupleQueue) {
            this.tupleQueue.notifyAll();
        }
        try {
            if (this.processThread != null) {
                this.processThread.join(GeTraverser.DEFAULT_DEGREE);
            }
        } catch (InterruptedException e) {
            throw new BcException("Could not stop process thread: " + this.processThread.getName());
        }
    }

    protected WorkerSpout prepareWorkerSpout() {
        WorkerSpout createWorkerSpout = this.workQueueRepository.createWorkerSpout(getQueueName());
        createWorkerSpout.open();
        return createWorkerSpout;
    }

    protected abstract String getQueueName();

    /* JADX INFO: Access modifiers changed from: protected */
    public WorkQueueRepository getWorkQueueRepository() {
        return this.workQueueRepository;
    }

    public WebQueueRepository getWebQueueRepository() {
        return this.webQueueRepository;
    }

    public boolean shouldRun() {
        return this.shouldRun;
    }
}
