package gate.cloud.io.file;

import gate.Gate;
import gate.cloud.batch.DocumentID;
import gate.cloud.io.IOConstants;
import gate.util.GateException;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.AutoCloseable;
import java.lang.ProcessBuilder;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.ToIntFunction;
import java.util.zip.GZIPOutputStream;
import org.apache.log4j.Logger;

/* loaded from: input_file:gate/cloud/io/file/StreamingFileOutputHelper.class */
public class StreamingFileOutputHelper<TItem, TWriter extends AutoCloseable> {
    private static final Logger logger = Logger.getLogger(StreamingFileOutputHelper.class);
    protected String pattern;
    protected String compression;
    protected File batchDir;
    protected NamingStrategy namingStrategy;
    protected TItem endOfData;
    protected WriterCreator<TWriter> openWriter;
    protected WriteOperation<TWriter, TItem> writeItem;
    protected ToIntFunction<TItem> itemSize;
    protected long chunkSize = -1;
    protected BlockingQueue<TItem> results = new ArrayBlockingQueue(100);
    protected ExecutorService processWaiter = Executors.newCachedThreadPool();

    /* loaded from: input_file:gate/cloud/io/file/StreamingFileOutputHelper$StreamOutputter.class */
    protected class StreamOutputter implements Runnable {
        private File currentFile;
        private int currentChunk = -1;
        private TWriter currentOutput;
        private Process currentProcess;

        protected StreamOutputter() {
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            try {
                int i = 0;
                while (true) {
                    try {
                        TItem take = StreamingFileOutputHelper.this.results.take();
                        if (take == StreamingFileOutputHelper.this.endOfData) {
                            closeChunk();
                            StreamingFileOutputHelper.this.processWaiter.shutdown();
                            return;
                        }
                        if (this.currentOutput == null) {
                            try {
                                openNextChunk();
                            } catch (Exception e) {
                                StreamingFileOutputHelper.logger.error("Failed to open output file " + this.currentFile, e);
                            }
                        }
                        try {
                            StreamingFileOutputHelper.this.writeItem.writeItem(this.currentOutput, take);
                        } catch (Exception e2) {
                            StreamingFileOutputHelper.logger.warn("Error writing to file " + this.currentFile, e2);
                        }
                        i += StreamingFileOutputHelper.this.itemSize.applyAsInt(take);
                        if (i > 1048576) {
                            if (this.currentFile.length() > StreamingFileOutputHelper.this.chunkSize) {
                                closeChunk();
                            }
                            i = 0;
                        }
                    } catch (Throwable th) {
                        closeChunk();
                        StreamingFileOutputHelper.this.processWaiter.shutdown();
                        throw th;
                    }
                }
            } catch (InterruptedException e3) {
                Thread.currentThread().interrupt();
            }
        }

        private void closeChunk() {
            if (this.currentOutput != null) {
                try {
                    this.currentOutput.close();
                } catch (Exception e) {
                    StreamingFileOutputHelper.logger.warn("Error closing file " + this.currentFile.getAbsolutePath(), e);
                }
                if (this.currentProcess != null) {
                    final Process process = this.currentProcess;
                    StreamingFileOutputHelper.this.processWaiter.execute(new Runnable() { // from class: gate.cloud.io.file.StreamingFileOutputHelper.StreamOutputter.1
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                process.waitFor();
                            } catch (InterruptedException e2) {
                                StreamingFileOutputHelper.logger.warn("Interrupted while waiting for process", e2);
                                Thread.currentThread().interrupt();
                            }
                        }
                    });
                    this.currentProcess = null;
                }
                this.currentOutput = null;
                this.currentFile = null;
            }
        }

        private void openNextChunk() throws Exception {
            OutputStream fileOutputStream;
            do {
                String str = StreamingFileOutputHelper.this.pattern;
                int i = this.currentChunk + 1;
                this.currentChunk = i;
                this.currentFile = StreamingFileOutputHelper.this.namingStrategy.toFile(new DocumentID(String.format(str, Integer.valueOf(i))));
            } while (this.currentFile.exists());
            if (IOConstants.VALUE_COMPRESSION_GZIP.equals(StreamingFileOutputHelper.this.compression)) {
                fileOutputStream = new GZIPOutputStream(new FileOutputStream(this.currentFile));
            } else if (StreamingFileOutputHelper.this.compression == null || IOConstants.VALUE_COMPRESSION_NONE.equals(StreamingFileOutputHelper.this.compression)) {
                fileOutputStream = new FileOutputStream(this.currentFile);
            } else {
                ProcessBuilder processBuilder = new ProcessBuilder(StreamingFileOutputHelper.this.compression.trim().split("\\s+"));
                processBuilder.directory();
                processBuilder.redirectInput(ProcessBuilder.Redirect.PIPE);
                processBuilder.redirectOutput(this.currentFile);
                processBuilder.redirectError(ProcessBuilder.Redirect.INHERIT);
                this.currentProcess = processBuilder.start();
                fileOutputStream = this.currentProcess.getOutputStream();
            }
            this.currentOutput = StreamingFileOutputHelper.this.openWriter.create(fileOutputStream);
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:gate/cloud/io/file/StreamingFileOutputHelper$WriteOperation.class */
    public interface WriteOperation<W, I> {
        void writeItem(W w, I i) throws Exception;
    }

    @FunctionalInterface
    /* loaded from: input_file:gate/cloud/io/file/StreamingFileOutputHelper$WriterCreator.class */
    public interface WriterCreator<W> {
        W create(OutputStream outputStream) throws Exception;
    }

    public StreamingFileOutputHelper(TItem titem, WriterCreator<TWriter> writerCreator, WriteOperation<TWriter, TItem> writeOperation, ToIntFunction<TItem> toIntFunction) {
        this.endOfData = titem;
        this.openWriter = writerCreator;
        this.writeItem = writeOperation;
        this.itemSize = toIntFunction;
    }

    public void config(Map<String, String> map) throws IOException, GateException {
        String str = map.get(IOConstants.PARAM_BATCH_FILE_LOCATION);
        if (str != null) {
            this.batchDir = new File(str).getParentFile();
        }
        String str2 = map.get(IOConstants.PARAM_NAMING_STRATEGY);
        if (str2 == null || str2.length() == 0) {
            str2 = SimpleNamingStrategy.class.getName();
        }
        try {
            this.namingStrategy = (NamingStrategy) Class.forName(str2, true, Gate.getClassLoader()).asSubclass(NamingStrategy.class).newInstance();
            this.namingStrategy.config(true, map);
            this.pattern = map.get(IOConstants.PARAM_PATTERN);
            if (this.pattern == null) {
                this.pattern = "part-%03d";
            }
            try {
                this.chunkSize = Long.parseLong(map.get(IOConstants.PARAM_CHUNK_SIZE));
            } catch (Exception e) {
                logger.info("Using default chunk size");
                this.chunkSize = 99000000L;
            }
            this.compression = map.get(IOConstants.PARAM_COMPRESSION);
            if (this.compression == null) {
                this.compression = IOConstants.VALUE_COMPRESSION_NONE;
            }
        } catch (Exception e2) {
            throw new GateException("Could not instantiate specified naming strategy", e2);
        }
    }

    public void init() throws IOException, GateException {
        new Thread(new StreamOutputter()).start();
    }

    public void sendItem(TItem titem) {
        try {
            this.results.put(titem);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void close() throws IOException, GateException {
        try {
            this.results.put(this.endOfData);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
