package top.zenyoung.redis.service.impl;

import com.google.common.collect.Maps;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import org.redisson.api.RQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.util.Assert;
import top.zenyoung.redis.service.QueueService;

/* loaded from: input_file:top/zenyoung/redis/service/impl/RedisQueueServiceImpl.class */
public class RedisQueueServiceImpl implements QueueService {
    private static final Logger log = LoggerFactory.getLogger(RedisQueueServiceImpl.class);
    private static final Map<String, Object> LOCKS = Maps.newConcurrentMap();
    private final RedissonClient redissonClient;

    @Nonnull
    protected String getRedisQueueKey(@Nonnull String str) {
        return "zy-queue:" + str;
    }

    /* JADX WARN: Finally extract failed */
    @Override // top.zenyoung.redis.service.QueueService
    public <T extends Serializable> void pushQueue(@Nonnull String str, @Nonnull T t) {
        RuntimeException runtimeException;
        log.debug("pushQueue(key: {},data: {})...", str, t);
        Assert.hasText(str, "'key'不能为空!");
        String redisQueueKey = getRedisQueueKey(str);
        Assert.hasText(redisQueueKey, "'queueKey'不能为空!");
        String str2 = redisQueueKey + "-push";
        synchronized (LOCKS.computeIfAbsent(str2, str3 -> {
            return new Object();
        })) {
            try {
                try {
                    this.redissonClient.getQueue(redisQueueKey).add(t);
                    LOCKS.remove(str2);
                } finally {
                }
            } catch (Throwable th) {
                LOCKS.remove(str2);
                throw th;
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // top.zenyoung.redis.service.QueueService
    public <T extends Serializable> int popQueue(@Nonnull String str, @Nonnull Class<T> cls, @Nonnull Consumer<T> consumer) {
        log.debug("popQueue(key: {},dataClass: {},consumer: {})...", new Object[]{str, cls, consumer});
        Assert.hasText(str, "'key'不能为空!");
        String redisQueueKey = getRedisQueueKey(str);
        Assert.hasText(redisQueueKey, "'queueKey'不能为空!");
        AtomicInteger atomicInteger = new AtomicInteger(0);
        String str2 = redisQueueKey + "-pop";
        synchronized (LOCKS.computeIfAbsent(str2, str3 -> {
            return new Object();
        })) {
            try {
                try {
                    RQueue queue = this.redissonClient.getQueue(redisQueueKey);
                    while (true) {
                        Serializable serializable = (Serializable) queue.poll();
                        if (!Objects.nonNull(serializable)) {
                            break;
                        }
                        atomicInteger.incrementAndGet();
                        consumer.accept(serializable);
                    }
                    LOCKS.remove(str2);
                } catch (Throwable th) {
                    LOCKS.remove(str2);
                    throw th;
                }
            } catch (QueryTimeoutException e) {
                log.debug("popQueue(key: {},dataClass: {},consumer: {})-exp: {}", new Object[]{str, cls, consumer, e.getMessage()});
                LOCKS.remove(str2);
            } catch (Throwable th2) {
                log.warn("popQueue(key: {},dataClass: {},consumer: {})-exp: {}", new Object[]{str, cls, consumer, th2.getMessage()});
                throw new RuntimeException(th2);
            }
        }
        return atomicInteger.get();
    }

    public RedisQueueServiceImpl(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }
}
