package org.joyqueue.broker.mqtt.handler;

import com.google.common.base.Strings;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.handler.timeout.IdleStateHandler;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.joyqueue.broker.BrokerContext;
import org.joyqueue.broker.mqtt.cluster.MqttConnectionManager;
import org.joyqueue.broker.mqtt.cluster.MqttConsumerManager;
import org.joyqueue.broker.mqtt.cluster.MqttProducerManager;
import org.joyqueue.broker.mqtt.cluster.MqttSessionManager;
import org.joyqueue.broker.mqtt.cluster.MqttSubscriptionManager;
import org.joyqueue.broker.mqtt.connection.MqttConnection;
import org.joyqueue.broker.mqtt.message.WillMessage;
import org.joyqueue.broker.mqtt.publish.MessagePublisher;
import org.joyqueue.broker.mqtt.session.MqttSession;
import org.joyqueue.broker.mqtt.subscriptions.MqttSubscription;
import org.joyqueue.broker.mqtt.subscriptions.TopicFilter;
import org.joyqueue.broker.mqtt.util.NettyAttrManager;
import org.joyqueue.domain.AppToken;
import org.joyqueue.network.session.Producer;
import org.joyqueue.nsr.NameService;
import org.joyqueue.toolkit.network.IpUtil;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/mqtt/handler/MqttProtocolHandler.class */
public class MqttProtocolHandler extends Service {
    private static final Logger LOG = LoggerFactory.getLogger(MqttProtocolHandler.class);
    private MqttSessionManager sessionManager;
    private MqttConsumerManager consumerManager;
    private MqttSubscriptionManager subscriptionManager;
    private MessagePublisher messagePublisher;
    private NameService nameService;
    private ConcurrentMap<String, WillMessage> willStore = new ConcurrentHashMap();
    private MqttConnectionManager connectionManager = new MqttConnectionManager();
    private MqttProducerManager producerManager = new MqttProducerManager(this.connectionManager);

    public MqttProtocolHandler(BrokerContext brokerContext) {
        this.messagePublisher = new MessagePublisher(brokerContext, this.connectionManager);
        this.sessionManager = new MqttSessionManager(brokerContext, this.connectionManager);
        this.consumerManager = new MqttConsumerManager(brokerContext, this.connectionManager, this.sessionManager, this.messagePublisher);
        this.subscriptionManager = new MqttSubscriptionManager(brokerContext);
        this.nameService = brokerContext.getNameService();
    }

    public MqttConnectionManager getConnectionManager() {
        return this.connectionManager;
    }

    public MqttSessionManager getSessionManager() {
        return this.sessionManager;
    }

    public MqttConsumerManager getConsumerManager() {
        return this.consumerManager;
    }

    public void start() throws Exception {
        super.start();
        this.connectionManager.start();
        this.sessionManager.start();
        this.producerManager.start();
        this.consumerManager.start();
        this.subscriptionManager.start();
    }

    public void stop() {
        super.stop();
        this.connectionManager.stop();
        this.sessionManager.stop();
        this.producerManager.stop();
        this.consumerManager.stop();
        this.subscriptionManager.stop();
    }

    public void processConnect(Channel channel, MqttConnectMessage mqttConnectMessage) {
        String clientIdentifier = mqttConnectMessage.payload().clientIdentifier();
        boolean isCleanSession = mqttConnectMessage.variableHeader().isCleanSession();
        if (!mqttConnectMessage.variableHeader().name().equals("MQTT") || mqttConnectMessage.variableHeader().version() != MqttVersion.MQTT_3_1_1.protocolLevel()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("CONN clientID: <{}>, 版本不对断开连接: <{}>", clientIdentifier, mqttConnectMessage.toString());
            }
            sendAckToClient(channel, mqttConnectMessage, MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false);
            return;
        }
        MqttConnectReturnCode checkAuth = checkAuth(mqttConnectMessage);
        if (checkAuth != MqttConnectReturnCode.CONNECTION_ACCEPTED || Strings.isNullOrEmpty(clientIdentifier)) {
            sendAckToClient(channel, mqttConnectMessage, checkAuth, false);
            return;
        }
        addConnection(channel, mqttConnectMessage, clientIdentifier);
        initializeKeepAliveTimeout(channel, mqttConnectMessage, clientIdentifier);
        storeWillMessage(clientIdentifier, mqttConnectMessage);
        this.sessionManager.addSession(clientIdentifier, isCleanSession);
        MqttConnAckMessage sendAckToClient = sendAckToClient(channel, mqttConnectMessage, MqttConnectReturnCode.CONNECTION_ACCEPTED, !isCleanSession);
        if (sendAckToClient.variableHeader().connectReturnCode().byteValue() != MqttConnectReturnCode.CONNECTION_ACCEPTED.byteValue()) {
            LOG.info("CONNECT-none-accepted clientID: <{}>, ConnectionStatus: <{}>, client-address: <{}>, server-address: <{}>", new Object[]{clientIdentifier, Byte.valueOf(sendAckToClient.variableHeader().connectReturnCode().byteValue()), channel.remoteAddress(), channel.localAddress()});
        }
        this.consumerManager.fireConsume(clientIdentifier);
        LOG.info("CONNECT successful, clientID: {}, client-address: <{}>, server-address: <{}>", new Object[]{clientIdentifier, channel.remoteAddress(), channel.localAddress()});
    }

    private MqttConnAckMessage sendAckToClient(Channel channel, MqttConnectMessage mqttConnectMessage, MqttConnectReturnCode mqttConnectReturnCode, boolean z) {
        MqttConnAckMessage newMessage = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.CONNACK, false, mqttConnectMessage.fixedHeader().qosLevel(), false, 0), new MqttConnAckVariableHeader(mqttConnectReturnCode, z), (Object) null);
        channel.writeAndFlush(newMessage);
        return newMessage;
    }

    private boolean auth(String str, String str2) {
        if (Strings.isNullOrEmpty(str) || Strings.isNullOrEmpty(str2)) {
            return false;
        }
        Date time = Calendar.getInstance().getTime();
        AppToken appToken = this.nameService.getAppToken(str, str2);
        return null != appToken && appToken.getEffectiveTime().before(time) && appToken.getExpirationTime().after(time);
    }

    private MqttConnectReturnCode checkAuth(MqttConnectMessage mqttConnectMessage) {
        boolean isCleanSession = mqttConnectMessage.variableHeader().isCleanSession();
        if (Strings.isNullOrEmpty(mqttConnectMessage.payload().clientIdentifier())) {
            LOG.debug("NULL clientID, cleanSession: {}", Boolean.valueOf(isCleanSession));
            return MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("hasUserName: {}", Boolean.valueOf(mqttConnectMessage.variableHeader().hasUserName()));
            LOG.debug("hasPassword: {}", Boolean.valueOf(mqttConnectMessage.variableHeader().hasPassword()));
        }
        if (!mqttConnectMessage.variableHeader().hasUserName() || !mqttConnectMessage.variableHeader().hasPassword()) {
            return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
        }
        String userName = mqttConnectMessage.payload().userName();
        String password = mqttConnectMessage.payload().password();
        if (LOG.isDebugEnabled()) {
            LOG.debug("CONN username: {}, password: {}", userName, password);
        }
        return auth(userName, password) ? MqttConnectReturnCode.CONNECTION_ACCEPTED : MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
    }

    private void initializeKeepAliveTimeout(Channel channel, MqttConnectMessage mqttConnectMessage, String str) {
        int keepAliveTimeSeconds = mqttConnectMessage.variableHeader().keepAliveTimeSeconds();
        NettyAttrManager.setAttrKeepAlive(channel, keepAliveTimeSeconds);
        NettyAttrManager.setAttrClientId(channel, str);
        NettyAttrManager.setAttrCleanSession(channel, Boolean.valueOf(mqttConnectMessage.variableHeader().isCleanSession()));
        int round = Math.round(keepAliveTimeSeconds * 1.5f);
        if (channel.pipeline().names().contains("idleStateHandler")) {
            channel.pipeline().remove("idleStateHandler");
        }
        channel.pipeline().addFirst("idleStateHandler", new IdleStateHandler(round, 0, 0));
    }

    private void storeWillMessage(String str, MqttConnectMessage mqttConnectMessage) {
        if (mqttConnectMessage.variableHeader().isWillFlag()) {
            MqttQoS valueOf = MqttQoS.valueOf(mqttConnectMessage.variableHeader().willQos());
            byte[] willMessageInBytes = mqttConnectMessage.payload().willMessageInBytes();
            this.willStore.put(str, new WillMessage(mqttConnectMessage.payload().willTopic(), (ByteBuffer) ByteBuffer.allocate(willMessageInBytes.length).put(willMessageInBytes).flip(), mqttConnectMessage.variableHeader().isWillRetain(), valueOf));
            LOG.info("Latest will message stored for client: <{}>", str);
        }
    }

    private void addConnection(Channel channel, MqttConnectMessage mqttConnectMessage, String str) {
        String str2 = "";
        String str3 = "";
        if (mqttConnectMessage.variableHeader().hasUserName() && mqttConnectMessage.variableHeader().hasPassword()) {
            str2 = mqttConnectMessage.payload().userName();
            str3 = mqttConnectMessage.payload().password();
        }
        MqttConnection mqttConnection = new MqttConnection(str, str2, str3, mqttConnectMessage.variableHeader().isCleanSession(), mqttConnectMessage.variableHeader().version(), mqttConnectMessage.variableHeader().isWillRetain(), mqttConnectMessage.variableHeader().willQos(), mqttConnectMessage.variableHeader().isWillFlag(), mqttConnectMessage.variableHeader().keepAliveTimeSeconds(), channel);
        mqttConnection.setAddress(IpUtil.toByte((InetSocketAddress) channel.remoteAddress()));
        mqttConnection.setServerAddress(IpUtil.toByte((InetSocketAddress) channel.localAddress()));
        MqttConnection addConnection = this.connectionManager.addConnection(mqttConnection);
        if (addConnection != null) {
            LOG.warn("重复clientID的connection连接: <{}>, 需要断开或者重置. 新建的client连接: <{}>", addConnection, mqttConnection);
            addConnection.getChannel().close().addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            this.connectionManager.removeConnection(addConnection);
            this.connectionManager.addConnection(mqttConnection);
        }
    }

    public void processDisconnect(Channel channel) {
        String attrClientId = NettyAttrManager.getAttrClientId(channel);
        cleanWillMessage(attrClientId);
        channel.flush();
        channel.close().addListener(ChannelFutureListener.CLOSE);
        LOG.info("Disconnect successful, clientID: {}", attrClientId);
    }

    private void cleanWillMessage(String str) {
        this.willStore.remove(str);
    }

    public void processPublish(Channel channel, MqttPublishMessage mqttPublishMessage) {
        String attrClientId = NettyAttrManager.getAttrClientId(channel);
        if (Strings.isNullOrEmpty(attrClientId)) {
            LOG.error("ClientID is null or empty for publish, aborting... publishEvent message trace: <{}>", mqttPublishMessage.toString());
            channel.close().addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            return;
        }
        try {
            String str = mqttPublishMessage.variableHeader().topicName();
            MqttConnection connection = this.connectionManager.getConnection(attrClientId);
            if (connection == null) {
                LOG.error("Client connection is null for publish, clientID: {}", attrClientId);
                channel.close().addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                return;
            }
            String application = connection.getApplication();
            if (Strings.isNullOrEmpty(application)) {
                LOG.error("Client application is null for publish, clientID: {}", attrClientId);
                channel.close().addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                Producer producer = this.producerManager.getProducer(attrClientId, application, str);
                if (producer == null) {
                    throw new Exception("MessageProducer instance null, please check producer create & start...");
                }
                this.messagePublisher.publishMessage(producer, channel, mqttPublishMessage);
            }
        } catch (Throwable th) {
            LOG.error("process Public Message Error!", th);
            channel.close().addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        }
    }

    public void processPubAck(Channel channel, MqttPubAckMessage mqttPubAckMessage) {
        short messageId = (short) mqttPubAckMessage.variableHeader().messageId();
        this.consumerManager.acknowledge(NettyAttrManager.getAttrClientId(channel), messageId);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received PubAck packageID: {}" + ((int) messageId));
        }
    }

    public void processSubscribe(Channel channel, MqttSubscribeMessage mqttSubscribeMessage) {
        ArrayList arrayList = new ArrayList();
        String attrClientId = NettyAttrManager.getAttrClientId(channel);
        if (this.connectionManager.isConnected(attrClientId)) {
            MqttConnection connection = this.connectionManager.getConnection(attrClientId);
            if (LOG.isDebugEnabled()) {
                LOG.debug("处理subscribe数据包, clientID: {}, cleanSession: {}", attrClientId, Boolean.valueOf(connection.isCleanSession()));
            }
            List<MqttTopicSubscription> list = mqttSubscribeMessage.payload().topicSubscriptions();
            LOG.info("Subscribe topics: {}, clientID: {}", list, attrClientId);
            try {
                if (null != list) {
                    Set<MqttSubscription> subscribe = subscribe(list, attrClientId, connection.getClientGroupName(), arrayList);
                    MqttSession session = this.sessionManager.getSession(attrClientId);
                    if (session != null) {
                        Iterator<MqttSubscription> it = subscribe.iterator();
                        while (it.hasNext()) {
                            session.addSubscription(it.next());
                        }
                    }
                } else {
                    this.consumerManager.stopConsume(attrClientId);
                    this.sessionManager.removeSession(attrClientId);
                    connection.getChannel().close().addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    this.connectionManager.removeConnection(connection);
                    channel.close().addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                }
            } catch (Exception e) {
                LOG.error("subscribe is error!");
                if (arrayList.size() < list.size()) {
                    for (int size = list.size() - arrayList.size(); size > 0; size--) {
                        arrayList.add(Integer.valueOf(MqttQoS.FAILURE.value()));
                    }
                }
            }
        }
        MqttSubAckMessage newMessage = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(mqttSubscribeMessage.variableHeader().messageId()), new MqttSubAckPayload(arrayList));
        LOG.info("SUBSCRIBE successful, subscribe result: {}", arrayList);
        channel.writeAndFlush(newMessage);
    }

    private Set<MqttSubscription> subscribe(List<MqttTopicSubscription> list, String str, String str2, List<Integer> list2) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (MqttTopicSubscription mqttTopicSubscription : list) {
            if (new TopicFilter(mqttTopicSubscription.topicName()).isValid()) {
                arrayList.add(new MqttSubscription(str, new TopicFilter(mqttTopicSubscription.topicName()), mqttTopicSubscription.qualityOfService()));
                list2.add(Integer.valueOf(MqttQoS.AT_LEAST_ONCE.value()));
            } else {
                list2.add(Integer.valueOf(MqttQoS.FAILURE.value()));
                LOG.warn("topic filter[{}] of clientID[{}] is invalid", mqttTopicSubscription.topicName(), str);
            }
        }
        LOG.info("Do subscribe topics: {}, clientGroup: {}", arrayList, str2);
        return this.subscriptionManager.subscribes(str2, arrayList);
    }

    public void processUnsubscribe(Channel channel, MqttUnsubscribeMessage mqttUnsubscribeMessage) {
        String attrClientId = NettyAttrManager.getAttrClientId(channel);
        int messageId = mqttUnsubscribeMessage.variableHeader().messageId();
        MqttQoS qosLevel = mqttUnsubscribeMessage.fixedHeader().qosLevel();
        if (LOG.isDebugEnabled()) {
            LOG.debug("处理unSubscribe数据包, clientID: {}, packageId: {}, Qos: {}", new Object[]{attrClientId, Integer.valueOf(messageId), qosLevel});
        }
        if (this.connectionManager.isConnected(attrClientId)) {
            MqttConnection connection = this.connectionManager.getConnection(attrClientId);
            List<String> list = mqttUnsubscribeMessage.payload().topics();
            LOG.info("UnSubscribe topics: {}", list);
            try {
                if (list != null) {
                    Set<MqttSubscription> unSubscribe = unSubscribe(list, attrClientId, connection.getClientGroupName());
                    MqttSession session = this.sessionManager.getSession(attrClientId);
                    if (session != null) {
                        Iterator<MqttSubscription> it = unSubscribe.iterator();
                        while (it.hasNext()) {
                            session.removeSubscription(it.next());
                        }
                    }
                } else {
                    this.consumerManager.stopConsume(attrClientId);
                    this.sessionManager.removeSession(attrClientId);
                    connection.getChannel().close().addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    this.connectionManager.removeConnection(connection);
                    channel.close().addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                }
            } catch (Exception e) {
                LOG.error("unSubscribe is error!");
            }
        }
        sendUnSubAck(channel, messageId, qosLevel);
    }

    private void sendUnSubAck(Channel channel, int i, MqttQoS mqttQoS) {
        MqttMessage newMessage = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.UNSUBACK, false, mqttQoS, false, 0), MqttMessageIdVariableHeader.from(i), (Object) null);
        LOG.info("UNSUBSCRIBE successful, packageID: {}", Integer.valueOf(i));
        channel.writeAndFlush(newMessage);
    }

    private Set<MqttSubscription> unSubscribe(List<String> list, String str, String str2) throws Exception {
        HashSet hashSet = new HashSet(list.size());
        for (String str3 : list) {
            if (TopicFilter.isValid(str3)) {
                MqttSession session = this.sessionManager.getSession(str);
                if (session != null) {
                    for (MqttSubscription mqttSubscription : session.listSubsciptions()) {
                        if (mqttSubscription.getTopicFilter().toString().equals(str3) || mqttSubscription.getTopicFilter().match(new TopicFilter(str3))) {
                            hashSet.add(mqttSubscription);
                        }
                    }
                }
            } else {
                LOG.warn("topic filter[{}] of clientID[{}] is invalid", str3, str);
            }
        }
        if (hashSet.isEmpty()) {
            LOG.warn("topic filter for client: <{}> may be null, the topicFilters is <{}>", str, list);
            return new HashSet();
        }
        this.subscriptionManager.unSubscribe(str2, hashSet);
        return hashSet;
    }
}
