package com.netflix.dyno.queues.redis.v2;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.dyno.connectionpool.HashPartitioner;
import com.netflix.dyno.connectionpool.impl.hash.Murmur3HashPartitioner;
import com.netflix.dyno.queues.DynoQueue;
import com.netflix.dyno.queues.Message;
import com.netflix.dyno.queues.redis.QueueMonitor;
import com.netflix.dyno.queues.redis.QueueUtils;
import com.netflix.dyno.queues.redis.conn.Pipe;
import com.netflix.dyno.queues.redis.conn.RedisConnection;
import com.netflix.servo.monitor.Stopwatch;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Response;
import redis.clients.jedis.Tuple;
import redis.clients.jedis.params.ZAddParams;

/* loaded from: input_file:com/netflix/dyno/queues/redis/v2/RedisPipelineQueue.class */
public class RedisPipelineQueue implements DynoQueue {
    private final Logger logger;
    private final Clock clock;
    private final String queueName;
    private final String shardName;
    private final String messageStoreKeyPrefix;
    private final String myQueueShard;
    private final String unackShardKeyPrefix;
    private final int unackTime;
    private final QueueMonitor monitor;
    private final ObjectMapper om;
    private final RedisConnection connPool;
    private volatile RedisConnection nonQuorumPool;
    private final ScheduledExecutorService schedulerForUnacksProcessing;
    private final HashPartitioner partitioner;
    private final int maxHashBuckets = 32;
    private final int longPollWaitIntervalInMillis = 10;

    public RedisPipelineQueue(String str, String str2, String str3, int i, int i2, RedisConnection redisConnection) {
        this(Clock.systemDefaultZone(), str, str2, str3, i, i2, redisConnection);
    }

    public RedisPipelineQueue(Clock clock, String str, String str2, String str3, int i, int i2, RedisConnection redisConnection) {
        this.logger = LoggerFactory.getLogger(RedisPipelineQueue.class);
        this.partitioner = new Murmur3HashPartitioner();
        this.maxHashBuckets = 32;
        this.longPollWaitIntervalInMillis = 10;
        this.clock = clock;
        this.queueName = str2;
        String str4 = "{" + str2 + "." + str3 + "}";
        this.shardName = str3;
        this.messageStoreKeyPrefix = str + ".MSG." + str4;
        this.myQueueShard = str + ".QUEUE." + str4;
        this.unackShardKeyPrefix = str + ".UNACK." + str4 + ".";
        this.unackTime = i2;
        this.connPool = redisConnection;
        this.nonQuorumPool = redisConnection;
        this.om = QueueUtils.constructObjectMapper();
        this.monitor = new QueueMonitor(str4, str3);
        this.schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1);
        this.schedulerForUnacksProcessing.scheduleAtFixedRate(() -> {
            processUnacks();
        }, i, i, TimeUnit.MILLISECONDS);
        this.logger.info(RedisPipelineQueue.class.getName() + " is ready to serve " + str4 + ", shard=" + str3);
    }

    public void setNonQuorumPool(RedisConnection redisConnection) {
        this.nonQuorumPool = redisConnection;
    }

    public String getName() {
        return this.queueName;
    }

    public int getUnackTime() {
        return this.unackTime;
    }

    public List<String> push(List<Message> list) {
        Stopwatch start = this.monitor.start(this.monitor.push, list.size());
        RedisConnection resource = this.connPool.getResource();
        try {
            try {
                Pipe pipelined = resource.pipelined();
                for (Message message : list) {
                    pipelined.hset(messageStoreKey(message.getId()), message.getId(), this.om.writeValueAsString(message));
                    pipelined.zadd(this.myQueueShard, Long.valueOf(this.clock.millis() + message.getTimeout()).doubleValue() + (message.getPriority() / 100.0d), message.getId());
                }
                pipelined.sync();
                pipelined.close();
                List<String> list2 = (List) list.stream().map(message2 -> {
                    return message2.getId();
                }).collect(Collectors.toList());
                resource.close();
                start.stop();
                return list2;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            resource.close();
            start.stop();
            throw th;
        }
    }

    private String messageStoreKey(String str) {
        return this.messageStoreKeyPrefix + "." + (this.partitioner.hash(str).longValue() % 32);
    }

    private String unackShardKey(String str) {
        return this.unackShardKeyPrefix + (this.partitioner.hash(str).longValue() % 32);
    }

    public List<Message> peek(int i) {
        Stopwatch start = this.monitor.peek.start();
        RedisConnection resource = this.connPool.getResource();
        try {
            try {
                Set<String> peekIds = peekIds(0, i);
                if (peekIds == null) {
                    List<Message> emptyList = Collections.emptyList();
                    resource.close();
                    start.stop();
                    return emptyList;
                }
                LinkedList linkedList = new LinkedList();
                for (String str : peekIds) {
                    linkedList.add((Message) this.om.readValue(resource.hget(messageStoreKey(str), str), Message.class));
                }
                return linkedList;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } finally {
            resource.close();
            start.stop();
        }
    }

    public synchronized List<Message> pop(int i, int i2, TimeUnit timeUnit) {
        if (i < 1) {
            return Collections.emptyList();
        }
        Stopwatch start = this.monitor.start(this.monitor.pop, i);
        List<Message> linkedList = new LinkedList();
        int i3 = i;
        long millis = this.clock.millis() + timeUnit.toMillis(i2);
        while (true) {
            try {
                try {
                    List<Message> _pop = _pop((List) peekIds(0, i3).stream().collect(Collectors.toList()));
                    int size = _pop.size();
                    if (size == i) {
                        linkedList = _pop;
                        break;
                    }
                    linkedList.addAll(_pop);
                    i3 -= size;
                    if (this.clock.millis() > millis) {
                        break;
                    }
                    try {
                        Thread.sleep(10L);
                    } catch (InterruptedException e) {
                        this.logger.error(e.getMessage(), e);
                    }
                    if (i3 <= 0) {
                        break;
                    }
                } catch (Exception e2) {
                    throw new RuntimeException(e2);
                }
            } finally {
                start.stop();
            }
        }
        return linkedList;
    }

    public Message popWithMsgId(String str) {
        throw new UnsupportedOperationException();
    }

    public Message unsafePopWithMsgIdAllShards(String str) {
        throw new UnsupportedOperationException();
    }

    private List<Message> _pop(List<String> list) throws Exception {
        String str;
        double doubleValue = Long.valueOf(this.clock.millis() + this.unackTime).doubleValue();
        LinkedList linkedList = new LinkedList();
        ZAddParams nx = ZAddParams.zAddParams().nx();
        RedisConnection resource = this.connPool.getResource();
        try {
            Pipe pipelined = resource.pipelined();
            ArrayList arrayList = new ArrayList(list.size());
            for (int i = 0; i < list.size() && (str = list.get(i)) != null; i++) {
                arrayList.add(pipelined.zadd(unackShardKey(str), doubleValue, str, nx));
            }
            pipelined.sync();
            Pipe pipelined2 = resource.pipelined();
            int size = arrayList.size();
            ArrayList arrayList2 = new ArrayList(size);
            LinkedList linkedList2 = new LinkedList();
            for (int i2 = 0; i2 < size; i2++) {
                if (((Long) ((Response) arrayList.get(i2)).get()).longValue() == 0) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Cannot add {} to unack queue shard", list.get(i2));
                    }
                    this.monitor.misses.increment();
                } else {
                    String str2 = list.get(i2);
                    arrayList2.add(str2);
                    linkedList2.add(pipelined2.zrem(this.myQueueShard, str2));
                }
            }
            pipelined2.sync();
            Pipe pipelined3 = resource.pipelined();
            ArrayList arrayList3 = new ArrayList(size);
            for (int i3 = 0; i3 < linkedList2.size(); i3++) {
                if (((Long) ((Response) linkedList2.get(i3)).get()).longValue() == 0) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Cannot remove {} from queue shard", arrayList2.get(i3));
                    }
                    this.monitor.misses.increment();
                } else {
                    arrayList3.add(pipelined3.hget(messageStoreKey((String) arrayList2.get(i3)), (String) arrayList2.get(i3)));
                }
            }
            pipelined3.sync();
            for (int i4 = 0; i4 < arrayList3.size(); i4++) {
                String str3 = (String) ((Response) arrayList3.get(i4)).get();
                if (str3 == null) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Cannot read payload for {}", arrayList2.get(i4));
                    }
                    this.monitor.misses.increment();
                } else {
                    Message message = (Message) this.om.readValue(str3, Message.class);
                    message.setShard(this.shardName);
                    linkedList.add(message);
                }
            }
            return linkedList;
        } finally {
            resource.close();
        }
    }

    public boolean ack(String str) {
        Stopwatch start = this.monitor.ack.start();
        RedisConnection resource = this.connPool.getResource();
        try {
            if (resource.zrem(unackShardKey(str), str).longValue() <= 0) {
                return false;
            }
            resource.hdel(messageStoreKey(str), str);
            resource.close();
            start.stop();
            return true;
        } finally {
            resource.close();
            start.stop();
        }
    }

    public void ack(List<Message> list) {
        Stopwatch start = this.monitor.ack.start();
        RedisConnection resource = this.connPool.getResource();
        Pipe pipelined = resource.pipelined();
        LinkedList linkedList = new LinkedList();
        try {
            try {
                for (Message message : list) {
                    linkedList.add(pipelined.zrem(unackShardKey(message.getId()), message.getId()));
                }
                pipelined.sync();
                Pipe pipelined2 = resource.pipelined();
                LinkedList linkedList2 = new LinkedList();
                for (int i = 0; i < list.size(); i++) {
                    if (((Long) ((Response) linkedList.get(i)).get()).longValue() > 0) {
                        linkedList2.add(pipelined2.hdel(messageStoreKey(list.get(i).getId()), list.get(i).getId()));
                    }
                }
                pipelined2.sync();
                resource.close();
                start.stop();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            resource.close();
            start.stop();
            throw th;
        }
    }

    public boolean setUnackTimeout(String str, long j) {
        Stopwatch start = this.monitor.ack.start();
        RedisConnection resource = this.connPool.getResource();
        try {
            double doubleValue = Long.valueOf(this.clock.millis() + j).doubleValue();
            if (resource.zscore(unackShardKey(str), str) == null) {
                return false;
            }
            resource.zadd(unackShardKey(str), doubleValue, str);
            resource.close();
            start.stop();
            return true;
        } finally {
            resource.close();
            start.stop();
        }
    }

    public boolean setTimeout(String str, long j) {
        RedisConnection resource = this.connPool.getResource();
        try {
            try {
                String hget = resource.hget(messageStoreKey(str), str);
                if (hget == null) {
                    return false;
                }
                Message message = (Message) this.om.readValue(hget, Message.class);
                message.setTimeout(j);
                if (resource.zscore(this.myQueueShard, str) == null) {
                    resource.close();
                    return false;
                }
                resource.zadd(this.myQueueShard, Long.valueOf(this.clock.millis() + j).doubleValue() + (message.getPriority() / 100.0d), str);
                resource.hset(messageStoreKey(message.getId()), message.getId(), this.om.writeValueAsString(message));
                resource.close();
                return true;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } finally {
            resource.close();
        }
    }

    public boolean remove(String str) {
        Stopwatch start = this.monitor.remove.start();
        RedisConnection resource = this.connPool.getResource();
        try {
            resource.zrem(unackShardKey(str), str);
            Long zrem = resource.zrem(this.myQueueShard, str);
            Long hdel = resource.hdel(messageStoreKey(str), str);
            if (zrem.longValue() > 0) {
                if (hdel.longValue() > 0) {
                    return true;
                }
            }
            resource.close();
            start.stop();
            return false;
        } finally {
            resource.close();
            start.stop();
        }
    }

    public boolean ensure(Message message) {
        throw new UnsupportedOperationException();
    }

    public boolean containsPredicate(String str) {
        return containsPredicate(str, false);
    }

    public String getMsgWithPredicate(String str) {
        return getMsgWithPredicate(str, false);
    }

    public boolean containsPredicate(String str, boolean z) {
        throw new UnsupportedOperationException();
    }

    public String getMsgWithPredicate(String str, boolean z) {
        throw new UnsupportedOperationException();
    }

    public Message popMsgWithPredicate(String str, boolean z) {
        throw new UnsupportedOperationException();
    }

    public List<Message> bulkPop(int i, int i2, TimeUnit timeUnit) {
        throw new UnsupportedOperationException();
    }

    public List<Message> unsafeBulkPop(int i, int i2, TimeUnit timeUnit) {
        throw new UnsupportedOperationException();
    }

    public Message get(String str) {
        Stopwatch start = this.monitor.get.start();
        RedisConnection resource = this.connPool.getResource();
        try {
            try {
                String hget = resource.hget(messageStoreKey(str), str);
                if (hget == null) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Cannot get the message payload " + str);
                    }
                    return null;
                }
                Message message = (Message) this.om.readValue(hget, Message.class);
                resource.close();
                start.stop();
                return message;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } finally {
            resource.close();
            start.stop();
        }
    }

    public Message localGet(String str) {
        throw new UnsupportedOperationException();
    }

    public long size() {
        Stopwatch start = this.monitor.size.start();
        RedisConnection resource = this.nonQuorumPool.getResource();
        try {
            long zcard = resource.zcard(this.myQueueShard);
            resource.close();
            start.stop();
            return zcard;
        } catch (Throwable th) {
            resource.close();
            start.stop();
            throw th;
        }
    }

    public Map<String, Map<String, Long>> shardSizes() {
        Stopwatch start = this.monitor.size.start();
        HashMap hashMap = new HashMap();
        RedisConnection resource = this.nonQuorumPool.getResource();
        try {
            long zcard = resource.zcard(this.myQueueShard);
            long j = 0;
            for (int i = 0; i < 32; i++) {
                j += resource.zcard(this.unackShardKeyPrefix + i);
            }
            HashMap hashMap2 = new HashMap();
            hashMap2.put("size", Long.valueOf(zcard));
            hashMap2.put("uacked", Long.valueOf(j));
            hashMap.put(this.shardName, hashMap2);
            resource.close();
            start.stop();
            return hashMap;
        } catch (Throwable th) {
            resource.close();
            start.stop();
            throw th;
        }
    }

    public void clear() {
        RedisConnection resource = this.connPool.getResource();
        try {
            resource.del(this.myQueueShard);
            for (int i = 0; i < 32; i++) {
                resource.del(this.unackShardKeyPrefix + i);
                resource.del(this.messageStoreKeyPrefix + "." + i);
            }
        } finally {
            resource.close();
        }
    }

    private Set<String> peekIds(int i, int i2) {
        RedisConnection resource = this.connPool.getResource();
        try {
            Set<String> zrangeByScore = resource.zrangeByScore(this.myQueueShard, 0, Long.valueOf(this.clock.millis() + 1).doubleValue(), i, i2);
            resource.close();
            return zrangeByScore;
        } catch (Throwable th) {
            resource.close();
            throw th;
        }
    }

    public void processUnacks() {
        for (int i = 0; i < 32; i++) {
            processUnacks(this.unackShardKeyPrefix + i);
        }
    }

    private void processUnacks(String str) {
        Stopwatch start = this.monitor.processUnack.start();
        RedisConnection resource = this.connPool.getResource();
        while (true) {
            try {
                this.monitor.queueDepth.record(size());
                Set<Tuple> zrangeByScoreWithScores = resource.zrangeByScoreWithScores(str, 0, Long.valueOf(this.clock.millis()).doubleValue(), 0, 1000);
                if (zrangeByScoreWithScores.size() <= 0) {
                    return;
                }
                this.logger.debug("Adding " + zrangeByScoreWithScores.size() + " messages back to the queue for " + this.queueName);
                LinkedList<Tuple> linkedList = new LinkedList();
                for (Tuple tuple : zrangeByScoreWithScores) {
                    tuple.getScore();
                    String element = tuple.getElement();
                    if (resource.hget(messageStoreKey(element), element) == null) {
                        resource.zrem(unackShardKey(element), element);
                    } else {
                        linkedList.add(tuple);
                    }
                }
                Pipe pipelined = resource.pipelined();
                for (Tuple tuple2 : linkedList) {
                    double score = tuple2.getScore();
                    String element2 = tuple2.getElement();
                    pipelined.zadd(this.myQueueShard, score, element2);
                    pipelined.zrem(unackShardKey(element2), element2);
                }
                pipelined.sync();
            } finally {
                resource.close();
                start.stop();
            }
        }
    }

    public List<Message> getAllMessages() {
        throw new UnsupportedOperationException();
    }

    public void atomicProcessUnacks() {
        throw new UnsupportedOperationException();
    }

    public List<Message> findStaleMessages() {
        throw new UnsupportedOperationException();
    }

    public boolean atomicRemove(String str) {
        throw new UnsupportedOperationException();
    }

    public void close() throws IOException {
        this.schedulerForUnacksProcessing.shutdown();
        this.monitor.close();
    }

    public List<Message> unsafePeekAllShards(int i) {
        throw new UnsupportedOperationException();
    }

    public List<Message> unsafePopAllShards(int i, int i2, TimeUnit timeUnit) {
        throw new UnsupportedOperationException();
    }
}
