package gu.simplemq.stomp;

import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import gu.simplemq.IAdvisor;
import gu.simplemq.json.BaseJsonEncoder;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.projectodd.stilts.stomp.StompException;
import org.projectodd.stilts.stomp.StompMessage;
import org.projectodd.stilts.stomp.Subscription;
import org.projectodd.stilts.stomp.client.ClientSubscription;
import org.projectodd.stilts.stomp.client.MessageHandler;
import org.projectodd.stilts.stomp.client.StompClient;
import org.projectodd.stilts.stomp.client.SubscriptionBuilder;

/* loaded from: input_file:gu/simplemq/stomp/AdvisoryMessageManager.class */
public class AdvisoryMessageManager implements AutoCloseable, IAdvisor, StompConstants, MessageHandler {
    private static final String FIELD_ConsumerInfo = "ConsumerInfo";
    private static final String FIELD_RemoveInfo = "RemoveInfo";
    private static final String TOPIC_PREIFX = "/topic/ActiveMQ.Advisory.Consumer.Topic.";
    private static final String QUEUE_PREIFX = "/topic/ActiveMQ.Advisory.Consumer.Queue.";
    private static final String ADVISORY_CONSUMER_TOPIC_PATTERN = "/topic/ActiveMQ.Advisory.Consumer.Topic.*";
    private static final String ADVISORY_CONSUMER_QUEUE_PATTERN = "/topic/ActiveMQ.Advisory.Consumer.Queue.*";
    private final ConcurrentMap<String, Set<String>> advisoryConsumers;
    private static final LoadingCache<StompPoolLazy, AdvisoryMessageManager> CACHE = CacheBuilder.newBuilder().build(new CacheLoader<StompPoolLazy, AdvisoryMessageManager>() { // from class: gu.simplemq.stomp.AdvisoryMessageManager.1
        @Override // com.google.common.cache.CacheLoader
        public AdvisoryMessageManager load(StompPoolLazy stompPoolLazy) throws Exception {
            return new AdvisoryMessageManager(stompPoolLazy);
        }
    });
    private final StompClient stompClient;
    private final StompPoolLazy pool;
    private final ClientSubscription topicSub;
    private final ClientSubscription queueSub;

    private AdvisoryMessageManager(StompPoolLazy stompPoolLazy) {
        this.advisoryConsumers = Maps.newConcurrentMap();
        this.pool = (StompPoolLazy) Preconditions.checkNotNull(stompPoolLazy, "pool is null");
        this.stompClient = this.pool.borrow();
        this.topicSub = sub(ADVISORY_CONSUMER_TOPIC_PATTERN);
        this.queueSub = sub(ADVISORY_CONSUMER_QUEUE_PATTERN);
    }

    ClientSubscription sub(String str) {
        SubscriptionBuilder withAckMode = this.stompClient.subscribe(str).withMessageHandler(this).withAckMode(Subscription.AckMode.CLIENT_INDIVIDUAL);
        for (Map.Entry<String, String> entry : this.pool.getHeaders().entrySet()) {
            withAckMode.withHeader(entry.getKey(), entry.getValue());
        }
        try {
            return withAckMode.start();
        } catch (StompException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        synchronized (this) {
            try {
                if (this.stompClient.isConnected()) {
                    if (this.topicSub.isActive()) {
                        this.topicSub.unsubscribe();
                    }
                    if (this.queueSub.isActive()) {
                        this.queueSub.unsubscribe();
                    }
                    this.pool.release(this.stompClient);
                }
            } catch (StompException e) {
                e.printStackTrace();
            }
        }
    }

    @Override // gu.simplemq.IAdvisor
    public int consumerCountOf(String str) {
        Set<String> set = this.advisoryConsumers.get("/topic/ActiveMQ.Advisory.Consumer.Topic.Queue." + str);
        if (set == null) {
            return 0;
        }
        return set.size();
    }

    @Override // gu.simplemq.IAdvisor
    public int subscriberCountOf(String str) {
        Set<String> set = this.advisoryConsumers.get(TOPIC_PREIFX + str);
        if (set == null) {
            return 0;
        }
        return set.size();
    }

    public static AdvisoryMessageManager instanceOf(StompPoolLazy stompPoolLazy) {
        return CACHE.getUnchecked(stompPoolLazy);
    }

    public static synchronized void closeAll() {
        Iterator<AdvisoryMessageManager> it = CACHE.asMap().values().iterator();
        while (it.hasNext()) {
            AdvisoryMessageManager next = it.next();
            it.remove();
            next.close();
        }
    }

    @Override // org.projectodd.stilts.stomp.client.MessageHandler
    public void handle(StompMessage stompMessage) {
        JSONObject jSONObject = (JSONObject) BaseJsonEncoder.getEncoder().fromJson(stompMessage.getContentAsString(), JSONObject.class);
        if (jSONObject.containsKey(FIELD_ConsumerInfo)) {
            String string = ((JSONObject) ((JSONObject) jSONObject.getObject(FIELD_ConsumerInfo, JSONObject.class)).getObject("consumerId", JSONObject.class)).getString("connectionId");
            this.advisoryConsumers.putIfAbsent(stompMessage.getDestination(), Sets.newHashSet());
            this.advisoryConsumers.get(stompMessage.getDestination()).add(string);
        } else if (jSONObject.containsKey(FIELD_RemoveInfo)) {
            String string2 = ((JSONObject) ((JSONObject) jSONObject.getObject(FIELD_RemoveInfo, JSONObject.class)).getObject("objectId", JSONObject.class)).getString("connectionId");
            this.advisoryConsumers.putIfAbsent(stompMessage.getDestination(), Sets.newHashSet());
            this.advisoryConsumers.get(stompMessage.getDestination()).remove(string2);
        }
        synchronized (this) {
            notifyAll();
        }
    }
}
