package arp.message.rocketmq;

import arp.process.publish.Message;
import arp.process.publish.ProcessMessageReceiver;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

/* loaded from: input_file:arp/message/rocketmq/RocketmgMessageReceiver.class */
public class RocketmgMessageReceiver implements ProcessMessageReceiver {
    private DefaultMQPushConsumer consumer;
    private ConcurrentLinkedQueue<Message> messageQueue;
    private String topic;
    private RocketmqMessageDeserializationStrategy deserializationStrategy;

    public RocketmgMessageReceiver(String str, String str2) throws MQClientException {
        this(str, str2, new FSTDeserializationStrategy());
    }

    public RocketmgMessageReceiver(String str, String str2, RocketmqMessageDeserializationStrategy rocketmqMessageDeserializationStrategy) throws MQClientException {
        this.topic = "arp_process_message";
        this.deserializationStrategy = rocketmqMessageDeserializationStrategy;
        this.messageQueue = new ConcurrentLinkedQueue<>();
        this.consumer = new DefaultMQPushConsumer(str);
        this.consumer.setNamesrvAddr(str2);
        this.consumer.subscribe(this.topic, "*");
        this.consumer.registerMessageListener(getMessageListener());
        this.consumer.start();
    }

    private MessageListenerConcurrently getMessageListener() {
        return new MessageListenerConcurrently() { // from class: arp.message.rocketmq.RocketmgMessageReceiver.1
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                if (list != null) {
                    Iterator<MessageExt> it = list.iterator();
                    while (it.hasNext()) {
                        try {
                            RocketmgMessageReceiver.this.messageQueue.offer(RocketmgMessageReceiver.this.deserializationStrategy.deserialize(it.next().getBody()));
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        };
    }

    public RocketmgMessageReceiver(String str, String str2, String str3, String str4, String str5, String str6, RocketmqMessageDeserializationStrategy rocketmqMessageDeserializationStrategy) throws MQClientException {
        this.topic = "arp_process_message";
        this.deserializationStrategy = rocketmqMessageDeserializationStrategy;
        this.topic = str6;
        this.messageQueue = new ConcurrentLinkedQueue<>();
        this.consumer = new DefaultMQPushConsumer(str2, str, new AclClientRPCHook(new SessionCredentials(str4, str5)));
        this.consumer.setNamesrvAddr(str3);
        this.consumer.subscribe(this.topic, "*");
        this.consumer.registerMessageListener(getMessageListener());
        this.consumer.start();
    }

    public RocketmgMessageReceiver(String str, String str2, String str3, String str4, String str5, String str6) throws MQClientException {
        this(str, str2, str3, str4, str5, str6, new FSTDeserializationStrategy());
    }

    public List<Message> receive() throws Exception {
        ArrayList arrayList = new ArrayList();
        int size = this.messageQueue.size();
        for (int i = 0; i < size; i++) {
            arrayList.add(this.messageQueue.poll());
        }
        return arrayList;
    }
}
