package io.reactivex.mantis.network.push;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.functions.Func1;

/* loaded from: input_file:io/reactivex/mantis/network/push/ConsistentHashingRouter.class */
public class ConsistentHashingRouter<K, V> extends Router<KeyValuePair<K, V>> {
    private static final Logger logger = LoggerFactory.getLogger(ConsistentHashingRouter.class);
    private static int connectionRepetitionOnRing = 1000;
    private static long validCacheAgeMSec = 5000;
    private HashFunction hashFunction;
    private AtomicReference<SnapshotCache<SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>>>> cachedRingRef;

    public ConsistentHashingRouter(String str, Func1<KeyValuePair<K, V>, byte[]> func1, HashFunction hashFunction) {
        super("ConsistentHashingRouter_" + str, func1);
        this.cachedRingRef = new AtomicReference<>();
        this.hashFunction = hashFunction;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v50, types: [java.util.List] */
    /* JADX WARN: Type inference failed for: r0v52, types: [java.util.List] */
    /* JADX WARN: Type inference failed for: r0v54, types: [java.util.ArrayList] */
    @Override // io.reactivex.mantis.network.push.Router
    public void route(Set<AsyncConnection<KeyValuePair<K, V>>> set, List<KeyValuePair<K, V>> list) {
        if (set == null || set.isEmpty() || list == null || list.isEmpty()) {
            return;
        }
        int size = set.size();
        int size2 = (list.size() / size) + 1;
        HashMap hashMap = new HashMap(size);
        SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>> hashConnections = hashConnections(set);
        for (KeyValuePair<K, V> keyValuePair : list) {
            AsyncConnection<KeyValuePair<K, V>> lookupConnection = lookupConnection(keyValuePair.getKeyBytesHashed(), hashConnections);
            Func1<KeyValuePair<K, V>, Boolean> predicate = lookupConnection.getPredicate();
            if (predicate == null || ((Boolean) predicate.call(keyValuePair)).booleanValue()) {
                V v = (List) hashMap.get(lookupConnection);
                if (v == null) {
                    v = new ArrayList(size2);
                    hashMap.put(lookupConnection, v);
                }
                v.add(this.encoder.call(keyValuePair));
            }
        }
        if (hashMap.isEmpty()) {
            return;
        }
        for (Map.Entry<K, V> entry : hashMap.entrySet()) {
            ((AsyncConnection) entry.getKey()).write((List) entry.getValue());
            this.numEventsRouted.increment(r0.size());
        }
    }

    private AsyncConnection<KeyValuePair<K, V>> lookupConnection(long j, SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>> sortedMap) {
        if (!sortedMap.containsKey(Long.valueOf(j))) {
            SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>> tailMap = sortedMap.tailMap(Long.valueOf(j));
            j = tailMap.isEmpty() ? sortedMap.firstKey().longValue() : tailMap.firstKey().longValue();
        }
        return sortedMap.get(Long.valueOf(j));
    }

    private void computeRing(Set<AsyncConnection<KeyValuePair<K, V>>> set) {
        TreeMap treeMap = new TreeMap();
        for (AsyncConnection<KeyValuePair<K, V>> asyncConnection : set) {
            for (int i = 0; i < connectionRepetitionOnRing; i++) {
                String slotId = asyncConnection.getSlotId();
                if (slotId == null) {
                    throw new IllegalStateException("Connection must specify an id for consistent hashing");
                }
                treeMap.put(Long.valueOf(this.hashFunction.computeHash((slotId + "-" + i).getBytes())), asyncConnection);
            }
        }
        this.cachedRingRef.set(new SnapshotCache<>(treeMap));
    }

    private SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>> hashConnections(Set<AsyncConnection<KeyValuePair<K, V>>> set) {
        SnapshotCache<SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>>> snapshotCache = this.cachedRingRef.get();
        if (snapshotCache == null) {
            logger.info("Recomputing ring due null reference");
            computeRing(set);
        } else if (snapshotCache.getCache().size() != set.size() * connectionRepetitionOnRing) {
            logger.info("Recomputing ring due to difference in number of connections versus cache");
            computeRing(set);
        } else {
            if (System.currentTimeMillis() - snapshotCache.getTimestamp() > validCacheAgeMSec) {
                computeRing(set);
            }
        }
        return this.cachedRingRef.get().getCache();
    }
}
