package com.reger.l2cache.pipeline.core;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisConnectionUtils;
import org.springframework.util.Assert;

/* loaded from: input_file:com/reger/l2cache/pipeline/core/RedisPipelineCore.class */
public class RedisPipelineCore implements PipelineCore {
    private static final int MAX_PIP_SIZE = 500;
    private static final int WARN_TIME_MILLIS = 50;
    private final LinkedBlockingQueue<QueueNode<?>> queue = new LinkedBlockingQueue<>();
    private volatile boolean pipeEnabled = true;
    private final Lock LOCK = new ReentrantLock();
    private RedisConnectionFactory connectionFactory;
    private static final Logger log = LoggerFactory.getLogger(RedisPipelineCore.class);
    private static final int THREAD_NUM = 2 * Runtime.getRuntime().availableProcessors();
    private static final Map<RedisConnectionFactory, RedisPipelineCore> MAP_REDIS_PIPELINE_CORE = new HashMap();
    private static final ExecutorService executorService = Executors.newCachedThreadPool(runnable -> {
        Thread thread = new Thread(runnable);
        thread.setName("Redis-Pipeline-Work-" + thread.getId());
        thread.setPriority(10);
        return thread;
    });

    private RedisPipelineCore() {
    }

    private LinkedList<QueueNode<?>> getQueueNodes() {
        QueueNode<?> poll;
        try {
            this.LOCK.lockInterruptibly();
            LinkedList<QueueNode<?>> linkedList = new LinkedList<>();
            QueueNode<?> queueNode = null;
            try {
                try {
                    queueNode = this.queue.take();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    log.debug("任务线程被中断", e);
                }
                if (queueNode == null) {
                    return null;
                }
                linkedList.add(queueNode);
                while (linkedList.size() < MAX_PIP_SIZE && (poll = this.queue.poll()) != null) {
                    linkedList.add(poll);
                }
                this.LOCK.unlock();
                return linkedList;
            } finally {
                this.LOCK.unlock();
            }
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            log.debug("任务线程被中断", e2);
            return null;
        }
    }

    private void handle() {
        LinkedList<QueueNode<?>> queueNodes = getQueueNodes();
        if (queueNodes == null || queueNodes.isEmpty()) {
            return;
        }
        operation(queueNodes);
    }

    private void operation(List<QueueNode<?>> list) {
        RedisConnection redisConnection = null;
        try {
            long currentTimeMillis = System.currentTimeMillis();
            try {
                try {
                    RedisConnection connection = RedisConnectionUtils.getConnection(this.connectionFactory);
                    redisConnection = connection;
                    connection.openPipeline();
                    list.forEach(queueNode -> {
                        queueNode.operation(connection);
                    });
                    List closePipeline = connection.closePipeline();
                    for (int i = 0; i < list.size(); i++) {
                        list.get(i).callback(closePipeline.get(i));
                    }
                    long currentTimeMillis2 = System.currentTimeMillis();
                    if (currentTimeMillis2 - currentTimeMillis > 50) {
                        log.info("注意是否有redis慢查询，本次执行耗时{}毫秒,{}条指令一次执行", Long.valueOf(currentTimeMillis2 - currentTimeMillis), Integer.valueOf(list.size()));
                    }
                    RedisConnectionUtils.releaseConnection(redisConnection, this.connectionFactory);
                } catch (Exception e) {
                    list.forEach(queueNode2 -> {
                        queueNode2.callback(e);
                    });
                    long currentTimeMillis3 = System.currentTimeMillis();
                    if (currentTimeMillis3 - currentTimeMillis > 50) {
                        log.info("注意是否有redis慢查询，本次执行耗时{}毫秒,{}条指令一次执行", Long.valueOf(currentTimeMillis3 - currentTimeMillis), Integer.valueOf(list.size()));
                    }
                    RedisConnectionUtils.releaseConnection(redisConnection, this.connectionFactory);
                }
            } catch (Throwable th) {
                long currentTimeMillis4 = System.currentTimeMillis();
                if (currentTimeMillis4 - currentTimeMillis > 50) {
                    log.info("注意是否有redis慢查询，本次执行耗时{}毫秒,{}条指令一次执行", Long.valueOf(currentTimeMillis4 - currentTimeMillis), Integer.valueOf(list.size()));
                }
                RedisConnectionUtils.releaseConnection(redisConnection, this.connectionFactory);
                throw th;
            }
        } finally {
            list.forEach(queueNode3 -> {
                queueNode3.aNotify();
            });
        }
    }

    private <T> T asyncExecute(Operations operations, Callback<T> callback) {
        QueueNode queueNode = new QueueNode(operations, callback);
        RedisConnection redisConnection = null;
        try {
            redisConnection = RedisConnectionUtils.getConnection(this.connectionFactory);
            T relust = queueNode.operation(redisConnection).relust();
            RedisConnectionUtils.releaseConnection(redisConnection, this.connectionFactory);
            return relust;
        } catch (Throwable th) {
            RedisConnectionUtils.releaseConnection(redisConnection, this.connectionFactory);
            throw th;
        }
    }

    private <T> T syncExecute(Operations operations, Callback<T> callback) {
        QueueNode<?> queueNode = new QueueNode<>(operations, callback);
        try {
            this.queue.put(queueNode);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.warn("加入元素时，线程中断", queueNode);
        }
        return queueNode.await().relust();
    }

    @Override // com.reger.l2cache.pipeline.core.PipelineCore
    public <T> T execute(Operations operations, Callback<T> callback, boolean z) {
        if (this.connectionFactory == null || !this.pipeEnabled) {
            log.debug("未启用RedisPipeline");
            return (T) asyncExecute(operations, callback);
        }
        if (z) {
            return (T) syncExecute(operations, callback);
        }
        log.debug("操作可能没有返回值值暂不支持在管道中执行");
        return (T) asyncExecute(operations, callback);
    }

    public static final synchronized RedisPipelineCore start(RedisConnectionFactory redisConnectionFactory, Boolean bool, Integer num) {
        Assert.notNull(redisConnectionFactory, "redis链接工厂不可以为空");
        if (MAP_REDIS_PIPELINE_CORE.containsKey(redisConnectionFactory)) {
            return MAP_REDIS_PIPELINE_CORE.get(redisConnectionFactory);
        }
        RedisPipelineCore redisPipelineCore = new RedisPipelineCore();
        redisPipelineCore.connectionFactory = redisConnectionFactory;
        redisPipelineCore.pipeEnabled = bool.booleanValue();
        MAP_REDIS_PIPELINE_CORE.put(redisConnectionFactory, redisPipelineCore);
        if (num == null || num.intValue() == 0) {
            num = Integer.valueOf(THREAD_NUM);
        }
        for (int i = 0; i < num.intValue(); i++) {
            executorService.submit(() -> {
                while (!Thread.interrupted()) {
                    try {
                        redisPipelineCore.handle();
                    } catch (Exception e) {
                        log.warn("存在异常", e);
                    }
                }
                log.info("线程{}结束", Thread.currentThread().getName());
            });
        }
        log.debug("准备了{}个线程用于处理任务", num);
        return redisPipelineCore;
    }

    public final void stop() {
        Assert.notNull(this.connectionFactory, "没有开启 RedisPipelineCore 不可以关闭");
        this.connectionFactory = null;
        if (executorService.isShutdown()) {
            return;
        }
        executorService.shutdownNow();
    }
}
