package com.ning.metrics.collector.realtime;

import com.google.inject.Inject;
import com.mogwee.executors.FailsafeScheduledExecutor;
import com.ning.metrics.collector.binder.config.CollectorConfig;
import com.ning.metrics.serialization.event.Event;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.skife.config.ConfigurationObjectFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.weakref.jmx.Managed;

/* loaded from: input_file:com/ning/metrics/collector/realtime/EventQueueProcessorImpl.class */
public class EventQueueProcessorImpl implements EventQueueProcessor {
    private static final Logger log = LoggerFactory.getLogger(EventQueueProcessorImpl.class);
    private final ConfigurationObjectFactory configFactory;
    private final EventQueueConnection connection;
    private final AtomicBoolean enabled = new AtomicBoolean(false);
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private final Map<String, LocalQueueAndWorkers> queuesPerCategory = new HashMap();
    private final Object queueMapMonitor = new Object();
    private final AtomicReference<Set<String>> typesToCollect = new AtomicReference<>();
    private final GlobalEventQueueStats stats;
    private final EventFormatter eventFormatter;

    @Inject
    public EventQueueProcessorImpl(ConfigurationObjectFactory configurationObjectFactory, CollectorConfig collectorConfig, EventQueueConnectionFactory eventQueueConnectionFactory, GlobalEventQueueStats globalEventQueueStats) {
        this.configFactory = configurationObjectFactory;
        this.stats = globalEventQueueStats;
        this.enabled.set(collectorConfig.isActiveMQEnabled());
        String activeMQEventsToCollect = collectorConfig.getActiveMQEventsToCollect();
        this.typesToCollect.set(activeMQEventsToCollect == null ? new HashSet() : new HashSet(Arrays.asList(activeMQEventsToCollect.split("\\s*,\\s*"))));
        this.eventFormatter = new EventFormatter(collectorConfig);
        this.connection = eventQueueConnectionFactory.createConnection();
        new FailsafeScheduledExecutor(1, "EventQueueProcessorImpl").execute(new Runnable() { // from class: com.ning.metrics.collector.realtime.EventQueueProcessorImpl.1
            @Override // java.lang.Runnable
            public void run() {
                EventQueueProcessorImpl.this.start();
            }
        });
    }

    void start() {
        if (!this.enabled.get() || this.connection == null) {
            return;
        }
        this.connection.reconnect();
        this.isRunning.set(true);
    }

    @Override // com.ning.metrics.collector.realtime.EventQueueProcessor
    public boolean isRunning() {
        return this.isRunning.get();
    }

    @Override // com.ning.metrics.collector.realtime.EventQueueProcessor
    public void stop() {
        if (this.isRunning.getAndSet(false)) {
            Iterator<LocalQueueAndWorkers> it = this.queuesPerCategory.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.queuesPerCategory.clear();
            this.connection.close();
        }
    }

    @Override // com.ning.metrics.collector.realtime.EventQueueProcessor
    public void send(Event event) {
        if (event == null || !this.isRunning.get() || !this.typesToCollect.get().contains(event.getName())) {
            this.stats.registerEventIgnored();
            return;
        }
        String name = event.getName();
        LocalQueueAndWorkers localQueueAndWorkers = this.queuesPerCategory.get(name);
        if (localQueueAndWorkers == null) {
            HashMap hashMap = new HashMap();
            hashMap.put("category", name);
            CollectorConfig collectorConfig = (CollectorConfig) this.configFactory.buildWithReplacements(CollectorConfig.class, hashMap);
            synchronized (this.queueMapMonitor) {
                localQueueAndWorkers = this.queuesPerCategory.get(name);
                if (localQueueAndWorkers == null) {
                    localQueueAndWorkers = new LocalQueueAndWorkers(name, this.connection.getSessionFor(name, collectorConfig), this.stats);
                    this.queuesPerCategory.put(name, localQueueAndWorkers);
                }
            }
        }
        localQueueAndWorkers.offer(this.eventFormatter.getFormattedEvent(event));
    }

    @Managed(description = "whether forwarding events to the queue is enabled")
    public boolean isEnabled() {
        return this.enabled.get();
    }

    @Managed(description = "forwards events to the queue")
    public void enable() {
        if (this.enabled.getAndSet(true)) {
            return;
        }
        start();
    }

    @Managed(description = "disable forwarding of events to the queue")
    public void disable() {
        if (this.enabled.getAndSet(false)) {
            stop();
        }
    }

    @Managed(description = "Enables sending of events as ByteMessage (instead of TextMessage) for all topics")
    public void sendUsingBytesMessage() {
        log.info("Enabling 'setUseBytesMessage' (send all events as BytesMessage)");
        this.connection.setUseBytesMessage(true);
    }

    @Managed(description = "Enables sending of events as TextMessage (instead of ByteMessage) for all topics")
    public void sendUsingTextMessage() {
        log.info("Disabling 'setUseBytesMessage' (send all events as TextMessage)");
        this.connection.setUseBytesMessage(false);
    }

    @Managed(description = "add event type to collect")
    public void addTypeToCollect(String str) {
        Set<String> set = this.typesToCollect.get();
        if (set.add(str)) {
            this.typesToCollect.set(set);
            log.info(String.format("Added event type '%s' to list of events to send to the queue", str));
        }
    }

    @Managed(description = "remove event type to collect")
    public void removeTypeToCollect(String str) {
        Set<String> set = this.typesToCollect.get();
        if (set.remove(str)) {
            this.typesToCollect.set(set);
            log.info(String.format("Removed event type '%s' from list of events to send to the queue", str));
        }
    }

    @Managed(description = "list event types allowed to collect")
    public String getTypesToCollect() {
        return this.typesToCollect.get().toString();
    }
}
