package io.gitee.jaychang.rocketmq.config;

import io.gitee.jaychang.rocketmq.annotation.MQConsumer;
import io.gitee.jaychang.rocketmq.base.AbstractMQPushConsumer;
import io.gitee.jaychang.rocketmq.base.MessageExtConst;
import io.gitee.jaychang.rocketmq.trace.common.OnsTraceConstants;
import io.gitee.jaychang.rocketmq.trace.dispatch.impl.AsyncTraceAppender;
import io.gitee.jaychang.rocketmq.trace.dispatch.impl.AsyncTraceDispatcher;
import io.gitee.jaychang.rocketmq.trace.tracehook.OnsConsumeMessageHookImpl;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

@Configuration
@ConditionalOnBean({MQBaseAutoConfiguration.class})
/* loaded from: input_file:io/gitee/jaychang/rocketmq/config/MQConsumerAutoConfiguration.class */
public class MQConsumerAutoConfiguration extends MQBaseAutoConfiguration {
    private static final Logger log = LoggerFactory.getLogger(MQConsumerAutoConfiguration.class);
    private AsyncTraceDispatcher asyncTraceDispatcher;
    private Map<String, String> validConsumerMap;

    @PostConstruct
    public void init() throws Exception {
        Map beansWithAnnotation = this.applicationContext.getBeansWithAnnotation(MQConsumer.class);
        if (!CollectionUtils.isEmpty(beansWithAnnotation) && this.mqProperties.getTraceEnabled().booleanValue()) {
            initAsyncAppender();
        }
        this.validConsumerMap = new HashMap();
        for (Map.Entry entry : beansWithAnnotation.entrySet()) {
            publishConsumer((String) entry.getKey(), entry.getValue());
        }
        this.validConsumerMap = null;
    }

    private AsyncTraceDispatcher initAsyncAppender() {
        if (this.asyncTraceDispatcher != null) {
            return this.asyncTraceDispatcher;
        }
        try {
            Properties properties = new Properties();
            properties.put(OnsTraceConstants.MaxMsgSize, "128000");
            properties.put(OnsTraceConstants.AsyncBufferSize, "2048");
            properties.put(OnsTraceConstants.MaxBatchNum, "1");
            properties.put(OnsTraceConstants.WakeUpNum, "1");
            properties.put(OnsTraceConstants.NAMESRV_ADDR, this.mqProperties.getNameServerAddress());
            properties.put(OnsTraceConstants.InstanceName, UUID.randomUUID().toString());
            AsyncTraceAppender asyncTraceAppender = new AsyncTraceAppender(properties);
            this.asyncTraceDispatcher = new AsyncTraceDispatcher(properties);
            this.asyncTraceDispatcher.start(asyncTraceAppender, "DEFAULT_WORKER_NAME");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        return this.asyncTraceDispatcher;
    }

    private void publishConsumer(String str, Object obj) throws Exception {
        MQConsumer mQConsumer = (MQConsumer) this.applicationContext.findAnnotationOnBean(str, MQConsumer.class);
        if (StringUtils.isEmpty(this.mqProperties.getNameServerAddress())) {
            throw new RuntimeException("name server address must be defined");
        }
        Assert.notNull(mQConsumer.consumerGroup(), "consumer's consumerGroup must be defined");
        Assert.notNull(mQConsumer.topic(), "consumer's topic must be defined");
        if (!AbstractMQPushConsumer.class.isAssignableFrom(obj.getClass())) {
            throw new RuntimeException(obj.getClass().getName() + " - consumer未实现Consumer抽象类");
        }
        ConfigurableEnvironment environment = this.applicationContext.getEnvironment();
        String str2 = environment.resolvePlaceholders(mQConsumer.consumerGroup()) + this.mqProperties.getSuffix();
        String str3 = environment.resolvePlaceholders(mQConsumer.topic()) + this.mqProperties.getSuffix();
        String str4 = "*";
        if (mQConsumer.tag().length == 1) {
            str4 = environment.resolvePlaceholders(mQConsumer.tag()[0]);
        } else if (mQConsumer.tag().length > 1) {
            str4 = StringUtils.join(mQConsumer.tag(), "||");
        }
        if (!StringUtils.isEmpty(this.validConsumerMap.get(str2))) {
            throw new RuntimeException("消费组重复订阅，请新增消费组用于新的topic和tag组合: " + str2 + "已经订阅了" + this.validConsumerMap.get(str2));
        }
        this.validConsumerMap.put(str2, str3 + "-" + str4);
        if (AbstractMQPushConsumer.class.isAssignableFrom(obj.getClass())) {
            DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(str2);
            defaultMQPushConsumer.setNamesrvAddr(this.mqProperties.getNameServerAddress());
            defaultMQPushConsumer.setMessageModel(MessageModel.valueOf(mQConsumer.messageMode()));
            defaultMQPushConsumer.subscribe(str3, str4);
            defaultMQPushConsumer.setInstanceName(UUID.randomUUID().toString());
            defaultMQPushConsumer.setVipChannelEnabled(this.mqProperties.getVipChannelEnabled().booleanValue());
            AbstractMQPushConsumer abstractMQPushConsumer = (AbstractMQPushConsumer) obj;
            if (MessageExtConst.CONSUME_MODE_CONCURRENTLY.equals(mQConsumer.consumeMode())) {
                defaultMQPushConsumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
                    return abstractMQPushConsumer.dealMessage((List<MessageExt>) list, consumeConcurrentlyContext);
                });
            } else {
                if (!MessageExtConst.CONSUME_MODE_ORDERLY.equals(mQConsumer.consumeMode())) {
                    throw new RuntimeException("unknown consume mode ! only support CONCURRENTLY and ORDERLY");
                }
                defaultMQPushConsumer.registerMessageListener((list2, consumeOrderlyContext) -> {
                    return abstractMQPushConsumer.dealMessage((List<MessageExt>) list2, consumeOrderlyContext);
                });
            }
            abstractMQPushConsumer.setConsumer(defaultMQPushConsumer);
            if (this.mqProperties.getTraceEnabled().booleanValue()) {
                try {
                    defaultMQPushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new OnsConsumeMessageHookImpl(this.asyncTraceDispatcher));
                } catch (Throwable th) {
                    log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
                }
            }
            defaultMQPushConsumer.start();
        }
        log.info(String.format("%s is ready to subscribe message", obj.getClass().getName()));
    }
}
