package org.mitre.caasd.commons.out;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Multiset;
import com.google.common.collect.TreeMultiset;
import com.google.common.io.Files;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.mitre.caasd.commons.Time;
import org.mitre.caasd.commons.fileutil.FileUtils;
import org.mitre.caasd.commons.util.DemotedException;

/* loaded from: input_file:org/mitre/caasd/commons/out/GzFileSink.class */
public class GzFileSink<T> implements Consumer<T>, Closeable {
    static final String IN_PROGRESS_PREFIX = "UNDER_CONSTRUCTION_";
    private final String outputDir;
    private final Function<T, String> fileNamer;
    private final Function<T, String> toString;
    private final Duration expirationTime;
    private boolean isClosed = false;
    private final int maxOpenWriters = 100;
    private final Map<String, TrackedPrintWriter> openWriters = new TreeMap();
    private final Multiset<String> targetCounts = TreeMultiset.create();
    private final BlockingQueue<T> queue = new ArrayBlockingQueue(5000);
    private final ScheduledExecutorService executor = buildExecutor();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mitre/caasd/commons/out/GzFileSink$DrainFlushAndShutdown.class */
    public static class DrainFlushAndShutdown extends Thread {
        final GzFileSink archiver;
        final ExecutorService exec;

        public DrainFlushAndShutdown(GzFileSink gzFileSink, ExecutorService executorService) {
            this.archiver = (GzFileSink) Objects.requireNonNull(gzFileSink);
            this.exec = (ExecutorService) Objects.requireNonNull(executorService);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            this.exec.shutdownNow();
            this.archiver.drainQueueAndWriteRecords();
            try {
                this.archiver.close();
            } catch (IOException e) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mitre/caasd/commons/out/GzFileSink$TrackedPrintWriter.class */
    public static class TrackedPrintWriter implements AutoCloseable {
        private final File targetFile;
        private final PrintWriter writer;
        private Instant timeOfLastWrite = Instant.now();

        TrackedPrintWriter(File file) throws IOException {
            this.targetFile = (File) Objects.requireNonNull(file);
            this.writer = FileUtils.buildGzWriter(file);
        }

        @Override // java.lang.AutoCloseable
        public void close() throws IOException {
            this.writer.close();
        }

        void write(String str) {
            this.timeOfLastWrite = Instant.now();
            this.writer.write(str);
        }

        void flush() {
            this.writer.flush();
        }

        Duration timeSinceLastWrite() {
            return Time.durationBtw(Instant.now(), this.timeOfLastWrite);
        }

        boolean isStale(Duration duration) {
            return Time.theDuration(timeSinceLastWrite()).isGreaterThan(duration);
        }

        File targetFile() {
            return this.targetFile;
        }
    }

    public GzFileSink(String str, Function<T, String> function, Function<T, String> function2, Duration duration) {
        this.outputDir = (String) Objects.requireNonNull(str);
        this.toString = (Function) Objects.requireNonNull(function);
        this.fileNamer = (Function) Objects.requireNonNull(function2);
        this.expirationTime = (Duration) Objects.requireNonNull(duration);
        scheduleStreamCloser();
        scheduleDataWriting();
    }

    private void scheduleStreamCloser() {
        this.executor.scheduleWithFixedDelay(this::closeStaleStreamTargets, 0L, 1L, TimeUnit.SECONDS);
    }

    private void scheduleDataWriting() {
        this.executor.scheduleWithFixedDelay(this::drainQueueAndWriteRecords, 0L, 1L, TimeUnit.SECONDS);
    }

    private void closeStaleStreamTargets() {
        staleOutputTargets().stream().forEach(this::closeAndRemoveWriter);
    }

    private List<String> staleOutputTargets() {
        return (List) this.openWriters.entrySet().stream().filter(entry -> {
            return ((TrackedPrintWriter) entry.getValue()).isStale(this.expirationTime);
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList());
    }

    @Override // java.util.function.Consumer
    public void accept(T t) {
        Preconditions.checkState(!this.isClosed, "Cannot add data to a closed GzFileSink");
        putInBuffer(t);
    }

    public int numOpenWriters() {
        return this.openWriters.size();
    }

    private void putInBuffer(T t) {
        try {
            this.queue.put(t);
        } catch (InterruptedException e) {
            throw DemotedException.demote("Thread interrupted while waiting to add item to maxxed out writeQueue", e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public synchronized void drainQueueAndWriteRecords() {
        ArrayList newArrayList = Lists.newArrayList();
        this.queue.drainTo(newArrayList);
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            sendToGzFile(it.next());
        }
    }

    private void sendToGzFile(T t) {
        String apply = this.fileNamer.apply(t);
        TrackedPrintWriter newGzStreamFor = this.openWriters.containsKey(apply) ? this.openWriters.get(apply) : newGzStreamFor(apply);
        newGzStreamFor.write(this.toString.apply(t) + "\n");
        newGzStreamFor.flush();
    }

    private TrackedPrintWriter newGzStreamFor(String str) {
        Preconditions.checkState(this.openWriters.size() <= this.maxOpenWriters, "Cannot open new gz file because " + this.openWriters.size() + " streams are already open.  Could calling flushAndCloseCurrentFiles() help?");
        FileUtils.makeDirIfMissing(this.outputDir);
        try {
            int count = this.targetCounts.count(str);
            TrackedPrintWriter trackedPrintWriter = new TrackedPrintWriter(new File(this.outputDir + File.separator + (count > 0 ? IN_PROGRESS_PREFIX + str + "_" + count + ".gz" : IN_PROGRESS_PREFIX + str + ".gz")));
            this.openWriters.put(str, trackedPrintWriter);
            this.targetCounts.add(str);
            return trackedPrintWriter;
        } catch (IOException e) {
            throw DemotedException.demote(e);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.isClosed = true;
        this.executor.shutdownNow();
        drainQueueAndWriteRecords();
        closeAllWriters();
    }

    private void closeAllWriters() {
        Iterator it = Lists.newArrayList(this.openWriters.keySet()).iterator();
        while (it.hasNext()) {
            closeAndRemoveWriter((String) it.next());
        }
    }

    public void flushAndCloseCurrentFiles() {
        drainQueueAndWriteRecords();
        closeAllWriters();
    }

    private void closeAndRemoveWriter(String str) {
        try {
            TrackedPrintWriter remove = this.openWriters.remove(str);
            remove.close();
            File targetFile = remove.targetFile();
            Files.move(targetFile, new File(this.outputDir + File.separator + targetFile.getName().substring(IN_PROGRESS_PREFIX.length())));
        } catch (IOException e) {
            throw DemotedException.demote(e);
        }
    }

    private ScheduledExecutorService buildExecutor() {
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(1);
        Runtime.getRuntime().addShutdownHook(new DrainFlushAndShutdown(this, newScheduledThreadPool));
        return newScheduledThreadPool;
    }
}
