package com.xiaomi.mone.log.manager.service.consumer;

import com.google.gson.Gson;
import com.xiaomi.mone.app.model.vo.HeraEnvIpVo;
import com.xiaomi.mone.log.manager.service.impl.LogTailServiceImpl;
import com.xiaomi.youpin.docean.anno.Service;
import com.xiaomi.youpin.docean.plugin.config.anno.Value;
import javax.annotation.Resource;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
/* loaded from: input_file:com/xiaomi/mone/log/manager/service/consumer/MioneRocketMqConsumer.class */
public class MioneRocketMqConsumer extends RocketMqConsumer {
    private static final Logger log = LoggerFactory.getLogger(MioneRocketMqConsumer.class);

    @Value("$rocketmq_consumer_topic")
    private String consumeTopic;

    @Resource
    private DefaultMQPushConsumer consumer;

    @Resource
    private LogTailServiceImpl logTailService;

    public void init() {
        log.info("consumer mq service init");
        try {
            this.consumer.subscribe(this.consumeTopic, "");
        } catch (MQClientException e) {
            log.error("Subscription IP address changed Mq consumption abnormality", e);
        }
        this.consumer.registerMessageListener((list, consumeOrderlyContext) -> {
            list.forEach(this::ipChangeConsumeMessage);
            return ConsumeOrderlyStatus.SUCCESS;
        });
        try {
            this.consumer.start();
        } catch (Exception e2) {
            log.error("The Rocket Mq client starts unexpectedly when the subscription creates a project", e2);
        }
    }

    private void ipChangeConsumeMessage(MessageExt messageExt) {
        try {
            HeraEnvIpVo heraEnvIpVo = (HeraEnvIpVo) new Gson().fromJson(new String(messageExt.getBody()), HeraEnvIpVo.class);
            log.info("【Dynamic expansion】The message data consumed by Rocket Mq is converted into objects: {}", heraEnvIpVo.toString());
            this.logTailService.machineIpChange(heraEnvIpVo);
            log.info("【Dynamic Expansion】The news consumption of Rocket Mq consumption ends");
        } catch (Throwable th) {
            log.error("【Dynamic Scaling】Rocket Mq consumption of messages is abnormal:" + th.getMessage(), th);
        }
    }
}
