package net.dreamlu.iot.mqtt.core.server.cluster;

import java.util.Iterator;
import java.util.Set;
import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.enums.MessageType;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.server.ServerChannelContext;
import org.tio.utils.hutool.StrUtil;

/* loaded from: input_file:net/dreamlu/iot/mqtt/core/server/cluster/MqttClusterMessageListener.class */
public class MqttClusterMessageListener {
    private static final Logger logger = LoggerFactory.getLogger(MqttClusterMessageListener.class);
    private final String nodeName;
    private final IMqttMessageListener messageListener;
    private final IMqttSessionManager sessionManager;
    private final MqttServer mqttServer;

    public MqttClusterMessageListener(MqttServer mqttServer) {
        this.nodeName = mqttServer.getServerCreator().getNodeName();
        this.messageListener = mqttServer.getServerCreator().getMessageListener();
        this.sessionManager = mqttServer.getServerCreator().getSessionManager();
        this.mqttServer = mqttServer;
    }

    public void onMessage(Message message) {
        Set byUserid;
        String clientId;
        ChannelContext byBsId;
        MessageType messageType = message.getMessageType();
        String topic = message.getTopic();
        if (MessageType.CONNECT == messageType) {
            String node = message.getNode();
            if (this.nodeName.equals(node) || (byBsId = Tio.getByBsId(this.mqttServer.getServerConfig(), (clientId = message.getClientId()))) == null) {
                return;
            }
            Tio.remove(byBsId, "clientId:[" + clientId + "] now bind on mqtt node:" + node);
            return;
        }
        if (MessageType.SUBSCRIBE == messageType) {
            String fromClientId = message.getFromClientId();
            if (this.mqttServer.getChannelContext(fromClientId) != null) {
                this.sessionManager.addSubscribe(topic, fromClientId, message.getQos());
                return;
            }
            return;
        }
        if (MessageType.UNSUBSCRIBE == messageType) {
            String fromClientId2 = message.getFromClientId();
            if (this.mqttServer.getChannelContext(fromClientId2) != null) {
                this.sessionManager.removeSubscribe(topic, fromClientId2);
                return;
            }
            return;
        }
        if (MessageType.UP_STREAM == messageType) {
            this.mqttServer.sendToClient(topic, message);
            return;
        }
        if (MessageType.DOWN_STREAM == messageType) {
            this.mqttServer.sendToClient(topic, message);
            return;
        }
        if (MessageType.HTTP_API == messageType) {
            MqttQoS valueOf = MqttQoS.valueOf(message.getQos());
            this.mqttServer.publishAll(topic, message.getPayload(), valueOf, message.isRetain());
            try {
                onHttpApiMessage(topic, valueOf, message);
                return;
            } catch (Throwable th) {
                logger.error(th.getMessage(), th);
                return;
            }
        }
        if (MessageType.DISCONNECT == messageType) {
            String clientId2 = message.getClientId();
            String username = message.getUsername();
            if (StrUtil.isNotBlank(clientId2)) {
                ChannelContext channelContext = this.mqttServer.getChannelContext(clientId2);
                if (channelContext != null) {
                    Tio.remove(channelContext, "Mqtt server kick out clients:" + clientId2);
                    return;
                }
                return;
            }
            if (!StrUtil.isNotBlank(username) || (byUserid = Tio.getByUserid(this.mqttServer.getServerConfig(), username)) == null || byUserid.isEmpty()) {
                return;
            }
            Iterator it = byUserid.iterator();
            while (it.hasNext()) {
                Tio.remove((ChannelContext) it.next(), "Mqtt server kick out clients:" + clientId2);
            }
        }
    }

    private void onHttpApiMessage(String str, MqttQoS mqttQoS, Message message) {
        String clientId = message.getClientId();
        ChannelContext serverChannelContext = new ServerChannelContext(this.mqttServer.getServerConfig());
        serverChannelContext.setServerNode(this.mqttServer.getTioServer().getServerNode());
        serverChannelContext.setClientNode(this.mqttServer.getWebServer().getTioServer().getServerNode());
        serverChannelContext.setBsId(clientId);
        serverChannelContext.setUserId(MessageType.HTTP_API.name());
        this.messageListener.onMessage(serverChannelContext, clientId, str, mqttQoS, MqttMessageBuilders.publish().topicName(str).qos(mqttQoS).retained(message.isRetain()).payload(message.getPayload()).build(), message);
    }
}
