package org.enodeframework.rocketmq.message;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.enodeframework.common.io.Task;
import org.enodeframework.queue.MessageHandler;
import org.enodeframework.queue.QueueMessage;

/* loaded from: input_file:org/enodeframework/rocketmq/message/RocketMQMessageOrderListener.class */
public class RocketMQMessageOrderListener implements MessageListenerOrderly {
    private final MessageHandler messageHandler;

    public RocketMQMessageOrderListener(MessageHandler messageHandler) {
        this.messageHandler = messageHandler;
    }

    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        list.forEach(messageExt -> {
            this.messageHandler.handle(covertToQueueMessage(messageExt), queueMessage -> {
                countDownLatch.countDown();
            });
        });
        Task.await(countDownLatch);
        return ConsumeOrderlyStatus.SUCCESS;
    }

    private QueueMessage covertToQueueMessage(MessageExt messageExt) {
        QueueMessage queueMessage = new QueueMessage();
        queueMessage.setBody(new String(messageExt.getBody(), StandardCharsets.UTF_8));
        queueMessage.setTopic(messageExt.getTopic());
        queueMessage.setTag(messageExt.getTags());
        queueMessage.setKey(messageExt.getKeys());
        return queueMessage;
    }
}
