package com.zrt.mq.handler;

import com.zrt.convention.exception.ServiceException;
import com.zrt.mq.dto.BaseMessage;
import com.zrt.mq.template.MQEnhanceTemplate;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/zrt/mq/handler/EnhanceMessageHandler.class */
public abstract class EnhanceMessageHandler<T extends BaseMessage> {
    private static final Logger log = LoggerFactory.getLogger(EnhanceMessageHandler.class);
    private static final int MAX_RETRY_TIMES = 3;
    private static final String RETRY_PREFIX = "retry_";
    private static final int DELAY_LEVEL = 5;
    private static final long TIMEOUT = 3000;
    private MQEnhanceTemplate mqEnhanceTemplate;

    protected abstract void handleMessage(T t);

    protected abstract void handleMaxRetryExceeded(T t);

    protected abstract boolean isRetry();

    protected abstract boolean throwException();

    protected int getMaxRetryTimes() {
        return MAX_RETRY_TIMES;
    }

    protected int getDelayLevel() {
        return DELAY_LEVEL;
    }

    protected boolean filter(T t) {
        return false;
    }

    public void dispatchMessage(T t) {
        log.info("消费者接收消息: {}", t);
        if (filter(t)) {
            log.info("消息id: {} 不满足消费条件，已过滤", t.getKey());
            return;
        }
        if (t.getRetryTimes().intValue() > getMaxRetryTimes()) {
            handleMaxRetryExceeded(t);
            return;
        }
        try {
            long currentTimeMillis = System.currentTimeMillis();
            handleMessage(t);
            log.info("消息{}消费成功，耗时[{}ms]", t.getKey(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        } catch (Exception e) {
            log.error("消息 {} 消费异常", t.getKey(), e);
            if (throwException()) {
                throw new ServiceException("消息消费失败" + e);
            }
            if (isRetry()) {
                handleRetry(t);
            }
        }
    }

    private void handleRetry(T t) {
        RocketMQMessageListener annotation = getClass().getAnnotation(RocketMQMessageListener.class);
        if (annotation == null) {
            return;
        }
        t.setSource((String) Optional.ofNullable(t.getSource()).filter(str -> {
            return StringUtils.isNotBlank(str) && !str.startsWith(RETRY_PREFIX);
        }).map(str2 -> {
            return RETRY_PREFIX + str2;
        }).orElse(t.getSource()));
        t.setRetryTimes(Integer.valueOf(t.getRetryTimes().intValue() + 1));
        try {
            SendResult send = this.mqEnhanceTemplate.send(annotation.topic(), annotation.selectorExpression(), t, Long.valueOf(TIMEOUT), Integer.valueOf(getDelayLevel()));
            if (send != null && send.getSendStatus() != SendStatus.SEND_OK) {
                throw new ServiceException("重试 " + t.getRetryTimes() + ", 消息消费失败");
            }
        } catch (Exception e) {
            log.error("消息 {} 消费异常", t.getKey(), e);
            throw new ServiceException("重试 " + t.getRetryTimes() + ", 消息消费失败" + e);
        }
    }
}
