package io.choerodon.message.impl.redis;

import io.choerodon.message.IQueueMessageListener;
import io.choerodon.message.annotation.QueueMonitor;
import io.choerodon.message.impl.MethodReflectUtils;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;
import redis.clients.jedis.Jedis;

/* loaded from: input_file:io/choerodon/message/impl/redis/QueueListenerContainer.class */
public class QueueListenerContainer implements InitializingBean, DisposableBean, SmartLifecycle {

    @Autowired
    private ApplicationContext applicationContext;
    private RedisConnectionFactory connectionFactory;
    private static final int PHASE = 9999;
    private static final long MIN_RECOVERY_INTERVAL = 2000;
    private static final long DEFAULT_RECOVERY_INTERVAL = 5000;
    private static final long IDLE_SLEEP_TIME = 100;
    private ExecutorService executorService;
    private List<IQueueMessageListener<?>> listeners;
    private RedisSerializer<String> stringRedisSerializer;
    private Logger logger = LoggerFactory.getLogger(QueueListenerContainer.class);
    private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
    private volatile boolean running = false;
    private List<MonitorTask> monitorTaskList = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/choerodon/message/impl/redis/QueueListenerContainer$MonitorTask.class */
    public class MonitorTask<T> implements SchedulingAwareRunnable {
        private IQueueMessageListener<T> receiver;
        private RedisConnection connection;
        private boolean running = false;

        MonitorTask(IQueueMessageListener<T> iQueueMessageListener) {
            this.receiver = iQueueMessageListener;
            Assert.notNull(iQueueMessageListener, "receiver is null.");
            Assert.hasText(iQueueMessageListener.getQueue(), "queue is not valid");
        }

        public void stop() {
            this.running = false;
            safeClose(true);
        }

        public void run() {
            this.running = true;
            while (this.running) {
                try {
                    if (this.connection == null) {
                        this.connection = QueueListenerContainer.this.connectionFactory.getConnection();
                    }
                    T fetchMessage = fetchMessage(this.connection, this.receiver.getQueue());
                    if (fetchMessage == null) {
                        sleep_(QueueListenerContainer.IDLE_SLEEP_TIME);
                    } else {
                        try {
                            this.receiver.onQueueMessage(fetchMessage, this.receiver.getQueue());
                        } catch (Throwable th) {
                            if (QueueListenerContainer.this.logger.isWarnEnabled()) {
                                QueueListenerContainer.this.logger.warn("exception occurred while receiver consume message.", th);
                            }
                        }
                    }
                } catch (Throwable th2) {
                    if (!this.running) {
                        break;
                    }
                    safeClose(new boolean[0]);
                    if (QueueListenerContainer.this.logger.isDebugEnabled()) {
                        QueueListenerContainer.this.logger.error("exception occurred while get message from queue [" + this.receiver.getQueue() + "]", th2);
                        QueueListenerContainer.this.logger.debug("try recovery after {}ms", Long.valueOf(QueueListenerContainer.this.getRecoveryInterval()));
                    }
                    sleep_(QueueListenerContainer.this.getRecoveryInterval());
                }
            }
            if (QueueListenerContainer.this.logger.isDebugEnabled()) {
                QueueListenerContainer.this.logger.debug("stop monitor:" + this);
            }
            safeClose(new boolean[0]);
        }

        /* JADX WARN: Type inference failed for: r2v1, types: [byte[], byte[][]] */
        T fetchMessage(RedisConnection redisConnection, String str) {
            List bLPop = redisConnection.bLPop(0, (byte[][]) new byte[]{QueueListenerContainer.this.stringRedisSerializer.serialize(str)});
            if (bLPop == null || bLPop.isEmpty()) {
                return null;
            }
            return (T) this.receiver.getRedisSerializer().deserialize((byte[]) bLPop.get(1));
        }

        void safeClose(boolean... zArr) {
            if (this.connection != null) {
                try {
                    if (zArr.length > 0 && zArr[0]) {
                        ((Jedis) this.connection.getNativeConnection()).disconnect();
                    }
                    this.connection.close();
                } catch (Exception e) {
                }
            }
            this.connection = null;
        }

        void sleep_(long j) {
            try {
                Thread.sleep(j);
            } catch (Exception e) {
            }
        }

        public boolean isLongLived() {
            return true;
        }
    }

    /* loaded from: input_file:io/choerodon/message/impl/redis/QueueListenerContainer$SimpleQueueListener.class */
    private static class SimpleQueueListener implements IQueueMessageListener {
        private String queue;
        private Object target;
        private Method method;
        private RedisSerializer redisSerializer;
        private Logger logger;

        SimpleQueueListener(String str, Object obj, Method method) {
            this.queue = str;
            this.target = obj;
            this.method = method;
            this.redisSerializer = MethodReflectUtils.getProperRedisSerializer(method.getParameterTypes()[0]);
            this.logger = LoggerFactory.getLogger(obj.getClass());
        }

        @Override // io.choerodon.message.IQueueMessageListener
        public String getQueue() {
            return this.queue;
        }

        @Override // io.choerodon.message.IQueueMessageListener
        public RedisSerializer getRedisSerializer() {
            return this.redisSerializer;
        }

        @Override // io.choerodon.message.IQueueMessageListener
        public void onQueueMessage(Object obj, String str) {
            Throwable th;
            try {
                this.method.invoke(this.target, obj, str);
            } catch (Exception e) {
                Throwable th2 = e;
                while (true) {
                    th = th2;
                    if (th.getCause() == null) {
                        break;
                    } else {
                        th2 = th.getCause();
                    }
                }
                if (this.logger.isErrorEnabled()) {
                    this.logger.error(th.getMessage(), th);
                }
            }
        }
    }

    public RedisConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    public void setConnectionFactory(RedisConnectionFactory redisConnectionFactory) {
        this.connectionFactory = redisConnectionFactory;
    }

    public long getRecoveryInterval() {
        return this.recoveryInterval;
    }

    public void setRecoveryInterval(long j) {
        this.recoveryInterval = j;
        if (j < MIN_RECOVERY_INTERVAL) {
            if (this.logger.isWarnEnabled()) {
                this.logger.warn("minimum for recoveryInterval is {}", Long.valueOf(MIN_RECOVERY_INTERVAL));
            }
            this.recoveryInterval = MIN_RECOVERY_INTERVAL;
        }
    }

    public List<IQueueMessageListener<?>> getListeners() {
        return this.listeners;
    }

    public void setListeners(List<IQueueMessageListener<?>> list) {
        this.listeners = list;
    }

    public RedisSerializer<String> getStringRedisSerializer() {
        return this.stringRedisSerializer;
    }

    @Autowired
    public void setStringRedisSerializer(RedisSerializer<String> redisSerializer) {
        this.stringRedisSerializer = redisSerializer;
    }

    public void destroy() throws Exception {
        stop();
    }

    public boolean isAutoStartup() {
        return true;
    }

    public void stop(Runnable runnable) {
        stop();
        runnable.run();
    }

    public void start() {
        if (this.running) {
            return;
        }
        this.running = true;
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("startup success");
        }
    }

    public void stop() {
        if (isRunning()) {
            this.running = false;
            this.monitorTaskList.forEach((v0) -> {
                v0.stop();
            });
            this.executorService.shutdownNow();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("shutdown complete");
            }
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    public int getPhase() {
        return PHASE;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void afterPropertiesSet() throws Exception {
        if (this.listeners == null) {
            this.listeners = new ArrayList();
        }
        this.applicationContext.getBeansWithAnnotation(QueueMonitor.class).forEach((str, obj) -> {
            Class targetClass = AopUtils.getTargetClass(obj);
            QueueMonitor queueMonitor = (QueueMonitor) targetClass.getAnnotation(QueueMonitor.class);
            String queue = queueMonitor.queue();
            String queueMethodName = MethodReflectUtils.getQueueMethodName(queueMonitor.method(), obj);
            List<Method> findMethod = MethodReflectUtils.findMethod(targetClass, new MethodReflectUtils.FindDesc(queueMethodName, 2));
            if (!findMethod.isEmpty()) {
                this.listeners.add(new SimpleQueueListener(queue, obj, findMethod.get(0)));
            } else if (this.logger.isErrorEnabled()) {
                this.logger.error("can not find proper method of name '{}' for bean {}", queueMethodName, obj);
            }
        });
        this.executorService = Executors.newFixedThreadPool(this.listeners.size());
        Iterator<IQueueMessageListener<?>> it = this.listeners.iterator();
        while (it.hasNext()) {
            SchedulingAwareRunnable monitorTask = new MonitorTask(it.next());
            this.monitorTaskList.add(monitorTask);
            this.executorService.execute(monitorTask);
        }
    }
}
