/*
 * Decompiled with CFR 0.152.
 */
package com.feingto.iot.server.handler;

import com.feingto.iot.common.handler.MessageHandler;
import com.feingto.iot.common.model.mqtt.SendMessage;
import com.feingto.iot.common.model.mqtt.SubscribeMessage;
import com.feingto.iot.server.cache.MessageCache;
import com.feingto.iot.server.cache.SessionCache;
import com.feingto.iot.server.cache.SubscribeCache;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttMessageType;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.IgniteCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseMessageHandler
implements MessageHandler {
    private static final Logger log = LoggerFactory.getLogger(BaseMessageHandler.class);
    private MqttMessageType type;
    private List<BaseMessageHandler> filters = new ArrayList();
    protected IgniteCache<String, ConcurrentHashMap<String, SubscribeMessage>> igniteSubscribe;
    protected IgniteCache<String, SendMessage> igniteRetained;
    protected IgniteCache<String, ConcurrentHashMap<Integer, SendMessage>> igniteMessage;

    public BaseMessageHandler(MqttMessageType type) {
        this.type = type;
    }

    public BaseMessageHandler filter(BaseMessageHandler filter) {
        if (this.igniteSubscribe != null) {
            filter.igniteSubscribe(this.igniteSubscribe);
        }
        if (this.igniteRetained != null) {
            filter.igniteRetained(this.igniteRetained);
        }
        if (this.igniteMessage != null) {
            filter.igniteMessage(this.igniteMessage);
        }
        this.filters.add(filter);
        return this;
    }

    public void proceed(MqttMessageType type, Channel channel, Object object) {
        if (this.type.equals((Object)type)) {
            log.debug(">>> message type: {}", (Object)type);
            this.handle(channel, object);
        }
        this.filters.stream().filter(Objects::nonNull).filter((? super T filter) -> type.equals((Object)filter.type)).findFirst().ifPresent(filter -> filter.proceed(type, channel, object));
    }

    protected void cleanSession(String clientId) {
        SessionCache.getInstance().remove(clientId);
        SubscribeCache.getInstance((IgniteCache)this.igniteSubscribe).remove(clientId);
        MessageCache.getInstance((IgniteCache)this.igniteMessage).remove(clientId);
    }

    public BaseMessageHandler igniteSubscribe(IgniteCache<String, ConcurrentHashMap<String, SubscribeMessage>> igniteSubscribe) {
        this.igniteSubscribe = igniteSubscribe;
        return this;
    }

    public BaseMessageHandler igniteRetained(IgniteCache<String, SendMessage> igniteRetained) {
        this.igniteRetained = igniteRetained;
        return this;
    }

    public BaseMessageHandler igniteMessage(IgniteCache<String, ConcurrentHashMap<Integer, SendMessage>> igniteMessage) {
        this.igniteMessage = igniteMessage;
        return this;
    }
}

