package com.google.cloud.genomics.dataflow.functions;

import com.google.api.services.storage.Storage;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.Max;
import com.google.cloud.dataflow.sdk.transforms.Min;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.util.GcsUtil;
import com.google.cloud.dataflow.sdk.util.Transport;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.cloud.genomics.dataflow.readers.bam.BAMIO;
import com.google.cloud.genomics.dataflow.readers.bam.HeaderInfo;
import com.google.cloud.genomics.dataflow.utils.GCSOptions;
import com.google.cloud.genomics.dataflow.utils.GCSOutputOptions;
import com.google.cloud.genomics.dataflow.utils.TruncatedOutputStream;
import com.google.cloud.genomics.utils.Contig;
import com.google.cloud.genomics.utils.grpc.ReadUtils;
import com.google.common.base.Stopwatch;
import com.google.genomics.v1.Read;
import htsjdk.samtools.BAMBlockWriter;
import htsjdk.samtools.SAMRecord;
import htsjdk.samtools.util.BlockCompressedStreamConstants;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/* loaded from: input_file:com/google/cloud/genomics/dataflow/functions/WriteBAMFn.class */
public class WriteBAMFn extends DoFn<Read, String> {
    private static final Logger LOG = Logger.getLogger(WriteBAMFn.class.getName());
    public static TupleTag<String> WRITTEN_BAM_NAMES_TAG = new TupleTag<String>() { // from class: com.google.cloud.genomics.dataflow.functions.WriteBAMFn.1
    };
    public static TupleTag<KV<Integer, Long>> SEQUENCE_SHARD_SIZES_TAG = new TupleTag<KV<Integer, Long>>() { // from class: com.google.cloud.genomics.dataflow.functions.WriteBAMFn.2
    };
    final PCollectionView<HeaderInfo> headerView;
    Storage.Objects storage;
    Stopwatch stopWatch;
    int readCount;
    int unmappedReadCount;
    String shardName;
    TruncatedOutputStream ts;
    BAMBlockWriter bw;
    Contig shardContig;
    Options options;
    HeaderInfo headerInfo;
    int sequenceIndex;
    SAMRecord prevRead = null;
    long minAlignment = Long.MAX_VALUE;
    long maxAlignment = Long.MIN_VALUE;
    boolean hadOutOfOrder = false;
    Aggregator<Integer, Integer> readCountAggregator = createAggregator("Written reads", new Sum.SumIntegerFn());
    Aggregator<Integer, Integer> unmappedReadCountAggregator = createAggregator("Written unmapped reads", new Sum.SumIntegerFn());
    Aggregator<Integer, Integer> initializedShardCount = createAggregator("Initialized Write Shard Count", new Sum.SumIntegerFn());
    Aggregator<Integer, Integer> finishedShardCount = createAggregator("Finished Write Shard Count", new Sum.SumIntegerFn());
    Aggregator<Long, Long> shardTimeMaxSec = createAggregator("Maximum Write Shard Processing Time (sec)", new Max.MaxLongFn());
    Aggregator<Long, Long> shardReadCountMax = createAggregator("Maximum Reads Per Shard", new Max.MaxLongFn());
    Aggregator<Long, Long> shardReadCountMin = createAggregator("Minimum Reads Per Shard", new Min.MinLongFn());
    Aggregator<Integer, Integer> outOfOrderCount = createAggregator("Out of order reads", new Sum.SumIntegerFn());

    /* loaded from: input_file:com/google/cloud/genomics/dataflow/functions/WriteBAMFn$Options.class */
    public interface Options extends GCSOutputOptions {
    }

    public WriteBAMFn(PCollectionView<HeaderInfo> pCollectionView) {
        this.headerView = pCollectionView;
    }

    public void startBundle(DoFn<Read, String>.Context context) throws IOException {
        LOG.info("Starting bundle ");
        this.storage = Transport.newStorageClient(context.getPipelineOptions().as(GCSOptions.class)).build().objects();
        this.initializedShardCount.addValue(1);
        this.stopWatch = Stopwatch.createStarted();
        this.options = context.getPipelineOptions().as(Options.class);
        this.readCount = 0;
        this.unmappedReadCount = 0;
        this.headerInfo = null;
        this.prevRead = null;
        this.minAlignment = Long.MAX_VALUE;
        this.maxAlignment = Long.MIN_VALUE;
        this.hadOutOfOrder = false;
    }

    public void finishBundle(DoFn<Read, String>.Context context) throws IOException {
        this.bw.close();
        this.shardTimeMaxSec.addValue(Long.valueOf(this.stopWatch.elapsed(TimeUnit.SECONDS)));
        Logger logger = LOG;
        String valueOf = String.valueOf(this.shardContig);
        logger.info(new StringBuilder(17 + String.valueOf(valueOf).length()).append("Finished writing ").append(valueOf).toString());
        this.finishedShardCount.addValue(1);
        long bytesWrittenExceptingTruncation = this.ts.getBytesWrittenExceptingTruncation();
        Logger logger2 = LOG;
        int i = this.readCount;
        int i2 = this.unmappedReadCount;
        String str = this.shardName;
        String str2 = this.hadOutOfOrder ? "ignored out of order" : "";
        logger2.info(new StringBuilder(86 + String.valueOf(str).length() + String.valueOf(str2).length()).append("Wrote ").append(i).append(" reads, ").append(i2).append(" unmapped, into ").append(str).append(str2).append(", wrote ").append(bytesWrittenExceptingTruncation).append(" bytes").toString());
        this.readCountAggregator.addValue(Integer.valueOf(this.readCount));
        this.unmappedReadCountAggregator.addValue(Integer.valueOf(this.unmappedReadCount));
        long j = this.readCount + this.unmappedReadCount;
        this.shardReadCountMax.addValue(Long.valueOf(j));
        this.shardReadCountMin.addValue(Long.valueOf(j));
        context.output(this.shardName);
        context.sideOutput(SEQUENCE_SHARD_SIZES_TAG, KV.of(Integer.valueOf(this.sequenceIndex), Long.valueOf(bytesWrittenExceptingTruncation)));
    }

    public void processElement(DoFn<Read, String>.ProcessContext processContext) throws Exception {
        String str;
        String str2;
        if (this.headerInfo == null) {
            this.headerInfo = (HeaderInfo) processContext.sideInput(this.headerView);
        }
        Read read = (Read) processContext.element();
        if (this.readCount == 0) {
            this.shardContig = KeyReadsFn.shardKeyForRead(read, 1L);
            this.sequenceIndex = this.headerInfo.header.getSequenceIndex(this.shardContig.referenceName);
            boolean shardHasFirstRead = this.headerInfo.shardHasFirstRead(this.shardContig);
            String output = this.options.getOutput();
            String valueOf = String.valueOf(String.format("%012d", Integer.valueOf(this.sequenceIndex)));
            String valueOf2 = String.valueOf(this.shardContig.referenceName);
            String valueOf3 = String.valueOf(String.format("%012d", Long.valueOf(this.shardContig.start)));
            this.shardName = new StringBuilder(3 + String.valueOf(output).length() + String.valueOf(valueOf).length() + String.valueOf(valueOf2).length() + String.valueOf(valueOf3).length()).append(output).append("-").append(valueOf).append("-").append(valueOf2).append(":").append(valueOf3).toString();
            Logger logger = LOG;
            String valueOf4 = String.valueOf(this.shardName);
            if (valueOf4.length() != 0) {
                str = "Writing shard file ".concat(valueOf4);
            } else {
                str = r2;
                String str3 = new String("Writing shard file ");
            }
            logger.info(str);
            this.ts = new TruncatedOutputStream(Channels.newOutputStream(new GcsUtil.GcsUtilFactory().create(this.options).create(GcsPath.fromUri(this.shardName), BAMIO.BAM_INDEX_FILE_MIME_TYPE)), BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length);
            this.bw = new BAMBlockWriter(this.ts, null);
            this.bw.setSortOrder(this.headerInfo.header.getSortOrder(), true);
            this.bw.setHeader(this.headerInfo.header);
            if (shardHasFirstRead) {
                Logger logger2 = LOG;
                String valueOf5 = String.valueOf(this.shardName);
                if (valueOf5.length() != 0) {
                    str2 = "First shard - writing header to ".concat(valueOf5);
                } else {
                    str2 = r2;
                    String str4 = new String("First shard - writing header to ");
                }
                logger2.info(str2);
                this.bw.writeHeader(this.headerInfo.header);
            }
        }
        SAMRecord makeSAMRecord = ReadUtils.makeSAMRecord(read, this.headerInfo.header);
        if (this.prevRead == null || this.prevRead.getAlignmentStart() <= makeSAMRecord.getAlignmentStart()) {
            this.minAlignment = Math.min(this.minAlignment, makeSAMRecord.getAlignmentStart());
            this.maxAlignment = Math.max(this.maxAlignment, makeSAMRecord.getAlignmentStart());
            this.prevRead = makeSAMRecord;
            if (makeSAMRecord.getReadUnmappedFlag()) {
                if (!makeSAMRecord.getMateUnmappedFlag()) {
                    makeSAMRecord.setReferenceName(makeSAMRecord.getMateReferenceName());
                    makeSAMRecord.setAlignmentStart(makeSAMRecord.getMateAlignmentStart());
                }
                this.unmappedReadCount++;
            }
            this.bw.addAlignment(makeSAMRecord);
            this.readCount++;
            return;
        }
        Logger logger3 = LOG;
        int alignmentStart = this.prevRead.getAlignmentStart();
        int alignmentStart2 = makeSAMRecord.getAlignmentStart();
        String str5 = this.shardName;
        int i = this.readCount;
        long j = this.minAlignment;
        long j2 = this.maxAlignment;
        String str6 = makeSAMRecord.getReadUnmappedFlag() ? "unmapped" : "mapped";
        String str7 = makeSAMRecord.getMateUnmappedFlag() ? "unmapped" : "mapped";
        logger3.info(new StringBuilder(209 + String.valueOf(str5).length() + String.valueOf(str6).length() + String.valueOf(str7).length()).append("Out of order read ").append(alignmentStart).append(" ").append(alignmentStart2).append(" during writing of shard ").append(str5).append(" after processing ").append(i).append(" reads, min seen alignment is ").append(j).append(" and max is ").append(j2).append(", this read is ").append(str6).append(" and its mate is ").append(str7).toString());
        this.outOfOrderCount.addValue(1);
        this.readCount++;
        this.hadOutOfOrder = true;
    }
}
