package com.ning.metrics.collector.processing.db;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.mogwee.executors.FailsafeScheduledExecutor;
import com.mogwee.executors.LoggingExecutor;
import com.mogwee.executors.NamedThreadFactory;
import com.ning.arecibo.jmx.Monitored;
import com.ning.arecibo.jmx.MonitoringType;
import com.ning.metrics.collector.binder.config.CollectorConfig;
import com.ning.metrics.collector.processing.EventSpoolProcessor;
import com.ning.metrics.collector.processing.SerializationType;
import com.ning.metrics.collector.processing.db.model.FeedEvent;
import com.ning.metrics.collector.processing.db.model.FeedEventData;
import com.ning.metrics.collector.processing.db.model.Subscription;
import com.ning.metrics.collector.processing.quartz.FeedUpdateQuartzJob;
import com.ning.metrics.serialization.event.Event;
import com.ning.metrics.serialization.event.EventDeserializer;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.SimpleTrigger;
import org.quartz.TriggerBuilder;
import org.skife.config.TimeSpan;
import org.skife.jdbi.v2.IDBI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ning/metrics/collector/processing/db/DBSpoolProcessor.class */
public class DBSpoolProcessor implements EventSpoolProcessor {
    private final IDBI dbi;
    private final CollectorConfig config;
    private final SubscriptionStorage subscriptionStorage;
    private final FeedEventStorage feedEventStorage;
    private static final String PROCESSOR_NAME = "DBWriter";
    private final BlockingQueue<FeedEvent> eventStorageBuffer = new ArrayBlockingQueue(1000, false);
    private final ExecutorService executorService = new LoggingExecutor(1, 1, Long.MAX_VALUE, TimeUnit.DAYS, new ArrayBlockingQueue(2), new NamedThreadFactory("FeedEvents-Storage-Threads"), new ThreadPoolExecutor.CallerRunsPolicy());
    private final ScheduledExecutorService scheduledExecutorService;
    private final TimeSpan executorShutdownTimeOut;
    private final Scheduler quartzScheduler;
    private static final Logger log = LoggerFactory.getLogger(DBSpoolProcessor.class);
    private static final ObjectMapper mapper = new ObjectMapper();

    /* loaded from: input_file:com/ning/metrics/collector/processing/db/DBSpoolProcessor$FeedEventInserter.class */
    private static class FeedEventInserter implements Runnable {
        private final ExecutorService es;
        private final DBSpoolProcessor dbSpoolProcessor;

        public FeedEventInserter(ExecutorService executorService, DBSpoolProcessor dBSpoolProcessor) {
            this.es = executorService;
            this.dbSpoolProcessor = dBSpoolProcessor;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.dbSpoolProcessor.flushFeedEventsToDB();
            this.es.submit(this);
        }
    }

    /* loaded from: input_file:com/ning/metrics/collector/processing/db/DBSpoolProcessor$FeedEventScheduledCleaner.class */
    private class FeedEventScheduledCleaner implements Runnable {
        private FeedEventScheduledCleaner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            DBSpoolProcessor.this.feedEventStorage.cleanOldFeedEvents();
        }
    }

    @Inject
    public DBSpoolProcessor(IDBI idbi, CollectorConfig collectorConfig, SubscriptionStorage subscriptionStorage, FeedEventStorage feedEventStorage, Scheduler scheduler) throws SchedulerException {
        this.dbi = idbi;
        this.config = collectorConfig;
        this.subscriptionStorage = subscriptionStorage;
        this.feedEventStorage = feedEventStorage;
        this.executorShutdownTimeOut = collectorConfig.getSpoolWriterExecutorShutdownTime();
        this.executorService.submit(new FeedEventInserter(this.executorService, this));
        this.scheduledExecutorService = new FailsafeScheduledExecutor(1, "FeedEvents-Cleaner-Threads");
        this.scheduledExecutorService.schedule(new FeedEventScheduledCleaner(), 15L, TimeUnit.MINUTES);
        this.quartzScheduler = scheduler;
        if (scheduler.isStarted()) {
            return;
        }
        scheduler.start();
    }

    @Override // com.ning.metrics.collector.processing.EventSpoolProcessor
    public void processEventFile(String str, SerializationType serializationType, File file, String str2) throws IOException {
        EventDeserializer deSerializer = serializationType.getDeSerializer(new FileInputStream(file));
        while (deSerializer.hasNextEvent()) {
            Event nextEvent = deSerializer.getNextEvent();
            log.info(String.format("Recieved DB Event to store with name as %s ", nextEvent.getName()));
            log.info(String.format("DB Event body to store is %s", nextEvent.getData()));
            if (nextEvent.getName().equalsIgnoreCase(DBStorageTypes.FEED_EVENT.getDbStorageType())) {
                FeedEventData feedEventData = (FeedEventData) mapper.readValue(nextEvent.getData().toString(), FeedEventData.class);
                boolean equal = Objects.equal(FeedEventData.EVENT_TYPE_SUPPRESS, feedEventData.getEventType());
                HashSet<Subscription> hashSet = new HashSet();
                for (String str3 : feedEventData.getTopics()) {
                    hashSet.addAll(equal ? this.subscriptionStorage.loadByStartsWithTopic(str3) : this.subscriptionStorage.loadByTopic(str3));
                }
                if (!hashSet.isEmpty()) {
                    for (Subscription subscription : hashSet) {
                        addToBuffer(nextEvent.getName(), new FeedEvent(feedEventData, subscription.getChannel(), subscription.getId(), subscription.getMetadata()));
                    }
                }
            }
        }
    }

    private void addToBuffer(String str, FeedEvent feedEvent) {
        try {
            this.eventStorageBuffer.put(feedEvent);
        } catch (InterruptedException e) {
            log.warn(String.format("Could not add event %s to the buffer", str), e);
        }
    }

    public void flushFeedEventsToDB() {
        int drainTo;
        try {
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(this.eventStorageBuffer.size());
            boolean z = false;
            do {
                drainTo = this.eventStorageBuffer.drainTo(newArrayListWithCapacity, 1000);
                if (drainTo > 0) {
                    z = true;
                    List<String> insert = this.feedEventStorage.insert(newArrayListWithCapacity);
                    log.info(String.format("Inserted %d events successfully!", Integer.valueOf(drainTo)));
                    newArrayListWithCapacity.clear();
                    scheduleFeedCollectionJob(insert);
                }
            } while (drainTo > 0);
            if (!z) {
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        } catch (Exception e2) {
            log.warn("unexpected exception trying to insert events!", e2);
        }
    }

    private void scheduleFeedCollectionJob(List<String> list) {
        try {
            if (this.quartzScheduler.isStarted()) {
                SimpleTrigger build = TriggerBuilder.newTrigger().withIdentity("feedUpdateTrigger", "feedUpdateGroup").withSchedule(SimpleScheduleBuilder.simpleSchedule().withMisfireHandlingInstructionFireNow()).build();
                JobDataMap jobDataMap = new JobDataMap();
                jobDataMap.put("feedEventIdList", list);
                this.quartzScheduler.scheduleJob(JobBuilder.newJob(FeedUpdateQuartzJob.class).withIdentity("feedUpdateJob", "feedUpdateJobGroup").usingJobData(jobDataMap).build(), build);
            }
        } catch (SchedulerException e) {
            log.warn("unexpected exception trying to schedule Quartz job for feed preparation of the inserted events!", e);
        }
    }

    @Override // com.ning.metrics.collector.processing.EventSpoolProcessor
    public void close() {
        try {
            this.feedEventStorage.cleanUp();
            this.subscriptionStorage.cleanUp();
            log.info("Shutting Down Executor Service for Feed Event Storage");
            this.executorService.shutdown();
            try {
                this.executorService.awaitTermination(this.executorShutdownTimeOut.getPeriod(), this.executorShutdownTimeOut.getUnit());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            this.executorService.shutdownNow();
            log.info("Executor Service for Feed Event Storage Shut Down success!");
            if (!this.eventStorageBuffer.isEmpty()) {
                log.info("Flushing remaining events to database");
                flushFeedEventsToDB();
            }
            log.info("Shutting Down Feed Event Cleaner Executor Service");
            this.scheduledExecutorService.shutdown();
            try {
                this.scheduledExecutorService.awaitTermination(this.executorShutdownTimeOut.getPeriod(), this.executorShutdownTimeOut.getUnit());
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
            this.scheduledExecutorService.shutdownNow();
            log.info("Feed Event Cleaner Executor Service shutdown success!");
            log.info("Shutting Down Quartz Scheduler");
            try {
                this.quartzScheduler.shutdown(true);
            } catch (SchedulerException e3) {
                log.error("Unexpected error while shutting down Quartz Scheduler!", e3);
            }
            log.info("Quartz Scheduler shutdown success");
        } catch (Throwable th) {
            log.info("Shutting Down Executor Service for Feed Event Storage");
            this.executorService.shutdown();
            try {
                this.executorService.awaitTermination(this.executorShutdownTimeOut.getPeriod(), this.executorShutdownTimeOut.getUnit());
            } catch (InterruptedException e4) {
                Thread.currentThread().interrupt();
            }
            this.executorService.shutdownNow();
            log.info("Executor Service for Feed Event Storage Shut Down success!");
            if (!this.eventStorageBuffer.isEmpty()) {
                log.info("Flushing remaining events to database");
                flushFeedEventsToDB();
            }
            log.info("Shutting Down Feed Event Cleaner Executor Service");
            this.scheduledExecutorService.shutdown();
            try {
                this.scheduledExecutorService.awaitTermination(this.executorShutdownTimeOut.getPeriod(), this.executorShutdownTimeOut.getUnit());
            } catch (InterruptedException e5) {
                Thread.currentThread().interrupt();
            }
            this.scheduledExecutorService.shutdownNow();
            log.info("Feed Event Cleaner Executor Service shutdown success!");
            log.info("Shutting Down Quartz Scheduler");
            try {
                this.quartzScheduler.shutdown(true);
            } catch (SchedulerException e6) {
                log.error("Unexpected error while shutting down Quartz Scheduler!", e6);
            }
            log.info("Quartz Scheduler shutdown success");
            throw th;
        }
    }

    @Override // com.ning.metrics.collector.processing.EventSpoolProcessor
    public String getProcessorName() {
        return PROCESSOR_NAME;
    }

    @Monitored(description = "Number of events in buffer", monitoringType = {MonitoringType.VALUE, MonitoringType.RATE})
    public long getEventsInBuffer() {
        return this.eventStorageBuffer.size();
    }
}
