/*
 * Decompiled with CFR 0.152.
 */
package com.feingto.iot.server.service;

import com.feingto.iot.common.model.mqtt.SendMessage;
import com.feingto.iot.common.model.mqtt.SubscribeMessage;
import com.feingto.iot.common.service.mqtt.MessageRequest;
import com.feingto.iot.server.cache.SessionCache;
import com.feingto.iot.server.cache.SubscribeCache;
import io.netty.channel.Channel;
import java.io.Serializable;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteMessaging;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class PushService {
    private static final Logger log = LoggerFactory.getLogger(PushService.class);
    private static final String LISTEN_TOPIC = "internal-mqtt-topic";
    @Resource(name="igniteSubscribe")
    protected IgniteCache<String, ConcurrentHashMap<String, SubscribeMessage>> igniteSubscribe;
    @Resource(name="igniteMessaging")
    private IgniteMessaging igniteMessaging;

    @PostConstruct
    public void initListen() {
        this.igniteMessaging.localListen((Object)LISTEN_TOPIC, (IgniteBiPredicate & Serializable)(nodeId, msg) -> {
            log.debug(">>> listen from cluster group nodeId: {}", nodeId);
            this.push((SendMessage)msg);
            return true;
        });
    }

    private void push(SendMessage mqttMessage) {
        SubscribeCache.getInstance((IgniteCache)this.igniteSubscribe).get(mqttMessage.topic()).forEach(subscribe -> Optional.ofNullable(SessionCache.getInstance().get(subscribe.clientId())).ifPresent(sess -> MessageRequest.publish((Channel)sess.channel(), (SendMessage)mqttMessage)));
    }

    public void internalSend(SendMessage mqttMessage) {
        if (CollectionUtils.isNotEmpty((Collection)this.igniteMessaging.clusterGroup().nodes())) {
            log.debug(">>> send to cluster group");
            this.igniteMessaging.send((Object)LISTEN_TOPIC, (Object)mqttMessage);
        } else {
            this.push(mqttMessage);
        }
    }
}

