package org.visallo.model.queue.inmemory;

import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.json.JSONObject;
import org.vertexium.Graph;
import org.visallo.core.config.Configuration;
import org.visallo.core.ingest.WorkerSpout;
import org.visallo.core.ingest.WorkerTuple;
import org.visallo.core.model.FlushFlag;
import org.visallo.core.model.WorkQueueNames;
import org.visallo.core.model.workQueue.Priority;
import org.visallo.core.model.workQueue.WorkQueueRepository;
import org.visallo.core.status.model.QueueStatus;
import org.visallo.core.status.model.Status;

/* loaded from: input_file:org/visallo/model/queue/inmemory/InMemoryWorkQueueRepository.class */
public class InMemoryWorkQueueRepository extends WorkQueueRepository {
    private static Map<String, List<JSONObject>> queues = new HashMap();
    private List<WorkQueueRepository.BroadcastConsumer> broadcastConsumers;

    @Inject
    public InMemoryWorkQueueRepository(Graph graph, WorkQueueNames workQueueNames, Configuration configuration) {
        super(graph, workQueueNames, configuration);
        this.broadcastConsumers = new ArrayList();
    }

    protected void broadcastJson(JSONObject jSONObject) {
        Iterator<WorkQueueRepository.BroadcastConsumer> it = this.broadcastConsumers.iterator();
        while (it.hasNext()) {
            it.next().broadcastReceived(jSONObject);
        }
    }

    public void pushOnQueue(String str, FlushFlag flushFlag, JSONObject jSONObject, Priority priority) {
        LOGGER.debug("push on queue: %s: %s", new Object[]{str, jSONObject});
        addToQueue(str, jSONObject, priority);
    }

    public void addToQueue(String str, JSONObject jSONObject, Priority priority) {
        List<JSONObject> queue = getQueue(str);
        synchronized (queue) {
            if (priority == Priority.HIGH) {
                queue.add(0, jSONObject);
            } else {
                queue.add(jSONObject);
            }
            queue.notifyAll();
        }
    }

    public void flush() {
    }

    public void format() {
        clearQueue();
    }

    public void subscribeToBroadcastMessages(WorkQueueRepository.BroadcastConsumer broadcastConsumer) {
        this.broadcastConsumers.add(broadcastConsumer);
    }

    public WorkerSpout createWorkerSpout(String str) {
        final List<JSONObject> queue = getQueue(str);
        return new WorkerSpout() { // from class: org.visallo.model.queue.inmemory.InMemoryWorkQueueRepository.1
            public WorkerTuple nextTuple() throws Exception {
                synchronized (queue) {
                    if (queue.size() == 0) {
                        return null;
                    }
                    JSONObject jSONObject = (JSONObject) queue.remove(0);
                    if (jSONObject == null) {
                        return null;
                    }
                    return new WorkerTuple("", jSONObject);
                }
            }
        };
    }

    public Map<String, Status> getQueuesStatus() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, List<JSONObject>> entry : queues.entrySet()) {
            hashMap.put(entry.getKey(), new QueueStatus(entry.getValue().size()));
        }
        return hashMap;
    }

    public static void clearQueue() {
        queues.clear();
    }

    protected void deleteQueue(String str) {
        queues.remove(str);
    }

    public static List<JSONObject> getQueue(String str) {
        List<JSONObject> list = queues.get(str);
        if (list == null) {
            list = new LinkedList();
            queues.put(str, list);
        }
        return list;
    }
}
