package org.carewebframework.amqp.rabbitmq;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.carewebframework.api.event.AbstractGlobalEventDispatcher;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

/* loaded from: input_file:org/carewebframework/amqp/rabbitmq/GlobalEventDispatcher.class */
public class GlobalEventDispatcher extends AbstractGlobalEventDispatcher implements MessageListener {
    private static final Log log = LogFactory.getLog(GlobalEventDispatcher.class);
    private final Map<String, Subscriber> subscribers = Collections.synchronizedMap(new HashMap());
    private Broker broker;

    protected String getNodeId() {
        return this.broker.getExchange().getName();
    }

    public void destroy() {
        super.destroy();
        removeSubscriptions();
    }

    private void removeSubscriptions() {
        Iterator<Subscriber> it = this.subscribers.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().stop();
            } catch (Throwable th) {
                log.debug("Error closing subscriber", th);
            }
        }
        this.subscribers.clear();
    }

    public void subscribeRemoteEvent(String str, boolean z) {
        try {
            if (z) {
                doHostSubscribe(str);
            } else {
                doHostUnsubscribe(str);
            }
        } catch (AmqpException e) {
            log.error(e);
        }
    }

    private void doHostSubscribe(String str) throws AmqpException {
        if (this.subscribers.get(str) != null) {
            if (log.isDebugEnabled()) {
                log.debug(String.format("Already subscribed to Topic[%s]", str));
                return;
            }
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug(String.format("Subscribing to Topic[%s]", str));
        }
        this.broker.declareEventQueue(str);
        Subscriber subscriber = new Subscriber(this.publisherInfo);
        subscriber.setMessageListener(this);
        subscriber.setConnectionFactory(this.broker.getRabbitTemplate().getConnectionFactory());
        subscriber.setQueueNames(new String[]{str});
        subscriber.start();
        this.subscribers.put(str, subscriber);
    }

    private void doHostUnsubscribe(String str) throws AmqpException {
        Subscriber remove = this.subscribers.remove(str);
        if (remove != null) {
            log.debug(String.format("Unsubscribing Subscriber[%s] for Topic [%s].", remove, str));
            remove.stop();
        }
    }

    public void fireRemoteEvent(String str, Serializable serializable, String str2) {
        try {
            doFireRemoteEvent(str, serializable, str2);
        } catch (AmqpException e) {
            log.error("Error firing remote event.", e);
        }
    }

    private void doFireRemoteEvent(String str, Object obj, String str2) throws AmqpException {
        this.broker.sendEvent(str, obj, this.publisherInfo.getEndpointId(), str2);
    }

    public void onMessage(Message message) {
        if (log.isDebugEnabled()) {
            log.debug("Message received: " + message);
        }
        processMessage(message);
    }

    protected void processMessage(Message message) {
        try {
            localEventDelivery(message.getMessageProperties().getReceivedRoutingKey(), this.broker.getRabbitTemplate().getMessageConverter().fromMessage(message));
        } catch (Exception e) {
            log.error("Error during local dispatch of global event.", e);
        }
    }

    protected boolean beginMessageProcessing() {
        return true;
    }

    protected void endMessageProcessing() {
    }

    public void setBroker(Broker broker) {
        this.broker = broker;
    }
}
