package com.ning.metrics.collector.events.hadoop.writer;

import com.google.inject.Inject;
import com.ning.metrics.collector.binder.config.CollectorConfig;
import com.ning.metrics.serialization.event.Event;
import com.ning.metrics.serialization.thrift.hadoop.TBooleanWritable;
import com.ning.metrics.serialization.writer.EventWriter;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.perf4j.aop.Profiled;
import org.weakref.jmx.Managed;

/* loaded from: input_file:com/ning/metrics/collector/events/hadoop/writer/HadoopFileEventWriter.class */
public class HadoopFileEventWriter implements EventWriter {
    private final String sessionId;
    private final String baseDirectory;
    private final String tmpDirectory;
    private final FileSystemAccess fsAccess;
    private int maxOutputStreams;
    private final Collection<FileError> fileErrorList;
    private volatile DateTime lastFlushed;
    private final Collection<HadoopOutputChunk> closedChunkList;
    private final Map<String, HadoopOutputChunk> outputChunks;
    private static final Logger log = Logger.getLogger(HadoopFileEventWriter.class);
    private static final TBooleanWritable BOOL_WRITABLE = new TBooleanWritable(true);

    @Inject
    public HadoopFileEventWriter(FileSystemAccess fileSystemAccess, CollectorConfig collectorConfig) {
        this(collectorConfig.getEventOutputDirectory(), collectorConfig.getTemporaryEventOutputDirectory(), fileSystemAccess, collectorConfig.getMaxHadoopWriters(), collectorConfig.getLocalIp(), collectorConfig.getLocalPort());
    }

    public HadoopFileEventWriter(String str, String str2, FileSystemAccess fileSystemAccess, int i, String str3, int i2) {
        this.maxOutputStreams = 64;
        this.fileErrorList = new ArrayDeque();
        this.lastFlushed = new DateTime();
        this.closedChunkList = new ArrayList();
        this.outputChunks = new LinkedHashMap<String, HadoopOutputChunk>() { // from class: com.ning.metrics.collector.events.hadoop.writer.HadoopFileEventWriter.1
            @Override // java.util.LinkedHashMap
            protected boolean removeEldestEntry(Map.Entry<String, HadoopOutputChunk> entry) {
                if (size() <= HadoopFileEventWriter.this.maxOutputStreams) {
                    return false;
                }
                HadoopOutputChunk value = entry.getValue();
                try {
                    value.close();
                    HadoopFileEventWriter.this.closedChunkList.add(value);
                    return true;
                } catch (IOException e) {
                    HadoopFileEventWriter.this.fileErrorList.add(new FileError(entry.getKey(), e));
                    return true;
                }
            }
        };
        this.baseDirectory = str;
        Object[] objArr = new Object[3];
        objArr[0] = str2;
        objArr[1] = str.startsWith("/") ? "" : "/";
        objArr[2] = str;
        this.tmpDirectory = String.format("%s%s%s", objArr);
        this.fsAccess = fileSystemAccess;
        this.maxOutputStreams = i;
        this.sessionId = String.format("%s-%d", str3, Integer.valueOf(i2));
    }

    @Profiled(tag = "jmx", message = "Write event to HDFS")
    public synchronized void write(Event event) throws IOException {
        for (FileError fileError : this.fileErrorList) {
            log.error(String.format("Error flushing & closing file %s", fileError.getFilename()), fileError.getException());
        }
        this.fileErrorList.clear();
        writeEventToHDFS(event, event.getOutputDir(this.baseDirectory), event.getOutputDir(this.tmpDirectory));
    }

    private void writeEventToHDFS(Event event, String str, String str2) throws IOException {
        Object data = event.getData();
        HadoopOutputChunk chunk = getChunk(event, str, str2, data, data.getClass());
        if (chunk != null) {
            chunk.getWriter().append(BOOL_WRITABLE, data);
        }
    }

    private HadoopOutputChunk getChunk(Event event, String str, String str2, Object obj, Class<?> cls) throws IOException {
        if (obj == null) {
            log.warn("Deserialized event contains no data: " + event);
            return null;
        }
        HadoopOutputChunk hadoopOutputChunk = this.outputChunks.get(str);
        if (hadoopOutputChunk == null) {
            String replace = String.format("%s-%s", new DateTime(), this.sessionId).replace(":", ".");
            Path path = new Path(str, replace);
            Path path2 = new Path(str2, replace);
            int i = 0;
            while (this.fsAccess.get().exists(path2)) {
                path = new Path(str, String.format("%s-%d", replace, Integer.valueOf(i)));
                path2 = new Path(str2, String.format("%s-%d", replace, Integer.valueOf(i)));
                i++;
            }
            log.info(String.format("OutputPath (tmp): %s (%s)", path.toUri().getPath(), path2.toUri().getPath()));
            hadoopOutputChunk = new HadoopOutputChunk(path2, path, SequenceFile.createWriter(this.fsAccess.get(), this.fsAccess.get().getConf(), path2, TBooleanWritable.class, cls, SequenceFile.CompressionType.BLOCK));
            this.outputChunks.put(str, hadoopOutputChunk);
        }
        return hadoopOutputChunk;
    }

    private List<HadoopOutputChunk> getAllChunks() {
        ArrayList arrayList = new ArrayList(this.outputChunks.values());
        arrayList.addAll(this.closedChunkList);
        return arrayList;
    }

    private void clearAllChunks() {
        this.outputChunks.clear();
        this.closedChunkList.clear();
    }

    public synchronized void commit() throws IOException {
        forceCommit();
    }

    public synchronized void flush() throws IOException {
        commit();
    }

    public synchronized void forceCommit() throws IOException {
        List<HadoopOutputChunk> allChunks = getAllChunks();
        Iterator<HadoopOutputChunk> it = allChunks.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        try {
            Iterator<HadoopOutputChunk> it2 = allChunks.iterator();
            while (it2.hasNext()) {
                it2.next().commit(this.fsAccess.get());
            }
            clearAllChunks();
            this.lastFlushed = new DateTime();
        } catch (IOException e) {
            log.error(String.format("Unable to commit all chunks (serialization rename failed). Chunks: %s", allChunks), e);
            throw e;
        } catch (RuntimeException e2) {
            log.error(String.format("Unable to commit all chunks (serialization rename failed). Chunks: %s", allChunks), e2);
            throw e2;
        }
    }

    public synchronized void rollback() throws IOException {
        List<HadoopOutputChunk> allChunks = getAllChunks();
        Iterator<HadoopOutputChunk> it = allChunks.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        try {
            Iterator<HadoopOutputChunk> it2 = allChunks.iterator();
            while (it2.hasNext()) {
                it2.next().rollback(this.fsAccess.get());
            }
            clearAllChunks();
            this.lastFlushed = new DateTime();
        } catch (IOException e) {
            log.error(String.format("unable to rollback all chunks (serialization rename failed). Chunks: %s", allChunks), e);
            throw e;
        } catch (RuntimeException e2) {
            log.error(String.format("Unable to rollback all chunks (serialization rename failed). Chunks: %s", allChunks), e2);
            throw e2;
        }
    }

    @Managed(description = "seconds since last commit of events to hdfs")
    public long getSecondsSinceLastUpdate() {
        return (new DateTime().getMillis() - this.lastFlushed.getMillis()) / 1000;
    }

    public String toString() {
        return String.format("HDFS File Writer [%s] [%s]", this.fsAccess.get().getUri(), this.baseDirectory);
    }
}
