package com.feingto.iot.server.handler.mqtt;

import com.feingto.iot.common.Constants;
import com.feingto.iot.common.model.mqtt.SendMessage;
import com.feingto.iot.common.model.mqtt.SessionStore;
import com.feingto.iot.common.model.mqtt.SubscribeMessage;
import com.feingto.iot.common.service.IAuth;
import com.feingto.iot.common.service.mqtt.MessageRequest;
import com.feingto.iot.common.service.mqtt.MessageResponse;
import com.feingto.iot.server.cache.MessageCache;
import com.feingto.iot.server.cache.SessionCache;
import com.feingto.iot.server.cache.SubscribeCache;
import com.feingto.iot.server.handler.BaseMessageHandler;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Resource;
import org.apache.ignite.IgniteCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@Component
/* loaded from: input_file:BOOT-INF/classes/com/feingto/iot/server/handler/mqtt/ConnectHandler.class */
public class ConnectHandler extends BaseMessageHandler {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConnectHandler.class);

    @Resource(name = "igniteSubscribe")
    private IgniteCache<String, ConcurrentHashMap<String, SubscribeMessage>> igniteSubscribe;

    @Resource(name = "igniteMessage")
    private IgniteCache<String, ConcurrentHashMap<Integer, SendMessage>> igniteMessage;

    @Resource(name = "authService")
    private IAuth authService;

    public ConnectHandler() {
        super(MqttMessageType.CONNECT);
    }

    @Override // com.feingto.iot.common.handler.MessageHandler
    public void handle(Channel channel, Object obj) {
        MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) obj;
        MqttConnectPayload payload = mqttConnectMessage.payload();
        String clientIdentifier = payload.clientIdentifier();
        if (StringUtils.isEmpty(clientIdentifier)) {
            MessageResponse.connack(channel, MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
            channel.close();
            return;
        }
        MqttConnectVariableHeader variableHeader = mqttConnectMessage.variableHeader();
        if (variableHeader.hasUserName() && variableHeader.hasPassword() && !this.authService.authorized(payload.userName(), payload.passwordInBytes())) {
            MessageResponse.connack(channel, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
            channel.close();
            return;
        }
        Optional.ofNullable(SessionCache.getInstance().get(clientIdentifier)).ifPresent(sessionStore -> {
            if (sessionStore.cleanSession()) {
                log.debug(">>> clean the previous session of the {}", clientIdentifier);
                SessionCache.getInstance().remove(clientIdentifier);
                SubscribeCache.getInstance(this.igniteSubscribe).remove(clientIdentifier);
                MessageCache.getInstance(this.igniteMessage).remove(clientIdentifier);
            }
            sessionStore.channel().close();
        });
        SessionStore cleanSession = new SessionStore().cleanSession(variableHeader.isCleanSession());
        if (variableHeader.isWillFlag()) {
            log.debug(">>> save will message of the {}", clientIdentifier);
            cleanSession.willMessage(new MqttPublishMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(variableHeader.willQos()), variableHeader.isWillRetain(), 0), new MqttPublishVariableHeader(payload.willTopic(), 0), Unpooled.buffer().writeBytes(payload.willMessageInBytes())));
        }
        if (variableHeader.keepAliveTimeSeconds() > 0) {
            channel.pipeline().addFirst(new IdleStateHandler(0, 0, Math.round(variableHeader.keepAliveTimeSeconds() * 1.5f)));
        }
        channel.attr(Constants.KEY_CLIENT_ID).set(clientIdentifier);
        SessionCache.getInstance().put(clientIdentifier, cleanSession.channel(channel));
        MessageResponse.connack(channel, MqttConnectReturnCode.CONNECTION_ACCEPTED);
        if (variableHeader.isCleanSession()) {
            return;
        }
        MessageCache.getInstance(this.igniteMessage).findBylientId(clientIdentifier).stream().filter(sendMessage -> {
            return MqttMessageType.PUBLISH.equals(sendMessage.type()) || MqttMessageType.PUBREL.equals(sendMessage.type());
        }).forEach(sendMessage2 -> {
            MessageRequest.publish(channel, sendMessage2);
        });
    }
}
