package org.joyqueue.broker.mqtt.cluster;

import io.netty.util.internal.ConcurrentSet;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.joyqueue.broker.BrokerContext;
import org.joyqueue.broker.mqtt.subscriptions.MqttSubscription;
import org.joyqueue.broker.mqtt.subscriptions.TopicFilter;
import org.joyqueue.domain.ClientType;
import org.joyqueue.domain.Subscription;
import org.joyqueue.domain.TopicName;
import org.joyqueue.nsr.NameService;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/mqtt/cluster/MqttSubscriptionManager.class */
public class MqttSubscriptionManager extends Service {
    private static Logger LOG = LoggerFactory.getLogger(MqttSubscriptionManager.class);
    private Set<String> topics = new ConcurrentSet();
    private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    private NameService nameService;

    /* loaded from: input_file:org/joyqueue/broker/mqtt/cluster/MqttSubscriptionManager$ScheduledTopicsUpdater.class */
    private class ScheduledTopicsUpdater implements Runnable {
        private ScheduledTopicsUpdater() {
        }

        @Override // java.lang.Runnable
        public void run() {
            Set allTopicCodes = MqttSubscriptionManager.this.nameService.getAllTopicCodes();
            if (allTopicCodes == null || allTopicCodes.size() <= 0) {
                MqttSubscriptionManager.LOG.info("Topic updater data empty.");
                return;
            }
            MqttSubscriptionManager.LOG.info("Topic updater data size: {}", Integer.valueOf(allTopicCodes.size()));
            Iterator it = allTopicCodes.iterator();
            while (it.hasNext()) {
                MqttSubscriptionManager.this.topics.add((String) it.next());
            }
        }
    }

    public MqttSubscriptionManager(BrokerContext brokerContext) {
        this.nameService = brokerContext.getNameService();
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.scheduler.scheduleWithFixedDelay(new ScheduledTopicsUpdater(), 0L, 60L, TimeUnit.SECONDS);
        LOG.info("subscribe service is started.");
    }

    protected void doStop() {
        super.doStop();
    }

    public Set<MqttSubscription> subscribes(String str, List<MqttSubscription> list) throws Exception {
        HashSet hashSet = new HashSet();
        if (list == null || list.isEmpty()) {
            LOG.info("Subscribe topic list empty, please check topicFilters: {}", list);
            return hashSet;
        }
        filterTopics(hashSet, list);
        if (hashSet.isEmpty()) {
            LOG.info("Subscribe topic list empty, please check topicFilters: {}", list);
            return hashSet;
        }
        ArrayList arrayList = new ArrayList(hashSet.size());
        Iterator<MqttSubscription> it = hashSet.iterator();
        while (it.hasNext()) {
            arrayList.add(new Subscription(TopicName.parse(it.next().getTopicFilter().toString()), str, Subscription.Type.CONSUMPTION));
        }
        this.nameService.subscribe(arrayList, ClientType.MQTT);
        return hashSet;
    }

    public void unSubscribe(String str, Set<MqttSubscription> set) throws Exception {
        if (set == null || set.isEmpty()) {
            LOG.info("UnSubscribe topic list empty, please check topicFilters: {}", set);
        }
        ArrayList arrayList = new ArrayList(set.size());
        Iterator<MqttSubscription> it = set.iterator();
        while (it.hasNext()) {
            arrayList.add(new Subscription(TopicName.parse(it.next().getTopicFilter().toString()), str, Subscription.Type.CONSUMPTION));
        }
        this.nameService.unSubscribe(arrayList);
    }

    private void filterTopics(Set<MqttSubscription> set, List<MqttSubscription> list) {
        if (this.topics.size() == 0) {
            set.addAll(list);
            return;
        }
        for (MqttSubscription mqttSubscription : list) {
            for (String str : this.topics) {
                TopicFilter topicFilter = new TopicFilter(str);
                try {
                    if (topicFilter.match(mqttSubscription.getTopicFilter())) {
                        set.add(new MqttSubscription(mqttSubscription.getClientId(), topicFilter, mqttSubscription.getRequestedQos()));
                    }
                } catch (Exception e) {
                    LOG.error("Topic meta data <{}> filter match subscription <{}> filter error: {}", new Object[]{str, mqttSubscription.getTopicFilter(), e});
                }
            }
        }
    }
}
