package com.ning.metrics.collector.processing.db;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Objects;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.inject.Inject;
import com.ning.metrics.collector.binder.config.CollectorConfig;
import com.ning.metrics.collector.processing.EventSpoolProcessor;
import com.ning.metrics.collector.processing.SerializationType;
import com.ning.metrics.collector.processing.db.model.CounterEvent;
import com.ning.metrics.collector.processing.db.model.CounterEventData;
import com.ning.metrics.collector.processing.db.model.CounterSubscription;
import com.ning.metrics.collector.processing.quartz.CounterEventCleanUpJob;
import com.ning.metrics.collector.processing.quartz.CounterEventScannerJob;
import com.ning.metrics.serialization.event.Event;
import com.ning.metrics.serialization.event.EventDeserializer;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.TriggerBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ning/metrics/collector/processing/db/CounterEventSpoolProcessor.class */
public class CounterEventSpoolProcessor implements EventSpoolProcessor {
    private static final Logger log = LoggerFactory.getLogger(CounterEventSpoolProcessor.class);
    private final CollectorConfig config;
    private final CounterStorage counterStorage;
    private final ObjectMapper mapper;
    private static final String PROCESSOR_NAME = "CounterEventDBWriter";
    private final CounterEventCacheProcessor counterEventCacheProcessor;
    private final Scheduler quartzScheduler;
    private final AtomicBoolean isCronJobScheduled = new AtomicBoolean(false);
    private final AtomicBoolean isCleanupCronJobScheduled = new AtomicBoolean(false);

    @Inject
    public CounterEventSpoolProcessor(CollectorConfig collectorConfig, CounterStorage counterStorage, Scheduler scheduler, CounterEventCacheProcessor counterEventCacheProcessor, ObjectMapper objectMapper) throws SchedulerException {
        this.config = collectorConfig;
        this.counterStorage = counterStorage;
        this.counterEventCacheProcessor = counterEventCacheProcessor;
        this.mapper = objectMapper;
        this.quartzScheduler = scheduler;
        if (!Splitter.on(collectorConfig.getFilters()).omitEmptyStrings().splitToList(collectorConfig.getFiltersEventType()).contains(DBStorageTypes.COUNTER_EVENT.getDbStorageType()) || scheduler.isStarted()) {
            return;
        }
        scheduler.start();
        scheduleCounterEventRollUpCronJob();
        scheduleRollupEventCleanupCronJob();
    }

    @Override // com.ning.metrics.collector.processing.EventSpoolProcessor
    public void processEventFile(String str, SerializationType serializationType, File file, String str2) throws IOException {
        EventDeserializer deSerializer = serializationType.getDeSerializer(new FileInputStream(file));
        boolean z = false;
        while (deSerializer.hasNextEvent()) {
            Event nextEvent = deSerializer.getNextEvent();
            log.debug(String.format("Recieved DB Event to store with name as %s ", nextEvent.getName()));
            if (DBStorageTypes.COUNTER_EVENT.getDbStorageType().equalsIgnoreCase(nextEvent.getName())) {
                log.debug(String.format("DB Event body to store is %s", nextEvent.getData()));
                CounterEvent counterEvent = (CounterEvent) this.mapper.readValue(nextEvent.getData().toString(), CounterEvent.class);
                if (!Strings.isNullOrEmpty(counterEvent.getAppId()) && !Objects.equal((Object) null, counterEvent.getCounterEvents()) && !counterEvent.getCounterEvents().isEmpty()) {
                    CounterSubscription loadCounterSubscription = this.counterStorage.loadCounterSubscription(counterEvent.getAppId());
                    if (!Objects.equal((Object) null, loadCounterSubscription)) {
                        Iterator<CounterEventData> it = counterEvent.getCounterEvents().iterator();
                        while (it.hasNext()) {
                            this.counterEventCacheProcessor.addCounterEventData(loadCounterSubscription.getId(), it.next());
                        }
                        z = true;
                    }
                }
            }
        }
        if (z) {
            this.counterEventCacheProcessor.processRemainingCounters();
            if (this.isCronJobScheduled.get()) {
                return;
            }
            try {
                scheduleCounterEventRollUpCronJob();
            } catch (SchedulerException e) {
                log.error("Exception occurred while scheduling cron job for counter roll ups.", e);
            }
        }
    }

    @Override // com.ning.metrics.collector.processing.EventSpoolProcessor
    public void close() {
        this.counterEventCacheProcessor.cleanUp();
        this.counterStorage.cleanUp();
        log.info("Shutting Down Quartz Scheduler");
        try {
            if (!this.quartzScheduler.isShutdown()) {
                this.quartzScheduler.shutdown(true);
            }
        } catch (SchedulerException e) {
            log.error("Unexpected error while shutting down Quartz Scheduler!", e);
        }
        log.info("Quartz Scheduler shutdown success");
    }

    private void scheduleCounterEventRollUpCronJob() throws SchedulerException {
        if (!this.quartzScheduler.isStarted() || this.isCronJobScheduled.get()) {
            return;
        }
        JobKey jobKey = new JobKey("counterProcessorCronJob", "counterProcessorCronJobGroup");
        if (!this.quartzScheduler.checkExists(jobKey)) {
            this.quartzScheduler.scheduleJob(JobBuilder.newJob(CounterEventScannerJob.class).withIdentity(jobKey).build(), TriggerBuilder.newTrigger().withIdentity("counterProcessorCronTrigger", "counterProcessorCronTriggerGroup").withSchedule(CronScheduleBuilder.cronSchedule(this.config.getCounterRollUpProcessorCronExpression()).withMisfireHandlingInstructionDoNothing()).build());
        }
        this.isCronJobScheduled.set(true);
    }

    private void scheduleRollupEventCleanupCronJob() throws SchedulerException {
        if (!this.quartzScheduler.isStarted() || this.isCleanupCronJobScheduled.get()) {
            return;
        }
        JobKey jobKey = new JobKey("rolledCountersCleanupCronJob", "rolledCountersCleanupCronJobGroup");
        if (!this.quartzScheduler.checkExists(jobKey)) {
            this.quartzScheduler.scheduleJob(JobBuilder.newJob(CounterEventCleanUpJob.class).withIdentity(jobKey).build(), TriggerBuilder.newTrigger().withIdentity("rolledCountersCleanupCronTrigger", "rolledCountersCleanupCronTriggerGroup").withSchedule(CronScheduleBuilder.cronSchedule(this.config.getRolledUpCounterCleanupCronExpression()).withMisfireHandlingInstructionDoNothing()).build());
        }
        this.isCleanupCronJobScheduled.set(true);
    }

    @Override // com.ning.metrics.collector.processing.EventSpoolProcessor
    public String getProcessorName() {
        return PROCESSOR_NAME;
    }
}
