package cn.wizzer.iot.mqtt.server.broker.protocol;

import cn.hutool.core.util.StrUtil;
import cn.wizzer.iot.mqtt.server.common.message.IMessageIdService;
import cn.wizzer.iot.mqtt.server.common.message.IRetainMessageStoreService;
import cn.wizzer.iot.mqtt.server.common.subscribe.ISubscribeStoreService;
import cn.wizzer.iot.mqtt.server.common.subscribe.SubscribeStore;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.util.AttributeKey;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/wizzer/iot/mqtt/server/broker/protocol/Subscribe.class */
public class Subscribe {
    private static final Logger LOGGER = LoggerFactory.getLogger(Subscribe.class);
    private ISubscribeStoreService subscribeStoreService;
    private IMessageIdService messageIdService;
    private IRetainMessageStoreService retainMessageStoreService;

    public Subscribe(ISubscribeStoreService iSubscribeStoreService, IMessageIdService iMessageIdService, IRetainMessageStoreService iRetainMessageStoreService) {
        this.subscribeStoreService = iSubscribeStoreService;
        this.messageIdService = iMessageIdService;
        this.retainMessageStoreService = iRetainMessageStoreService;
    }

    public void processSubscribe(Channel channel, MqttSubscribeMessage mqttSubscribeMessage) {
        List<MqttTopicSubscription> list = mqttSubscribeMessage.payload().topicSubscriptions();
        if (!validTopicFilter(list)) {
            channel.close();
            return;
        }
        String str = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
        ArrayList arrayList = new ArrayList();
        list.forEach(mqttTopicSubscription -> {
            String str2 = mqttTopicSubscription.topicName();
            MqttQoS qualityOfService = mqttTopicSubscription.qualityOfService();
            this.subscribeStoreService.put(str2, new SubscribeStore(str, str2, qualityOfService.value()));
            arrayList.add(Integer.valueOf(qualityOfService.value()));
            LOGGER.debug("SUBSCRIBE - clientId: {}, topFilter: {}, QoS: {}", new Object[]{str, str2, Integer.valueOf(qualityOfService.value())});
        });
        channel.writeAndFlush(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(mqttSubscribeMessage.variableHeader().messageId()), new MqttSubAckPayload(arrayList)));
        list.forEach(mqttTopicSubscription2 -> {
            sendRetainMessage(channel, mqttTopicSubscription2.topicName(), mqttTopicSubscription2.qualityOfService());
        });
    }

    private boolean validTopicFilter(List<MqttTopicSubscription> list) {
        Iterator<MqttTopicSubscription> it = list.iterator();
        while (it.hasNext()) {
            String str = it.next().topicName();
            if (StrUtil.startWith(str, '+') || StrUtil.endWith(str, '/')) {
                return false;
            }
            if (StrUtil.contains(str, '#') && StrUtil.count(str, '#') > 1) {
                return false;
            }
            if (StrUtil.contains(str, '+') && StrUtil.count(str, '+') != StrUtil.count(str, "/+")) {
                return false;
            }
        }
        return true;
    }

    private void sendRetainMessage(Channel channel, String str, MqttQoS mqttQoS) {
        this.retainMessageStoreService.search(str).forEach(retainMessageStore -> {
            MqttQoS valueOf = retainMessageStore.getMqttQoS() > mqttQoS.value() ? mqttQoS : MqttQoS.valueOf(retainMessageStore.getMqttQoS());
            if (valueOf == MqttQoS.AT_MOST_ONCE) {
                MqttPublishMessage newMessage = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, false, valueOf, false, 0), new MqttPublishVariableHeader(retainMessageStore.getTopic(), 0), Unpooled.buffer().writeBytes(retainMessageStore.getMessageBytes()));
                LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}", new Object[]{(String) channel.attr(AttributeKey.valueOf("clientId")).get(), retainMessageStore.getTopic(), Integer.valueOf(valueOf.value())});
                channel.writeAndFlush(newMessage);
            }
            if (valueOf == MqttQoS.AT_LEAST_ONCE) {
                int nextMessageId = this.messageIdService.getNextMessageId();
                MqttPublishMessage newMessage2 = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, false, valueOf, false, 0), new MqttPublishVariableHeader(retainMessageStore.getTopic(), nextMessageId), Unpooled.buffer().writeBytes(retainMessageStore.getMessageBytes()));
                LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", new Object[]{(String) channel.attr(AttributeKey.valueOf("clientId")).get(), retainMessageStore.getTopic(), Integer.valueOf(valueOf.value()), Integer.valueOf(nextMessageId)});
                channel.writeAndFlush(newMessage2);
            }
            if (valueOf == MqttQoS.EXACTLY_ONCE) {
                int nextMessageId2 = this.messageIdService.getNextMessageId();
                MqttPublishMessage newMessage3 = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, false, valueOf, false, 0), new MqttPublishVariableHeader(retainMessageStore.getTopic(), nextMessageId2), Unpooled.buffer().writeBytes(retainMessageStore.getMessageBytes()));
                LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", new Object[]{(String) channel.attr(AttributeKey.valueOf("clientId")).get(), retainMessageStore.getTopic(), Integer.valueOf(valueOf.value()), Integer.valueOf(nextMessageId2)});
                channel.writeAndFlush(newMessage3);
            }
        });
    }
}
