package org.minbox.framework.message.pipe.server.distribution;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.minbox.framework.message.pipe.core.exception.MessagePipeException;
import org.minbox.framework.message.pipe.server.MessagePipe;
import org.minbox.framework.message.pipe.server.config.ServerConfiguration;
import org.minbox.framework.message.pipe.server.service.discovery.ServiceDiscovery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.ObjectUtils;

/* loaded from: input_file:org/minbox/framework/message/pipe/server/distribution/MessageDistributionExecutors.class */
public class MessageDistributionExecutors implements InitializingBean {
    public static final String BEAN_NAME = "messageDistributionExecutors";
    private static ExecutorService executorThreadPool;
    private ServerConfiguration serverConfiguration;
    private ServiceDiscovery serviceDiscovery;
    private static final Logger log = LoggerFactory.getLogger(MessageDistributionExecutors.class);
    private static final ConcurrentMap<String, MessageDistributionExecutor> EXECUTORS = new ConcurrentHashMap();

    public MessageDistributionExecutors(ServerConfiguration serverConfiguration, ServiceDiscovery serviceDiscovery) {
        this.serverConfiguration = serverConfiguration;
        this.serviceDiscovery = serviceDiscovery;
    }

    public void startExecutor(MessagePipe messagePipe) {
        String name = messagePipe.getName();
        if (EXECUTORS.containsKey(name)) {
            return;
        }
        MessageDistributionExecutor messageDistributionExecutor = new MessageDistributionExecutor(messagePipe, this.serviceDiscovery);
        EXECUTORS.put(name, messageDistributionExecutor);
        executorThreadPool.submit(() -> {
            messageDistributionExecutor.waitProcessing();
        });
        log.info("MessagePipe：{}，distribution executors start successfully.", name);
    }

    public void notifyExecutor(String str) {
        MessageDistributionExecutor messageDistributionExecutor = EXECUTORS.get(str);
        if (ObjectUtils.isEmpty(messageDistributionExecutor)) {
            throw new MessagePipeException("Message pipeline: " + str + ", MessageDistributionExecutor does not exist.");
        }
        synchronized (messageDistributionExecutor) {
            messageDistributionExecutor.notifyAll();
            log.debug("Message Pipe：{}，MessageDistributionExecutor notify successfully.", str);
        }
    }

    public List<MessageDistributionExecutor> getExecutors() {
        return (List) EXECUTORS.values().stream().collect(Collectors.toList());
    }

    public void afterPropertiesSet() throws Exception {
        executorThreadPool = Executors.newFixedThreadPool(this.serverConfiguration.getMessageDistributionExecutorPoolSize());
    }
}
