/*
 * Decompiled with CFR 0.152.
 */
package org.cache2k.jcache.provider.event;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.event.EventType;
import org.cache2k.jcache.provider.event.EntryEvent;
import org.cache2k.jcache.provider.event.EventHandling;
import org.cache2k.jcache.provider.event.Listener;

public class AsyncDispatcher<K, V> {
    private static final int KEY_LOCKS_MASK = 2 << 31 - Integer.numberOfLeadingZeros(Runtime.getRuntime().availableProcessors()) - 1;
    private static Object[] KEY_LOCKS = new Object[KEY_LOCKS_MASK + 1];
    private Executor executor;
    private Map<K, Queue<EntryEvent<K, V>>> keyQueue = new ConcurrentHashMap<K, Queue<EntryEvent<K, V>>>();
    private Map<EventType, List<Listener<K, V>>> asyncListenerByType = new HashMap<EventType, List<Listener<K, V>>>();

    static Object getLockObject(Object key) {
        return KEY_LOCKS[key.hashCode() & KEY_LOCKS_MASK];
    }

    public AsyncDispatcher(Executor _executor) {
        for (EventType t : EventType.values()) {
            this.asyncListenerByType.put(t, new CopyOnWriteArrayList());
        }
        this.executor = _executor;
    }

    void addAsyncListener(Listener<K, V> l) {
        this.asyncListenerByType.get(l.getEventType()).add(l);
    }

    boolean removeAsyncListener(CacheEntryListenerConfiguration<K, V> cfg) {
        boolean _found = false;
        for (EventType t : EventType.values()) {
            _found |= EventHandling.removeCfgMatch(cfg, this.asyncListenerByType.get(t));
        }
        return _found;
    }

    void collectListeners(Collection<Listener<K, V>> l) {
        for (EventType t : EventType.values()) {
            l.addAll((Collection)this.asyncListenerByType.get(t));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void deliverAsyncEvent(EntryEvent<K, V> _event) {
        if (this.asyncListenerByType.get(_event.getEventType()).isEmpty()) {
            return;
        }
        ArrayList<Listener<K, V>> _listeners = new ArrayList<Listener<K, V>>((Collection)this.asyncListenerByType.get(_event.getEventType()));
        if (_listeners.isEmpty()) {
            return;
        }
        K key = _event.getKey();
        Object object = AsyncDispatcher.getLockObject(key);
        synchronized (object) {
            Queue<EntryEvent<K, V>> q = this.keyQueue.get(key);
            if (q != null) {
                q.add(_event);
                return;
            }
            q = new LinkedList<EntryEvent<K, V>>();
            this.keyQueue.put(key, q);
        }
        this.runAllListenersInParallel(_event, _listeners);
    }

    void runAllListenersInParallel(final EntryEvent<K, V> _event, List<Listener<K, V>> _listeners) {
        final AtomicInteger _countDown = new AtomicInteger(_listeners.size());
        for (final Listener<K, V> l : _listeners) {
            Runnable r = new Runnable(){

                @Override
                public void run() {
                    try {
                        l.fire(_event);
                    }
                    catch (Throwable t) {
                        t.printStackTrace();
                    }
                    int _done = _countDown.decrementAndGet();
                    if (_done == 0) {
                        AsyncDispatcher.this.runMoreOnKeyQueueOrStop(_event.getKey());
                    }
                }
            };
            this.executor.execute(r);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void runMoreOnKeyQueueOrStop(K key) {
        EntryEvent<K, V> _event;
        Object object = AsyncDispatcher.getLockObject(key);
        synchronized (object) {
            Queue<EntryEvent<K, V>> q = this.keyQueue.get(key);
            if (q.isEmpty()) {
                this.keyQueue.remove(key);
                return;
            }
            _event = q.remove();
        }
        ArrayList<Listener<K, V>> _listeners = new ArrayList<Listener<K, V>>((Collection)this.asyncListenerByType.get(_event.getEventType()));
        if (_listeners.isEmpty()) {
            this.runMoreOnKeyQueueOrStop(key);
            return;
        }
        this.runAllListenersInParallel(_event, _listeners);
    }

    static {
        for (int i = 0; i < KEY_LOCKS.length; ++i) {
            AsyncDispatcher.KEY_LOCKS[i] = new Object();
        }
    }
}

