package io.micrc.core.message;

import com.schibsted.spt.data.jslt.Parser;
import io.micrc.core.annotations.message.MessageAdapter;
import io.micrc.core.rpc.Result;
import io.micrc.lib.JsonUtil;
import java.time.Duration;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.camel.EndpointInject;
import org.apache.camel.ProducerTemplate;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.env.Environment;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.util.StringUtils;

@Aspect
@Configuration
/* loaded from: input_file:io/micrc/core/message/MessageConsumeRouterExecution.class */
public class MessageConsumeRouterExecution implements Ordered {
    private static final Logger log = LoggerFactory.getLogger(MessageConsumeRouterExecution.class);

    @Autowired
    private Environment environment;
    private final ConcurrentHashMap<Long, Integer> map = new ConcurrentHashMap<>();

    @EndpointInject
    private ProducerTemplate template;

    @Autowired
    private PlatformTransactionManager platformTransactionManager;

    @Autowired
    private TransactionDefinition transactionDefinition;

    @Pointcut("@annotation(io.micrc.core.annotations.message.MessageExecution)")
    public void annotationPointCut() {
    }

    @Around("annotationPointCut()")
    public Object around(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        ConsumerRecord<?, ?> consumerRecord = null;
        Acknowledgment acknowledgment = null;
        for (Object obj : proceedingJoinPoint.getArgs()) {
            if (obj instanceof ConsumerRecord) {
                consumerRecord = (ConsumerRecord) obj;
            }
            if (obj instanceof Acknowledgment) {
                acknowledgment = (Acknowledgment) obj;
            }
        }
        if (null == consumerRecord || null == acknowledgment) {
            throw new IllegalArgumentException("sys args error");
        }
        Class<?>[] interfaces = proceedingJoinPoint.getTarget().getClass().getInterfaces();
        if (interfaces.length != 1) {
            throw new IllegalStateException("businesses service implementation class must only implement it's interface. ");
        }
        Class<?> cls = interfaces[0];
        String simpleName = cls.getSimpleName();
        MessageAdapter annotation = cls.getAnnotation(MessageAdapter.class);
        String[] split = annotation.commandServicePath().split("\\.");
        String str = split[split.length - 1];
        String eventName = annotation.eventName();
        boolean custom = annotation.custom();
        HashMap<String, String> hashMap = new HashMap<>();
        hashMap.put("serviceName", str);
        transMessageHeaders(consumerRecord, hashMap);
        String str2 = hashMap.get("groupId");
        if (null != str2 && !"".equals(str2) && !str2.equals(this.environment.getProperty("spring.application.name"))) {
            acknowledgment.acknowledge();
            log.info("接收成功（无关死信）: " + hashMap.get("messageId"));
            return null;
        }
        HashMap hashMap2 = (HashMap) JsonUtil.writeObjectAsObject(((HashMap) JsonUtil.writeValueAsObject(hashMap.get("mappingMap"), HashMap.class)).get(str), HashMap.class);
        String str3 = hashMap2 == null ? null : (String) hashMap2.get("mappingPath");
        if (null == str3 || null == eventName || !eventName.equals(hashMap.get("event"))) {
            acknowledgment.acknowledge();
            log.info("接收成功（无需消费）: " + hashMap.get("messageId"));
            return null;
        }
        hashMap.put("content", JsonUtil.writeValueAsStringRetainNull(Parser.compileString(str3).apply(JsonUtil.readTree(consumerRecord.value()))));
        TransactionStatus transaction = this.platformTransactionManager.getTransaction(this.transactionDefinition);
        try {
            if (((Boolean) this.template.requestBody("subscribe://idempotent-check", hashMap, Boolean.class)).booleanValue()) {
                this.platformTransactionManager.commit(transaction);
                acknowledgment.acknowledge();
                log.warn("接收失败（重复消费）: " + hashMap.get("messageId"));
                return null;
            }
            if (custom) {
                Object proceed = proceedingJoinPoint.proceed(proceedingJoinPoint.getArgs());
                this.platformTransactionManager.commit(transaction);
                acknowledgment.acknowledge();
                log.info("接收成功: " + hashMap.get("messageId"));
                return proceed;
            }
            Object requestBody = this.template.requestBody("message://" + simpleName, hashMap.get("content"));
            Result result = new Result();
            if (requestBody instanceof String) {
                result = (Result) JsonUtil.writeValueAsObjectRetainNull((String) requestBody, Result.class);
            } else if (requestBody instanceof Result) {
                result = (Result) requestBody;
            }
            if (StringUtils.hasText(result.getCode()) && !"200".equals(result.getCode())) {
                this.platformTransactionManager.rollback(transaction);
                log.error("接收失败: " + hashMap.get("messageId"));
                throw new IllegalStateException("sys execute error");
            }
            this.platformTransactionManager.commit(transaction);
            acknowledgment.acknowledge();
            log.info("接收成功: " + hashMap.get("messageId"));
            return null;
        } catch (IllegalStateException e) {
            this.platformTransactionManager.rollback(transaction);
            acknowledgment.nack(Duration.ofMillis(5000L));
            log.warn("接收失败（等待启动）: " + hashMap.get("messageId"));
            return null;
        }
    }

    private void transMessageHeaders(ConsumerRecord<?, ?> consumerRecord, HashMap<String, String> hashMap) {
        for (Header header : consumerRecord.headers()) {
            hashMap.put(header.key(), new String(header.value()));
        }
    }

    public int getOrder() {
        return Integer.MAX_VALUE;
    }
}
