package com.ning.metrics.collector.processing;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.mogwee.executors.FailsafeScheduledExecutor;
import com.mogwee.executors.LoggingExecutor;
import com.mogwee.executors.NamedThreadFactory;
import com.ning.arecibo.jmx.Monitored;
import com.ning.metrics.collector.binder.config.CollectorConfig;
import com.ning.metrics.serialization.writer.CallbackHandler;
import com.ning.metrics.serialization.writer.DiskSpoolEventWriter;
import com.ning.metrics.serialization.writer.EventHandler;
import com.ning.metrics.serialization.writer.EventWriter;
import com.ning.metrics.serialization.writer.SyncType;
import com.ning.metrics.serialization.writer.ThresholdEventWriter;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.skife.config.ConfigurationObjectFactory;
import org.skife.config.TimeSpan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.weakref.jmx.Managed;

/* loaded from: input_file:com/ning/metrics/collector/processing/EventSpoolWriterFactory.class */
public class EventSpoolWriterFactory implements PersistentWriterFactory {
    private static final Logger log = LoggerFactory.getLogger(EventSpoolWriterFactory.class);
    private final CollectorConfig config;
    private final AtomicBoolean flushEnabled;
    private final Set<EventSpoolProcessor> defaultSpoolProcessorSet;
    private final Map<String, Set<EventSpoolProcessor>> perEventSpoolProcessors;
    private long cutoffTime;
    private final TimeSpan executorShutdownTimeOut;
    private final ExecutorService executorService;
    private final ConfigurationObjectFactory configFactory;

    public EventSpoolWriterFactory(Set<EventSpoolProcessor> set, CollectorConfig collectorConfig, ConfigurationObjectFactory configurationObjectFactory) {
        this(set, Maps.newHashMap(), collectorConfig, configurationObjectFactory);
    }

    @Inject
    public EventSpoolWriterFactory(Set<EventSpoolProcessor> set, Map<String, Set<EventSpoolProcessor>> map, CollectorConfig collectorConfig, ConfigurationObjectFactory configurationObjectFactory) {
        this.cutoffTime = 7200000L;
        this.defaultSpoolProcessorSet = set;
        this.perEventSpoolProcessors = map;
        this.config = collectorConfig;
        this.configFactory = configurationObjectFactory;
        this.flushEnabled = new AtomicBoolean(collectorConfig.isFlushEnabled());
        this.executorShutdownTimeOut = collectorConfig.getSpoolWriterExecutorShutdownTime();
        this.executorService = new LoggingExecutor(0, collectorConfig.getFileProcessorThreadCount(), 60L, TimeUnit.SECONDS, new SynchronousQueue(), new NamedThreadFactory("EventSpool-Processor-Threads"), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    private long getFlushTimeForEventInSeconds(CollectorConfig collectorConfig) {
        return TimeUnit.SECONDS.convert(collectorConfig.getEventFlushTime().getPeriod(), collectorConfig.getEventFlushTime().getUnit());
    }

    private int getMaxUncommittedTimeForEventInSeconds(CollectorConfig collectorConfig) {
        Integer eventMaxUncommittedPeriodInSeconds = collectorConfig.getEventMaxUncommittedPeriodInSeconds();
        if (eventMaxUncommittedPeriodInSeconds == null) {
            eventMaxUncommittedPeriodInSeconds = Integer.valueOf(collectorConfig.getMaxUncommittedPeriodInSeconds());
        }
        return eventMaxUncommittedPeriodInSeconds.intValue();
    }

    private Set<EventSpoolProcessor> getSpoolProcessors(String str) {
        Set<EventSpoolProcessor> set = this.perEventSpoolProcessors.get(str);
        if (set == null) {
            set = this.defaultSpoolProcessorSet;
        }
        return set;
    }

    @Override // com.ning.metrics.collector.processing.PersistentWriterFactory
    public EventWriter createPersistentWriter(final WriterStats writerStats, SerializationType serializationType, String str, String str2) {
        final LocalSpoolManager localSpoolManager = new LocalSpoolManager(this.config, str, serializationType, str2);
        CollectorConfig collectorConfig = (CollectorConfig) this.configFactory.buildWithReplacements(CollectorConfig.class, ImmutableMap.of("eventName", str));
        final Set<EventSpoolProcessor> spoolProcessors = getSpoolProcessors(str);
        return new ThresholdEventWriter(new DiskSpoolEventWriter(new EventHandler() { // from class: com.ning.metrics.collector.processing.EventSpoolWriterFactory.1
            private int flushCount = 0;

            public void handle(File file, CallbackHandler callbackHandler) {
                if (EventSpoolWriterFactory.this.flushEnabled.get()) {
                    if (!EventSpoolWriterFactory.this.executeSpoolProcessors(spoolProcessors, localSpoolManager, file, localSpoolManager.toHadoopPath(this.flushCount))) {
                        callbackHandler.onError(new RuntimeException("Execution Failed!"), file);
                        this.flushCount++;
                    } else {
                        EventSpoolWriterFactory.log.debug(String.format("Calling Handler Success ... deleting the file %s!", file.getAbsolutePath()));
                        callbackHandler.onSuccess(file);
                        writerStats.registerHdfsFlush();
                        this.flushCount++;
                    }
                }
            }
        }, localSpoolManager.getSpoolDirectoryPath(), this.config.isFlushEnabled(), getFlushTimeForEventInSeconds(collectorConfig), new FailsafeScheduledExecutor(1, str2 + "-EventSpool-writer"), SyncType.valueOf(this.config.getSyncType()), this.config.getSyncBatchSize(), this.config.getCompressionCodec(), serializationType.getSerializer()), this.config.getMaxUncommittedWriteCount(), getMaxUncommittedTimeForEventInSeconds(collectorConfig));
    }

    @Override // com.ning.metrics.collector.processing.PersistentWriterFactory
    @Managed(description = "Process all local files files")
    public void processLeftBelowFiles() throws IOException {
        log.info(String.format("Processing files left below %s", this.config.getSpoolDirectoryName()));
        Collection<File> findOldSpoolDirectories = LocalSpoolManager.findOldSpoolDirectories(this.config.getSpoolDirectoryName(), getCutoffTime());
        HashMap<String, Integer> hashMap = new HashMap<>();
        for (File file : findOldSpoolDirectories) {
            log.info(String.format("Processing the directory %s", file.getAbsolutePath()));
            for (File file2 : LocalSpoolManager.findFilesInSpoolDirectory(file)) {
                log.info(String.format("Processing file %s in the directory the directory %s", file2.getAbsolutePath(), file.getAbsolutePath()));
                try {
                    LocalSpoolManager localSpoolManager = new LocalSpoolManager(this.config, file);
                    incrementFlushCount(hashMap, localSpoolManager.getEventName());
                    if (!executeSpoolProcessors(getSpoolProcessors(localSpoolManager.getEventName()), localSpoolManager, file2, localSpoolManager.toHadoopPath(hashMap.get(localSpoolManager.getEventName()).intValue()))) {
                        log.warn(String.format("Exception cleaning up left below file: %s. We might have DUPS!", file2.toString()));
                    }
                    if (!file2.delete()) {
                        log.warn(String.format("Exception cleaning up left below file: %s. We might have DUPS!", file2.toString()));
                    }
                } catch (IllegalArgumentException e) {
                    log.warn(String.format("Skipping invalid local directory: %s", file2.getAbsolutePath()));
                }
            }
        }
        LocalSpoolManager.cleanupOldSpoolDirectories(findOldSpoolDirectories);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean executeSpoolProcessors(Set<EventSpoolProcessor> set, final LocalSpoolManager localSpoolManager, final File file, final String str) {
        ArrayList<Future> arrayList = new ArrayList();
        boolean z = true;
        log.info("Starting Spool Process");
        for (final EventSpoolProcessor eventSpoolProcessor : set) {
            log.info("Submitting task for " + eventSpoolProcessor.getProcessorName());
            arrayList.add(this.executorService.submit(new Callable<Boolean>() { // from class: com.ning.metrics.collector.processing.EventSpoolWriterFactory.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Boolean call() throws Exception {
                    try {
                        EventSpoolWriterFactory.log.info(String.format("Processing Event %s via spooler %s at path %s ", localSpoolManager.getEventName(), eventSpoolProcessor.getProcessorName(), str));
                        eventSpoolProcessor.processEventFile(localSpoolManager.getEventName(), localSpoolManager.getSerializationType(), file, str);
                        EventSpoolWriterFactory.log.info(String.format("Completed Processing Event  %s via spooler %s", localSpoolManager.getEventName(), eventSpoolProcessor.getProcessorName()));
                        return true;
                    } catch (IOException e) {
                        EventSpoolWriterFactory.log.error("Exception occurred while processing event " + localSpoolManager.getEventName() + " for spooler " + eventSpoolProcessor.getProcessorName(), e);
                        return false;
                    }
                }
            }));
        }
        log.debug("Spool Process Completed, now waiting for the parallel task to complete.");
        try {
            for (Future future : arrayList) {
                log.debug("Execution Result is " + future.get());
                if (!((Boolean) future.get()).booleanValue()) {
                    z = false;
                }
            }
        } catch (InterruptedException e) {
            log.error("InterruptedException while checking the result of the apoolers", e);
            z = false;
        } catch (ExecutionException e2) {
            log.error("ExecutionException while checking the result of the apoolers", e2);
            z = false;
        }
        log.debug("Parallel Spool Execution Completed with result as " + z);
        return z;
    }

    @Override // com.ning.metrics.collector.processing.PersistentWriterFactory
    public void close() {
        try {
            log.info("Processing old files and quarantine directories");
            try {
                processLeftBelowFiles();
            } catch (IOException e) {
                log.warn("Got IOException trying to process left below files: " + e.getLocalizedMessage());
            }
            File file = new File(this.config.getSpoolDirectoryName());
            int size = LocalSpoolManager.findFilesInSpoolDirectory(file).size();
            for (int i = 0; size > 0 && i < 10; i++) {
                log.info(String.format("%d more files are left to be flushed, sleeping to give them a chance in [%s]", Integer.valueOf(size), file));
                try {
                    Thread.sleep(5000L);
                    size = LocalSpoolManager.findFilesInSpoolDirectory(file).size();
                } catch (InterruptedException e2) {
                    log.warn(String.format("Interrupted while waiting for files to be flushed to HDFS. This means that [%s] still contains data!", this.config.getSpoolDirectoryName()));
                }
            }
            if (size > 0) {
                log.warn(String.format("Giving up while waiting for files to be flushed to HDFS. Files not flushed: %s", LocalSpoolManager.findFilesInSpoolDirectory(file)));
            } else {
                log.info("All local files have been flushed");
            }
            Iterator<EventSpoolProcessor> it = this.defaultSpoolProcessorSet.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            log.info("Shutting Down Executor Service");
            this.executorService.shutdown();
            try {
                this.executorService.awaitTermination(this.executorShutdownTimeOut.getPeriod(), this.executorShutdownTimeOut.getUnit());
            } catch (InterruptedException e3) {
                Thread.currentThread().interrupt();
            }
            this.executorService.shutdownNow();
        } catch (Throwable th) {
            log.info("Shutting Down Executor Service");
            this.executorService.shutdown();
            try {
                this.executorService.awaitTermination(this.executorShutdownTimeOut.getPeriod(), this.executorShutdownTimeOut.getUnit());
            } catch (InterruptedException e4) {
                Thread.currentThread().interrupt();
            }
            this.executorService.shutdownNow();
            throw th;
        }
    }

    private void incrementFlushCount(HashMap<String, Integer> hashMap, String str) {
        if (hashMap.get(str) == null) {
            hashMap.put(str, 0);
        }
        hashMap.put(str, Integer.valueOf(hashMap.get(str).intValue() + 1));
    }

    @Monitored(description = "Cutoff time for files to be sent to Spool Processors")
    public long getCutoffTime() {
        return this.cutoffTime;
    }

    @Managed(description = "Set the cutoff time")
    public void setCutoffTime(long j) {
        this.cutoffTime = j;
    }

    @Managed(description = "Whether files should be flushed")
    public AtomicBoolean getFlushEnabled() {
        return this.flushEnabled;
    }

    @Managed(description = "Enable flush")
    public void enableFlush() {
        this.flushEnabled.set(true);
    }

    @Managed(description = "Disable Flush")
    public void disableFlush() {
        this.flushEnabled.set(false);
    }

    @Monitored(description = "Number of local files not yet pushed to Spool Processors")
    public int nbLocalFiles() {
        return LocalSpoolManager.findFilesInSpoolDirectory(new File(this.config.getSpoolDirectoryName())).size();
    }
}
