package sviolet.slate.common.helper.rocketmq.consumer.manager;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import sviolet.slate.common.helper.rocketmq.consumer.ConsumeWay;
import sviolet.slate.common.helper.rocketmq.consumer.RocketMqConcurrentConsumer;
import sviolet.slate.common.helper.rocketmq.consumer.RocketMqCustomConsumer;
import sviolet.slate.common.helper.rocketmq.consumer.RocketMqHelperException;
import sviolet.slate.common.helper.rocketmq.consumer.RocketMqOrderedConsumer;

/* loaded from: input_file:sviolet/slate/common/helper/rocketmq/consumer/manager/RmqConsumerManagerImpl.class */
public class RmqConsumerManagerImpl implements RmqConsumerManager, ApplicationContextAware {
    private static final Logger logger = LoggerFactory.getLogger(RmqConsumerManagerImpl.class);

    @Value("${slate.common.rocketmq.namesrv:}")
    private String nameServerFromProperties;

    @Value("${slate.common.rocketmq.print-message-when-exception:true}")
    private boolean printMessageWhenException;

    @Value("${slate.common.rocketmq.print-message-when-reconsume:true}")
    private boolean printMessageWhenReconsume;

    @Value("${slate.common.rocketmq.default-charset:UTF-8}")
    private String defaultCharset;

    @Autowired
    private RmqConsumerMethodInvokerFactory invokerFactory;
    private ApplicationContext applicationContext;
    private final ConcurrentHashMap<String, MQPushConsumer> consumers = new ConcurrentHashMap<>();
    private final Map<String, Object> enableAnnotationAttributes;

    /* loaded from: input_file:sviolet/slate/common/helper/rocketmq/consumer/manager/RmqConsumerManagerImpl$ConcurrentListener.class */
    private class ConcurrentListener implements MessageListenerConcurrently {
        private RmqConsumerMethodInvoker invoker;
        private boolean reconsumeWhenException;

        private ConcurrentListener(RmqConsumerMethodInvoker rmqConsumerMethodInvoker, boolean z) {
            this.invoker = rmqConsumerMethodInvoker;
            this.reconsumeWhenException = z;
        }

        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            if (RmqConsumerManagerImpl.logger.isDebugEnabled()) {
                RmqConsumerManagerImpl.logger.debug("Consume Concurrent Messages: " + list);
            }
            consumeConcurrentlyContext.setAckIndex(-1);
            Iterator<MessageExt> it = list.iterator();
            while (it.hasNext()) {
                if (!RmqConsumerManagerImpl.this.consumeOneMessage(this.invoker, it.next(), this.reconsumeWhenException)) {
                    break;
                }
                consumeConcurrentlyContext.setAckIndex(consumeConcurrentlyContext.getAckIndex() + 1);
            }
            if (RmqConsumerManagerImpl.logger.isInfoEnabled() && RmqConsumerManagerImpl.this.printMessageWhenReconsume) {
                for (int ackIndex = consumeConcurrentlyContext.getAckIndex() + 1; ackIndex < list.size(); ackIndex++) {
                    RmqConsumerManagerImpl.logger.info("RECONSUME_LATER: " + list.get(ackIndex));
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }

    /* loaded from: input_file:sviolet/slate/common/helper/rocketmq/consumer/manager/RmqConsumerManagerImpl$OrderedListener.class */
    private class OrderedListener implements MessageListenerOrderly {
        private RmqConsumerMethodInvoker invoker;
        private boolean reconsumeWhenException;

        private OrderedListener(RmqConsumerMethodInvoker rmqConsumerMethodInvoker, boolean z) {
            this.invoker = rmqConsumerMethodInvoker;
            this.reconsumeWhenException = z;
        }

        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
            if (RmqConsumerManagerImpl.logger.isDebugEnabled()) {
                RmqConsumerManagerImpl.logger.debug("Consume Ordered Messages: " + list);
            }
            Iterator<MessageExt> it = list.iterator();
            while (it.hasNext()) {
                if (!RmqConsumerManagerImpl.this.consumeOneMessage(this.invoker, it.next(), this.reconsumeWhenException)) {
                    if (RmqConsumerManagerImpl.logger.isInfoEnabled() && RmqConsumerManagerImpl.this.printMessageWhenReconsume) {
                        RmqConsumerManagerImpl.logger.info("SUSPEND_CURRENT_QUEUE_A_MOMENT: " + list);
                    }
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    }

    public RmqConsumerManagerImpl(Map<String, Object> map) {
        this.enableAnnotationAttributes = map == null ? new HashMap() : map;
    }

    @Override // sviolet.slate.common.helper.rocketmq.consumer.manager.RmqConsumerManager
    public void registerMethod(Object obj, String str, Method method, RocketMqCustomConsumer rocketMqCustomConsumer) throws Exception {
        if (logger.isInfoEnabled()) {
            logger.info("Register RocketMQ custom consumer to " + obj.getClass().getName() + "#" + method.getName() + ", annotation:" + rocketMqCustomConsumer);
        }
        RmqConsumerMethodInvoker createInvoker = createInvoker(obj, method, getCharset(rocketMqCustomConsumer.charset()));
        DefaultMQPushConsumer defaultMQPushConsumer = (DefaultMQPushConsumer) this.applicationContext.getBean(rocketMqCustomConsumer.consumerBeanName(), DefaultMQPushConsumer.class);
        if (defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getServiceState() != ServiceState.CREATE_JUST) {
            throw new RocketMqHelperException("DO NOT start the DefaultMQPushConsumer manually if you want to register it on the method annotated by @RocketMqCustomConsumer");
        }
        if (rocketMqCustomConsumer.isOrdered()) {
            defaultMQPushConsumer.registerMessageListener(new OrderedListener(createInvoker, rocketMqCustomConsumer.reconsumeWhenException()));
        } else {
            defaultMQPushConsumer.registerMessageListener(new ConcurrentListener(createInvoker, rocketMqCustomConsumer.reconsumeWhenException()));
        }
        defaultMQPushConsumer.start();
        cacheConsumer(defaultMQPushConsumer.getConsumerGroup(), defaultMQPushConsumer);
        if (logger.isInfoEnabled()) {
            logger.info("New RocketMQ Consumer: " + defaultMQPushConsumer);
        }
    }

    @Override // sviolet.slate.common.helper.rocketmq.consumer.manager.RmqConsumerManager
    public void registerMethod(Object obj, String str, Method method, RocketMqConcurrentConsumer rocketMqConcurrentConsumer) throws Exception {
        if (logger.isInfoEnabled()) {
            logger.info("Register RocketMQ concurrent consumer to " + obj.getClass().getName() + "#" + method.getName() + ", annotation:" + rocketMqConcurrentConsumer);
        }
        RmqConsumerMethodInvoker createInvoker = createInvoker(obj, method, getCharset(rocketMqConcurrentConsumer.charset()));
        String consumerGroup = getConsumerGroup(obj, method, rocketMqConcurrentConsumer.consumerGroup());
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup);
        defaultMQPushConsumer.setNamesrvAddr(getNameServer(rocketMqConcurrentConsumer.nameServer()));
        if ("".equals(rocketMqConcurrentConsumer.sqlExpression())) {
            defaultMQPushConsumer.subscribe(rocketMqConcurrentConsumer.topic(), rocketMqConcurrentConsumer.subExpression());
        } else {
            defaultMQPushConsumer.subscribe(rocketMqConcurrentConsumer.topic(), MessageSelector.bySql(rocketMqConcurrentConsumer.sqlExpression()));
        }
        if (rocketMqConcurrentConsumer.isBroadcast()) {
            defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
        }
        defaultMQPushConsumer.setConsumeFromWhere(parseConsumeFromWhere(rocketMqConcurrentConsumer.consumeFromWhere()));
        defaultMQPushConsumer.setConsumeThreadMin(rocketMqConcurrentConsumer.threadMin());
        defaultMQPushConsumer.setConsumeThreadMax(rocketMqConcurrentConsumer.threadMax());
        defaultMQPushConsumer.registerMessageListener(new ConcurrentListener(createInvoker, rocketMqConcurrentConsumer.reconsumeWhenException()));
        defaultMQPushConsumer.start();
        cacheConsumer(consumerGroup, defaultMQPushConsumer);
        if (logger.isInfoEnabled()) {
            logger.info("New RocketMQ Consumer: " + defaultMQPushConsumer);
        }
    }

    @Override // sviolet.slate.common.helper.rocketmq.consumer.manager.RmqConsumerManager
    public void registerMethod(Object obj, String str, Method method, RocketMqOrderedConsumer rocketMqOrderedConsumer) throws Exception {
        if (logger.isInfoEnabled()) {
            logger.info("Register RocketMQ ordered consumer to " + obj.getClass().getName() + "#" + method.getName() + ", annotation:" + rocketMqOrderedConsumer);
        }
        RmqConsumerMethodInvoker createInvoker = createInvoker(obj, method, getCharset(rocketMqOrderedConsumer.charset()));
        String consumerGroup = getConsumerGroup(obj, method, rocketMqOrderedConsumer.consumerGroup());
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup);
        defaultMQPushConsumer.setNamesrvAddr(getNameServer(rocketMqOrderedConsumer.nameServer()));
        defaultMQPushConsumer.subscribe(rocketMqOrderedConsumer.topic(), rocketMqOrderedConsumer.subExpression());
        defaultMQPushConsumer.setConsumeFromWhere(parseConsumeFromWhere(rocketMqOrderedConsumer.consumeFromWhere()));
        defaultMQPushConsumer.registerMessageListener(new OrderedListener(createInvoker, rocketMqOrderedConsumer.reconsumeWhenException()));
        defaultMQPushConsumer.start();
        cacheConsumer(consumerGroup, defaultMQPushConsumer);
        if (logger.isInfoEnabled()) {
            logger.info("New RocketMQ Consumer: " + defaultMQPushConsumer);
        }
    }

    public void destroy() throws Exception {
        Iterator<Map.Entry<String, MQPushConsumer>> it = this.consumers.entrySet().iterator();
        while (it.hasNext()) {
            try {
                it.next().getValue().shutdown();
            } catch (Exception e) {
            }
        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    protected boolean consumeOneMessage(RmqConsumerMethodInvoker rmqConsumerMethodInvoker, MessageExt messageExt, boolean z) {
        try {
            return !Boolean.FALSE.equals(rmqConsumerMethodInvoker.invoke(messageExt));
        } catch (Throwable th) {
            if (this.printMessageWhenException) {
                logger.error("Uncaught exception while consuming message:" + messageExt, th);
            } else {
                logger.error("Uncaught exception while consuming message", th);
            }
            return !z;
        }
    }

    private void cacheConsumer(String str, DefaultMQPushConsumer defaultMQPushConsumer) throws Exception {
        if (this.consumers.putIfAbsent(str, defaultMQPushConsumer) != null) {
            throw new RocketMqHelperException("The consumer group[" + str + "] has been created before, specify another name please.\nSee http://rocketmq.apache.org/docs/faq/ for further details.");
        }
    }

    protected String getCharset(String str) {
        return "".equals(str) ? this.defaultCharset : str;
    }

    protected String getConsumerGroup(Object obj, Method method, String str) {
        return str;
    }

    protected RmqConsumerMethodInvoker createInvoker(Object obj, Method method, String str) throws Exception {
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (parameterTypes.length != 1) {
            throw new RocketMqHelperException("The method " + obj.getClass().getName() + "#" + method.getName() + " can only have one parameter, because it's a RocketMQ consumer method");
        }
        RmqConsumerMethodInvoker newInvoker = this.invokerFactory.newInvoker(parameterTypes[0], obj, method, str);
        if (newInvoker == null) {
            throw new RocketMqHelperException("Unable to find an invoker suitable for the RocketMQ consumer method " + obj.getClass().getName() + "#" + method.getName() + " (Invoker for parameter type " + parameterTypes[0].getName() + ")");
        }
        return newInvoker;
    }

    protected String getNameServer(String str) {
        if (!"".equals(str)) {
            return str;
        }
        if (!"".equals(this.nameServerFromProperties)) {
            return this.nameServerFromProperties;
        }
        Object obj = this.enableAnnotationAttributes.get("defaultNameServer");
        return (!(obj instanceof String) || "".equals(obj)) ? "localhost:9876" : (String) obj;
    }

    protected ConsumeFromWhere parseConsumeFromWhere(ConsumeWay consumeWay) throws Exception {
        switch (consumeWay) {
            case CONSUME_FROM_LAST_OFFSET:
                return ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
            case CONSUME_FROM_FIRST_OFFSET:
                return ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET;
            case CONSUME_FROM_TIMESTAMP:
                return ConsumeFromWhere.CONSUME_FROM_TIMESTAMP;
            default:
                throw new RocketMqHelperException("Illegal ConsumeWay " + consumeWay);
        }
    }
}
