/*
 * Decompiled with CFR 0.152.
 */
package org.fisco.bcos.sdk.service;

import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.fisco.bcos.sdk.amop.Amop;
import org.fisco.bcos.sdk.channel.Channel;
import org.fisco.bcos.sdk.channel.PeerSelectRule;
import org.fisco.bcos.sdk.channel.ResponseCallback;
import org.fisco.bcos.sdk.channel.model.ChannelMessageError;
import org.fisco.bcos.sdk.channel.model.EnumChannelProtocolVersion;
import org.fisco.bcos.sdk.channel.model.Options;
import org.fisco.bcos.sdk.client.Client;
import org.fisco.bcos.sdk.client.exceptions.ClientException;
import org.fisco.bcos.sdk.client.handler.BlockNumberNotifyHandler;
import org.fisco.bcos.sdk.client.handler.GetNodeVersionHandler;
import org.fisco.bcos.sdk.client.handler.OnReceiveBlockNotifyFunc;
import org.fisco.bcos.sdk.client.handler.TransactionNotifyHandler;
import org.fisco.bcos.sdk.client.protocol.response.BlockNumber;
import org.fisco.bcos.sdk.client.protocol.response.GroupList;
import org.fisco.bcos.sdk.config.ConfigOption;
import org.fisco.bcos.sdk.model.Message;
import org.fisco.bcos.sdk.model.MsgType;
import org.fisco.bcos.sdk.model.NodeVersion;
import org.fisco.bcos.sdk.model.Response;
import org.fisco.bcos.sdk.model.TransactionReceipt;
import org.fisco.bcos.sdk.model.callback.TransactionCallback;
import org.fisco.bcos.sdk.network.ConnectionInfo;
import org.fisco.bcos.sdk.service.GroupManagerService;
import org.fisco.bcos.sdk.service.GroupService;
import org.fisco.bcos.sdk.service.GroupServiceFactory;
import org.fisco.bcos.sdk.service.callback.BlockNumberNotifyCallback;
import org.fisco.bcos.sdk.service.model.BlockNumberMessageDecoder;
import org.fisco.bcos.sdk.service.model.BlockNumberNotification;
import org.fisco.bcos.sdk.utils.ChannelUtils;
import org.fisco.bcos.sdk.utils.ObjectMapperFactory;
import org.fisco.bcos.sdk.utils.ThreadPoolService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GroupManagerServiceImpl
implements GroupManagerService {
    public static final String SM_CRYPTO_STR = "gm";
    private static Logger logger = LoggerFactory.getLogger(GroupManagerServiceImpl.class);
    private final Channel channel;
    private final BlockNumberMessageDecoder blockNumberMessageDecoder;
    private Amop amop;
    private final GroupServiceFactory groupServiceFactory;
    private ConcurrentHashMap<Integer, GroupService> groupIdToService = new ConcurrentHashMap();
    private ConcurrentHashMap<String, List<String>> nodeToGroupIDList = new ConcurrentHashMap();
    private ConcurrentHashMap<String, NodeVersion> nodeToNodeVersion = new ConcurrentHashMap();
    private ConcurrentHashMap<String, BlockNumberNotifyCallback> registerIdToBlockNotifyCallback = new ConcurrentHashMap();
    private ConcurrentHashMap<String, TransactionCallback> seq2TransactionCallback = new ConcurrentHashMap();
    private final Timer timeoutHandler = new HashedWheelTimer();
    private Client groupInfoGetter;
    private long fetchGroupListIntervalMs = 60000L;
    private ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    private final ThreadPoolService threadPool;
    AtomicBoolean running = new AtomicBoolean(false);
    private final ConfigOption config;

    public GroupManagerServiceImpl(Channel channel, ConfigOption configOption) {
        this.channel = channel;
        this.config = configOption;
        this.threadPool = new ThreadPoolService("GroupManagerServiceImpl", configOption.getThreadPoolConfig().getReceiptProcessorThreadSize(), configOption.getThreadPoolConfig().getMaxBlockingQueueSize());
        this.blockNumberMessageDecoder = new BlockNumberMessageDecoder();
        this.groupServiceFactory = new GroupServiceFactory();
        this.groupInfoGetter = Client.build(channel);
        this.registerGetNodeVersionHandler();
        this.registerBlockNumberNotifyHandler();
        this.registerTransactionNotifyHandler();
        this.fetchGroupList();
        this.updateNodeVersion();
        this.start();
    }

    @Override
    public ConfigOption getConfig() {
        return this.config;
    }

    @Override
    public Integer getCryptoType(String peerInfo) {
        if (!this.nodeToNodeVersion.containsKey(peerInfo)) {
            return null;
        }
        NodeVersion nodeVersion = this.nodeToNodeVersion.get(peerInfo);
        if (nodeVersion.getNodeVersion().getVersion().contains(SM_CRYPTO_STR)) {
            return 1;
        }
        return 0;
    }

    @Override
    public NodeVersion getNodeVersion(String peerInfo) {
        if (!this.nodeToNodeVersion.containsKey(peerInfo)) {
            return null;
        }
        return this.nodeToNodeVersion.get(peerInfo);
    }

    @Override
    public void updateNodeVersion() {
        List<String> peers = this.channel.getAvailablePeer();
        for (String peer : peers) {
            this.updateNodeVersion(peer);
        }
    }

    private void updateNodeVersion(String peerIpAndPort) {
        try {
            NodeVersion nodeVersion = this.groupInfoGetter.getNodeVersion(peerIpAndPort);
            this.nodeToNodeVersion.put(peerIpAndPort, nodeVersion);
        }
        catch (Exception e) {
            logger.error("updateNodeVersion for {} failed, error message: {}", (Object)peerIpAndPort, (Object)e.getMessage());
        }
    }

    public void registerGetNodeVersionHandler() {
        GetNodeVersionHandler handler = new GetNodeVersionHandler(new Consumer<String>(){

            @Override
            public void accept(final String peerIpAndPort) {
                GroupManagerServiceImpl.this.threadPool.getThreadPool().execute(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            GroupManagerServiceImpl.this.fetchGroupList(peerIpAndPort);
                            GroupManagerServiceImpl.this.updateNodeVersion(peerIpAndPort);
                        }
                        catch (Exception e) {
                            logger.warn("GetNodeVersionHandler exception, error message: {}", (Object)e.getMessage(), (Object)e);
                        }
                    }
                });
            }
        });
        this.channel.addEstablishHandler(handler);
    }

    private void onDisconnect(String peerIpAndPort) {
        try {
            this.nodeToNodeVersion.remove(peerIpAndPort);
            if (!this.nodeToGroupIDList.containsKey(peerIpAndPort)) {
                return;
            }
            List<String> groupList = this.nodeToGroupIDList.get(peerIpAndPort);
            for (String group : groupList) {
                Integer groupId = Integer.valueOf(group);
                GroupService groupService = this.groupIdToService.get(groupId);
                if (groupService == null) continue;
                if (groupService.removeNode(peerIpAndPort)) {
                    this.updateBlockNotify(peerIpAndPort, this.nodeToGroupIDList.get(peerIpAndPort));
                }
                if (groupService.getGroupNodesInfo().size() != 0) continue;
                this.groupIdToService.remove(groupId);
            }
            this.nodeToGroupIDList.remove(peerIpAndPort);
        }
        catch (Exception e) {
            logger.warn("onDisconnect to {} failed, error message: {}", (Object)peerIpAndPort, (Object)e.getMessage());
        }
    }

    public void registerBlockNumberNotifyHandler() {
        OnReceiveBlockNotifyFunc onReceiveBlockNotifyFunc = (version, peerIpAndPort, blockNumberNotifyMessage) -> this.threadPool.getThreadPool().execute(new Runnable(){

            @Override
            public void run() {
                try {
                    GroupManagerServiceImpl.this.onReceiveBlockNotifyImpl(version, peerIpAndPort, blockNumberNotifyMessage);
                }
                catch (Exception e) {
                    logger.warn("registerBlockNumberNotifyHandler exception, e: {}", (Object)e.getMessage(), (Object)e);
                }
            }
        });
        BlockNumberNotifyHandler handler = new BlockNumberNotifyHandler(onReceiveBlockNotifyFunc, new Consumer<String>(){

            @Override
            public void accept(final String disconnectedEndpoint) {
                GroupManagerServiceImpl.this.threadPool.getThreadPool().execute(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            GroupManagerServiceImpl.this.onDisconnect(disconnectedEndpoint);
                        }
                        catch (Exception e) {
                            logger.warn("BlockNumberNotifyHandler exception, e: {}", (Object)e.getMessage(), (Object)e);
                        }
                    }
                });
            }
        });
        this.channel.addMessageHandler(MsgType.BLOCK_NOTIFY, handler);
        this.channel.addDisconnectHandler(handler);
        logger.info("registerBlockNumberNotifyHandler");
    }

    public void registerTransactionNotifyHandler() {
        TransactionNotifyHandler handler = new TransactionNotifyHandler(new Consumer<Message>(){

            @Override
            public void accept(final Message message) {
                GroupManagerServiceImpl.this.threadPool.getThreadPool().execute(new Runnable(){

                    @Override
                    public void run() {
                        GroupManagerServiceImpl.this.onReceiveTransactionNotify(message);
                    }
                });
            }
        });
        this.channel.addMessageHandler(MsgType.TRANSACTION_NOTIFY, handler);
        logger.info("registerTransactionNotifyHandler");
    }

    protected void onReceiveBlockNotifyImpl(EnumChannelProtocolVersion version, final String peerIpAndPort, Message blockNumberNotifyMessage) {
        try {
            final BlockNumberNotification blockNumberInfo = this.blockNumberMessageDecoder.decode(version, blockNumberNotifyMessage);
            if (blockNumberInfo == null) {
                return;
            }
            this.updateBlockNumberInfo(Integer.valueOf(blockNumberInfo.getGroupId()), peerIpAndPort, new BigInteger(blockNumberInfo.getBlockNumber()));
            this.threadPool.getThreadPool().execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        for (String registerId : GroupManagerServiceImpl.this.registerIdToBlockNotifyCallback.keySet()) {
                            BlockNumberNotifyCallback callback = (BlockNumberNotifyCallback)GroupManagerServiceImpl.this.registerIdToBlockNotifyCallback.get(registerId);
                            callback.onReceiveBlockNumberInfo(peerIpAndPort, blockNumberInfo);
                        }
                    }
                    catch (Exception e) {
                        logger.warn("Calls BlockNumberNotifyCallback failed, error info: {}", (Object)e.getMessage());
                    }
                }
            });
        }
        catch (Exception e) {
            logger.error("onReceiveBlockNotify failed, error message: {}", (Object)e.getMessage());
        }
    }

    @Override
    public String registerBlockNotifyCallback(BlockNumberNotifyCallback callback) {
        String registerId = ChannelUtils.newSeq();
        this.registerIdToBlockNotifyCallback.put(registerId, callback);
        logger.debug("register BlockNumberNotifyCallback, registerId: {}", (Object)registerId);
        return registerId;
    }

    @Override
    public void eraseBlockNotifyCallback(String registerId) {
        if (this.registerIdToBlockNotifyCallback.containsKey(registerId)) {
            this.registerIdToBlockNotifyCallback.remove(registerId);
        }
    }

    protected void onReceiveTransactionNotify(Message message) {
        String seq = message.getSeq();
        if (seq == null) {
            return;
        }
        TransactionCallback callback = this.seq2TransactionCallback.get(seq);
        try {
            this.seq2TransactionCallback.remove(seq);
            if (callback == null) {
                logger.error("transaction callback is null, seq: {}", (Object)seq);
                return;
            }
            callback.cancelTimeout();
            TransactionReceipt receipt = null;
            receipt = (TransactionReceipt)ObjectMapperFactory.getObjectMapper().readValue(message.getData(), TransactionReceipt.class);
            callback.onResponse(receipt);
        }
        catch (Exception e) {
            TransactionReceipt receipt = new TransactionReceipt();
            receipt.setStatus(String.valueOf(ChannelMessageError.MESSAGE_DECODE_ERROR.getError()));
            receipt.setMessage("Decode receipt error, seq: " + seq + ", reason: " + e.getLocalizedMessage());
            callback.onResponse(receipt);
        }
    }

    @Override
    public void asyncSendTransaction(Integer groupId, final Message transactionMessage, final TransactionCallback callback, ResponseCallback responseCallback) {
        if (callback.getTimeout() > 0) {
            callback.setTimeoutHandler(this.timeoutHandler.newTimeout(new TimerTask(){

                public void run(Timeout timeout) throws Exception {
                    callback.onTimeout();
                    logger.info("Transaction timeout: {}", (Object)transactionMessage.getSeq());
                    GroupManagerServiceImpl.this.seq2TransactionCallback.remove(transactionMessage.getSeq());
                }
            }, (long)callback.getTimeout().intValue(), TimeUnit.MILLISECONDS));
        }
        this.seq2TransactionCallback.put(transactionMessage.getSeq(), callback);
        this.asyncSendMessageToGroup(groupId, transactionMessage, responseCallback);
    }

    @Override
    public void eraseTransactionSeq(String seq) {
        if (seq != null && this.seq2TransactionCallback.containsKey(seq)) {
            this.seq2TransactionCallback.remove(seq);
        }
    }

    @Override
    public Channel getChannel() {
        return this.channel;
    }

    @Override
    public void stop() {
        if (!this.running.get()) {
            logger.warn("GroupManagerService has already been stopped!");
            return;
        }
        logger.debug("stop GroupManagerService...");
        this.timeoutHandler.stop();
        ThreadPoolService.stopThreadPool(this.scheduledExecutorService);
        this.threadPool.stop();
        logger.debug("stop GroupManagerService succ...");
        this.running.set(false);
    }

    protected void start() {
        if (this.running.get()) {
            logger.warn("GroupManagerService has already been started!");
            return;
        }
        logger.debug("start GroupManagerService...");
        this.running.set(true);
        this.scheduledExecutorService.scheduleAtFixedRate(() -> this.fetchGroupList(), 0L, this.fetchGroupListIntervalMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public void updateGroupInfo(String peerIpAndPort, List<String> groupList) {
        List<String> orgGroupList = this.nodeToGroupIDList.get(peerIpAndPort);
        if (orgGroupList != null) {
            for (int i = 0; i < orgGroupList.size(); ++i) {
                Integer groupId = Integer.valueOf(orgGroupList.get(i));
                if (groupList.contains(orgGroupList.get(i)) || !this.groupIdToService.containsKey(groupId)) continue;
                if (this.groupIdToService.get(groupId).removeNode(peerIpAndPort)) {
                    this.updateBlockNotify(peerIpAndPort, this.nodeToGroupIDList.get(peerIpAndPort));
                }
                logger.info("remove group {} from {}", (Object)orgGroupList.get(i), (Object)peerIpAndPort);
            }
        }
        this.nodeToGroupIDList.put(peerIpAndPort, groupList);
        for (String groupIdStr : groupList) {
            Integer groupId = Integer.valueOf(groupIdStr);
            if (groupId == null) continue;
            if (this.tryToCreateGroupService(peerIpAndPort, groupId)) {
                this.getAndUpdateBlockNumberForAllPeers(groupId);
                continue;
            }
            if (!this.groupIdToService.get(groupId).insertNode(peerIpAndPort)) continue;
            this.fetchAndUpdateBlockNumberInfo(groupId, peerIpAndPort);
            this.updateBlockNotify(peerIpAndPort, this.nodeToGroupIDList.get(peerIpAndPort));
        }
        logger.trace("update groupInfo for {}, groupList: {}", (Object)peerIpAndPort, groupList);
    }

    @Override
    public void updateBlockNumberInfo(Integer groupId, String peerInfo, BigInteger currentBlockNumber) {
        this.tryToCreateGroupService(peerInfo, groupId);
        GroupService groupService = this.groupIdToService.get(groupId);
        groupService.updatePeersBlockNumberInfo(peerInfo, currentBlockNumber);
    }

    private boolean tryToCreateGroupService(String peerIpAndPort, Integer groupId) {
        if (!this.groupIdToService.containsKey(groupId)) {
            this.groupIdToService.put(groupId, this.groupServiceFactory.createGroupSerivce(groupId, peerIpAndPort));
            this.updateBlockNotify(peerIpAndPort, this.nodeToGroupIDList.get(peerIpAndPort));
            return true;
        }
        return false;
    }

    @Override
    public BigInteger getBlockLimitByGroup(Integer groupId) {
        return this.getLatestBlockNumberByGroup(groupId).add(BLOCK_LIMIT);
    }

    @Override
    public BigInteger getLatestBlockNumberByGroup(Integer groupId) {
        if (this.groupIdToService.containsKey(groupId) && this.groupIdToService.get(groupId).getLatestBlockNumber().equals(BigInteger.ZERO)) {
            this.getAndUpdateBlockNumberForAllPeers(groupId);
        }
        if (this.groupIdToService.containsKey(groupId)) {
            return this.groupIdToService.get(groupId).getLatestBlockNumber();
        }
        return BigInteger.ZERO;
    }

    private void getAndUpdateBlockNumberForAllPeers(Integer groupId) {
        List<String> availablePeers = this.getGroupAvailablePeers(groupId);
        logger.debug("g: {}, getAndUpdateBlockNumberForAllPeers, group availablePeers:{}", (Object)groupId, (Object)availablePeers.toString());
        for (String peer : availablePeers) {
            this.fetchAndUpdateBlockNumberInfo(groupId, peer);
        }
    }

    private void fetchAndUpdateBlockNumberInfo(Integer groupId, String peer) {
        try {
            BlockNumber blockNumber = this.groupInfoGetter.getBlockNumber(groupId, peer);
            this.updateBlockNumberInfo(groupId, peer, blockNumber.getBlockNumber());
            logger.debug("fetch and update the blockNumber information, groupId: {}, peer:{}, blockNumber: {}", new Object[]{groupId, peer, blockNumber.getBlockNumber()});
        }
        catch (ClientException e) {
            logger.error("GetBlockNumber from {} failed, error information:{}", (Object)peer, (Object)e.getMessage());
        }
    }

    @Override
    public Set<String> getGroupNodeList(Integer groupId) {
        if (!this.groupIdToService.containsKey(groupId)) {
            return new HashSet<String>();
        }
        return this.groupIdToService.get(groupId).getGroupNodesInfo();
    }

    @Override
    public List<String> getGroupInfoByNodeInfo(String nodeAddress) {
        if (!this.nodeToGroupIDList.containsKey(nodeAddress)) {
            return new ArrayList<String>();
        }
        return this.nodeToGroupIDList.get(nodeAddress);
    }

    private boolean checkGroupStatus(Integer groupId) {
        if (!this.groupIdToService.containsKey(groupId)) {
            logger.warn("checkGroupStatus failed for group {} doesn't exist", (Object)groupId);
            return false;
        }
        return true;
    }

    @Override
    public Response sendMessageToGroup(Integer groupId, Message message) {
        if (!this.checkGroupStatus(groupId)) {
            return null;
        }
        String targetNode = this.groupIdToService.get(groupId).getNodeWithTheLatestBlockNumber();
        if (targetNode == null) {
            logger.error("sendMessageToGroup message failed for get the node with the latest block number failed, groupId: {}, seq: {}, type: {}", new Object[]{groupId, message.getSeq(), message.getType()});
            return null;
        }
        logger.trace("g:{}, sendMessageToGroup, selectedPeer: {}, message type: {}, seq: {}, length:{}", new Object[]{groupId, targetNode, message.getType(), message.getSeq(), message.getLength()});
        return this.channel.sendToPeer(message, targetNode);
    }

    @Override
    public void asyncSendMessageToGroup(Integer groupId, Message message, ResponseCallback callback) {
        if (!this.checkGroupStatus(groupId)) {
            if (callback != null) {
                callback.onError("asyncSendMessageToGroup to group " + groupId + " failed, please check the connections.");
            }
            return;
        }
        String targetNode = this.groupIdToService.get(groupId).getNodeWithTheLatestBlockNumber();
        if (targetNode == null) {
            logger.warn("g:{}, asyncSendMessageToGroup, selectedPeer failed, seq: {}, type: {}", new Object[]{groupId, message.getSeq(), message.getType()});
            throw new ClientException("asyncSendMessageToGroup to " + groupId + " failed for selectPeer failed, messageSeq: " + message.getSeq());
        }
        logger.trace("g:{}, asyncSendMessageToGroup, selectedPeer:{}, message type: {}, seq: {}, length:{}", new Object[]{groupId, targetNode, message.getType(), message.getSeq(), message.getLength()});
        this.channel.asyncSendToPeer(message, targetNode, callback, new Options());
    }

    @Override
    public Response sendMessageToGroupByRule(Integer groupId, Message message, PeerSelectRule rule) {
        String selectedPeer = this.selectGroupPeersByRule(groupId, rule);
        if (selectedPeer == null) {
            logger.warn("g:{}, sendMessageToGroupByRule, no peer is selected by the rule, message type: {}, seq: {}, length:{}", new Object[]{groupId, message.getType(), message.getSeq(), message.getLength()});
            return null;
        }
        logger.debug("g:{}, sendMessageToGroupByRule, send message to {}, selectedPeer: {}, message type: {}, seq: {}, length:{}", new Object[]{groupId, selectedPeer, selectedPeer, message.getType(), message.getSeq(), message.getLength()});
        return this.channel.sendToPeer(message, selectedPeer);
    }

    private String selectGroupPeersByRule(Integer groupId, PeerSelectRule rule) {
        if (!this.checkGroupStatus(groupId)) {
            return null;
        }
        List<ConnectionInfo> groupConnnectionInfos = this.getGroupConnectionInfo(groupId);
        return rule.select(groupConnnectionInfos);
    }

    @Override
    public List<ConnectionInfo> getGroupConnectionInfo(Integer groupId) {
        if (!this.checkGroupStatus(groupId)) {
            return new ArrayList<ConnectionInfo>();
        }
        return this.getGroupConnectionInfo(this.groupIdToService.get(groupId));
    }

    private List<ConnectionInfo> getGroupConnectionInfo(GroupService groupService) {
        List<ConnectionInfo> connectionInfos = this.channel.getConnectionInfo();
        ArrayList<ConnectionInfo> groupConnectionInfos = new ArrayList<ConnectionInfo>();
        for (ConnectionInfo connectionInfo : connectionInfos) {
            if (!groupService.existPeer(connectionInfo.getEndPoint())) continue;
            groupConnectionInfos.add(connectionInfo);
        }
        return groupConnectionInfos;
    }

    @Override
    public List<String> getGroupAvailablePeers(Integer groupId) {
        if (!this.checkGroupStatus(groupId)) {
            return new ArrayList<String>();
        }
        return this.getGroupAvailablePeers(this.groupIdToService.get(groupId));
    }

    private List<String> getGroupAvailablePeers(GroupService groupService) {
        List<String> availablePeers = this.channel.getAvailablePeer();
        ArrayList<String> groupAvailablePeers = new ArrayList<String>();
        for (String peer : availablePeers) {
            if (!groupService.existPeer(peer)) continue;
            groupAvailablePeers.add(peer);
        }
        return groupAvailablePeers;
    }

    @Override
    public void asyncSendMessageToGroupByRule(Integer groupId, Message message, PeerSelectRule rule, ResponseCallback callback) {
        String selectedPeer;
        String errorMessage;
        if (!this.checkGroupStatus(groupId)) {
            errorMessage = "asyncSendMessageToGroupByRule to " + groupId + " failed for the group doesn't exit, message seq: " + message.getSeq();
            callback.onError(errorMessage);
        }
        if ((selectedPeer = this.selectGroupPeersByRule(groupId, rule)) == null) {
            logger.warn("g:{}, asyncSendMessageToGroup, no peer is selected by the rule, message type: {}, seq: {}, length:{}", new Object[]{groupId, message.getType(), message.getSeq(), message.getLength()});
            errorMessage = "asyncSendMessageToGroupByRule to " + groupId + " failed for no peer is selected by the rule";
            callback.onError(errorMessage);
            return;
        }
        logger.trace("g:{}, asyncSendMessageToGroupByRule, selectedPeer: {}, message type: {}, seq: {}, length:{}", new Object[]{groupId, selectedPeer, message.getType(), message.getSeq(), message.getLength()});
        this.channel.asyncSendToPeer(message, selectedPeer, callback, new Options());
    }

    @Override
    public void broadcastMessageToGroup(Integer groupId, Message message) {
        List<ConnectionInfo> groupConnnectionInfos = this.getGroupConnectionInfo(groupId);
        if (groupConnnectionInfos == null) {
            logger.warn("g:{}, broadcastMessageToGroup,  broadcast message failed for the group has no connected peers, message type: {}, seq: {}, length:{}", new Object[]{groupId, message.getType(), message.getSeq(), message.getLength()});
            return;
        }
        for (ConnectionInfo connectionInfo : groupConnnectionInfos) {
            this.channel.asyncSendToPeer(message, connectionInfo.getEndPoint(), null, new Options());
        }
    }

    @Override
    public void fetchGroupList() {
        List<String> peers = this.channel.getAvailablePeer();
        for (String peerEndPoint : peers) {
            this.fetchGroupList(peerEndPoint);
        }
    }

    private void fetchGroupList(String peerEndPoint) {
        try {
            GroupList groupList = this.groupInfoGetter.getGroupList(peerEndPoint);
            this.updateGroupInfo(peerEndPoint, groupList.getGroupList());
        }
        catch (ClientException e) {
            logger.warn("fetchGroupList from {} failed, error info: {}", (Object)peerEndPoint, (Object)e.getMessage());
        }
    }

    @Override
    public Set<Integer> getGroupList() {
        this.fetchGroupList();
        return this.groupIdToService.keySet();
    }

    protected void updateBlockNotify(String peer, List<String> groupList) {
        if (this.amop == null) {
            return;
        }
        this.amop.getTopicManager().updateBlockNotify(peer, groupList);
        this.amop.sendSubscribe();
    }

    @Override
    public void setAmop(Amop amop) {
        this.amop = amop;
        List<String> availablePeers = this.channel.getAvailablePeer();
        for (String peer : availablePeers) {
            List<String> groupList = this.getGroupInfoByNodeInfo(peer);
            logger.debug("register blockNotify for {}, groupList: {}", (Object)peer, (Object)groupList.toString());
            if (groupList == null || groupList.size() <= 0) continue;
            this.updateBlockNotify(peer, groupList);
        }
    }
}

