package com.netflix.dyno.queues.redis;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.annotations.VisibleForTesting;
import com.netflix.dyno.connectionpool.exception.DynoException;
import com.netflix.dyno.queues.DynoQueue;
import com.netflix.dyno.queues.Message;
import com.netflix.servo.monitor.Stopwatch;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCommands;
import redis.clients.jedis.Tuple;

/* loaded from: input_file:com/netflix/dyno/queues/redis/RedisDynoQueue.class */
public class RedisDynoQueue implements DynoQueue {
    private String queueName;
    private List<String> allShards;
    private String shardName;
    private String redisKeyPrefix;
    private String messageStoreKey;
    private String myQueueShard;
    private QueueMonitor monitor;
    private ObjectMapper om;
    private ExecutorService executorService;
    private JedisCommands quorumConn;
    private JedisCommands nonQuorumConn;
    private LinkedBlockingQueue<String> prefetchedIds;
    private final Logger logger = LoggerFactory.getLogger(RedisDynoQueue.class);
    private int unackTime = 60;
    private int unackScheduleInMS = 60000;
    private int prefetchCount = 10000;
    private int retryCount = 2;
    private AtomicBoolean prefetch = new AtomicBoolean(false);
    private AtomicInteger nextShardIndex = new AtomicInteger(0);

    public RedisDynoQueue(String str, String str2, Set<String> set, String str3, ExecutorService executorService) {
        this.redisKeyPrefix = str;
        this.queueName = str2;
        this.allShards = (List) set.stream().collect(Collectors.toList());
        this.shardName = str3;
        this.messageStoreKey = str + ".MESSAGE." + str2;
        this.myQueueShard = getQueueShardKey(str2, str3);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        objectMapper.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
        objectMapper.configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, false);
        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
        objectMapper.disable(SerializationFeature.INDENT_OUTPUT);
        this.om = objectMapper;
        this.monitor = new QueueMonitor(str2, str3);
        this.prefetchedIds = new LinkedBlockingQueue<>();
        this.executorService = executorService;
        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> {
            processUnacks();
        }, this.unackScheduleInMS, this.unackScheduleInMS, TimeUnit.MILLISECONDS);
        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> {
            prefetchIds();
        }, 0L, 20L, TimeUnit.MILLISECONDS);
        this.logger.info(RedisDynoQueue.class.getName() + " is ready to serve " + str2);
    }

    public RedisDynoQueue withQuorumConn(JedisCommands jedisCommands) {
        this.quorumConn = jedisCommands;
        return this;
    }

    public RedisDynoQueue withNonQuorumConn(JedisCommands jedisCommands) {
        this.nonQuorumConn = jedisCommands;
        return this;
    }

    public RedisDynoQueue withUnackTime(int i) {
        this.unackTime = i;
        return this;
    }

    public RedisDynoQueue withUnackSchedulerTime(int i) {
        this.unackScheduleInMS = i;
        return this;
    }

    public String getName() {
        return this.queueName;
    }

    public int getUnackTime() {
        return this.unackTime;
    }

    public List<String> push(List<Message> list) {
        Stopwatch start = this.monitor.start(this.monitor.push, list.size());
        try {
            execute(() -> {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    Message message = (Message) it.next();
                    this.quorumConn.hset(this.messageStoreKey, message.getId(), this.om.writeValueAsString(message));
                    this.quorumConn.zadd(getQueueShardKey(this.queueName, getNextShard()), Long.valueOf(System.currentTimeMillis() + message.getTimeout()).doubleValue() + (message.getPriority() / 100), message.getId());
                }
                return list;
            });
            List<String> list2 = (List) list.stream().map(message -> {
                return message.getId();
            }).collect(Collectors.toList());
            start.stop();
            return list2;
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    public List<Message> peek(int i) {
        Stopwatch start = this.monitor.peek.start();
        try {
            Set<String> peekIds = peekIds(0, i);
            if (peekIds == null) {
                List<Message> emptyList = Collections.emptyList();
                start.stop();
                return emptyList;
            }
            List<Message> list = (List) execute(() -> {
                LinkedList linkedList = new LinkedList();
                Iterator it = peekIds.iterator();
                while (it.hasNext()) {
                    linkedList.add((Message) this.om.readValue(this.nonQuorumConn.hget(this.messageStoreKey, (String) it.next()), Message.class));
                }
                return linkedList;
            });
            start.stop();
            return list;
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    public List<Message> pop(int i, int i2, TimeUnit timeUnit) {
        if (i < 1) {
            return Collections.emptyList();
        }
        Stopwatch start = this.monitor.start(this.monitor.pop, i);
        try {
            List<Message> list = (List) execute(() -> {
                HashSet hashSet = new HashSet();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("{} prefetchedIds.size={}", this.queueName, Integer.valueOf(this.prefetchedIds.size()));
                }
                if (this.prefetchedIds.size() < i) {
                    this.prefetch.set(true);
                    String poll = this.prefetchedIds.poll(i2, timeUnit);
                    if (poll != null) {
                        hashSet.add(poll);
                    }
                }
                this.prefetchedIds.drainTo(hashSet, i);
                if (hashSet.size() < i) {
                    this.prefetch.set(true);
                }
                return hashSet.isEmpty() ? Collections.emptyList() : _pop(hashSet, i);
            });
            start.stop();
            return list;
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    @VisibleForTesting
    void prefetch() {
        this.prefetch.set(true);
        prefetchIds();
    }

    private void prefetchIds() {
        if (this.prefetch.get()) {
            Stopwatch start = this.monitor.start(this.monitor.prefetch, this.prefetchCount);
            try {
                execute(() -> {
                    this.prefetchedIds.addAll(peekIds(0, this.prefetchCount));
                    this.prefetch.set(false);
                    return null;
                });
            } finally {
                start.stop();
            }
        }
    }

    private List<Message> _pop(Set<String> set, int i) throws Exception {
        double doubleValue = Long.valueOf(System.currentTimeMillis() + this.unackTime).doubleValue();
        String unackKey = getUnackKey(this.queueName, this.shardName);
        LinkedList linkedList = new LinkedList();
        for (String str : set) {
            if (this.quorumConn.zadd(unackKey, doubleValue, str).longValue() == 0) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("cannot add to the unack shard " + str);
                }
                this.monitor.misses.increment();
            } else if (this.quorumConn.zrem(this.myQueueShard, new String[]{str}).longValue() == 0) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("cannot remove from the queue shard " + str);
                }
                this.monitor.misses.increment();
            } else {
                String hget = this.quorumConn.hget(this.messageStoreKey, str);
                if (hget == null) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Cannot get the message payload " + str);
                    }
                    this.monitor.misses.increment();
                } else {
                    linkedList.add((Message) this.om.readValue(hget, Message.class));
                    if (linkedList.size() == i) {
                        return linkedList;
                    }
                }
            }
        }
        return linkedList;
    }

    public boolean ack(String str) {
        Stopwatch start = this.monitor.ack.start();
        try {
            boolean booleanValue = ((Boolean) execute(() -> {
                Iterator<String> it = this.allShards.iterator();
                while (it.hasNext()) {
                    if (this.quorumConn.zrem(getUnackKey(this.queueName, it.next()), new String[]{str}).longValue() > 0) {
                        this.quorumConn.hdel(this.messageStoreKey, new String[]{str});
                        return true;
                    }
                }
                return false;
            })).booleanValue();
            start.stop();
            return booleanValue;
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    public boolean setUnackTimeout(String str, long j) {
        Stopwatch start = this.monitor.ack.start();
        try {
            boolean booleanValue = ((Boolean) execute(() -> {
                Iterator<String> it = this.allShards.iterator();
                while (it.hasNext()) {
                    String unackKey = getUnackKey(this.queueName, it.next());
                    if (this.quorumConn.zrem(unackKey, new String[]{str}).longValue() > 0) {
                        this.quorumConn.zadd(unackKey, Long.valueOf(System.currentTimeMillis() + j).doubleValue(), str);
                        return true;
                    }
                }
                return false;
            })).booleanValue();
            start.stop();
            return booleanValue;
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    public boolean remove(String str) {
        Stopwatch start = this.monitor.remove.start();
        try {
            boolean booleanValue = ((Boolean) execute(() -> {
                for (String str2 : this.allShards) {
                    this.quorumConn.zrem(getUnackKey(this.queueName, str2), new String[]{str});
                    Long zrem = this.quorumConn.zrem(getQueueShardKey(this.queueName, str2), new String[]{str});
                    Long hdel = this.quorumConn.hdel(this.messageStoreKey, new String[]{str});
                    if (zrem.longValue() > 0 && hdel.longValue() > 0) {
                        return true;
                    }
                }
                return false;
            })).booleanValue();
            start.stop();
            return booleanValue;
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    public Message get(String str) {
        Stopwatch start = this.monitor.get.start();
        try {
            Message message = (Message) execute(() -> {
                String hget = this.quorumConn.hget(this.messageStoreKey, str);
                if (hget != null) {
                    return (Message) this.om.readValue(hget, Message.class);
                }
                if (!this.logger.isDebugEnabled()) {
                    return null;
                }
                this.logger.debug("Cannot get the message payload " + str);
                return null;
            });
            start.stop();
            return message;
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    public long size() {
        Stopwatch start = this.monitor.size.start();
        try {
            long longValue = ((Long) execute(() -> {
                long j = 0;
                Iterator<String> it = this.allShards.iterator();
                while (it.hasNext()) {
                    j += this.nonQuorumConn.zcard(getQueueShardKey(this.queueName, it.next())).longValue();
                }
                return Long.valueOf(j);
            })).longValue();
            start.stop();
            return longValue;
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    public Map<String, Map<String, Long>> shardSizes() {
        Stopwatch start = this.monitor.size.start();
        HashMap hashMap = new HashMap();
        try {
            Map<String, Map<String, Long>> map = (Map) execute(() -> {
                for (String str : this.allShards) {
                    long longValue = this.nonQuorumConn.zcard(getQueueShardKey(this.queueName, str)).longValue();
                    long longValue2 = this.nonQuorumConn.zcard(getUnackKey(this.queueName, str)).longValue();
                    HashMap hashMap2 = new HashMap();
                    hashMap2.put("size", Long.valueOf(longValue));
                    hashMap2.put("uacked", Long.valueOf(longValue2));
                    hashMap.put(str, hashMap2);
                }
                return hashMap;
            });
            start.stop();
            return map;
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    public void clear() {
        execute(() -> {
            for (String str : this.allShards) {
                String queueShardKey = getQueueShardKey(this.queueName, str);
                String unackKey = getUnackKey(this.queueName, str);
                this.quorumConn.del(queueShardKey);
                this.quorumConn.del(unackKey);
            }
            this.quorumConn.del(this.messageStoreKey);
            return null;
        });
    }

    private Set<String> peekIds(int i, int i2) {
        return (Set) execute(() -> {
            return this.quorumConn.zrangeByScore(this.myQueueShard, 0.0d, Long.valueOf(System.currentTimeMillis() + 1).doubleValue(), i, i2);
        });
    }

    public void processUnacks() {
        Stopwatch start = this.monitor.processUnack.start();
        try {
            this.monitor.queueDepth.record(size());
            execute(() -> {
                String unackKey = getUnackKey(this.queueName, this.shardName);
                Set<Tuple> zrangeByScoreWithScores = this.quorumConn.zrangeByScoreWithScores(unackKey, 0.0d, Long.valueOf(System.currentTimeMillis()).doubleValue(), 0, 1000);
                if (zrangeByScoreWithScores.size() > 0) {
                    this.logger.debug("Adding " + zrangeByScoreWithScores.size() + " messages back to the queue for " + this.queueName);
                }
                for (Tuple tuple : zrangeByScoreWithScores) {
                    double score = tuple.getScore();
                    String element = tuple.getElement();
                    if (this.quorumConn.hget(this.messageStoreKey, element) == null) {
                        this.quorumConn.zrem(unackKey, new String[]{element});
                    } else {
                        this.quorumConn.zadd(this.myQueueShard, score, element);
                        this.quorumConn.zrem(unackKey, new String[]{element});
                    }
                }
                prefetchIds();
                return null;
            });
            start.stop();
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    private String getNextShard() {
        int incrementAndGet = this.nextShardIndex.incrementAndGet();
        if (incrementAndGet >= this.allShards.size()) {
            this.nextShardIndex.set(0);
            incrementAndGet = 0;
        }
        return this.allShards.get(incrementAndGet);
    }

    private String getQueueShardKey(String str, String str2) {
        return this.redisKeyPrefix + ".QUEUE." + str + "." + str2;
    }

    private String getUnackKey(String str, String str2) {
        return this.redisKeyPrefix + ".UNACK." + str + "." + str2;
    }

    private <R> R execute(Callable<R> callable) {
        return (R) executeWithRetry(this.executorService, callable, 0);
    }

    private <R> R executeWithRetry(ExecutorService executorService, Callable<R> callable, int i) {
        try {
            return (R) executorService.submit(callable).get(1000L, TimeUnit.SECONDS);
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof DynoException) || i >= this.retryCount) {
                throw new RuntimeException(e.getCause());
            }
            return (R) executeWithRetry(executorService, callable, i + 1);
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }
}
