package com.netflix.dyno.queues.redis;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Uninterruptibles;
import com.netflix.dyno.queues.DynoQueue;
import com.netflix.dyno.queues.Message;
import com.netflix.dyno.queues.redis.sharding.ShardingStrategy;
import com.netflix.servo.monitor.Stopwatch;
import java.io.IOException;
import java.time.Clock;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCommands;
import redis.clients.jedis.Tuple;
import redis.clients.jedis.params.sortedset.ZAddParams;

/* loaded from: input_file:com/netflix/dyno/queues/redis/RedisDynoQueue.class */
public class RedisDynoQueue implements DynoQueue {
    private final Logger logger;
    private final Clock clock;
    private final String queueName;
    private final List<String> allShards;
    private final String shardName;
    private final String redisKeyPrefix;
    private final String messageStoreKey;
    private final String myQueueShard;
    private volatile int unackTime;
    private final QueueMonitor monitor;
    private final ObjectMapper om;
    private volatile JedisCommands quorumConn;
    private volatile JedisCommands nonQuorumConn;
    private final ConcurrentLinkedQueue<String> prefetchedIds;
    private final ScheduledExecutorService schedulerForUnacksProcessing;
    private final int retryCount = 2;
    private final ShardingStrategy shardingStrategy;

    @VisibleForTesting
    AtomicInteger prefetch;

    public RedisDynoQueue(String str, String str2, Set<String> set, String str3, ShardingStrategy shardingStrategy) {
        this(str, str2, set, str3, 60000, shardingStrategy);
    }

    public RedisDynoQueue(String str, String str2, Set<String> set, String str3, int i, ShardingStrategy shardingStrategy) {
        this(Clock.systemDefaultZone(), str, str2, set, str3, i, shardingStrategy);
    }

    public RedisDynoQueue(Clock clock, String str, String str2, Set<String> set, String str3, int i, ShardingStrategy shardingStrategy) {
        this.logger = LoggerFactory.getLogger(RedisDynoQueue.class);
        this.unackTime = 60;
        this.retryCount = 2;
        this.prefetch = new AtomicInteger(0);
        this.clock = clock;
        this.redisKeyPrefix = str;
        this.queueName = str2;
        this.allShards = ImmutableList.copyOf((Collection) set.stream().collect(Collectors.toList()));
        this.shardName = str3;
        this.messageStoreKey = str + ".MESSAGE." + str2;
        this.myQueueShard = getQueueShardKey(str2, str3);
        this.shardingStrategy = shardingStrategy;
        this.om = QueueUtils.constructObjectMapper();
        this.monitor = new QueueMonitor(str2, str3);
        this.prefetchedIds = new ConcurrentLinkedQueue<>();
        this.schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1);
        this.schedulerForUnacksProcessing.scheduleAtFixedRate(() -> {
            processUnacks();
        }, i, i, TimeUnit.MILLISECONDS);
        this.logger.info(RedisDynoQueue.class.getName() + " is ready to serve " + str2);
    }

    public RedisDynoQueue withQuorumConn(JedisCommands jedisCommands) {
        this.quorumConn = jedisCommands;
        return this;
    }

    public RedisDynoQueue withNonQuorumConn(JedisCommands jedisCommands) {
        this.nonQuorumConn = jedisCommands;
        return this;
    }

    public RedisDynoQueue withUnackTime(int i) {
        this.unackTime = i;
        return this;
    }

    public String getName() {
        return this.queueName;
    }

    public int getUnackTime() {
        return this.unackTime;
    }

    public List<String> push(List<Message> list) {
        Stopwatch start = this.monitor.start(this.monitor.push, list.size());
        try {
            QueueUtils.execute("push", "(a shard in) " + this.queueName, () -> {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    Message message = (Message) it.next();
                    this.quorumConn.hset(this.messageStoreKey, message.getId(), this.om.writeValueAsString(message));
                    this.quorumConn.zadd(getQueueShardKey(this.queueName, this.shardingStrategy.getNextShard(this.allShards, message)), Long.valueOf(this.clock.millis() + message.getTimeout()).doubleValue() + (message.getPriority() / 100.0d), message.getId());
                }
                return list;
            });
            List<String> list2 = (List) list.stream().map(message -> {
                return message.getId();
            }).collect(Collectors.toList());
            start.stop();
            return list2;
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    public List<Message> peek(int i) {
        Stopwatch start = this.monitor.peek.start();
        try {
            Set<String> peekIds = peekIds(0, i);
            if (peekIds == null) {
                List<Message> emptyList = Collections.emptyList();
                start.stop();
                return emptyList;
            }
            List<Message> list = (List) QueueUtils.execute("peek", this.messageStoreKey, () -> {
                LinkedList linkedList = new LinkedList();
                Iterator it = peekIds.iterator();
                while (it.hasNext()) {
                    linkedList.add((Message) this.om.readValue(this.nonQuorumConn.hget(this.messageStoreKey, (String) it.next()), Message.class));
                }
                return linkedList;
            });
            start.stop();
            return list;
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    public List<Message> pop(int i, int i2, TimeUnit timeUnit) {
        if (i < 1) {
            return Collections.emptyList();
        }
        Stopwatch start = this.monitor.start(this.monitor.pop, i);
        try {
            try {
                long millis = this.clock.millis();
                long millis2 = timeUnit.toMillis(i2);
                this.prefetch.addAndGet(i);
                prefetchIds();
                while (this.prefetchedIds.size() < i && this.clock.millis() - millis < millis2) {
                    Uninterruptibles.sleepUninterruptibly(200L, TimeUnit.MILLISECONDS);
                    prefetchIds();
                }
                List<Message> _pop = _pop(i);
                start.stop();
                return _pop;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    private void prefetchIds() {
        if (this.prefetch.get() < 1) {
            return;
        }
        int i = this.prefetch.get();
        Stopwatch start = this.monitor.start(this.monitor.prefetch, i);
        try {
            Set<String> peekIds = peekIds(0, i);
            this.prefetchedIds.addAll(peekIds);
            this.prefetch.addAndGet((-1) * peekIds.size());
            if (this.prefetch.get() < 0 || peekIds.isEmpty()) {
                this.prefetch.set(0);
            }
        } finally {
            start.stop();
        }
    }

    private List<Message> _pop(int i) throws Exception {
        String poll;
        double doubleValue = Long.valueOf(this.clock.millis() + this.unackTime).doubleValue();
        String unackKey = getUnackKey(this.queueName, this.shardName);
        LinkedList linkedList = new LinkedList();
        ZAddParams nx = ZAddParams.zAddParams().nx();
        while (linkedList.size() != i && (poll = this.prefetchedIds.poll()) != null) {
            if (this.quorumConn.zadd(unackKey, doubleValue, poll, nx).longValue() == 0) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("cannot add {} to the unack shard ", this.queueName, poll);
                }
                this.monitor.misses.increment();
            } else if (this.quorumConn.zrem(this.myQueueShard, new String[]{poll}).longValue() == 0) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("cannot remove {} from the queue shard ", this.queueName, poll);
                }
                this.monitor.misses.increment();
            } else {
                String hget = this.quorumConn.hget(this.messageStoreKey, poll);
                if (hget == null) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Cannot get the message payload for {}", poll);
                    }
                    this.monitor.misses.increment();
                } else {
                    linkedList.add((Message) this.om.readValue(hget, Message.class));
                    if (linkedList.size() == i) {
                        return linkedList;
                    }
                }
            }
        }
        return linkedList;
    }

    public boolean ack(String str) {
        Stopwatch start = this.monitor.ack.start();
        try {
            boolean booleanValue = ((Boolean) QueueUtils.execute("ack", "(a shard in) " + this.queueName, () -> {
                Iterator<String> it = this.allShards.iterator();
                while (it.hasNext()) {
                    if (this.quorumConn.zrem(getUnackKey(this.queueName, it.next()), new String[]{str}).longValue() > 0) {
                        this.quorumConn.hdel(this.messageStoreKey, new String[]{str});
                        return true;
                    }
                }
                return false;
            })).booleanValue();
            start.stop();
            return booleanValue;
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    public void ack(List<Message> list) {
        Iterator<Message> it = list.iterator();
        while (it.hasNext()) {
            ack(it.next().getId());
        }
    }

    public boolean setUnackTimeout(String str, long j) {
        Stopwatch start = this.monitor.ack.start();
        try {
            boolean booleanValue = ((Boolean) QueueUtils.execute("setUnackTimeout", "(a shard in) " + this.queueName, () -> {
                double doubleValue = Long.valueOf(this.clock.millis() + j).doubleValue();
                Iterator<String> it = this.allShards.iterator();
                while (it.hasNext()) {
                    String unackKey = getUnackKey(this.queueName, it.next());
                    if (this.quorumConn.zscore(unackKey, str) != null) {
                        this.quorumConn.zadd(unackKey, doubleValue, str);
                        return true;
                    }
                }
                return false;
            })).booleanValue();
            start.stop();
            return booleanValue;
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    public boolean setTimeout(String str, long j) {
        return ((Boolean) QueueUtils.execute("setTimeout", "(a shard in) " + this.queueName, () -> {
            String hget = this.nonQuorumConn.hget(this.messageStoreKey, str);
            if (hget == null) {
                return false;
            }
            Message message = (Message) this.om.readValue(hget, Message.class);
            message.setTimeout(j);
            Iterator<String> it = this.allShards.iterator();
            while (it.hasNext()) {
                String queueShardKey = getQueueShardKey(this.queueName, it.next());
                if (this.quorumConn.zscore(queueShardKey, str) != null) {
                    this.quorumConn.zadd(queueShardKey, Long.valueOf(this.clock.millis() + j).doubleValue() + (message.getPriority() / 100), str, ZAddParams.zAddParams().xx());
                    this.quorumConn.hset(this.messageStoreKey, message.getId(), this.om.writeValueAsString(message));
                    return true;
                }
            }
            return false;
        })).booleanValue();
    }

    public boolean remove(String str) {
        Stopwatch start = this.monitor.remove.start();
        try {
            boolean booleanValue = ((Boolean) QueueUtils.execute("remove", "(a shard in) " + this.queueName, () -> {
                for (String str2 : this.allShards) {
                    this.quorumConn.zrem(getUnackKey(this.queueName, str2), new String[]{str});
                    Long zrem = this.quorumConn.zrem(getQueueShardKey(this.queueName, str2), new String[]{str});
                    Long hdel = this.quorumConn.hdel(this.messageStoreKey, new String[]{str});
                    if (zrem.longValue() > 0 && hdel.longValue() > 0) {
                        return true;
                    }
                }
                return false;
            })).booleanValue();
            start.stop();
            return booleanValue;
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    public boolean ensure(Message message) {
        return ((Boolean) QueueUtils.execute("ensure", "(a shard in) " + this.queueName, () -> {
            String id = message.getId();
            for (String str : this.allShards) {
                if (this.quorumConn.zscore(getQueueShardKey(this.queueName, str), id) != null) {
                    return false;
                }
                if (this.quorumConn.zscore(getUnackKey(this.queueName, str), id) != null) {
                    return false;
                }
            }
            push(Collections.singletonList(message));
            return true;
        })).booleanValue();
    }

    public boolean containsPredicate(String str) {
        return ((Boolean) QueueUtils.execute("containsPredicate", this.messageStoreKey, () -> {
            return Boolean.valueOf(((Long) this.quorumConn.eval("local hkey=KEYS[1]\nlocal predicate=ARGV[1]\nlocal cursor=0\nlocal begin=false\nwhile (cursor ~= 0 or begin==false) do\n  local ret = redis.call('hscan', hkey, cursor)\n  for i, content in ipairs(ret[2]) do\n    if (string.find(content, predicate)) then\n      return 1\n    end\n  end\n  cursor=tonumber(ret[1])\n  begin=true\nend\nreturn 0", Collections.singletonList(this.messageStoreKey), Collections.singletonList(str))).longValue() == 1);
        })).booleanValue();
    }

    public Message get(String str) {
        Stopwatch start = this.monitor.get.start();
        try {
            Message message = (Message) QueueUtils.execute("get", this.messageStoreKey, () -> {
                String hget = this.quorumConn.hget(this.messageStoreKey, str);
                if (hget != null) {
                    return (Message) this.om.readValue(hget, Message.class);
                }
                if (!this.logger.isDebugEnabled()) {
                    return null;
                }
                this.logger.debug("Cannot get the message payload " + str);
                return null;
            });
            start.stop();
            return message;
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    public long size() {
        Stopwatch start = this.monitor.size.start();
        try {
            long longValue = ((Long) QueueUtils.execute("size", "(a shard in) " + this.queueName, () -> {
                long j = 0;
                Iterator<String> it = this.allShards.iterator();
                while (it.hasNext()) {
                    j += this.nonQuorumConn.zcard(getQueueShardKey(this.queueName, it.next())).longValue();
                }
                return Long.valueOf(j);
            })).longValue();
            start.stop();
            return longValue;
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    public Map<String, Map<String, Long>> shardSizes() {
        Stopwatch start = this.monitor.size.start();
        HashMap hashMap = new HashMap();
        try {
            Map<String, Map<String, Long>> map = (Map) QueueUtils.execute("shardSizes", "(a shard in) " + this.queueName, () -> {
                for (String str : this.allShards) {
                    long longValue = this.nonQuorumConn.zcard(getQueueShardKey(this.queueName, str)).longValue();
                    long longValue2 = this.nonQuorumConn.zcard(getUnackKey(this.queueName, str)).longValue();
                    HashMap hashMap2 = new HashMap();
                    hashMap2.put("size", Long.valueOf(longValue));
                    hashMap2.put("uacked", Long.valueOf(longValue2));
                    hashMap.put(str, hashMap2);
                }
                return hashMap;
            });
            start.stop();
            return map;
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    public void clear() {
        QueueUtils.execute("clear", "(a shard in) " + this.queueName, () -> {
            for (String str : this.allShards) {
                String queueShardKey = getQueueShardKey(this.queueName, str);
                String unackKey = getUnackKey(this.queueName, str);
                this.quorumConn.del(queueShardKey);
                this.quorumConn.del(unackKey);
            }
            this.quorumConn.del(this.messageStoreKey);
            return null;
        });
    }

    private Set<String> peekIds(int i, int i2) {
        return (Set) QueueUtils.execute("peekIds", this.myQueueShard, () -> {
            return this.quorumConn.zrangeByScore(this.myQueueShard, 0.0d, Long.valueOf(this.clock.millis() + 1).doubleValue(), i, i2);
        });
    }

    public void processUnacks() {
        Stopwatch start = this.monitor.processUnack.start();
        try {
            this.monitor.queueDepth.record(size());
            QueueUtils.execute("processUnacks", getUnackKey(this.queueName, this.shardName), () -> {
                String unackKey = getUnackKey(this.queueName, this.shardName);
                Set<Tuple> zrangeByScoreWithScores = this.quorumConn.zrangeByScoreWithScores(unackKey, 0.0d, Long.valueOf(this.clock.millis()).doubleValue(), 0, 1000);
                if (zrangeByScoreWithScores.size() > 0) {
                    this.logger.debug("Adding " + zrangeByScoreWithScores.size() + " messages back to the queue for " + this.queueName);
                }
                for (Tuple tuple : zrangeByScoreWithScores) {
                    double score = tuple.getScore();
                    String element = tuple.getElement();
                    if (this.quorumConn.hget(this.messageStoreKey, element) == null) {
                        this.quorumConn.zrem(unackKey, new String[]{element});
                    } else {
                        this.quorumConn.zadd(this.myQueueShard, score, element);
                        this.quorumConn.zrem(unackKey, new String[]{element});
                    }
                }
                return null;
            });
            start.stop();
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    private String getQueueShardKey(String str, String str2) {
        return this.redisKeyPrefix + ".QUEUE." + str + "." + str2;
    }

    private String getUnackKey(String str, String str2) {
        return this.redisKeyPrefix + ".UNACK." + str + "." + str2;
    }

    public void close() throws IOException {
        this.schedulerForUnacksProcessing.shutdown();
        this.monitor.close();
    }
}
