package com.google.cloud.genomics.dataflow.writers.bam;

import com.google.api.services.storage.Storage;
import com.google.cloud.genomics.dataflow.functions.KeyReadsFn;
import com.google.cloud.genomics.dataflow.readers.bam.BAMIO;
import com.google.cloud.genomics.dataflow.readers.bam.HeaderInfo;
import com.google.cloud.genomics.dataflow.utils.GCSOptions;
import com.google.cloud.genomics.dataflow.utils.GCSOutputOptions;
import com.google.cloud.genomics.dataflow.utils.TruncatedOutputStream;
import com.google.cloud.genomics.utils.Contig;
import com.google.cloud.genomics.utils.grpc.ReadUtils;
import com.google.common.base.Stopwatch;
import com.google.genomics.v1.Read;
import htsjdk.samtools.BAMBlockWriter;
import htsjdk.samtools.SAMRecord;
import htsjdk.samtools.util.BlockCompressedStreamConstants;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.Transport;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;

/* loaded from: input_file:com/google/cloud/genomics/dataflow/writers/bam/WriteBAMFn.class */
public class WriteBAMFn extends DoFn<Read, String> {
    private static final Logger LOG = Logger.getLogger(WriteBAMFn.class.getName());
    public static TupleTag<String> WRITTEN_BAM_NAMES_TAG = new TupleTag<String>() { // from class: com.google.cloud.genomics.dataflow.writers.bam.WriteBAMFn.1
    };
    public static TupleTag<KV<Integer, Long>> SEQUENCE_SHARD_SIZES_TAG = new TupleTag<KV<Integer, Long>>() { // from class: com.google.cloud.genomics.dataflow.writers.bam.WriteBAMFn.2
    };
    final PCollectionView<HeaderInfo> headerView;
    Storage.Objects storage;
    Stopwatch stopWatch;
    int readCount;
    int unmappedReadCount;
    String shardName;
    TruncatedOutputStream ts;
    BAMBlockWriter bw;
    Contig shardContig;
    Options options;
    HeaderInfo headerInfo;
    int sequenceIndex;
    SAMRecord prevRead = null;
    long minAlignment = Long.MAX_VALUE;
    long maxAlignment = Long.MIN_VALUE;
    boolean hadOutOfOrder = false;
    private BoundedWindow window;

    /* loaded from: input_file:com/google/cloud/genomics/dataflow/writers/bam/WriteBAMFn$Options.class */
    public interface Options extends GCSOutputOptions {
    }

    public WriteBAMFn(PCollectionView<HeaderInfo> pCollectionView) {
        this.headerView = pCollectionView;
    }

    @DoFn.StartBundle
    public void startBundle(DoFn<Read, String>.StartBundleContext startBundleContext) throws IOException {
        LOG.info("Starting bundle ");
        this.storage = Transport.newStorageClient(startBundleContext.getPipelineOptions().as(GCSOptions.class)).build().objects();
        Metrics.counter(WriteBAMFn.class, "Initialized Write Shard Count").inc();
        this.stopWatch = Stopwatch.createStarted();
        this.options = startBundleContext.getPipelineOptions().as(Options.class);
        this.readCount = 0;
        this.unmappedReadCount = 0;
        this.headerInfo = null;
        this.prevRead = null;
        this.minAlignment = Long.MAX_VALUE;
        this.maxAlignment = Long.MIN_VALUE;
        this.hadOutOfOrder = false;
    }

    @DoFn.FinishBundle
    public void finishBundle(DoFn<Read, String>.FinishBundleContext finishBundleContext) throws IOException {
        this.bw.close();
        Metrics.distribution(WriteBAMFn.class, "Maximum Write Shard Processing Time (sec)").update(this.stopWatch.elapsed(TimeUnit.SECONDS));
        LOG.info("Finished writing " + this.shardContig);
        Metrics.counter(WriteBAMFn.class, "Finished Write Shard Count").inc();
        long bytesWrittenExceptingTruncation = this.ts.getBytesWrittenExceptingTruncation();
        LOG.info("Wrote " + this.readCount + " reads, " + this.unmappedReadCount + " unmapped, into " + this.shardName + (this.hadOutOfOrder ? "ignored out of order" : "") + ", wrote " + bytesWrittenExceptingTruncation + " bytes");
        Metrics.counter(WriteBAMFn.class, "Written reads").inc(this.readCount);
        Metrics.counter(WriteBAMFn.class, "Written unmapped reads").inc(this.unmappedReadCount);
        Metrics.distribution(WriteBAMFn.class, "Maximum Reads Per Shard").update(this.readCount + this.unmappedReadCount);
        finishBundleContext.output(this.shardName, this.window.maxTimestamp(), this.window);
        finishBundleContext.output(SEQUENCE_SHARD_SIZES_TAG, KV.of(Integer.valueOf(this.sequenceIndex), Long.valueOf(bytesWrittenExceptingTruncation)), this.window.maxTimestamp(), this.window);
    }

    @DoFn.ProcessElement
    public void processElement(DoFn<Read, String>.ProcessContext processContext, BoundedWindow boundedWindow) throws Exception {
        this.window = boundedWindow;
        if (this.headerInfo == null) {
            this.headerInfo = (HeaderInfo) processContext.sideInput(this.headerView);
        }
        Read read = (Read) processContext.element();
        if (this.readCount == 0) {
            this.shardContig = KeyReadsFn.shardKeyForRead(read, 1L);
            this.sequenceIndex = this.headerInfo.header.getSequenceIndex(this.shardContig.referenceName);
            boolean shardHasFirstRead = this.headerInfo.shardHasFirstRead(this.shardContig);
            this.shardName = this.options.getOutput() + "-" + String.format("%012d", Integer.valueOf(this.sequenceIndex)) + "-" + this.shardContig.referenceName + ":" + String.format("%012d", Long.valueOf(this.shardContig.start));
            LOG.info("Writing shard file " + this.shardName);
            this.ts = new TruncatedOutputStream(Channels.newOutputStream(new GcsUtil.GcsUtilFactory().create(this.options).create(GcsPath.fromUri(this.shardName), BAMIO.BAM_INDEX_FILE_MIME_TYPE)), BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length);
            this.bw = new BAMBlockWriter(this.ts, null);
            this.bw.setSortOrder(this.headerInfo.header.getSortOrder(), true);
            this.bw.setHeader(this.headerInfo.header);
            if (shardHasFirstRead) {
                LOG.info("First shard - writing header to " + this.shardName);
                this.bw.writeHeader(this.headerInfo.header);
            }
        }
        SAMRecord makeSAMRecord = ReadUtils.makeSAMRecord(read, this.headerInfo.header);
        if (this.prevRead != null && this.prevRead.getAlignmentStart() > makeSAMRecord.getAlignmentStart()) {
            LOG.info("Out of order read " + this.prevRead.getAlignmentStart() + " " + makeSAMRecord.getAlignmentStart() + " during writing of shard " + this.shardName + " after processing " + this.readCount + " reads, min seen alignment is " + this.minAlignment + " and max is " + this.maxAlignment + ", this read is " + (makeSAMRecord.getReadUnmappedFlag() ? "unmapped" : "mapped") + " and its mate is " + (makeSAMRecord.getMateUnmappedFlag() ? "unmapped" : "mapped"));
            Metrics.counter(WriteBAMFn.class, "Out of order reads").inc();
            this.readCount++;
            this.hadOutOfOrder = true;
            return;
        }
        this.minAlignment = Math.min(this.minAlignment, makeSAMRecord.getAlignmentStart());
        this.maxAlignment = Math.max(this.maxAlignment, makeSAMRecord.getAlignmentStart());
        this.prevRead = makeSAMRecord;
        if (makeSAMRecord.getReadUnmappedFlag()) {
            if (!makeSAMRecord.getMateUnmappedFlag()) {
                makeSAMRecord.setReferenceName(makeSAMRecord.getMateReferenceName());
                makeSAMRecord.setAlignmentStart(makeSAMRecord.getMateAlignmentStart());
            }
            this.unmappedReadCount++;
        }
        this.bw.addAlignment(makeSAMRecord);
        this.readCount++;
    }
}
