package com.reger.l2cache.pipeline.core;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisConnectionUtils;
import org.springframework.util.Assert;

/* loaded from: input_file:com/reger/l2cache/pipeline/core/RedisPipelineCore.class */
public class RedisPipelineCore implements PipelineCore {
    private static final int PIP_SIZE = 1000;
    private static final int WARN_PIP_SIZE = 990;
    private static final int INFO_PIP_SIZE = 900;
    private final LinkedBlockingQueue<QueueNode<?>> queue = new LinkedBlockingQueue<>();
    private final Lock LOCK = new ReentrantLock();
    private RedisConnectionFactory connectionFactory;
    private static final Logger log = LoggerFactory.getLogger(RedisPipelineCore.class);
    private static final Callback<?> DEFAULT_CALLBACK = obj -> {
        return obj;
    };
    private static final int THREAD_NUM = Math.min(Runtime.getRuntime().availableProcessors(), 8);
    private static final Map<RedisConnectionFactory, RedisPipelineCore> MAP_REDIS_PIPELINE_CORE = new HashMap();
    private static final ExecutorService executorService = Executors.newCachedThreadPool(runnable -> {
        Thread thread = new Thread(runnable);
        thread.setName("Redis-Pipeline-Work-" + thread.getId());
        thread.setPriority(10);
        return thread;
    });

    private RedisPipelineCore() {
    }

    private LinkedList<QueueNode<?>> getQueueNodes() {
        try {
            this.LOCK.lockInterruptibly();
            QueueNode<?> queueNode = null;
            try {
                try {
                    queueNode = this.queue.take();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    log.debug("任务线程被中断", e);
                }
                if (queueNode == null) {
                    return null;
                }
                LinkedList<QueueNode<?>> linkedList = new LinkedList<>();
                linkedList.add(queueNode);
                while (true) {
                    QueueNode<?> poll = this.queue.poll();
                    if (poll == null || linkedList.size() >= PIP_SIZE) {
                        break;
                    }
                    linkedList.add(poll);
                }
                this.LOCK.unlock();
                return linkedList;
            } finally {
                this.LOCK.unlock();
            }
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            log.debug("任务线程被中断", e2);
            return null;
        }
    }

    private void handle() {
        LinkedList<QueueNode<?>> queueNodes = getQueueNodes();
        if (queueNodes == null || queueNodes.isEmpty()) {
            return;
        }
        if (log.isInfoEnabled() && queueNodes.size() > INFO_PIP_SIZE && queueNodes.size() < WARN_PIP_SIZE) {
            log.info("请检查redis服务器是否运行较慢或者存在redis慢查询,本次共需要执行 {}个", Integer.valueOf(queueNodes.size()));
        }
        if (queueNodes.size() >= WARN_PIP_SIZE) {
            log.warn("请检查redis服务器运行较慢或者存在redis慢查询,本次共需要执行 {}个", Integer.valueOf(queueNodes.size()));
        }
        operation(queueNodes);
    }

    private void operation(List<QueueNode<?>> list) {
        RedisConnection redisConnection = null;
        try {
            try {
                try {
                    RedisConnection connection = RedisConnectionUtils.getConnection(this.connectionFactory);
                    redisConnection = connection;
                    connection.openPipeline();
                    list.forEach(queueNode -> {
                        queueNode.operation(connection);
                    });
                    List closePipeline = connection.closePipeline();
                    for (int i = 0; i < list.size(); i++) {
                        list.get(i).callback(closePipeline.get(i));
                    }
                    RedisConnectionUtils.releaseConnection(redisConnection, this.connectionFactory);
                } catch (Exception e) {
                    list.forEach(queueNode2 -> {
                        queueNode2.callback(e);
                    });
                    RedisConnectionUtils.releaseConnection(redisConnection, this.connectionFactory);
                }
            } catch (Throwable th) {
                RedisConnectionUtils.releaseConnection(redisConnection, this.connectionFactory);
                throw th;
            }
        } finally {
            list.forEach(queueNode3 -> {
                queueNode3.aNotify();
            });
        }
    }

    @Override // com.reger.l2cache.pipeline.core.PipelineCore
    public final <T> T execute(Operations operations, Callback<T> callback) {
        Assert.notNull(this.connectionFactory, "需要先开启RedisPipelineCore");
        QueueNode<?> queueNode = new QueueNode<>(operations, callback);
        this.queue.add(queueNode);
        return queueNode.await().relust();
    }

    @Override // com.reger.l2cache.pipeline.core.PipelineCore
    public final <T> T execute(Operations operations) {
        return (T) execute(operations, DEFAULT_CALLBACK);
    }

    public static final synchronized RedisPipelineCore start(RedisConnectionFactory redisConnectionFactory) {
        Assert.notNull(redisConnectionFactory, "redis链接工厂不可以为空");
        if (MAP_REDIS_PIPELINE_CORE.containsKey(redisConnectionFactory)) {
            return MAP_REDIS_PIPELINE_CORE.get(redisConnectionFactory);
        }
        RedisPipelineCore redisPipelineCore = new RedisPipelineCore();
        redisPipelineCore.connectionFactory = redisConnectionFactory;
        MAP_REDIS_PIPELINE_CORE.put(redisConnectionFactory, redisPipelineCore);
        for (int i = 0; i < THREAD_NUM; i++) {
            executorService.submit(() -> {
                while (!Thread.interrupted()) {
                    try {
                        redisPipelineCore.handle();
                    } catch (Exception e) {
                        log.warn("存在异常", e);
                    }
                }
                log.info("线程{}结束", Thread.currentThread().getName());
            });
        }
        log.debug("准备了{}个线程用于处理任务", Integer.valueOf(THREAD_NUM));
        return redisPipelineCore;
    }

    public final void stop() {
        Assert.notNull(this.connectionFactory, "没有开启 RedisPipelineCore 不可以关闭");
        this.connectionFactory = null;
        if (executorService.isShutdown()) {
            return;
        }
        executorService.shutdownNow();
    }
}
