/*
 * Decompiled with CFR 0.152.
 */
package gu.simplemq.jms;

import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
import gu.simplemq.IAdvisor;
import gu.simplemq.jms.AutoReconnectAdapter;
import gu.simplemq.jms.ExceptionListenerContainer;
import gu.simplemq.jms.JMSReconnectCallback;
import gu.simplemq.jms.JmsConstants;
import gu.simplemq.jms.JmsPoolLazy;
import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;

public class AdvisoryMessageManager
implements AutoCloseable,
IAdvisor,
JmsConstants,
MessageListener,
JMSReconnectCallback {
    private final ConcurrentMap<String, Message> advisoryMessages = Maps.newConcurrentMap();
    public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.Consumer.Queue.";
    public static final String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.Consumer.Topic.";
    private static final LoadingCache<JmsPoolLazy, AdvisoryMessageManager> CACHE = CacheBuilder.newBuilder().build(new CacheLoader<JmsPoolLazy, AdvisoryMessageManager>(){

        @Override
        public AdvisoryMessageManager load(JmsPoolLazy key) throws Exception {
            return new AdvisoryMessageManager(key);
        }
    });
    private volatile Connection advisoryConnection;
    private volatile Session advisorySession;
    private final JmsPoolLazy poolLazy;
    private final AutoReconnectAdapter autoReconnectAdapter = new AutoReconnectAdapter(this);

    private AdvisoryMessageManager(JmsPoolLazy poolLazy) {
        this.poolLazy = Preconditions.checkNotNull(poolLazy, "poolLazy is null");
        this.init();
    }

    private void init() {
        try {
            this.advisoryConnection = this.poolLazy.borrow();
            new ExceptionListenerContainer(this.autoReconnectAdapter).bind(this.advisoryConnection);
            this.advisoryConnection.start();
            this.advisorySession = this.advisoryConnection.createSession(Boolean.FALSE, 1);
            AdvisoryMessageManager.sub(this.advisorySession, "ActiveMQ.Advisory.Consumer.Topic.*", this);
            AdvisoryMessageManager.sub(this.advisorySession, "ActiveMQ.Advisory.Consumer.Queue.*", this);
        }
        catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void uninit() {
        AdvisoryMessageManager advisoryMessageManager = this;
        synchronized (advisoryMessageManager) {
            if (null != this.advisoryConnection) {
                this.poolLazy.release(this.advisoryConnection);
                this.advisoryConnection = null;
            }
        }
    }

    private static void sub(Session session, String topic, MessageListener listener) throws JMSException {
        Topic t = session.createTopic(topic);
        MessageConsumer consumer = session.createConsumer(t);
        consumer.setMessageListener(listener);
    }

    private static String getAdvisoryTopic(String destination, String prefix) {
        return prefix + destination.replaceAll(",", "&sbquo;");
    }

    Message advisoryMessageOfTopic(String name) {
        String advisoryTopic = AdvisoryMessageManager.getAdvisoryTopic(name, TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX);
        return (Message)this.advisoryMessages.get(advisoryTopic);
    }

    Message advisoryMessageOfQueue(String name) {
        String advisoryTopic = AdvisoryMessageManager.getAdvisoryTopic(name, QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX);
        return (Message)this.advisoryMessages.get(advisoryTopic);
    }

    @Override
    public void close() {
        this.uninit();
        CACHE.asMap().remove(this.poolLazy);
    }

    @Override
    public int consumerCountOf(String channelName) {
        try {
            Message message = this.advisoryMessageOfQueue(channelName);
            return message == null ? 0 : message.getIntProperty("consumerCount");
        }
        catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public int subscriberCountOf(String channelName) {
        try {
            Message message = this.advisoryMessageOfTopic(channelName);
            return message == null ? 0 : message.getIntProperty("consumerCount");
        }
        catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void onMessage(Message message) {
        try {
            String dest = this.poolLazy.runtimeContext.destNameOf(message);
            this.advisoryMessages.put(dest, message);
        }
        catch (JMSException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void onConnectionLost() throws Exception {
        this.uninit();
    }

    @Override
    public void tryReconnecting() throws Exception {
        this.init();
    }

    @Override
    public String ownerName() {
        return this.getClass().getSimpleName();
    }

    public static AdvisoryMessageManager instanceOf(JmsPoolLazy pool) {
        return CACHE.getUnchecked(pool);
    }

    public static synchronized void closeAll() {
        Iterator itor = CACHE.asMap().values().iterator();
        while (itor.hasNext()) {
            AdvisoryMessageManager p = (AdvisoryMessageManager)itor.next();
            itor.remove();
            p.close();
        }
    }
}

