package com.feingto.iot.server.handler.mqtt;

import com.feingto.iot.common.model.mqtt.SendMessage;
import com.feingto.iot.common.service.mqtt.MessageResponse;
import com.feingto.iot.server.cache.MessageCache;
import com.feingto.iot.server.cache.RetainedCache;
import com.feingto.iot.server.cache.SubscribeCache;
import com.feingto.iot.server.handler.BaseMessageHandler;
import com.feingto.iot.server.service.PushService;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/com/feingto/iot/server/handler/mqtt/PublishHandler.class */
public class PublishHandler extends BaseMessageHandler {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PublishHandler.class);
    private final PushService pushService;

    public PublishHandler(PushService pushService) {
        super(MqttMessageType.PUBLISH);
        this.pushService = pushService;
    }

    @Override // com.feingto.iot.common.handler.MessageHandler
    public void handle(Channel channel, Object obj) {
        MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) obj;
        mqttPublishMessage.fixedHeader();
        MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader();
        SendMessage newInstance = SendMessage.newInstance(mqttPublishMessage);
        if (newInstance.retain()) {
            log.debug(">>> save retain message on {}", newInstance.topic());
            RetainedCache.getInstance(this.igniteRetained).put(newInstance.topic(), newInstance);
        }
        newInstance.retain(false);
        switch (r0.qosLevel()) {
            case AT_MOST_ONCE:
                this.pushService.internalSend(newInstance);
                return;
            case AT_LEAST_ONCE:
                storeMessage(newInstance, MqttMessageType.PUBLISH);
                this.pushService.internalSend(newInstance);
                MessageResponse.puback(channel, MqttMessageType.PUBACK, MqttQoS.AT_MOST_ONCE, variableHeader.packetId());
                return;
            case EXACTLY_ONCE:
                storeMessage(newInstance, MqttMessageType.PUBREC);
                MessageResponse.puback(channel, MqttMessageType.PUBREC, MqttQoS.AT_LEAST_ONCE, variableHeader.packetId());
                return;
            case FAILURE:
            default:
                return;
        }
    }

    private void storeMessage(SendMessage sendMessage, MqttMessageType mqttMessageType) {
        SubscribeCache.getInstance(this.igniteSubscribe).get(sendMessage.topic()).forEach(subscribeMessage -> {
            MessageCache.getInstance(this.igniteMessage).put(subscribeMessage.clientId(), sendMessage.type(mqttMessageType));
        });
    }
}
