package org.enode.pulsar.message;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
import org.enodeframework.common.exception.IORuntimeException;
import org.enodeframework.queue.MessageHandler;
import org.enodeframework.queue.QueueMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/enode/pulsar/message/PulsarMessageListener.class */
public class PulsarMessageListener implements MessageListener<byte[]> {
    private static final Logger logger = LoggerFactory.getLogger(PulsarMessageListener.class);
    private final Map<Character, MessageHandler> messageHandlerMap;

    public PulsarMessageListener(Map<Character, MessageHandler> map) {
        this.messageHandlerMap = map;
    }

    public void received(Consumer<byte[]> consumer, Message<byte[]> message) {
        QueueMessage queueMessage = toQueueMessage(message);
        MessageHandler messageHandler = this.messageHandlerMap.get(queueMessage.getType());
        if (messageHandler == null) {
            logger.error("No messageHandler for message: {}.", queueMessage);
        } else {
            messageHandler.handle(queueMessage, queueMessage2 -> {
                try {
                    consumer.acknowledge(message);
                } catch (PulsarClientException e) {
                    logger.error("Acknowledge message fail: {}.", queueMessage, e);
                    throw new IORuntimeException(e);
                }
            });
        }
    }

    private QueueMessage toQueueMessage(Message<byte[]> message) {
        QueueMessage queueMessage = new QueueMessage();
        String str = new String((byte[]) message.getValue(), StandardCharsets.UTF_8);
        int length = str.length();
        queueMessage.setBody(str.substring(0, length - 2));
        queueMessage.setType(Character.valueOf(str.charAt(length - 1)));
        queueMessage.setTopic(message.getTopicName());
        queueMessage.setRouteKey(message.getKey());
        queueMessage.setKey(new String(message.getOrderingKey(), StandardCharsets.UTF_8));
        return queueMessage;
    }
}
