package org.enodeframework.ons.message;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.enodeframework.common.io.Task;
import org.enodeframework.queue.MessageHandler;
import org.enodeframework.queue.MessageHandlerHolder;
import org.enodeframework.queue.QueueMessage;

/* loaded from: input_file:org/enodeframework/ons/message/OnsMessageListener.class */
public class OnsMessageListener implements MessageListener, BatchMessageListener {
    private final MessageHandlerHolder messageHandlerHolder;

    public OnsMessageListener(MessageHandlerHolder messageHandlerHolder) {
        this.messageHandlerHolder = messageHandlerHolder;
    }

    public Action consume(Message message, ConsumeContext consumeContext) {
        QueueMessage covertToQueueMessage = covertToQueueMessage(message);
        MessageHandler chooseMessageHandler = this.messageHandlerHolder.chooseMessageHandler(covertToQueueMessage.getType());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        chooseMessageHandler.handle(covertToQueueMessage, queueMessage -> {
            countDownLatch.countDown();
        });
        Task.await(countDownLatch);
        return Action.CommitMessage;
    }

    public Action consume(List<Message> list, ConsumeContext consumeContext) {
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        list.forEach(message -> {
            QueueMessage covertToQueueMessage = covertToQueueMessage(message);
            this.messageHandlerHolder.chooseMessageHandler(covertToQueueMessage.getType()).handle(covertToQueueMessage, queueMessage -> {
                countDownLatch.countDown();
            });
        });
        Task.await(countDownLatch);
        return Action.CommitMessage;
    }

    private QueueMessage covertToQueueMessage(Message message) {
        QueueMessage queueMessage = new QueueMessage();
        String userProperties = message.getUserProperties("ETYPE");
        queueMessage.setBody(message.getBody());
        queueMessage.setType(userProperties);
        queueMessage.setTopic(message.getTopic());
        queueMessage.setTag(message.getTag());
        queueMessage.setRouteKey(message.getShardingKey());
        queueMessage.setKey(message.getKey());
        return queueMessage;
    }
}
