package org.joyqueue.broker.mqtt.cluster;

import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.joyqueue.broker.BrokerContext;
import org.joyqueue.broker.mqtt.session.MqttSession;
import org.joyqueue.broker.mqtt.subscriptions.MqttSubscription;
import org.joyqueue.broker.mqtt.subscriptions.TopicFilter;
import org.joyqueue.domain.Subscription;
import org.joyqueue.domain.TopicName;
import org.joyqueue.nsr.NameService;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/mqtt/cluster/MqttSessionManager.class */
public class MqttSessionManager extends Service {
    private static final Logger LOG = LoggerFactory.getLogger(MqttSessionManager.class);
    private MqttConnectionManager connectionManager;
    private final ConcurrentMap<String, MqttSession> sessions = new ConcurrentHashMap();
    private NameService nameService;

    public MqttSessionManager(BrokerContext brokerContext, MqttConnectionManager mqttConnectionManager) {
        this.nameService = brokerContext.getNameService();
        this.connectionManager = mqttConnectionManager;
    }

    public MqttSession getSession(String str) {
        MqttSession mqttSession = this.sessions.get(str);
        if (mqttSession != null) {
            return mqttSession;
        }
        LOG.error("Can't find the session for client: <{}>", str);
        throw new RuntimeException("Can't find the session for client <" + str + ">");
    }

    public void addSession(String str, boolean z) {
        if (this.sessions.containsKey(str)) {
            MqttSession mqttSession = this.sessions.get(str);
            if (mqttSession.isCleanSession() != z) {
                mqttSession.setCleanSession(z);
            }
        } else {
            this.sessions.put(str, new MqttSession(str, z));
        }
        if (z) {
            return;
        }
        MqttSession mqttSession2 = this.sessions.get(str);
        String clientGroupName = this.connectionManager.getConnection(str).getClientGroupName();
        Set topics = this.nameService.getTopics(clientGroupName, Subscription.Type.CONSUMPTION);
        if (topics == null || topics.size() <= 0) {
            return;
        }
        Iterator it = topics.iterator();
        while (it.hasNext()) {
            mqttSession2.addSubscription(new MqttSubscription(str, new TopicFilter((String) it.next()), MqttQoS.AT_LEAST_ONCE));
        }
        LOG.info("Persistent client group: <{}>, recovery session: {}", clientGroupName, mqttSession2);
    }

    public void removeSession(String str) {
        MqttSession mqttSession = this.sessions.get(str);
        if (mqttSession != null) {
            if (mqttSession.isCleanSession()) {
                String clientGroupName = this.connectionManager.getConnection(str).getClientGroupName();
                Set<MqttSubscription> listSubsciptions = mqttSession.listSubsciptions();
                ArrayList arrayList = new ArrayList(listSubsciptions.size());
                Iterator<MqttSubscription> it = listSubsciptions.iterator();
                while (it.hasNext()) {
                    arrayList.add(new Subscription(TopicName.parse(it.next().getTopicFilter().toString()), clientGroupName, Subscription.Type.CONSUMPTION));
                }
                this.nameService.unSubscribe(arrayList);
            }
            this.sessions.remove(str);
        }
    }

    public boolean contains(String str) {
        return this.sessions.containsKey(str);
    }

    public Map<String, MqttSession> listSessions() {
        return this.sessions;
    }
}
