/*
 * Decompiled with CFR 0.152.
 */
package org.enodeframework.rocketmq.message;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.enodeframework.common.io.Task;
import org.enodeframework.queue.MessageHandler;
import org.enodeframework.queue.QueueMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocketMQMessageListener
implements MessageListenerConcurrently {
    private static final Logger logger = LoggerFactory.getLogger(RocketMQMessageListener.class);
    private final Map<Character, MessageHandler> messageHandlerMap;

    public RocketMQMessageListener(Map<Character, MessageHandler> messageHandlerMap) {
        this.messageHandlerMap = messageHandlerMap;
    }

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        CountDownLatch latch = new CountDownLatch(msgs.size());
        msgs.forEach(msg -> {
            QueueMessage queueMessage = this.covertToQueueMessage((MessageExt)msg);
            MessageHandler messageHandler = this.messageHandlerMap.get(queueMessage.getType());
            if (messageHandler == null) {
                logger.error("No messageHandler for message: {}.", (Object)queueMessage);
                latch.countDown();
                return;
            }
            messageHandler.handle(queueMessage, message -> latch.countDown());
        });
        Task.await((CountDownLatch)latch);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    private QueueMessage covertToQueueMessage(MessageExt messageExt) {
        QueueMessage queueMessage = new QueueMessage();
        String value = new String(messageExt.getBody(), StandardCharsets.UTF_8);
        int length = value.length();
        queueMessage.setBody(value.substring(0, length - 2));
        queueMessage.setType(Character.valueOf(value.charAt(length - 1)));
        queueMessage.setTopic(messageExt.getTopic());
        queueMessage.setTag(messageExt.getTags());
        queueMessage.setKey(messageExt.getKeys());
        return queueMessage;
    }
}

