/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.modelfarm;

import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.modelfarm.Model;
import io.deephaven.modelfarm.ModelFarmBase;
import io.deephaven.modelfarm.RDMModelFarm;
import io.deephaven.modelfarm.RowDataManager;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantLock;

public class ModelFarmTick<KEYTYPE, DATATYPE, ROWDATAMANAGERTYPE extends RowDataManager<KEYTYPE, DATATYPE>>
extends RDMModelFarm<KEYTYPE, DATATYPE, ROWDATAMANAGERTYPE> {
    private static final Logger log = LoggerFactory.getLogger(ModelFarmTick.class);
    private static final boolean LOG_PERF = Configuration.getInstance().getBooleanWithDefault("ModelFarm.logModelFarmTickPerformance", false);
    private final int maxQueueSize;
    private final Map<KEYTYPE, UQueue<DATATYPE>> umap = new ConcurrentHashMap<KEYTYPE, UQueue<DATATYPE>>();
    private final Queue<UQueue<DATATYPE>> queue;
    private final ModelFarmBase.MostRecentDataGetter<KEYTYPE, DATATYPE> mostRecentDataGetter;

    public ModelFarmTick(int nThreads, Model<DATATYPE> model, ROWDATAMANAGERTYPE dataManager, int maxQueueSize) {
        super(nThreads, model, dataManager);
        this.maxQueueSize = maxQueueSize;
        this.queue = new ArrayDeque<UQueue<DATATYPE>>(this.maxQueueSize);
        this.mostRecentDataGetter = this.getMostRecentDataFactory(ModelFarmBase.GetDataLockType.UGP_LOCK_ALREADY_HELD);
    }

    @Override
    protected void onDataUpdate(RowSet added, RowSet removed, RowSet modified) {
        Object key;
        long i;
        HashSet keys = new HashSet();
        RowSet.Iterator it = added.iterator();
        while (it.hasNext()) {
            i = it.nextLong();
            key = this.dataManager.uniqueIdCurrent(i);
            keys.add(key);
        }
        it = modified.iterator();
        while (it.hasNext()) {
            i = it.nextLong();
            key = this.dataManager.uniqueIdCurrent(i);
            keys.add(key);
        }
        this.updateQueue(keys);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateQueue(Set<KEYTYPE> keys) {
        for (KEYTYPE key : keys) {
            DATATYPE data = this.mostRecentDataGetter.get(key);
            ModelFarmTick modelFarmTick = this;
            synchronized (modelFarmTick) {
                boolean isNotShutdown;
                ModelFarmBase.State state = this.getState();
                boolean bl = isNotShutdown = state == ModelFarmBase.State.WAITING || state == ModelFarmBase.State.RUNNING;
                if (data != null && isNotShutdown) {
                    UQueue uqueue = this.umap.computeIfAbsent(key, u -> new UQueue());
                    uqueue.queue.add(data);
                    while (this.queue.size() >= this.maxQueueSize || !this.queue.offer(uqueue)) {
                        if (this.getState() == ModelFarmBase.State.WAITING) {
                            throw new IllegalStateException("Queue is full, but model farm is not started! Possible deadlock.  Consider increasing maxQueueSize: maxQueueSize=" + this.maxQueueSize);
                        }
                        try {
                            this.wait();
                        }
                        catch (InterruptedException e) {
                            log.warn((Throwable)e).append((CharSequence)"Interruption").endl();
                            throw new RuntimeException(e);
                        }
                    }
                    this.notifyAll();
                }
            }
        }
    }

    @Override
    protected synchronized boolean isQueueEmpty() {
        return this.queue.isEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void execute() throws InterruptedException {
        long t6;
        long t5;
        long t4;
        long t3;
        long t2;
        UQueue<DATATYPE> uqueue;
        long t0 = System.nanoTime();
        ModelFarmTick modelFarmTick = this;
        synchronized (modelFarmTick) {
            while ((uqueue = this.queue.poll()) == null) {
                if (this.getState() != ModelFarmBase.State.RUNNING) {
                    return;
                }
                this.wait();
            }
            this.notifyAll();
        }
        long t1 = System.nanoTime();
        try {
            uqueue.lock.lock();
            t2 = System.nanoTime();
            Object data = uqueue.queue.poll();
            t3 = System.nanoTime();
            Assert.neqNull(data, (String)"data");
            this.model.exec(data);
            t4 = System.nanoTime();
        }
        finally {
            t5 = System.nanoTime();
            uqueue.lock.unlock();
            t6 = System.nanoTime();
        }
        long t7 = System.nanoTime();
        if (LOG_PERF) {
            log.warn().append((CharSequence)"ModelFarmTick.execute PERFORMANCE: all=").append((t7 - t0) / 1000L).append((CharSequence)" take=").append((t1 - t0) / 1000L).append((CharSequence)(" lock=" + (t2 - t1) / 1000L)).append((CharSequence)" poll=").append((t3 - t2) / 1000L).append((CharSequence)" exec=").append((t4 - t3) / 1000L).append((CharSequence)" unlock=").append((t6 - t5) / 1000L).endl();
        }
    }

    private static class UQueue<T> {
        private final ReentrantLock lock = new ReentrantLock();
        private final Queue<T> queue = new ConcurrentLinkedQueue<T>();

        private UQueue() {
        }
    }
}

