/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.mantis.network.push;

import io.reactivex.mantis.network.push.AsyncConnection;
import io.reactivex.mantis.network.push.HashFunction;
import io.reactivex.mantis.network.push.KeyValuePair;
import io.reactivex.mantis.network.push.Router;
import io.reactivex.mantis.network.push.SnapshotCache;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.functions.Func1;

public class ConsistentHashingRouter<K, V>
extends Router<KeyValuePair<K, V>> {
    private static final Logger logger = LoggerFactory.getLogger(ConsistentHashingRouter.class);
    private static int connectionRepetitionOnRing = 1000;
    private static long validCacheAgeMSec = 5000L;
    private HashFunction hashFunction;
    private AtomicReference<SnapshotCache<SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>>>> cachedRingRef = new AtomicReference();

    public ConsistentHashingRouter(String name, Func1<KeyValuePair<K, V>, byte[]> dataEncoder, HashFunction hashFunction) {
        super("ConsistentHashingRouter_" + name, dataEncoder);
        this.hashFunction = hashFunction;
    }

    @Override
    public void route(Set<AsyncConnection<KeyValuePair<K, V>>> connections, List<KeyValuePair<K, V>> chunks) {
        if (connections != null && !connections.isEmpty() && chunks != null && !chunks.isEmpty()) {
            int numConnections = connections.size();
            int bufferCapacity = chunks.size() / numConnections + 1;
            HashMap<AsyncConnection<KeyValuePair<K, V>>, ArrayList<Object>> writes = new HashMap<AsyncConnection<KeyValuePair<K, V>>, ArrayList<Object>>(numConnections);
            SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>> ring = this.hashConnections(connections);
            for (KeyValuePair<K, V> keyValuePair : chunks) {
                long hash = keyValuePair.getKeyBytesHashed();
                AsyncConnection<KeyValuePair<K, V>> connection = this.lookupConnection(hash, ring);
                Func1<KeyValuePair<K, V>, Boolean> predicate = connection.getPredicate();
                if (predicate != null && !((Boolean)predicate.call(keyValuePair)).booleanValue()) continue;
                ArrayList<Object> buffer = (ArrayList<Object>)writes.get(connection);
                if (buffer == null) {
                    buffer = new ArrayList<Object>(bufferCapacity);
                    writes.put(connection, buffer);
                }
                buffer.add(this.encoder.call(keyValuePair));
            }
            if (!writes.isEmpty()) {
                for (Map.Entry entry : writes.entrySet()) {
                    AsyncConnection connection = (AsyncConnection)entry.getKey();
                    List toWrite = (List)entry.getValue();
                    connection.write(toWrite);
                    this.numEventsRouted.increment((long)toWrite.size());
                }
            }
        }
    }

    private AsyncConnection<KeyValuePair<K, V>> lookupConnection(long hash, SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>> ring) {
        if (!ring.containsKey(hash)) {
            SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>> tailMap = ring.tailMap(hash);
            hash = tailMap.isEmpty() ? ring.firstKey().longValue() : tailMap.firstKey().longValue();
        }
        return (AsyncConnection)ring.get(hash);
    }

    private void computeRing(Set<AsyncConnection<KeyValuePair<K, V>>> connections) {
        TreeMap<Long, AsyncConnection<KeyValuePair<K, V>>> ring = new TreeMap<Long, AsyncConnection<KeyValuePair<K, V>>>();
        for (AsyncConnection<KeyValuePair<K, V>> connection : connections) {
            for (int i = 0; i < connectionRepetitionOnRing; ++i) {
                String connectionId = connection.getSlotId();
                if (connectionId == null) {
                    throw new IllegalStateException("Connection must specify an id for consistent hashing");
                }
                byte[] connectionBytes = (connectionId + "-" + i).getBytes();
                long hash = this.hashFunction.computeHash(connectionBytes);
                ring.put(hash, connection);
            }
        }
        this.cachedRingRef.set(new SnapshotCache(ring));
    }

    private SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>> hashConnections(Set<AsyncConnection<KeyValuePair<K, V>>> connections) {
        SnapshotCache<SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>>> cache = this.cachedRingRef.get();
        if (cache == null) {
            logger.info("Recomputing ring due null reference");
            this.computeRing(connections);
        } else {
            SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>> cachedRing = cache.getCache();
            if (cachedRing.size() != connections.size() * connectionRepetitionOnRing) {
                logger.info("Recomputing ring due to difference in number of connections versus cache");
                this.computeRing(connections);
            } else {
                long timestamp = cache.getTimestamp();
                if (System.currentTimeMillis() - timestamp > validCacheAgeMSec) {
                    this.computeRing(connections);
                }
            }
        }
        return this.cachedRingRef.get().getCache();
    }
}

