package org.enodeframework.rocketmq.message;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.enodeframework.common.io.Task;
import org.enodeframework.queue.MessageHandler;
import org.enodeframework.queue.QueueMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/enodeframework/rocketmq/message/RocketMQMessageListener.class */
public class RocketMQMessageListener implements MessageListenerConcurrently {
    private static final Logger logger = LoggerFactory.getLogger(RocketMQMessageListener.class);
    private final Map<Character, MessageHandler> messageHandlerMap;

    public RocketMQMessageListener(Map<Character, MessageHandler> map) {
        this.messageHandlerMap = map;
    }

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        list.forEach(messageExt -> {
            QueueMessage covertToQueueMessage = covertToQueueMessage(messageExt);
            MessageHandler messageHandler = this.messageHandlerMap.get(Character.valueOf(covertToQueueMessage.getType()));
            if (messageHandler != null) {
                messageHandler.handle(covertToQueueMessage, queueMessage -> {
                    countDownLatch.countDown();
                });
            } else {
                logger.error("No messageHandler for message: {}.", covertToQueueMessage);
                countDownLatch.countDown();
            }
        });
        Task.await(countDownLatch);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    private QueueMessage covertToQueueMessage(MessageExt messageExt) {
        QueueMessage queueMessage = new QueueMessage();
        String str = new String(messageExt.getBody(), StandardCharsets.UTF_8);
        int length = str.length();
        queueMessage.setBody(str.substring(0, length - 2));
        queueMessage.setType(str.charAt(length - 1));
        queueMessage.setTopic(messageExt.getTopic());
        queueMessage.setTag(messageExt.getTags());
        queueMessage.setKey(messageExt.getKeys());
        return queueMessage;
    }
}
