package com.ning.metrics.collector.hadoop.processing;

import com.google.inject.Inject;
import com.mogwee.executors.FailsafeScheduledExecutor;
import com.ning.metrics.collector.binder.config.CollectorConfig;
import com.ning.metrics.serialization.hadoop.FileSystemAccess;
import com.ning.metrics.serialization.writer.CallbackHandler;
import com.ning.metrics.serialization.writer.DiskSpoolEventWriter;
import com.ning.metrics.serialization.writer.EventHandler;
import com.ning.metrics.serialization.writer.EventWriter;
import com.ning.metrics.serialization.writer.SyncType;
import com.ning.metrics.serialization.writer.ThresholdEventWriter;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.weakref.jmx.Managed;

/* loaded from: input_file:com/ning/metrics/collector/hadoop/processing/HadoopWriterFactory.class */
public class HadoopWriterFactory implements PersistentWriterFactory {
    private static final Logger log = LoggerFactory.getLogger(HadoopWriterFactory.class);
    private final CollectorConfig config;
    private final FileSystemAccess hdfsAccess;
    private final AtomicBoolean flushEnabled = new AtomicBoolean(true);
    private long cutoffTime = 7200000;

    @Inject
    public HadoopWriterFactory(FileSystemAccess fileSystemAccess, CollectorConfig collectorConfig) {
        this.hdfsAccess = fileSystemAccess;
        this.config = collectorConfig;
    }

    @Override // com.ning.metrics.collector.hadoop.processing.PersistentWriterFactory
    public EventWriter createPersistentWriter(final WriterStats writerStats, SerializationType serializationType, String str, String str2) {
        final LocalSpoolManager localSpoolManager = new LocalSpoolManager(this.config, str, serializationType, str2);
        return new ThresholdEventWriter(new DiskSpoolEventWriter(new EventHandler() { // from class: com.ning.metrics.collector.hadoop.processing.HadoopWriterFactory.1
            private int flushCount = 0;

            public void handle(File file, CallbackHandler callbackHandler) {
                if (HadoopWriterFactory.this.flushEnabled.get()) {
                    try {
                        HadoopWriterFactory.this.pushFileToHadoop(file, localSpoolManager.toHadoopPath(this.flushCount));
                        callbackHandler.onSuccess(file);
                        writerStats.registerHdfsFlush();
                        this.flushCount++;
                    } catch (IOException e) {
                        callbackHandler.onError(e, file);
                        this.flushCount++;
                    }
                }
            }
        }, localSpoolManager.getSpoolDirectoryPath(), this.config.isFlushEnabled(), this.config.getFlushIntervalInSeconds(), new FailsafeScheduledExecutor(1, str2 + "-HDFS-writer"), SyncType.valueOf(this.config.getSyncType()), this.config.getSyncBatchSize(), this.config.getCompressionCodec(), serializationType.getSerializer()), this.config.getMaxUncommittedWriteCount(), this.config.getMaxUncommittedPeriodInSeconds());
    }

    @Override // com.ning.metrics.collector.hadoop.processing.PersistentWriterFactory
    @Managed(description = "Process all local files files")
    public void processLeftBelowFiles() throws IOException {
        log.info("Processing files left below");
        Collection<File> findOldSpoolDirectories = LocalSpoolManager.findOldSpoolDirectories(this.config.getSpoolDirectoryName(), getCutoffTime());
        HashMap<String, Integer> hashMap = new HashMap<>();
        for (File file : findOldSpoolDirectories) {
            for (File file2 : LocalSpoolManager.findFilesInSpoolDirectory(file)) {
                try {
                    LocalSpoolManager localSpoolManager = new LocalSpoolManager(this.config, file);
                    incrementFlushCount(hashMap, localSpoolManager.getEventName());
                    pushFileToHadoop(file2, localSpoolManager.toHadoopPath(hashMap.get(localSpoolManager.getEventName()).intValue()));
                    if (!file2.delete()) {
                        log.warn(String.format("Exception cleaning up left below file: %s. We might have DUPS in HDFS!", file2.toString()));
                    }
                } catch (IllegalArgumentException e) {
                    log.warn(String.format("Skipping invalid local directory: %s", file2.getAbsolutePath()));
                }
            }
        }
        LocalSpoolManager.cleanupOldSpoolDirectories(findOldSpoolDirectories);
    }

    @Override // com.ning.metrics.collector.hadoop.processing.PersistentWriterFactory
    public void close() {
        log.info("Processing old files and quarantine directories");
        try {
            processLeftBelowFiles();
        } catch (IOException e) {
            log.warn("Got IOException trying to process left below files: " + e.getLocalizedMessage());
        }
        File file = new File(this.config.getSpoolDirectoryName());
        int size = LocalSpoolManager.findFilesInSpoolDirectory(file).size();
        for (int i = 0; size > 0 && i < 10; i++) {
            log.info(String.format("%d more files are left to be flushed, sleeping to give them a chance", Integer.valueOf(size)));
            try {
                Thread.sleep(5000L);
                size = LocalSpoolManager.findFilesInSpoolDirectory(file).size();
            } catch (InterruptedException e2) {
                log.warn(String.format("Interrupted while waiting for files to be flushed to HDFS. This means that [%s] still contains data!", this.config.getSpoolDirectoryName()));
            }
        }
        if (size > 0) {
            log.warn(String.format("Giving up while waiting for files to be flushed to HDFS. Files not flushed: %s", LocalSpoolManager.findFilesInSpoolDirectory(file)));
        } else {
            log.info("All local files have been flushed");
        }
        this.hdfsAccess.close();
    }

    protected void pushFileToHadoop(File file, String str) throws IOException {
        log.info(String.format("Flushing events to HDFS: [%s] -> [%s]", file.getAbsolutePath(), str));
        this.hdfsAccess.get().copyFromLocalFile(new Path(file.getAbsolutePath()), new Path(str));
    }

    private void incrementFlushCount(HashMap<String, Integer> hashMap, String str) {
        if (hashMap.get(str) == null) {
            hashMap.put(str, 0);
        }
        hashMap.put(str, Integer.valueOf(hashMap.get(str).intValue() + 1));
    }

    @Managed(description = "Cutoff time for files to be sent to HDFS")
    public long getCutoffTime() {
        return this.cutoffTime;
    }

    @Managed(description = "Set the cutoff time")
    public void setCutoffTime(long j) {
        this.cutoffTime = j;
    }

    @Managed(description = "Whether files should be flushed to HDFS")
    public AtomicBoolean getFlushEnabled() {
        return this.flushEnabled;
    }

    @Managed(description = "Enable flush to HDFS")
    public void enableFlush() {
        this.flushEnabled.set(true);
    }

    @Managed(description = "Disable flush to HDFS")
    public void disableFlush() {
        this.flushEnabled.set(false);
    }

    @Managed(description = "Number of local files not yet pushed to HDFS")
    public int nbLocalFiles() {
        return LocalSpoolManager.findFilesInSpoolDirectory(new File(this.config.getSpoolDirectoryName())).size();
    }
}
