package com.feingto.iot.server.handler;

import com.feingto.iot.common.handler.MessageHandler;
import com.feingto.iot.common.model.mqtt.SendMessage;
import com.feingto.iot.common.model.mqtt.SubscribeMessage;
import com.feingto.iot.server.cache.MessageCache;
import com.feingto.iot.server.cache.SessionCache;
import com.feingto.iot.server.cache.SubscribeCache;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttMessageType;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.IgniteCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/com/feingto/iot/server/handler/BaseMessageHandler.class */
public abstract class BaseMessageHandler implements MessageHandler {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) BaseMessageHandler.class);
    private MqttMessageType type;
    private List<BaseMessageHandler> filters = new ArrayList();
    protected IgniteCache<String, ConcurrentHashMap<String, SubscribeMessage>> igniteSubscribe;
    protected IgniteCache<String, SendMessage> igniteRetained;
    protected IgniteCache<String, ConcurrentHashMap<Integer, SendMessage>> igniteMessage;

    public BaseMessageHandler(MqttMessageType mqttMessageType) {
        this.type = mqttMessageType;
    }

    public BaseMessageHandler filter(BaseMessageHandler baseMessageHandler) {
        if (this.igniteSubscribe != null) {
            baseMessageHandler.igniteSubscribe(this.igniteSubscribe);
        }
        if (this.igniteRetained != null) {
            baseMessageHandler.igniteRetained(this.igniteRetained);
        }
        if (this.igniteMessage != null) {
            baseMessageHandler.igniteMessage(this.igniteMessage);
        }
        this.filters.add(baseMessageHandler);
        return this;
    }

    public void proceed(MqttMessageType mqttMessageType, Channel channel, Object obj) {
        if (this.type.equals(mqttMessageType)) {
            log.debug(">>> message type: {}", mqttMessageType);
            handle(channel, obj);
        }
        this.filters.stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).filter(baseMessageHandler -> {
            return mqttMessageType.equals(baseMessageHandler.type);
        }).findFirst().ifPresent(baseMessageHandler2 -> {
            baseMessageHandler2.proceed(mqttMessageType, channel, obj);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void cleanSession(String str) {
        SessionCache.getInstance().remove(str);
        SubscribeCache.getInstance(this.igniteSubscribe).remove(str);
        MessageCache.getInstance(this.igniteMessage).remove(str);
    }

    public BaseMessageHandler igniteSubscribe(IgniteCache<String, ConcurrentHashMap<String, SubscribeMessage>> igniteCache) {
        this.igniteSubscribe = igniteCache;
        return this;
    }

    public BaseMessageHandler igniteRetained(IgniteCache<String, SendMessage> igniteCache) {
        this.igniteRetained = igniteCache;
        return this;
    }

    public BaseMessageHandler igniteMessage(IgniteCache<String, ConcurrentHashMap<Integer, SendMessage>> igniteCache) {
        this.igniteMessage = igniteCache;
        return this;
    }
}
