package com.feingto.iot.server.handler.mqtt;

import com.feingto.iot.common.Constants;
import com.feingto.iot.common.model.mqtt.SubscribeMessage;
import com.feingto.iot.common.service.mqtt.MessageResponse;
import com.feingto.iot.server.cache.RetainedCache;
import com.feingto.iot.server.cache.SubscribeCache;
import com.feingto.iot.server.handler.BaseMessageHandler;
import com.feingto.iot.server.handler.MqttHandlerChain;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/com/feingto/iot/server/handler/mqtt/SubscribeHandler.class */
public class SubscribeHandler extends BaseMessageHandler {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SubscribeHandler.class);

    public SubscribeHandler() {
        super(MqttMessageType.SUBSCRIBE);
    }

    @Override // com.feingto.iot.common.handler.MessageHandler
    public void handle(Channel channel, Object obj) {
        String str = (String) channel.attr(Constants.KEY_CLIENT_ID).get();
        MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) obj;
        mqttSubscribeMessage.payload().topicSubscriptions().forEach(mqttTopicSubscription -> {
            SubscribeCache.getInstance(this.igniteSubscribe).put(mqttTopicSubscription.topicName(), new SubscribeMessage().clientId(str).topicName(mqttTopicSubscription.topicName()).mqttQoS(mqttTopicSubscription.qualityOfService().value()));
        });
        MessageResponse.suback(channel, (Iterable) mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription2 -> {
            return Integer.valueOf(mqttTopicSubscription2.qualityOfService().value());
        }).collect(Collectors.toList()), mqttSubscribeMessage.variableHeader().messageId());
        SubscribeCache.getInstance(this.igniteSubscribe).getByClientId(str).forEach(subscribeMessage -> {
            Optional.ofNullable(RetainedCache.getInstance(this.igniteRetained).get(subscribeMessage.topicName())).ifPresent(sendMessage -> {
                log.debug(">>> publish retained message to {}", str);
                MqttHandlerChain.getInstance().pushService.internalSend(sendMessage);
            });
        });
    }
}
