package artoria.message;

import artoria.common.AsyncCallback;
import artoria.identifier.IdentifierUtils;
import artoria.lifecycle.LifecycleException;
import artoria.logging.Logger;
import artoria.logging.LoggerFactory;
import artoria.thread.SimpleThreadFactory;
import artoria.util.Assert;
import artoria.util.ShutdownHookUtils;
import artoria.util.StringUtils;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:artoria/message/SimpleMessageProvider.class */
public class SimpleMessageProvider implements MessageProvider {
    private final Map<String, LinkedList<MessageListener>> listenerMap;
    private final Map<String, BlockingQueue<Message>> topicMap;
    private final Map<String, BlockingQueue<Message>> queueMap;
    private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
    private static Logger log = LoggerFactory.getLogger((Class<?>) SimpleMessageProvider.class);
    private static final String THREAD_NAME_PREFIX = "simple-message-provider-executor";
    private static final int DEFAULT_CORE_POOL_SIZE = 2;
    private static final int DEFAULT_PERIOD = 5000;

    public SimpleMessageProvider() {
        this(2, 5000L);
    }

    public SimpleMessageProvider(int i, long j) {
        this(new ScheduledThreadPoolExecutor(i, new SimpleThreadFactory(THREAD_NAME_PREFIX, Boolean.TRUE)), j);
    }

    public SimpleMessageProvider(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, long j) {
        this.listenerMap = new ConcurrentHashMap();
        this.topicMap = new ConcurrentHashMap();
        this.queueMap = new ConcurrentHashMap();
        Assert.notNull(scheduledThreadPoolExecutor, "Parameter \"threadPool\" must not null. ");
        Assert.isTrue(j > 0, "Parameter \"period\" must greater than 0. ");
        scheduledThreadPoolExecutor.scheduleAtFixedRate(new SimpleMessageDispatcher(this.listenerMap, this.topicMap, this.queueMap), j, j, TimeUnit.MILLISECONDS);
        ShutdownHookUtils.addExecutorService(scheduledThreadPoolExecutor);
        this.scheduledThreadPoolExecutor = scheduledThreadPoolExecutor;
    }

    private BlockingQueue<Message> messageQueue(String str) {
        Assert.notBlank(str, "Parameter \"destination\" must not blank. ");
        String upperCase = str.toUpperCase();
        BlockingQueue<Message> blockingQueue = this.topicMap.get(upperCase);
        BlockingQueue<Message> blockingQueue2 = blockingQueue;
        if (blockingQueue == null) {
            BlockingQueue<Message> blockingQueue3 = this.queueMap.get(upperCase);
            blockingQueue2 = blockingQueue3;
            if (blockingQueue3 == null) {
                throw new MessageException("Destination \"" + upperCase + "\" must exist. ");
            }
        }
        return blockingQueue2;
    }

    public void createQueue(String str) {
        Assert.notBlank(str, "Parameter \"destination\" must not blank. ");
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        this.queueMap.put(str.toUpperCase(), linkedBlockingQueue);
    }

    public void createTopic(String str) {
        Assert.notBlank(str, "Parameter \"destination\" must not blank. ");
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        this.topicMap.put(str.toUpperCase(), linkedBlockingQueue);
    }

    @Override // artoria.lifecycle.Initializable
    public void initialize() throws LifecycleException {
    }

    @Override // artoria.message.MessageProvider
    public void listening(String str, Map<String, Object> map, MessageListener messageListener) throws MessageException {
        Assert.notBlank(str, "Parameter \"destination\" must not blank. ");
        Assert.notNull(messageListener, "Parameter \"listener\" must not null. ");
        String upperCase = str.toUpperCase();
        LinkedList<MessageListener> linkedList = this.listenerMap.get(upperCase);
        if (linkedList == null) {
            linkedList = new LinkedList<>();
            this.listenerMap.put(upperCase, linkedList);
        }
        linkedList.add(messageListener);
    }

    @Override // artoria.message.MessageProvider
    public void removeListening(String str, Map<String, Object> map, MessageListener messageListener) throws MessageException {
        Assert.notBlank(str, "Parameter \"destination\" must not blank. ");
        Assert.notNull(messageListener, "Parameter \"listener\" must not null. ");
        LinkedList<MessageListener> linkedList = this.listenerMap.get(str.toUpperCase());
        if (linkedList == null) {
            return;
        }
        linkedList.remove(messageListener);
    }

    @Override // artoria.message.MessageProvider
    public void send(Message message) throws MessageException {
        Assert.notNull(message, "Parameter \"message\" must not null. ");
        String destination = message.getDestination();
        Assert.notBlank(destination, "Parameter \"destination\" must not blank. ");
        if (StringUtils.isBlank(message.getMessageId())) {
            message.setMessageId(IdentifierUtils.nextStringIdentifier());
        }
        messageQueue(destination.toUpperCase()).offer(message);
    }

    @Override // artoria.message.MessageProvider
    public void sendAsync(Message message, AsyncCallback<Object> asyncCallback) throws MessageException {
        Assert.notNull(message, "Parameter \"message\" must not null. ");
        Assert.notNull(asyncCallback, "Parameter \"callback\" must not null. ");
        Assert.notBlank(message.getDestination(), "Parameter \"destination\" must not blank. ");
        this.scheduledThreadPoolExecutor.execute(new SimpleAsyncSender(this, message, asyncCallback));
    }

    @Override // artoria.message.MessageProvider
    public Message receive(String str, Map<String, Object> map) throws MessageException {
        Assert.notBlank(str, "Parameter \"destination\" must not blank. ");
        return messageQueue(str.toUpperCase()).poll();
    }

    @Override // artoria.message.MessageProvider
    public Message receive(String str, Map<String, Object> map, long j) throws MessageException {
        Assert.notBlank(str, "Parameter \"destination\" must not blank. ");
        try {
            return messageQueue(str.toUpperCase()).take();
        } catch (InterruptedException e) {
            return null;
        }
    }

    @Override // artoria.lifecycle.Destroyable
    public void destroy() throws Exception {
        if (this.scheduledThreadPoolExecutor == null || this.scheduledThreadPoolExecutor.isShutdown()) {
            return;
        }
        this.scheduledThreadPoolExecutor.shutdown();
        ShutdownHookUtils.removeExecutorService(this.scheduledThreadPoolExecutor);
    }
}
