package org.minbox.framework.message.pipe.server.manager;

import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.minbox.framework.message.pipe.core.exception.MessagePipeException;
import org.minbox.framework.message.pipe.server.MessagePipe;
import org.minbox.framework.message.pipe.server.config.MessagePipeConfiguration;
import org.minbox.framework.message.pipe.server.config.ServerConfiguration;
import org.minbox.framework.message.pipe.server.service.discovery.ServiceDiscovery;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.ObjectUtils;

/* loaded from: input_file:org/minbox/framework/message/pipe/server/manager/AbstractMessagePipeManager.class */
public abstract class AbstractMessagePipeManager implements MessagePipeManager, InitializingBean, DisposableBean, BeanFactoryAware {
    private static final Logger log = LoggerFactory.getLogger(AbstractMessagePipeManager.class);
    private static final ConcurrentMap<String, MessagePipe> MESSAGE_PIPE_MAP = new ConcurrentHashMap();
    private static final int CLEANUP_EXPIRED_CORE_THREADS = 1;
    private MessagePipeConfiguration sharedConfiguration;
    private BeanFactory beanFactory;
    private static ScheduledExecutorService CLEANUP_EXPIRED_SERVICE;
    private ServerConfiguration serverConfiguration;
    private MessagePipeFactoryBean messagePipeFactoryBean;
    private ServiceDiscovery serviceDiscovery;
    private RedissonClient redissonClient;

    public AbstractMessagePipeManager(MessagePipeConfiguration messagePipeConfiguration) {
        this.sharedConfiguration = messagePipeConfiguration;
    }

    public AbstractMessagePipeManager(Map<String, MessagePipeConfiguration> map) {
        useInitConfigurationsToCreateMessagePipe(map);
    }

    @Override // org.minbox.framework.message.pipe.server.manager.MessagePipeManager
    public MessagePipe createMessagePipe(String str) {
        synchronized (MESSAGE_PIPE_MAP) {
            if (checkIsExclude(str) || MESSAGE_PIPE_MAP.containsKey(str)) {
                return MESSAGE_PIPE_MAP.get(str);
            }
            log.info("Create new message pipe {},current number of cached is {}, max limit is {}.", new Object[]{str, Integer.valueOf(MESSAGE_PIPE_MAP.size()), Integer.valueOf(this.serverConfiguration.getMaxMessagePipeCount())});
            if (MESSAGE_PIPE_MAP.size() >= this.serverConfiguration.getMaxMessagePipeCount()) {
                throw new MessagePipeException("The number of message pipes reaches the upper limit, and the message pipe cannot be created.");
            }
            MessagePipe createMessagePipe = this.messagePipeFactoryBean.createMessagePipe(str, getConfiguration());
            MESSAGE_PIPE_MAP.put(str, createMessagePipe);
            log.info("MessagePipe：{}，created successfully and cached.", str);
            MessagePipeDistributor messagePipeDistributor = new MessagePipeDistributor(createMessagePipe, this.serviceDiscovery);
            log.info("MessagePipe：{}，distributor create successfully.", str);
            new MessagePipeMonitor(createMessagePipe, messagePipeDistributor).startup();
            log.info("MessagePipe：{}，monitor create successfully.", str);
            new MessagePipeScheduler(createMessagePipe, messagePipeDistributor).startup();
            log.info("MessagePipe：{}，scheduler create successfully.", str);
            return createMessagePipe;
        }
    }

    @Override // org.minbox.framework.message.pipe.server.manager.MessagePipeManager
    public MessagePipe getMessagePipe(String str) {
        MessagePipe messagePipe;
        synchronized (MESSAGE_PIPE_MAP) {
            createMessagePipe(str);
            messagePipe = MESSAGE_PIPE_MAP.get(str);
        }
        return messagePipe;
    }

    private void useInitConfigurationsToCreateMessagePipe(Map<String, MessagePipeConfiguration> map) {
        if (map == null || map.size() == 0) {
            log.warn("The provided initial MessagePipeConfiguration list is empty, no creation is performed.");
        } else {
            map.keySet().stream().forEach(str -> {
                createMessagePipe(str);
            });
        }
    }

    private MessagePipeConfiguration getConfiguration() {
        return this.sharedConfiguration == null ? MessagePipeConfiguration.defaultConfiguration() : this.sharedConfiguration;
    }

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
    }

    public void afterPropertiesSet() throws Exception {
        this.redissonClient = (RedissonClient) this.beanFactory.getBean(RedissonClient.class);
        this.serverConfiguration = (ServerConfiguration) this.beanFactory.getBean(ServerConfiguration.class);
        this.messagePipeFactoryBean = (MessagePipeFactoryBean) this.beanFactory.getBean(MessagePipeFactoryBean.class);
        this.serviceDiscovery = (ServiceDiscovery) this.beanFactory.getBean(ServiceDiscovery.class);
        CLEANUP_EXPIRED_SERVICE = Executors.newScheduledThreadPool(CLEANUP_EXPIRED_CORE_THREADS);
        startCleanupExpiredThread();
        log.info("The MessagePipeManager startup successfully，maximum number of message pipes：{}.", Integer.valueOf(this.serverConfiguration.getMaxMessagePipeCount()));
    }

    private void startCleanupExpiredThread() {
        CLEANUP_EXPIRED_SERVICE.scheduleAtFixedRate(() -> {
            try {
                log.debug("Clean up expired message pipes thread is start working...");
                List list = (List) MESSAGE_PIPE_MAP.values().stream().filter(messagePipe -> {
                    return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - messagePipe.getLastProcessTimeMillis().longValue()) > this.serverConfiguration.getCleanupExpiredMessagePipeThresholdSeconds();
                }).collect(Collectors.toList());
                if (!ObjectUtils.isEmpty(list)) {
                    list.stream().forEach(messagePipe2 -> {
                        try {
                            messagePipe2.setStopSchedulerThread(true);
                            messagePipe2.setStopMonitorThread(true);
                            MESSAGE_PIPE_MAP.remove(messagePipe2.getName(), messagePipe2);
                            log.warn("The MessagePipe：{} is expired, threshold：{}, last process time is {}.", new Object[]{messagePipe2.getName(), Long.valueOf(this.serverConfiguration.getCleanupExpiredMessagePipeThresholdSeconds()), new Date(messagePipe2.getLastProcessTimeMillis().longValue())});
                        } catch (Exception e) {
                            log.error(e.getMessage(), e);
                        }
                    });
                    log.warn("The cleanup of expired message pipes thread is completed, this cleanup: {}.", list.stream().map((v0) -> {
                        return v0.getName();
                    }).collect(Collectors.toList()));
                }
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
        }, 1L, this.serverConfiguration.getCleanupExpiredMessagePipeIntervalSeconds(), TimeUnit.SECONDS);
    }

    public void destroy() throws Exception {
        this.redissonClient.shutdown();
        log.info("The MessagePipeManager shutdown successfully.");
    }

    private boolean checkIsExclude(String str) {
        String[] excludePipeNamePatterns = this.serverConfiguration.getExcludePipeNamePatterns();
        if (ObjectUtils.isEmpty(excludePipeNamePatterns)) {
            return false;
        }
        boolean z = false;
        int length = excludePipeNamePatterns.length;
        int i = 0;
        while (true) {
            if (i >= length) {
                break;
            }
            if (Pattern.compile(excludePipeNamePatterns[i]).matcher(str).find()) {
                z = CLEANUP_EXPIRED_CORE_THREADS;
                log.warn("Message pipeline: {}, exclude creation.", str);
                break;
            }
            i += CLEANUP_EXPIRED_CORE_THREADS;
        }
        return z;
    }
}
