package org.atmosphere.plugin.jms;

import ch.qos.logback.core.CoreConstants;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.InitialContext;
import org.atmosphere.cpr.AtmosphereConfig;
import org.atmosphere.cpr.BroadcastFilter;
import org.atmosphere.cpr.Broadcaster;
import org.atmosphere.cpr.ClusterBroadcastFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/atmosphere-jms-1.1.0.beta2.jar:org/atmosphere/plugin/jms/JMSFilter.class */
public class JMSFilter implements ClusterBroadcastFilter {
    private static final String JMS_TOPIC = JMSBroadcaster.class.getName() + ".topic";
    private static final String JNDI_NAMESPACE = JMSBroadcaster.class.getName() + ".JNDINamespace";
    private static final String JNDI_FACTORY_NAME = JMSBroadcaster.class.getName() + ".JNDIConnectionFactoryName";
    private static final String JNDI_TOPIC = JMSBroadcaster.class.getName() + ".JNDITopic";
    private static final Logger logger = LoggerFactory.getLogger(JMSFilter.class);
    private Connection connection;
    private Session session;
    private MessageConsumer consumer;
    private MessageProducer publisher;
    private String topicId;
    private String factoryName;
    private String namespace;
    private String clusterName;
    private Broadcaster bc;
    private final ConcurrentLinkedQueue<String> receivedMessages;

    public JMSFilter() {
        this(null);
    }

    public JMSFilter(Broadcaster broadcaster) {
        this(broadcaster, "atmosphere-framework");
    }

    public JMSFilter(Broadcaster broadcaster, String str) {
        this.topicId = "atmosphere";
        this.factoryName = "atmosphereFactory";
        this.namespace = "jms/";
        this.bc = null;
        this.receivedMessages = new ConcurrentLinkedQueue<>();
        this.bc = broadcaster;
        this.topicId = str;
    }

    @Override // org.atmosphere.cpr.ClusterBroadcastFilter
    public void setUri(String str) {
        this.clusterName = str;
    }

    @Override // org.atmosphere.cpr.BroadcastFilterLifecycle
    public void init(AtmosphereConfig atmosphereConfig) {
        if (atmosphereConfig != null) {
            try {
                if (atmosphereConfig.getInitParameter(JMS_TOPIC) != null) {
                    this.topicId = atmosphereConfig.getInitParameter(JMS_TOPIC);
                }
                if (atmosphereConfig.getInitParameter(JNDI_NAMESPACE) != null) {
                    this.namespace = atmosphereConfig.getInitParameter(JNDI_NAMESPACE);
                }
                if (atmosphereConfig.getInitParameter(JNDI_FACTORY_NAME) != null) {
                    this.factoryName = atmosphereConfig.getInitParameter(JNDI_FACTORY_NAME);
                }
                if (atmosphereConfig.getInitParameter(JNDI_TOPIC) != null) {
                    this.topicId = atmosphereConfig.getInitParameter(JNDI_TOPIC);
                }
            } catch (Throwable th) {
                throw new IllegalStateException("Unable to initialize JMSBroadcaster", th);
            }
        }
        String id = this.bc.getID();
        if (id.startsWith("/*")) {
            id = "atmosphere";
        }
        logger.info(String.format("Looking up Connection Factory %s", this.namespace + this.factoryName));
        InitialContext initialContext = new InitialContext();
        ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup(this.namespace + this.factoryName);
        logger.info(String.format("Looking up topic: %s", this.topicId));
        Topic topic = (Topic) initialContext.lookup(this.namespace + this.topicId);
        this.connection = connectionFactory.createConnection();
        this.session = this.connection.createSession(false, 1);
        logger.info(String.format("Create customer: %s", id));
        String format = String.format("BroadcasterId = '%s'", id);
        this.consumer = this.session.createConsumer(topic, format);
        this.consumer.setMessageListener(new MessageListener() { // from class: org.atmosphere.plugin.jms.JMSFilter.1
            public void onMessage(Message message) {
                try {
                    String text = ((TextMessage) message).getText();
                    if (text != null && JMSFilter.this.bc != null) {
                        JMSFilter.this.receivedMessages.offer(text);
                        JMSFilter.this.bc.broadcast(text);
                    }
                } catch (JMSException e) {
                    JMSFilter.logger.warn(CoreConstants.EMPTY_STRING, e);
                }
            }
        });
        this.publisher = this.session.createProducer(topic);
        this.connection.start();
        logger.info(String.format("JMS created for topic %s, with filter %s", this.topicId, format));
    }

    @Override // org.atmosphere.cpr.BroadcastFilterLifecycle
    public void destroy() {
    }

    @Override // org.atmosphere.cpr.BroadcastFilter
    public BroadcastFilter.BroadcastAction filter(Object obj, Object obj2) {
        if (!(obj2 instanceof String)) {
            return new BroadcastFilter.BroadcastAction(obj2);
        }
        String str = (String) obj2;
        if (!this.receivedMessages.remove(str)) {
            try {
                String id = this.bc.getID();
                if (id.startsWith("/*")) {
                    id = "atmosphere";
                }
                TextMessage createTextMessage = this.session.createTextMessage(str.toString());
                createTextMessage.setStringProperty("BroadcasterId", id);
                this.publisher.send(createTextMessage);
            } catch (JMSException e) {
                logger.warn("failed to publish message", e);
            }
        }
        return new BroadcastFilter.BroadcastAction(str);
    }

    @Override // org.atmosphere.cpr.ClusterBroadcastFilter
    public Broadcaster getBroadcaster() {
        return this.bc;
    }

    @Override // org.atmosphere.cpr.ClusterBroadcastFilter
    public void setBroadcaster(Broadcaster broadcaster) {
        this.bc = broadcaster;
    }
}
