package org.minbox.framework.message.pipe.server;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.minbox.framework.message.pipe.server.config.ServerConfiguration;
import org.minbox.framework.message.pipe.server.distribution.MessageDistributionExecutors;
import org.minbox.framework.message.pipe.server.manager.MessagePipeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;

/* loaded from: input_file:org/minbox/framework/message/pipe/server/MessagePipeMonitor.class */
public class MessagePipeMonitor implements InitializingBean {
    public static final String BEAN_NAME = "messagePipeMonitor";
    private ServerConfiguration configuration;
    private MessagePipeManager messagePipeManager;
    private MessageDistributionExecutors messageDistributionExecutors;
    private static final Logger log = LoggerFactory.getLogger(MessagePipeMonitor.class);
    private static ScheduledExecutorService monitorThreadPool = Executors.newScheduledThreadPool(1);

    public MessagePipeMonitor(ServerConfiguration serverConfiguration, MessagePipeManager messagePipeManager, MessageDistributionExecutors messageDistributionExecutors) {
        this.configuration = serverConfiguration;
        this.messagePipeManager = messagePipeManager;
        this.messageDistributionExecutors = messageDistributionExecutors;
    }

    private void startMonitor() {
        try {
            this.messageDistributionExecutors.getExecutors().stream().forEach(messageDistributionExecutor -> {
                String pipeName = messageDistributionExecutor.getPipeName();
                Long valueOf = Long.valueOf(System.currentTimeMillis());
                MessagePipe messagePipe = this.messagePipeManager.getMessagePipe(pipeName);
                long longValue = valueOf.longValue() - messagePipe.getLastProcessTimeMillis().longValue();
                log.debug("MessagePipe：{}，Interval execution mill seconds：{}", pipeName, Long.valueOf(longValue));
                if (messagePipe.size() <= 0 || longValue <= this.configuration.getNotifyIntervalMillSeconds()) {
                    return;
                }
                this.messageDistributionExecutors.notifyExecutor(pipeName);
                log.debug("MessagePipe：{}，MessageDistributionExecutor be awakened.", pipeName);
            });
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    public void afterPropertiesSet() throws Exception {
        monitorThreadPool.scheduleWithFixedDelay(() -> {
            startMonitor();
        }, 10L, this.configuration.getMonitorCheckIntervalSeconds(), TimeUnit.SECONDS);
        log.info("MessagePipe monitor start successfully.");
    }
}
