package org.broadinstitute.hellbender.tools.examples;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.broadinstitute.barclay.argparser.Argument;
import org.broadinstitute.barclay.argparser.CommandLineProgramProperties;
import org.broadinstitute.hellbender.cmdline.programgroups.ExampleProgramGroup;
import org.broadinstitute.hellbender.engine.FeatureContext;
import org.broadinstitute.hellbender.engine.ReadWalker;
import org.broadinstitute.hellbender.engine.ReferenceContext;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.utils.python.StreamingPythonScriptExecutor;
import org.broadinstitute.hellbender.utils.read.GATKRead;
import org.broadinstitute.hellbender.utils.runtime.AsynchronousStreamWriterService;

@CommandLineProgramProperties(summary = "Example/toy program that uses a Python script.", oneLineSummary = "Example/toy program that uses a Python script.", programGroup = ExampleProgramGroup.class, omitFromCommandLine = true)
/* loaded from: input_file:org/broadinstitute/hellbender/tools/examples/ExampleStreamingPythonExecutor.class */
public class ExampleStreamingPythonExecutor extends ReadWalker {
    private static final String NL = System.lineSeparator();

    @Argument(fullName = "output", shortName = "O", doc = "Output file")
    private File outputFile;
    private FileOutputStream fifoWriter;

    @Argument(fullName = "batchSize", doc = "Size of a batch for writing")
    private int batchSize = 1000;
    final StreamingPythonScriptExecutor pythonExecutor = new StreamingPythonScriptExecutor(true);
    private AsynchronousStreamWriterService<String> asyncWriter = null;
    private List<String> batchList = new ArrayList(this.batchSize);
    private int batchCount = 0;

    @Override // org.broadinstitute.hellbender.engine.GATKTool
    public void onTraversalStart() {
        this.pythonExecutor.start(Collections.emptyList());
        File fIFOForWrite = this.pythonExecutor.getFIFOForWrite();
        this.pythonExecutor.sendAsynchronousCommand(String.format("fifoFile = open('%s', 'r')" + NL, fIFOForWrite.getAbsolutePath()));
        try {
            this.fifoWriter = new FileOutputStream(fIFOForWrite);
            this.asyncWriter = this.pythonExecutor.getAsynchronousStreamWriterService(this.fifoWriter, AsynchronousStreamWriterService.stringSerializer);
            this.pythonExecutor.getAccumulatedOutput();
            this.pythonExecutor.sendSynchronousCommand(String.format("tempFile = open('%s', 'w')" + NL, this.outputFile.getAbsolutePath()));
        } catch (IOException e) {
            throw new GATKException("Failure opening FIFO for writing", e);
        }
    }

    @Override // org.broadinstitute.hellbender.engine.ReadWalker
    public void apply(GATKRead gATKRead, ReferenceContext referenceContext, FeatureContext featureContext) {
        if (this.batchCount == this.batchSize) {
            startAsynchronousBatchWrite();
        }
        this.batchList.add(String.format("Read at %s:%d-%d:\n%s\n", gATKRead.getContig(), Integer.valueOf(gATKRead.getStart()), Integer.valueOf(gATKRead.getEnd()), gATKRead.getBasesString()));
        this.batchCount++;
    }

    @Override // org.broadinstitute.hellbender.engine.GATKTool
    public Object onTraversalSuccess() {
        if (this.batchCount != 0) {
            startAsynchronousBatchWrite();
        }
        this.asyncWriter.waitForPreviousBatchCompletion(1000L, TimeUnit.MILLISECONDS);
        this.pythonExecutor.getAccumulatedOutput();
        this.pythonExecutor.sendSynchronousCommand("tempFile.close()" + NL);
        this.pythonExecutor.sendSynchronousCommand("fifoFile.close()" + NL);
        return true;
    }

    private void startAsynchronousBatchWrite() {
        this.asyncWriter.waitForPreviousBatchCompletion(1000L, TimeUnit.MILLISECONDS);
        this.pythonExecutor.sendAsynchronousCommand(String.format("for i in range(%s):\n    tempFile.write(fifoFile.readline())" + NL + NL, Integer.valueOf(this.batchCount)));
        this.asyncWriter.startAsynchronousBatchWrite(this.batchList);
        this.batchList = new ArrayList(this.batchSize);
        this.batchCount = 0;
    }

    @Override // org.broadinstitute.hellbender.engine.GATKTool
    public void closeTool() {
        if (this.asyncWriter != null) {
            this.asyncWriter.terminate();
        }
        try {
            this.fifoWriter.close();
            this.pythonExecutor.terminate();
        } catch (IOException e) {
            throw new GATKException("IOException closing fifo writer", e);
        }
    }
}
