package org.copperengine.core.persistent.hybrid;

import com.google.common.util.concurrent.ListenableFuture;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import org.copperengine.core.Acknowledge;
import org.copperengine.core.CopperRuntimeException;
import org.copperengine.core.DuplicateIdException;
import org.copperengine.core.ProcessingState;
import org.copperengine.core.Response;
import org.copperengine.core.WaitMode;
import org.copperengine.core.Workflow;
import org.copperengine.core.common.WorkflowRepository;
import org.copperengine.core.internal.WorkflowAccessor;
import org.copperengine.core.persistent.RegisterCall;
import org.copperengine.core.persistent.ScottyDBStorageInterface;
import org.copperengine.core.persistent.Serializer;
import org.copperengine.core.util.Blocker;
import org.copperengine.management.model.WorkflowInstanceFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/copperengine/core/persistent/hybrid/HybridDBStorage.class */
public class HybridDBStorage implements ScottyDBStorageInterface {
    private static final Logger logger;
    private static final Acknowledge.BestEffortAcknowledge ACK;
    private final Executor executor;
    private final TimeoutManager timeoutManager;
    private final Serializer serializer;
    private final WorkflowRepository wfRepo;
    private final Storage storage;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Blocker startupBlocker = new Blocker(true);
    private final CorrelationIdMap correlationIdMap = new CorrelationIdMap();
    private final Object[] mutexArray = new Object[2003];
    private final Set<String> currentlyProcessingEarlyResponses = new HashSet();
    private boolean started = false;
    private final Map<String, ConcurrentSkipListSet<QueueElement>> ppoolId2queueMap = new ConcurrentHashMap();

    public HybridDBStorage(Serializer serializer, WorkflowRepository workflowRepository, Storage storage, TimeoutManager timeoutManager, Executor executor) {
        this.serializer = serializer;
        this.wfRepo = workflowRepository;
        this.storage = storage;
        this.timeoutManager = timeoutManager;
        this.executor = executor;
        for (int i = 0; i < this.mutexArray.length; i++) {
            this.mutexArray[i] = new Object();
        }
    }

    @Override // org.copperengine.core.persistent.ScottyDBStorageInterface
    public void insert(Workflow<?> workflow, Acknowledge acknowledge) throws DuplicateIdException, Exception {
        if (workflow == null) {
            throw new NullPointerException();
        }
        logger.debug("insert({})", workflow.getId());
        this.startupBlocker.pass();
        WorkflowInstance workflowInstance = new WorkflowInstance();
        workflowInstance.id = workflow.getId();
        workflowInstance.serializedWorkflow = this.serializer.serializeWorkflow(workflow);
        workflowInstance.ppoolId = workflow.getProcessorPoolId();
        workflowInstance.prio = workflow.getPriority();
        workflowInstance.creationTS = workflow.getCreationTS();
        workflowInstance.state = ProcessingState.ENQUEUED;
        workflowInstance.classname = workflow.getClass().getName();
        this.storage.safeWorkflowInstance(workflowInstance, true);
        _enqueue(workflow.getId(), workflow.getProcessorPoolId(), workflow.getPriority());
        if (acknowledge != null) {
            acknowledge.onSuccess();
        }
    }

    @Override // org.copperengine.core.persistent.ScottyDBStorageInterface
    public void insert(List<Workflow<?>> list, Acknowledge acknowledge) throws DuplicateIdException, Exception {
        Iterator<Workflow<?>> it = list.iterator();
        while (it.hasNext()) {
            insert(it.next(), ACK);
        }
        acknowledge.onSuccess();
    }

    @Override // org.copperengine.core.persistent.ScottyDBStorageInterface
    public void insert(Workflow<?> workflow, Connection connection) throws DuplicateIdException, Exception {
        insert(workflow, ACK);
    }

    @Override // org.copperengine.core.persistent.ScottyDBStorageInterface
    public void insert(List<Workflow<?>> list, Connection connection) throws DuplicateIdException, Exception {
        Iterator<Workflow<?>> it = list.iterator();
        while (it.hasNext()) {
            insert(it.next(), ACK);
        }
    }

    @Override // org.copperengine.core.persistent.ScottyDBStorageInterface
    public void finish(Workflow<?> workflow, Acknowledge acknowledge) {
        logger.debug("finish({})", workflow.getId());
        final Acknowledge acknowledge2 = acknowledge != null ? acknowledge : ACK;
        try {
            this.startupBlocker.pass();
            final String id = workflow.getId();
            this.correlationIdMap.removeAll4Workflow(id);
            final ListenableFuture<Void> deleteWorkflowInstance = this.storage.deleteWorkflowInstance(workflow.getId());
            deleteWorkflowInstance.addListener(new Runnable() { // from class: org.copperengine.core.persistent.hybrid.HybridDBStorage.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        deleteWorkflowInstance.get();
                        acknowledge2.onSuccess();
                    } catch (InterruptedException | ExecutionException e) {
                        HybridDBStorage.logger.error("finish(" + id + ") failed", e);
                        acknowledge2.onException(e);
                    }
                }
            }, this.executor);
        } catch (Exception e) {
            logger.error("finish failed", (Throwable) e);
            acknowledge2.onException(e);
        }
    }

    @Override // org.copperengine.core.persistent.ScottyDBStorageInterface
    public List<Workflow<?>> dequeue(String str, int i) throws Exception {
        logger.debug("dequeue({},{})", str, Integer.valueOf(i));
        long currentTimeMillis = System.currentTimeMillis();
        this.startupBlocker.pass();
        ArrayList arrayList = new ArrayList(i);
        while (arrayList.size() < i) {
            QueueElement _take = arrayList.isEmpty() ? _take(str) : _poll(str);
            if (_take == null) {
                break;
            }
            synchronized (findMutex(_take.wfId)) {
                try {
                    this.correlationIdMap.removeAll4Workflow(_take.wfId);
                    WorkflowInstance readWorkflowInstance = this.storage.readWorkflowInstance(_take.wfId);
                    if (readWorkflowInstance == null) {
                        logger.warn("No workflow instance with id {} found in database", _take.wfId);
                    } else {
                        Workflow<?> workflow = null;
                        try {
                            workflow = convert2workflow(readWorkflowInstance);
                        } catch (Exception e) {
                            logger.error("Unable to deserialize workflow instance " + _take.wfId + " - setting state to INVALID", (Throwable) e);
                            this.storage.updateWorkflowInstanceState(_take.wfId, ProcessingState.INVALID);
                        }
                        if (workflow != null) {
                            this.timeoutManager.unregisterTimeout(readWorkflowInstance.timeout, readWorkflowInstance.id);
                            arrayList.add(workflow);
                        }
                    }
                } catch (Exception e2) {
                    logger.error("Fatal error: dequeue failed for workflow instance " + _take.wfId, (Throwable) e2);
                }
            }
        }
        logger.debug("dequeue({},{}) finished, returning {} elements in {} msec", str, Integer.valueOf(i), Integer.valueOf(arrayList.size()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        return arrayList;
    }

    private Workflow<?> convert2workflow(WorkflowInstance workflowInstance) throws Exception {
        if (workflowInstance == null) {
            return null;
        }
        Workflow<?> deserializeWorkflow = this.serializer.deserializeWorkflow(workflowInstance.serializedWorkflow, this.wfRepo);
        deserializeWorkflow.setId(workflowInstance.id);
        deserializeWorkflow.setProcessorPoolId(workflowInstance.ppoolId);
        deserializeWorkflow.setPriority(workflowInstance.prio);
        WorkflowAccessor.setCreationTS(deserializeWorkflow, workflowInstance.creationTS);
        WorkflowAccessor.setLastActivityTS(deserializeWorkflow, workflowInstance.lastModTS);
        if (workflowInstance.cid2ResponseMap != null) {
            for (Map.Entry<String, String> entry : workflowInstance.cid2ResponseMap.entrySet()) {
                if (entry.getValue() != null) {
                    deserializeWorkflow.putResponse(this.serializer.deserializeResponse(entry.getValue()));
                } else {
                    deserializeWorkflow.putResponse(new Response<>(entry.getKey()));
                }
            }
        }
        return deserializeWorkflow;
    }

    @Override // org.copperengine.core.persistent.ScottyDBStorageInterface
    public void registerCallback(RegisterCall registerCall, Acknowledge acknowledge) throws Exception {
        boolean z;
        logger.debug("registerCallback({})", registerCall);
        this.startupBlocker.pass();
        final String id = registerCall.workflow.getId();
        WorkflowInstance workflowInstance = new WorkflowInstance();
        workflowInstance.id = id;
        workflowInstance.state = ProcessingState.WAITING;
        workflowInstance.prio = registerCall.workflow.getPriority();
        workflowInstance.creationTS = registerCall.workflow.getCreationTS();
        workflowInstance.serializedWorkflow = this.serializer.serializeWorkflow(registerCall.workflow);
        workflowInstance.waitMode = registerCall.waitMode;
        workflowInstance.timeout = (registerCall.timeout == null || registerCall.timeout.longValue() <= 0) ? null : new Date(System.currentTimeMillis() + registerCall.timeout.longValue());
        workflowInstance.ppoolId = registerCall.workflow.getProcessorPoolId();
        workflowInstance.cid2ResponseMap = new HashMap();
        for (String str : registerCall.correlationIds) {
            workflowInstance.cid2ResponseMap.put(str, null);
        }
        workflowInstance.classname = registerCall.workflow.getClass().getName();
        this.storage.safeWorkflowInstance(workflowInstance, false);
        this.correlationIdMap.addCorrelationIds(id, registerCall.correlationIds);
        synchronized (this.currentlyProcessingEarlyResponses) {
            do {
                z = false;
                for (String str2 : registerCall.correlationIds) {
                    if (this.currentlyProcessingEarlyResponses.contains(str2)) {
                        this.currentlyProcessingEarlyResponses.wait();
                        z = true;
                    }
                }
            } while (z);
        }
        boolean z2 = false;
        for (String str3 : registerCall.correlationIds) {
            Response<?> deserializeResponse = this.serializer.deserializeResponse(this.storage.readEarlyResponse(str3));
            if (deserializeResponse != null) {
                logger.debug("found early response with correlationId {} for workflow {} - doing notify...", str3, id);
                if (notifyInternal(deserializeResponse, ACK)) {
                    z2 = true;
                }
                this.storage.deleteEarlyResponse(str3);
            }
        }
        if (workflowInstance.timeout != null && !z2) {
            this.timeoutManager.registerTimeout(workflowInstance.timeout, id, new Runnable() { // from class: org.copperengine.core.persistent.hybrid.HybridDBStorage.2
                @Override // java.lang.Runnable
                public void run() {
                    HybridDBStorage.this.onTimeout(id);
                }
            });
        }
        acknowledge.onSuccess();
    }

    @Override // org.copperengine.core.persistent.ScottyDBStorageInterface
    public void notify(Response<?> response, Acknowledge acknowledge) throws Exception {
        logger.debug("notify({})", response);
        this.startupBlocker.pass();
        notifyInternal(response, acknowledge);
    }

    private boolean notifyInternal(Response<?> response, Acknowledge acknowledge) throws Exception {
        logger.debug("notifyInternal({})", response);
        String correlationId = response.getCorrelationId();
        String workflowId = this.correlationIdMap.getWorkflowId(correlationId);
        if (workflowId != null) {
            synchronized (findMutex(workflowId)) {
                if (this.correlationIdMap.getWorkflowId(correlationId) != null) {
                    WorkflowInstance readWorkflowInstance = this.storage.readWorkflowInstance(workflowId);
                    if (readWorkflowInstance.cid2ResponseMap.containsKey(correlationId)) {
                        readWorkflowInstance.cid2ResponseMap.put(correlationId, this.serializer.serializeResponse(response));
                    }
                    boolean z = readWorkflowInstance.state == ProcessingState.WAITING && ((readWorkflowInstance.timeout != null && (readWorkflowInstance.timeout.getTime() > System.currentTimeMillis() ? 1 : (readWorkflowInstance.timeout.getTime() == System.currentTimeMillis() ? 0 : -1)) <= 0) || readWorkflowInstance.waitMode == WaitMode.FIRST || ((readWorkflowInstance.waitMode == WaitMode.ALL && readWorkflowInstance.cid2ResponseMap.size() == 1) || (readWorkflowInstance.waitMode == WaitMode.ALL && allResponsesAvailable(readWorkflowInstance))));
                    if (z) {
                        readWorkflowInstance.state = ProcessingState.ENQUEUED;
                    }
                    this.storage.safeWorkflowInstance(readWorkflowInstance, false);
                    if (z) {
                        _enqueue(readWorkflowInstance.id, readWorkflowInstance.ppoolId, readWorkflowInstance.prio);
                    }
                    acknowledge.onSuccess();
                    return z;
                }
            }
        }
        handleEarlyResponse(response, acknowledge);
        return false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onTimeout(String str) {
        logger.debug("onTimeout(wfId={})", str);
        try {
            synchronized (findMutex(str)) {
                if (this.correlationIdMap.containsWorkflowId(str)) {
                    WorkflowInstance readWorkflowInstance = this.storage.readWorkflowInstance(str);
                    logger.debug("workflow instance={}", readWorkflowInstance);
                    if (readWorkflowInstance.state == ProcessingState.WAITING) {
                        readWorkflowInstance.state = ProcessingState.ENQUEUED;
                        this.storage.safeWorkflowInstance(readWorkflowInstance, false);
                        _enqueue(readWorkflowInstance.id, readWorkflowInstance.ppoolId, readWorkflowInstance.prio);
                    }
                }
            }
        } catch (Exception e) {
            logger.error("onTimeout failed for wfId " + str, (Throwable) e);
        }
    }

    private void handleEarlyResponse(final Response<?> response, final Acknowledge acknowledge) throws Exception {
        synchronized (this.currentlyProcessingEarlyResponses) {
            this.currentlyProcessingEarlyResponses.add(response.getCorrelationId());
        }
        final ListenableFuture<Void> safeEarlyResponse = this.storage.safeEarlyResponse(response.getCorrelationId(), this.serializer.serializeResponse(response));
        safeEarlyResponse.addListener(new Runnable() { // from class: org.copperengine.core.persistent.hybrid.HybridDBStorage.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    try {
                        safeEarlyResponse.get();
                        acknowledge.onSuccess();
                        synchronized (HybridDBStorage.this.currentlyProcessingEarlyResponses) {
                            HybridDBStorage.this.currentlyProcessingEarlyResponses.remove(response.getCorrelationId());
                            HybridDBStorage.this.currentlyProcessingEarlyResponses.notifyAll();
                        }
                    } catch (Exception e) {
                        HybridDBStorage.logger.error("safeEarlyResponse failed", (Throwable) e);
                        acknowledge.onException(e);
                        synchronized (HybridDBStorage.this.currentlyProcessingEarlyResponses) {
                            HybridDBStorage.this.currentlyProcessingEarlyResponses.remove(response.getCorrelationId());
                            HybridDBStorage.this.currentlyProcessingEarlyResponses.notifyAll();
                        }
                    }
                } catch (Throwable th) {
                    synchronized (HybridDBStorage.this.currentlyProcessingEarlyResponses) {
                        HybridDBStorage.this.currentlyProcessingEarlyResponses.remove(response.getCorrelationId());
                        HybridDBStorage.this.currentlyProcessingEarlyResponses.notifyAll();
                        throw th;
                    }
                }
            }
        }, this.executor);
    }

    @Override // org.copperengine.core.persistent.ScottyDBStorageInterface
    public void notify(List<Response<?>> list, Acknowledge acknowledge) throws Exception {
        Iterator<Response<?>> it = list.iterator();
        while (it.hasNext()) {
            notify(it.next(), ACK);
        }
        acknowledge.onSuccess();
    }

    @Override // org.copperengine.core.persistent.ScottyDBStorageInterface
    public void notify(List<Response<?>> list, Connection connection) throws Exception {
        Iterator<Response<?>> it = list.iterator();
        while (it.hasNext()) {
            notify(it.next(), ACK);
        }
    }

    @Override // org.copperengine.core.persistent.ScottyDBStorageInterface
    public synchronized void startup() {
        if (this.started) {
            return;
        }
        logger.info("Starting up...");
        try {
            this.storage.initialize(new HybridDBStorageAccessor() { // from class: org.copperengine.core.persistent.hybrid.HybridDBStorage.4
                @Override // org.copperengine.core.persistent.hybrid.HybridDBStorageAccessor
                public void registerCorrelationId(String str, String str2) {
                    HybridDBStorage.this._registerCorrelationId(str, str2);
                }

                @Override // org.copperengine.core.persistent.hybrid.HybridDBStorageAccessor
                public void enqueue(String str, String str2, int i) {
                    HybridDBStorage.this._enqueue(str, str2, i);
                }
            }, Runtime.getRuntime().availableProcessors());
            this.started = true;
            this.startupBlocker.unblock();
            logger.info("Startup finished!");
        } catch (RuntimeException e) {
            logger.error("startup failed", (Throwable) e);
            throw e;
        } catch (Exception e2) {
            logger.error("startup failed", (Throwable) e2);
            throw new CopperRuntimeException("startup failed", e2);
        }
    }

    @Override // org.copperengine.core.persistent.ScottyDBStorageInterface
    public void shutdown() {
    }

    @Override // org.copperengine.core.persistent.ScottyDBStorageInterface
    public void error(Workflow<?> workflow, Throwable th, Acknowledge acknowledge) {
        try {
            this.startupBlocker.pass();
            this.correlationIdMap.removeAll4Workflow(workflow.getId());
            this.storage.updateWorkflowInstanceState(workflow.getId(), ProcessingState.ERROR);
            if (acknowledge != null) {
                acknowledge.onSuccess();
            }
        } catch (Exception e) {
            logger.error("error failed", (Throwable) e);
            if (acknowledge != null) {
                acknowledge.onException(e);
            }
        }
    }

    @Override // org.copperengine.core.persistent.ScottyDBStorageInterface
    public void restart(String str) throws Exception {
        this.startupBlocker.pass();
        WorkflowInstance readWorkflowInstance = this.storage.readWorkflowInstance(str);
        if (readWorkflowInstance == null) {
            throw new CopperRuntimeException("No workflow found with id " + str);
        }
        if (readWorkflowInstance.state != ProcessingState.ERROR) {
            throw new CopperRuntimeException("Workflow found with id " + str + " is not in state ERROR");
        }
        _enqueue(readWorkflowInstance.id, readWorkflowInstance.ppoolId, readWorkflowInstance.prio);
    }

    @Override // org.copperengine.core.persistent.ScottyDBStorageInterface
    public void setRemoveWhenFinished(boolean z) {
        throw new UnsupportedOperationException();
    }

    @Override // org.copperengine.core.persistent.ScottyDBStorageInterface
    public void restartAll() throws Exception {
        throw new UnsupportedOperationException();
    }

    @Override // org.copperengine.core.persistent.ScottyDBStorageInterface
    public void deleteBroken(String str) throws Exception {
        throw new UnsupportedOperationException();
    }

    @Override // org.copperengine.core.persistent.ScottyDBStorageInterface
    public Workflow<?> read(String str) throws Exception {
        return convert2workflow(this.storage.readWorkflowInstance(str));
    }

    void _enqueue(String str, String str2, int i) {
        logger.trace("enqueue(wfId={}, ppoolId={}, prio={})", str, str2, Integer.valueOf(i));
        ConcurrentSkipListSet<QueueElement> _findQueue = _findQueue(str2);
        synchronized (_findQueue) {
            boolean add = _findQueue.add(new QueueElement(str, i));
            if (!$assertionsDisabled && !add) {
                throw new AssertionError("queue already contains workflow id " + str);
            }
            _findQueue.notify();
        }
    }

    QueueElement _poll(String str) {
        logger.trace("_poll({})", str);
        QueueElement pollFirst = _findQueue(str).pollFirst();
        if (pollFirst != null) {
            logger.debug("dequeued for ppoolId={}: wfId={}", str, pollFirst.wfId);
        }
        return pollFirst;
    }

    QueueElement _take(String str) throws InterruptedException {
        QueueElement pollFirst;
        logger.trace("_take({})", str);
        ConcurrentSkipListSet<QueueElement> _findQueue = _findQueue(str);
        synchronized (_findQueue) {
            while (true) {
                pollFirst = _findQueue.pollFirst();
                if (pollFirst != null) {
                    logger.debug("dequeued for ppoolId={}: wfId={}", str, pollFirst.wfId);
                } else {
                    _findQueue.wait(10L);
                }
            }
        }
        return pollFirst;
    }

    private ConcurrentSkipListSet<QueueElement> _findQueue(String str) {
        ConcurrentSkipListSet<QueueElement> concurrentSkipListSet = this.ppoolId2queueMap.get(str);
        if (concurrentSkipListSet != null) {
            return concurrentSkipListSet;
        }
        synchronized (this.ppoolId2queueMap) {
            ConcurrentSkipListSet<QueueElement> concurrentSkipListSet2 = this.ppoolId2queueMap.get(str);
            if (concurrentSkipListSet2 != null) {
                return concurrentSkipListSet2;
            }
            ConcurrentSkipListSet<QueueElement> concurrentSkipListSet3 = new ConcurrentSkipListSet<>(new QueueElementComparator());
            this.ppoolId2queueMap.put(str, concurrentSkipListSet3);
            return concurrentSkipListSet3;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void _registerCorrelationId(String str, String str2) {
        this.correlationIdMap.addCorrelationId(str2, str);
    }

    private boolean allResponsesAvailable(WorkflowInstance workflowInstance) {
        Iterator<Map.Entry<String, String>> it = workflowInstance.cid2ResponseMap.entrySet().iterator();
        while (it.hasNext()) {
            if (it.next().getValue() == null) {
                return false;
            }
        }
        return true;
    }

    private Object findMutex(String str) {
        return this.mutexArray[(int) (Math.abs(str.hashCode()) % this.mutexArray.length)];
    }

    @Override // org.copperengine.core.persistent.ScottyDBStorageInterface
    public List<Workflow<?>> queryAllActive(String str, int i) throws Exception {
        throw new UnsupportedOperationException();
    }

    @Override // org.copperengine.core.persistent.ScottyDBStorageInterface
    public int queryQueueSize(String str) throws Exception {
        ConcurrentSkipListSet<QueueElement> concurrentSkipListSet = this.ppoolId2queueMap.get(Objects.requireNonNull(str));
        if (concurrentSkipListSet == null) {
            return 0;
        }
        return concurrentSkipListSet.size();
    }

    @Override // org.copperengine.core.persistent.ScottyDBStorageInterface
    public List<Workflow<?>> queryWorkflowInstances(WorkflowInstanceFilter workflowInstanceFilter) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (WorkflowInstance workflowInstance : this.storage.queryWorkflowInstances(workflowInstanceFilter)) {
            try {
                arrayList.add(convert2workflow(workflowInstance));
            } catch (Exception e) {
                logger.error("Failed to convert workflow instance " + workflowInstance.id, (Throwable) e);
            }
        }
        return arrayList;
    }

    static {
        $assertionsDisabled = !HybridDBStorage.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger((Class<?>) HybridDBStorage.class);
        ACK = new Acknowledge.BestEffortAcknowledge();
    }
}
