/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.dyno.queues.redis;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Uninterruptibles;
import com.netflix.dyno.connectionpool.exception.DynoException;
import com.netflix.dyno.queues.DynoQueue;
import com.netflix.dyno.queues.Message;
import com.netflix.dyno.queues.redis.QueueMonitor;
import com.netflix.servo.monitor.Stopwatch;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCommands;
import redis.clients.jedis.Tuple;
import redis.clients.jedis.params.sortedset.ZAddParams;

public class RedisDynoQueue
implements DynoQueue {
    private final Logger logger = LoggerFactory.getLogger(RedisDynoQueue.class);
    private String queueName;
    private List<String> allShards;
    private String shardName;
    private String redisKeyPrefix;
    private String messageStoreKey;
    private String myQueueShard;
    private int unackTime = 60;
    private QueueMonitor monitor;
    private ObjectMapper om;
    private JedisCommands quorumConn;
    private JedisCommands nonQuorumConn;
    private ConcurrentLinkedQueue<String> prefetchedIds;
    private ScheduledExecutorService schedulerForUnacksProcessing;
    private ScheduledExecutorService schedulerForPrefetchProcessing;
    private int retryCount = 2;
    @VisibleForTesting
    AtomicInteger prefetch = new AtomicInteger(0);
    private AtomicInteger nextShardIndex = new AtomicInteger(0);

    public RedisDynoQueue(String redisKeyPrefix, String queueName, Set<String> allShards, String shardName) {
        this(redisKeyPrefix, queueName, allShards, shardName, 60000);
    }

    public RedisDynoQueue(String redisKeyPrefix, String queueName, Set<String> allShards, String shardName, int unackScheduleInMS) {
        this.redisKeyPrefix = redisKeyPrefix;
        this.queueName = queueName;
        this.allShards = allShards.stream().collect(Collectors.toList());
        this.shardName = shardName;
        this.messageStoreKey = redisKeyPrefix + ".MESSAGE." + queueName;
        this.myQueueShard = this.getQueueShardKey(queueName, shardName);
        ObjectMapper om = new ObjectMapper();
        om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        om.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
        om.configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, false);
        om.setSerializationInclusion(JsonInclude.Include.NON_NULL);
        om.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
        om.disable(SerializationFeature.INDENT_OUTPUT);
        this.om = om;
        this.monitor = new QueueMonitor(queueName, shardName);
        this.prefetchedIds = new ConcurrentLinkedQueue();
        this.schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1);
        this.schedulerForPrefetchProcessing = Executors.newScheduledThreadPool(1);
        this.schedulerForUnacksProcessing.scheduleAtFixedRate(() -> this.processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
        this.logger.info(RedisDynoQueue.class.getName() + " is ready to serve " + queueName);
    }

    public RedisDynoQueue withQuorumConn(JedisCommands quorumConn) {
        this.quorumConn = quorumConn;
        return this;
    }

    public RedisDynoQueue withNonQuorumConn(JedisCommands nonQuorumConn) {
        this.nonQuorumConn = nonQuorumConn;
        return this;
    }

    public RedisDynoQueue withUnackTime(int unackTime) {
        this.unackTime = unackTime;
        return this;
    }

    public String getName() {
        return this.queueName;
    }

    public int getUnackTime() {
        return this.unackTime;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<String> push(List<Message> messages) {
        Stopwatch sw = this.monitor.start(this.monitor.push, messages.size());
        try {
            this.execute(() -> {
                for (Message message : messages) {
                    String json = this.om.writeValueAsString((Object)message);
                    this.quorumConn.hset(this.messageStoreKey, message.getId(), json);
                    double priority = message.getPriority() / 100;
                    double score = Long.valueOf(System.currentTimeMillis() + message.getTimeout()).doubleValue() + priority;
                    String shard = this.getNextShard();
                    String queueShard = this.getQueueShardKey(this.queueName, shard);
                    this.quorumConn.zadd(queueShard, score, message.getId());
                }
                return messages;
            });
            List<String> list = messages.stream().map(msg -> msg.getId()).collect(Collectors.toList());
            return list;
        }
        finally {
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<Message> peek(int messageCount) {
        Stopwatch sw = this.monitor.peek.start();
        try {
            List msgs;
            Set<String> ids = this.peekIds(0, messageCount);
            if (ids == null) {
                List<Message> list = Collections.emptyList();
                return list;
            }
            List list = msgs = this.execute(() -> {
                LinkedList<Message> messages = new LinkedList<Message>();
                for (String id : ids) {
                    String json = this.nonQuorumConn.hget(this.messageStoreKey, id);
                    Message message = (Message)this.om.readValue(json, Message.class);
                    messages.add(message);
                }
                return messages;
            });
            return list;
        }
        finally {
            sw.stop();
        }
    }

    public List<Message> pop(int messageCount, int wait, TimeUnit unit) {
        if (messageCount < 1) {
            return Collections.emptyList();
        }
        Stopwatch sw = this.monitor.start(this.monitor.pop, messageCount);
        try {
            long start = System.currentTimeMillis();
            long waitFor = unit.toMillis(wait);
            this.prefetch.addAndGet(messageCount);
            this.prefetchIds();
            while (this.prefetchedIds.size() < messageCount && System.currentTimeMillis() - start < waitFor) {
                Uninterruptibles.sleepUninterruptibly((long)200L, (TimeUnit)TimeUnit.MILLISECONDS);
                this.prefetchIds();
            }
            List<Message> list = this._pop(messageCount);
            return list;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void prefetchIds() {
        if (this.prefetch.get() < 1) {
            return;
        }
        int prefetchCount = this.prefetch.get();
        Stopwatch sw = this.monitor.start(this.monitor.prefetch, prefetchCount);
        try {
            Set<String> ids = this.peekIds(0, prefetchCount);
            this.prefetchedIds.addAll(ids);
            this.prefetch.addAndGet(-1 * ids.size());
            if (this.prefetch.get() < 0 || ids.isEmpty()) {
                this.prefetch.set(0);
            }
        }
        finally {
            sw.stop();
        }
    }

    private List<Message> _pop(int messageCount) throws Exception {
        String msgId;
        double unackScore = Long.valueOf(System.currentTimeMillis() + (long)this.unackTime).doubleValue();
        String unackQueueName = this.getUnackKey(this.queueName, this.shardName);
        LinkedList<Message> popped = new LinkedList<Message>();
        ZAddParams zParams = ZAddParams.zAddParams().nx();
        while (popped.size() != messageCount && (msgId = this.prefetchedIds.poll()) != null) {
            long added = this.quorumConn.zadd(unackQueueName, unackScore, msgId, zParams);
            if (added == 0L) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("cannot add {} to the unack shard ", (Object)this.queueName, (Object)msgId);
                }
                this.monitor.misses.increment();
                continue;
            }
            long removed = this.quorumConn.zrem(this.myQueueShard, new String[]{msgId});
            if (removed == 0L) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("cannot remove {} from the queue shard ", (Object)this.queueName, (Object)msgId);
                }
                this.monitor.misses.increment();
                continue;
            }
            String json = this.quorumConn.hget(this.messageStoreKey, msgId);
            if (json == null) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Cannot get the message payload for {}", (Object)msgId);
                }
                this.monitor.misses.increment();
                continue;
            }
            Message msg = (Message)this.om.readValue(json, Message.class);
            popped.add(msg);
            if (popped.size() != messageCount) continue;
            return popped;
        }
        return popped;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean ack(String messageId) {
        Stopwatch sw = this.monitor.ack.start();
        try {
            boolean bl = this.execute(() -> {
                for (String shard : this.allShards) {
                    String unackShardKey = this.getUnackKey(this.queueName, shard);
                    Long removed = this.quorumConn.zrem(unackShardKey, new String[]{messageId});
                    if (removed <= 0L) continue;
                    this.quorumConn.hdel(this.messageStoreKey, new String[]{messageId});
                    return true;
                }
                return false;
            });
            return bl;
        }
        finally {
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean setUnackTimeout(String messageId, long timeout) {
        Stopwatch sw = this.monitor.ack.start();
        try {
            boolean bl = this.execute(() -> {
                double unackScore = Long.valueOf(System.currentTimeMillis() + timeout).doubleValue();
                for (String shard : this.allShards) {
                    String unackShardKey = this.getUnackKey(this.queueName, shard);
                    Double score = this.quorumConn.zscore(unackShardKey, messageId);
                    if (score == null) continue;
                    this.quorumConn.zadd(unackShardKey, unackScore, messageId);
                    return true;
                }
                return false;
            });
            return bl;
        }
        finally {
            sw.stop();
        }
    }

    public boolean setTimeout(String messageId, long timeout) {
        return this.execute(() -> {
            String json = this.nonQuorumConn.hget(this.messageStoreKey, messageId);
            if (json == null) {
                return false;
            }
            Message message = (Message)this.om.readValue(json, Message.class);
            message.setTimeout(timeout);
            for (String shard : this.allShards) {
                ZAddParams params;
                String queueShard = this.getQueueShardKey(this.queueName, shard);
                Double score = this.quorumConn.zscore(queueShard, messageId);
                if (score == null) continue;
                double priorityd = message.getPriority() / 100;
                double newScore = Long.valueOf(System.currentTimeMillis() + timeout).doubleValue() + priorityd;
                long added = this.quorumConn.zadd(queueShard, newScore, messageId, params = ZAddParams.zAddParams().xx());
                if (added == 1L) {
                    json = this.om.writeValueAsString((Object)message);
                    this.quorumConn.hset(this.messageStoreKey, message.getId(), json);
                    return true;
                }
                return false;
            }
            return false;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean remove(String messageId) {
        Stopwatch sw = this.monitor.remove.start();
        try {
            boolean bl = this.execute(() -> {
                for (String shard : this.allShards) {
                    String unackShardKey = this.getUnackKey(this.queueName, shard);
                    this.quorumConn.zrem(unackShardKey, new String[]{messageId});
                    String queueShardKey = this.getQueueShardKey(this.queueName, shard);
                    Long removed = this.quorumConn.zrem(queueShardKey, new String[]{messageId});
                    Long msgRemoved = this.quorumConn.hdel(this.messageStoreKey, new String[]{messageId});
                    if (removed <= 0L || msgRemoved <= 0L) continue;
                    return true;
                }
                return false;
            });
            return bl;
        }
        finally {
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Message get(String messageId) {
        Stopwatch sw = this.monitor.get.start();
        try {
            Message message = this.execute(() -> {
                String json = this.quorumConn.hget(this.messageStoreKey, messageId);
                if (json == null) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Cannot get the message payload " + messageId);
                    }
                    return null;
                }
                Message msg = (Message)this.om.readValue(json, Message.class);
                return msg;
            });
            return message;
        }
        finally {
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long size() {
        Stopwatch sw = this.monitor.size.start();
        try {
            long l = this.execute(() -> {
                long size = 0L;
                for (String shard : this.allShards) {
                    size += this.nonQuorumConn.zcard(this.getQueueShardKey(this.queueName, shard)).longValue();
                }
                return size;
            });
            return l;
        }
        finally {
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<String, Map<String, Long>> shardSizes() {
        Stopwatch sw = this.monitor.size.start();
        HashMap shardSizes = new HashMap();
        try {
            Map map = this.execute(() -> {
                for (String shard : this.allShards) {
                    long size = this.nonQuorumConn.zcard(this.getQueueShardKey(this.queueName, shard));
                    long uacked = this.nonQuorumConn.zcard(this.getUnackKey(this.queueName, shard));
                    HashMap<String, Long> shardDetails = new HashMap<String, Long>();
                    shardDetails.put("size", size);
                    shardDetails.put("uacked", uacked);
                    shardSizes.put(shard, shardDetails);
                }
                return shardSizes;
            });
            return map;
        }
        finally {
            sw.stop();
        }
    }

    public void clear() {
        this.execute(() -> {
            for (String shard : this.allShards) {
                String queueShard = this.getQueueShardKey(this.queueName, shard);
                String unackShard = this.getUnackKey(this.queueName, shard);
                this.quorumConn.del(queueShard);
                this.quorumConn.del(unackShard);
            }
            this.quorumConn.del(this.messageStoreKey);
            return null;
        });
    }

    private Set<String> peekIds(int offset, int count) {
        return this.execute(() -> {
            double now = Long.valueOf(System.currentTimeMillis() + 1L).doubleValue();
            Set scanned = this.quorumConn.zrangeByScore(this.myQueueShard, 0.0, now, offset, count);
            return scanned;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processUnacks() {
        Stopwatch sw = this.monitor.processUnack.start();
        try {
            long queueDepth = this.size();
            this.monitor.queueDepth.record(queueDepth);
            this.execute(() -> {
                double now;
                int batchSize = 1000;
                String unackQueueName = this.getUnackKey(this.queueName, this.shardName);
                Set unacks = this.quorumConn.zrangeByScoreWithScores(unackQueueName, 0.0, now = Long.valueOf(System.currentTimeMillis()).doubleValue(), 0, batchSize);
                if (unacks.size() > 0) {
                    this.logger.debug("Adding " + unacks.size() + " messages back to the queue for " + this.queueName);
                }
                for (Tuple unack : unacks) {
                    double score = unack.getScore();
                    String member = unack.getElement();
                    String payload = this.quorumConn.hget(this.messageStoreKey, member);
                    if (payload == null) {
                        this.quorumConn.zrem(unackQueueName, new String[]{member});
                        continue;
                    }
                    this.quorumConn.zadd(this.myQueueShard, score, member);
                    this.quorumConn.zrem(unackQueueName, new String[]{member});
                }
                return null;
            });
        }
        finally {
            sw.stop();
        }
    }

    private String getNextShard() {
        int indx = this.nextShardIndex.incrementAndGet();
        if (indx >= this.allShards.size()) {
            this.nextShardIndex.set(0);
            indx = 0;
        }
        String s = this.allShards.get(indx);
        return s;
    }

    private String getQueueShardKey(String queueName, String shard) {
        return this.redisKeyPrefix + ".QUEUE." + queueName + "." + shard;
    }

    private String getUnackKey(String queueName, String shard) {
        return this.redisKeyPrefix + ".UNACK." + queueName + "." + shard;
    }

    private <R> R execute(Callable<R> r) {
        return this.executeWithRetry(r, 0);
    }

    private <R> R executeWithRetry(Callable<R> r, int retryCount) {
        try {
            return r.call();
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof DynoException && retryCount < this.retryCount) {
                return this.executeWithRetry(r, ++retryCount);
            }
            throw new RuntimeException(e.getCause());
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void close() throws IOException {
        this.schedulerForUnacksProcessing.shutdown();
        this.schedulerForPrefetchProcessing.shutdown();
        this.monitor.close();
    }
}

