/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.dyno.queues.redis;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Uninterruptibles;
import com.netflix.dyno.jedis.DynoJedisClient;
import com.netflix.dyno.queues.DynoQueue;
import com.netflix.dyno.queues.Message;
import com.netflix.dyno.queues.redis.QueueMonitor;
import com.netflix.dyno.queues.redis.QueueUtils;
import com.netflix.dyno.queues.redis.sharding.ShardingStrategy;
import com.netflix.servo.monitor.Stopwatch;
import java.io.IOException;
import java.time.Clock;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Tuple;
import redis.clients.jedis.commands.JedisCommands;
import redis.clients.jedis.params.ZAddParams;

public class RedisDynoQueue
implements DynoQueue {
    private final Logger logger = LoggerFactory.getLogger(RedisDynoQueue.class);
    private final Clock clock;
    private final String queueName;
    private final List<String> allShards;
    private final String shardName;
    private final String redisKeyPrefix;
    private final String messageStoreKey;
    private final String localQueueShard;
    private volatile int unackTime = 60;
    private final QueueMonitor monitor;
    private final ObjectMapper om;
    private volatile JedisCommands quorumConn;
    private volatile JedisCommands nonQuorumConn;
    private final ConcurrentLinkedQueue<String> prefetchedIds;
    private final Map<String, ConcurrentLinkedQueue<String>> unsafePrefetchedIdsAllShardsMap;
    private final ScheduledExecutorService schedulerForUnacksProcessing;
    private final int retryCount = 2;
    private final ShardingStrategy shardingStrategy;
    @VisibleForTesting
    AtomicInteger numIdsToPrefetch;
    @VisibleForTesting
    AtomicInteger unsafeNumIdsToPrefetchAllShards;

    public RedisDynoQueue(String redisKeyPrefix, String queueName, Set<String> allShards, String shardName, ShardingStrategy shardingStrategy) {
        this(redisKeyPrefix, queueName, allShards, shardName, 60000, shardingStrategy);
    }

    public RedisDynoQueue(String redisKeyPrefix, String queueName, Set<String> allShards, String shardName, int unackScheduleInMS, ShardingStrategy shardingStrategy) {
        this(Clock.systemDefaultZone(), redisKeyPrefix, queueName, allShards, shardName, unackScheduleInMS, shardingStrategy);
    }

    public RedisDynoQueue(Clock clock, String redisKeyPrefix, String queueName, Set<String> allShards, String shardName, int unackScheduleInMS, ShardingStrategy shardingStrategy) {
        this.clock = clock;
        this.redisKeyPrefix = redisKeyPrefix;
        this.queueName = queueName;
        this.allShards = ImmutableList.copyOf((Collection)allShards.stream().collect(Collectors.toList()));
        this.shardName = shardName;
        this.messageStoreKey = redisKeyPrefix + ".MESSAGE." + queueName;
        this.localQueueShard = this.getQueueShardKey(queueName, shardName);
        this.shardingStrategy = shardingStrategy;
        this.numIdsToPrefetch = new AtomicInteger(0);
        this.unsafeNumIdsToPrefetchAllShards = new AtomicInteger(0);
        this.om = QueueUtils.constructObjectMapper();
        this.monitor = new QueueMonitor(queueName, shardName);
        this.prefetchedIds = new ConcurrentLinkedQueue();
        this.unsafePrefetchedIdsAllShardsMap = new HashMap<String, ConcurrentLinkedQueue<String>>();
        for (String shard : allShards) {
            this.unsafePrefetchedIdsAllShardsMap.put(this.getQueueShardKey(queueName, shard), new ConcurrentLinkedQueue());
        }
        this.schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1);
        this.schedulerForUnacksProcessing.scheduleAtFixedRate(() -> this.processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
        this.logger.info(RedisDynoQueue.class.getName() + " is ready to serve " + queueName);
    }

    public RedisDynoQueue withQuorumConn(JedisCommands quorumConn) {
        this.quorumConn = quorumConn;
        return this;
    }

    public RedisDynoQueue withNonQuorumConn(JedisCommands nonQuorumConn) {
        this.nonQuorumConn = nonQuorumConn;
        return this;
    }

    public RedisDynoQueue withUnackTime(int unackTime) {
        this.unackTime = unackTime;
        return this;
    }

    private int unsafeGetNumPrefetchedIds() {
        AtomicInteger totalSize = new AtomicInteger(0);
        this.unsafePrefetchedIdsAllShardsMap.forEach((k, v) -> totalSize.addAndGet(v.size()));
        return totalSize.get();
    }

    public String getName() {
        return this.queueName;
    }

    public int getUnackTime() {
        return this.unackTime;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<String> push(List<Message> messages) {
        Stopwatch sw = this.monitor.start(this.monitor.push, messages.size());
        try {
            QueueUtils.execute("push", "(a shard in) " + this.queueName, () -> {
                for (Message message : messages) {
                    String json = this.om.writeValueAsString((Object)message);
                    this.quorumConn.hset(this.messageStoreKey, message.getId(), json);
                    double priority = (double)message.getPriority() / 100.0;
                    double score = Long.valueOf(this.clock.millis() + message.getTimeout()).doubleValue() + priority;
                    String shard = this.shardingStrategy.getNextShard(this.allShards, message);
                    String queueShard = this.getQueueShardKey(this.queueName, shard);
                    this.quorumConn.zadd(queueShard, score, message.getId());
                }
                return messages;
            });
            List<String> list = messages.stream().map(msg -> msg.getId()).collect(Collectors.toList());
            return list;
        }
        finally {
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<Message> peek(int messageCount) {
        Stopwatch sw = this.monitor.peek.start();
        try {
            Set<String> ids = this.peekIds(0, messageCount);
            if (ids == null) {
                List<Message> list = Collections.emptyList();
                return list;
            }
            List<Message> list = this.doPeekBodyHelper(ids);
            return list;
        }
        finally {
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<Message> unsafePeekAllShards(int messageCount) {
        Stopwatch sw = this.monitor.peek.start();
        try {
            Set<String> ids = this.peekIdsAllShards(0, messageCount);
            if (ids == null) {
                List<Message> list = Collections.emptyList();
                return list;
            }
            List<Message> list = this.doPeekBodyHelper(ids);
            return list;
        }
        finally {
            sw.stop();
        }
    }

    private Set<String> peekIds(int offset, int count, double peekTillTs) {
        return QueueUtils.execute("peekIds", this.localQueueShard, () -> {
            double peekTillTsOrNow = peekTillTs == 0.0 ? Long.valueOf(this.clock.millis() + 1L).doubleValue() : peekTillTs;
            return this.doPeekIdsFromShardHelper(this.localQueueShard, peekTillTsOrNow, offset, count);
        });
    }

    private Set<String> peekIds(int offset, int count) {
        return this.peekIds(offset, count, 0.0);
    }

    private Set<String> peekIdsAllShards(int offset, int count) {
        return QueueUtils.execute("peekIdsAllShards", this.localQueueShard, () -> {
            HashSet<String> scanned = new HashSet<String>();
            double now = Long.valueOf(this.clock.millis() + 1L).doubleValue();
            int remaining_count = count;
            scanned.addAll(this.peekIds(offset, count, now));
            remaining_count -= scanned.size();
            for (String shard : this.allShards) {
                String queueShardName = this.getQueueShardKey(this.queueName, shard);
                if (queueShardName.equals(this.localQueueShard)) continue;
                Set<String> elems = this.doPeekIdsFromShardHelper(queueShardName, now, offset, count);
                scanned.addAll(elems);
                if ((remaining_count -= elems.size()) > 0) continue;
                break;
            }
            return scanned;
        });
    }

    private Set<String> doPeekIdsFromShardHelper(String queueShardName, double peekTillTs, int offset, int count) {
        return this.nonQuorumConn.zrangeByScore(queueShardName, 0.0, peekTillTs, offset, count);
    }

    private List<Message> doPeekBodyHelper(Set<String> message_ids) {
        List msgs = QueueUtils.execute("peek", this.messageStoreKey, () -> {
            LinkedList<Message> messages = new LinkedList<Message>();
            for (String id : message_ids) {
                String json = this.nonQuorumConn.hget(this.messageStoreKey, id);
                Message message = (Message)this.om.readValue(json, Message.class);
                messages.add(message);
            }
            return messages;
        });
        return msgs;
    }

    public List<Message> pop(int messageCount, int wait, TimeUnit unit) {
        if (messageCount < 1) {
            return Collections.emptyList();
        }
        Stopwatch sw = this.monitor.start(this.monitor.pop, messageCount);
        try {
            long start = this.clock.millis();
            long waitFor = unit.toMillis(wait);
            this.numIdsToPrefetch.addAndGet(messageCount);
            this.prefetchIds();
            while (this.prefetchedIds.size() < messageCount && this.clock.millis() - start < waitFor) {
                Uninterruptibles.sleepUninterruptibly((long)200L, (TimeUnit)TimeUnit.MILLISECONDS);
                this.prefetchIds();
            }
            List<Message> list = this._pop(this.shardName, messageCount, this.prefetchedIds);
            return list;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            sw.stop();
        }
    }

    public Message popWithMsgId(String messageId) {
        return QueueUtils.execute("popWithMsgId", this.localQueueShard, () -> {
            double unackScore = Long.valueOf(this.clock.millis() + (long)this.unackTime).doubleValue();
            String unackShardName = this.getUnackKey(this.queueName, this.shardName);
            ZAddParams zParams = ZAddParams.zAddParams().nx();
            try {
                long exists = this.nonQuorumConn.zrank(this.localQueueShard, messageId);
                assert (exists >= 0L);
            }
            catch (NullPointerException e) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Cannot get the message payload for {}", (Object)messageId);
                }
                this.monitor.misses.increment();
                return null;
            }
            String json = this.quorumConn.hget(this.messageStoreKey, messageId);
            if (json == null) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Cannot get the message payload for {}", (Object)messageId);
                }
                this.monitor.misses.increment();
                return null;
            }
            long added = this.quorumConn.zadd(unackShardName, unackScore, messageId, zParams);
            if (added == 0L) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("cannot add {} to the unack shard {}", (Object)messageId, (Object)unackShardName);
                }
                this.monitor.misses.increment();
                return null;
            }
            long removed = this.quorumConn.zrem(this.localQueueShard, new String[]{messageId});
            if (removed == 0L) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("cannot remove {} from the queue shard ", (Object)this.queueName, (Object)messageId);
                }
                this.monitor.misses.increment();
                return null;
            }
            Message msg = (Message)this.om.readValue(json, Message.class);
            return msg;
        });
    }

    public List<Message> unsafePopAllShards(int messageCount, int wait, TimeUnit unit) {
        if (messageCount < 1) {
            return Collections.emptyList();
        }
        Stopwatch sw = this.monitor.start(this.monitor.pop, messageCount);
        try {
            long start = this.clock.millis();
            long waitFor = unit.toMillis(wait);
            this.unsafeNumIdsToPrefetchAllShards.addAndGet(messageCount);
            this.prefetchIdsAllShards();
            while (this.unsafeGetNumPrefetchedIds() < messageCount && this.clock.millis() - start < waitFor) {
                Uninterruptibles.sleepUninterruptibly((long)200L, (TimeUnit)TimeUnit.MILLISECONDS);
                this.prefetchIdsAllShards();
            }
            int remainingCount = messageCount;
            List<Message> popped = this._pop(this.shardName, remainingCount, this.prefetchedIds);
            remainingCount -= popped.size();
            for (String shard : this.allShards) {
                String queueShardName = this.getQueueShardKey(this.queueName, shard);
                List<Message> elems = this._pop(shard, remainingCount, this.unsafePrefetchedIdsAllShardsMap.get(queueShardName));
                popped.addAll(elems);
                remainingCount -= elems.size();
            }
            List<Message> list = popped;
            return list;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            sw.stop();
        }
    }

    private void prefetchIds() {
        double now = Long.valueOf(this.clock.millis() + 1L).doubleValue();
        int numPrefetched = this.doPrefetchIdsHelper(this.localQueueShard, this.numIdsToPrefetch, this.prefetchedIds, now);
        if (numPrefetched == 0) {
            this.numIdsToPrefetch.set(0);
        }
    }

    private void prefetchIdsAllShards() {
        double now = Long.valueOf(this.clock.millis() + 1L).doubleValue();
        this.doPrefetchIdsHelper(this.localQueueShard, this.unsafeNumIdsToPrefetchAllShards, this.unsafePrefetchedIdsAllShardsMap.get(this.localQueueShard), now);
        if (this.unsafeNumIdsToPrefetchAllShards.get() < 1) {
            return;
        }
        for (String shard : this.allShards) {
            String queueShardName = this.getQueueShardKey(this.queueName, shard);
            if (queueShardName.equals(this.localQueueShard)) continue;
            this.doPrefetchIdsHelper(queueShardName, this.unsafeNumIdsToPrefetchAllShards, this.unsafePrefetchedIdsAllShardsMap.get(queueShardName), now);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int doPrefetchIdsHelper(String queueShardName, AtomicInteger prefetchCounter, ConcurrentLinkedQueue<String> prefetchedIdQueue, double prefetchFromTs) {
        if (prefetchCounter.get() < 1) {
            return 0;
        }
        int numSuccessfullyPrefetched = 0;
        int numToPrefetch = prefetchCounter.get();
        Stopwatch sw = this.monitor.start(this.monitor.prefetch, numToPrefetch);
        try {
            Set<String> ids = this.doPeekIdsFromShardHelper(queueShardName, prefetchFromTs, 0, numToPrefetch);
            prefetchedIdQueue.addAll(ids);
            numSuccessfullyPrefetched = ids.size();
            prefetchCounter.addAndGet(-1 * ids.size());
            if (prefetchCounter.get() < 0) {
                prefetchCounter.set(0);
            }
        }
        finally {
            sw.stop();
        }
        return numSuccessfullyPrefetched;
    }

    private List<Message> _pop(String shard, int messageCount, ConcurrentLinkedQueue<String> prefetchedIdQueue) throws Exception {
        String msgId;
        String queueShardName = this.getQueueShardKey(this.queueName, shard);
        String unackShardName = this.getUnackKey(this.queueName, shard);
        double unackScore = Long.valueOf(this.clock.millis() + (long)this.unackTime).doubleValue();
        ZAddParams zParams = ZAddParams.zAddParams().nx();
        LinkedList<Message> popped = new LinkedList<Message>();
        while (popped.size() != messageCount && (msgId = prefetchedIdQueue.poll()) != null) {
            long added = this.quorumConn.zadd(unackShardName, unackScore, msgId, zParams);
            if (added == 0L) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("cannot add {} to the unack shard {}", (Object)msgId, (Object)unackShardName);
                }
                this.monitor.misses.increment();
                continue;
            }
            long removed = this.quorumConn.zrem(queueShardName, new String[]{msgId});
            if (removed == 0L) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("cannot remove {} from the queue shard {}", (Object)msgId, (Object)queueShardName);
                }
                this.monitor.misses.increment();
                continue;
            }
            String json = this.quorumConn.hget(this.messageStoreKey, msgId);
            if (json == null) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Cannot get the message payload for {}", (Object)msgId);
                }
                this.monitor.misses.increment();
                continue;
            }
            Message msg = (Message)this.om.readValue(json, Message.class);
            popped.add(msg);
            if (popped.size() != messageCount) continue;
            return popped;
        }
        return popped;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean ack(String messageId) {
        Stopwatch sw = this.monitor.ack.start();
        try {
            boolean bl = QueueUtils.execute("ack", "(a shard in) " + this.queueName, () -> {
                for (String shard : this.allShards) {
                    String unackShardKey = this.getUnackKey(this.queueName, shard);
                    Long removed = this.quorumConn.zrem(unackShardKey, new String[]{messageId});
                    if (removed <= 0L) continue;
                    this.quorumConn.hdel(this.messageStoreKey, new String[]{messageId});
                    return true;
                }
                return false;
            });
            return bl;
        }
        finally {
            sw.stop();
        }
    }

    public void ack(List<Message> messages) {
        for (Message message : messages) {
            this.ack(message.getId());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean setUnackTimeout(String messageId, long timeout) {
        Stopwatch sw = this.monitor.ack.start();
        try {
            boolean bl = QueueUtils.execute("setUnackTimeout", "(a shard in) " + this.queueName, () -> {
                double unackScore = Long.valueOf(this.clock.millis() + timeout).doubleValue();
                for (String shard : this.allShards) {
                    String unackShardKey = this.getUnackKey(this.queueName, shard);
                    Double score = this.quorumConn.zscore(unackShardKey, messageId);
                    if (score == null) continue;
                    this.quorumConn.zadd(unackShardKey, unackScore, messageId);
                    return true;
                }
                return false;
            });
            return bl;
        }
        finally {
            sw.stop();
        }
    }

    public boolean setTimeout(String messageId, long timeout) {
        return QueueUtils.execute("setTimeout", "(a shard in) " + this.queueName, () -> {
            String json = this.nonQuorumConn.hget(this.messageStoreKey, messageId);
            if (json == null) {
                return false;
            }
            Message message = (Message)this.om.readValue(json, Message.class);
            message.setTimeout(timeout);
            for (String shard : this.allShards) {
                String queueShard = this.getQueueShardKey(this.queueName, shard);
                Double score = this.quorumConn.zscore(queueShard, messageId);
                if (score == null) continue;
                double priorityd = message.getPriority() / 100;
                double newScore = Long.valueOf(this.clock.millis() + timeout).doubleValue() + priorityd;
                ZAddParams params = ZAddParams.zAddParams().xx();
                this.quorumConn.zadd(queueShard, newScore, messageId, params);
                json = this.om.writeValueAsString((Object)message);
                this.quorumConn.hset(this.messageStoreKey, message.getId(), json);
                return true;
            }
            return false;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean remove(String messageId) {
        Stopwatch sw = this.monitor.remove.start();
        try {
            boolean bl = QueueUtils.execute("remove", "(a shard in) " + this.queueName, () -> {
                for (String shard : this.allShards) {
                    String unackShardKey = this.getUnackKey(this.queueName, shard);
                    this.quorumConn.zrem(unackShardKey, new String[]{messageId});
                    String queueShardKey = this.getQueueShardKey(this.queueName, shard);
                    Long removed = this.quorumConn.zrem(queueShardKey, new String[]{messageId});
                    if (removed <= 0L) continue;
                    Long msgRemoved = this.quorumConn.hdel(this.messageStoreKey, new String[]{messageId});
                    return true;
                }
                return false;
            });
            return bl;
        }
        finally {
            sw.stop();
        }
    }

    public boolean ensure(Message message) {
        return QueueUtils.execute("ensure", "(a shard in) " + this.queueName, () -> {
            String messageId = message.getId();
            for (String shard : this.allShards) {
                String queueShard = this.getQueueShardKey(this.queueName, shard);
                Double score = this.quorumConn.zscore(queueShard, messageId);
                if (score != null) {
                    return false;
                }
                String unackShardKey = this.getUnackKey(this.queueName, shard);
                score = this.quorumConn.zscore(unackShardKey, messageId);
                if (score == null) continue;
                return false;
            }
            this.push(Collections.singletonList(message));
            return true;
        });
    }

    public boolean containsPredicate(String predicate) {
        return QueueUtils.execute("containsPredicate", this.messageStoreKey, () -> this.getMsgWithPredicate(predicate) != null);
    }

    public String getMsgWithPredicate(String predicate) {
        return QueueUtils.execute("getMsgWithPredicate", this.messageStoreKey, () -> {
            String predicateCheckLuaScript = "local hkey=KEYS[1]\nlocal predicate=ARGV[1]\nlocal cursor=0\nlocal begin=false\nwhile (cursor ~= 0 or begin==false) do\n  local ret = redis.call('hscan', hkey, cursor)\n  local curmsgid\n  for i, content in ipairs(ret[2]) do\n    if (i % 2 ~= 0) then\n      curmsgid = content\n    elseif (string.match(content, predicate)) then\n      return curmsgid\n    end\n  end\n  cursor=tonumber(ret[1])\n  begin=true\nend\nreturn nil";
            String retval = (String)((DynoJedisClient)this.nonQuorumConn).eval(predicateCheckLuaScript, Collections.singletonList(this.messageStoreKey), Collections.singletonList(predicate));
            return retval;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Message get(String messageId) {
        Stopwatch sw = this.monitor.get.start();
        try {
            Message message = QueueUtils.execute("get", this.messageStoreKey, () -> {
                String json = this.quorumConn.hget(this.messageStoreKey, messageId);
                if (json == null) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Cannot get the message payload " + messageId);
                    }
                    return null;
                }
                Message msg = (Message)this.om.readValue(json, Message.class);
                return msg;
            });
            return message;
        }
        finally {
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long size() {
        Stopwatch sw = this.monitor.size.start();
        try {
            long l = QueueUtils.execute("size", "(a shard in) " + this.queueName, () -> {
                long size = 0L;
                for (String shard : this.allShards) {
                    size += this.nonQuorumConn.zcard(this.getQueueShardKey(this.queueName, shard)).longValue();
                }
                return size;
            });
            return l;
        }
        finally {
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<String, Map<String, Long>> shardSizes() {
        Stopwatch sw = this.monitor.size.start();
        HashMap shardSizes = new HashMap();
        try {
            Map map = QueueUtils.execute("shardSizes", "(a shard in) " + this.queueName, () -> {
                for (String shard : this.allShards) {
                    long size = this.nonQuorumConn.zcard(this.getQueueShardKey(this.queueName, shard));
                    long uacked = this.nonQuorumConn.zcard(this.getUnackKey(this.queueName, shard));
                    HashMap<String, Long> shardDetails = new HashMap<String, Long>();
                    shardDetails.put("size", size);
                    shardDetails.put("uacked", uacked);
                    shardSizes.put(shard, shardDetails);
                }
                return shardSizes;
            });
            return map;
        }
        finally {
            sw.stop();
        }
    }

    public void clear() {
        QueueUtils.execute("clear", "(a shard in) " + this.queueName, () -> {
            for (String shard : this.allShards) {
                String queueShard = this.getQueueShardKey(this.queueName, shard);
                String unackShard = this.getUnackKey(this.queueName, shard);
                this.quorumConn.del(queueShard);
                this.quorumConn.del(unackShard);
            }
            this.quorumConn.del(this.messageStoreKey);
            return null;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processUnacks() {
        Stopwatch sw = this.monitor.processUnack.start();
        try {
            long queueDepth = this.size();
            this.monitor.queueDepth.record(queueDepth);
            String keyName = this.getUnackKey(this.queueName, this.shardName);
            QueueUtils.execute("processUnacks", keyName, () -> {
                int batchSize = 1000;
                String unackShardName = this.getUnackKey(this.queueName, this.shardName);
                double now = Long.valueOf(this.clock.millis()).doubleValue();
                int num_moved_back = 0;
                int num_stale = 0;
                Set unacks = this.nonQuorumConn.zrangeByScoreWithScores(unackShardName, 0.0, now, 0, batchSize);
                if (unacks.size() > 0) {
                    this.logger.info("processUnacks: Adding " + unacks.size() + " messages back to shard of queue: " + unackShardName);
                }
                for (Tuple unack : unacks) {
                    double score = unack.getScore();
                    String member = unack.getElement();
                    String payload = this.quorumConn.hget(this.messageStoreKey, member);
                    if (payload == null) {
                        this.quorumConn.zrem(unackShardName, new String[]{member});
                        ++num_stale;
                        continue;
                    }
                    long added_back = this.quorumConn.zadd(this.localQueueShard, score, member);
                    long removed_from_unack = this.quorumConn.zrem(unackShardName, new String[]{member});
                    if (added_back <= 0L || removed_from_unack <= 0L) continue;
                    ++num_moved_back;
                }
                this.logger.info("processUnacks: Moved back " + num_moved_back + " items. Got rid of " + num_stale + " stale items.");
                return null;
            });
        }
        catch (Exception e) {
            this.logger.error("Error while processing unacks. " + e.getMessage());
        }
        finally {
            sw.stop();
        }
    }

    private String getQueueShardKey(String queueName, String shard) {
        return this.redisKeyPrefix + ".QUEUE." + queueName + "." + shard;
    }

    private String getUnackKey(String queueName, String shard) {
        return this.redisKeyPrefix + ".UNACK." + queueName + "." + shard;
    }

    public void close() throws IOException {
        this.schedulerForUnacksProcessing.shutdown();
        this.monitor.close();
    }
}

