package com.google.cloud.genomics.dataflow.functions;

import com.google.api.services.genomics.model.Read;
import com.google.api.services.storage.Storage;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.util.GcsUtil;
import com.google.cloud.dataflow.sdk.util.Transport;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.genomics.dataflow.pipelines.ShardedBAMWriting;
import com.google.cloud.genomics.dataflow.utils.GCSOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsDatasetOptions;
import com.google.cloud.genomics.dataflow.utils.TruncatedOutputStream;
import com.google.cloud.genomics.utils.Contig;
import com.google.cloud.genomics.utils.ReadUtils;
import htsjdk.samtools.BAMBlockWriter;
import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SAMRecord;
import htsjdk.samtools.util.BlockCompressedStreamConstants;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.Iterator;
import java.util.logging.Logger;

/* loaded from: input_file:com/google/cloud/genomics/dataflow/functions/WriteShardFn.class */
public class WriteShardFn extends DoFn<KV<Contig, Iterable<Read>>, String> {
    private static final int MAX_RETRIES_FOR_WRITING_A_SHARD = 4;
    private static final String BAM_INDEX_FILE_MIME_TYPE = "application/octet-stream";
    private static final Logger LOG = Logger.getLogger(WriteShardFn.class.getName());
    final PCollectionView<ShardedBAMWriting.HeaderInfo> headerView;
    Storage.Objects storage;
    Aggregator<Integer, Integer> readCountAggregator = createAggregator("Written reads", new Sum.SumIntegerFn());
    Aggregator<Integer, Integer> unmappedReadCountAggregator = createAggregator("Written unmapped reads", new Sum.SumIntegerFn());

    /* loaded from: input_file:com/google/cloud/genomics/dataflow/functions/WriteShardFn$Options.class */
    public interface Options extends GenomicsDatasetOptions {
    }

    public WriteShardFn(PCollectionView<ShardedBAMWriting.HeaderInfo> pCollectionView) {
        this.headerView = pCollectionView;
    }

    public void startBundle(DoFn<KV<Contig, Iterable<Read>>, String>.Context context) throws IOException {
        this.storage = Transport.newStorageClient(context.getPipelineOptions().as(GCSOptions.class)).build().objects();
    }

    public void processElement(DoFn<KV<Contig, Iterable<Read>>, String>.ProcessContext processContext) throws Exception {
        ShardedBAMWriting.HeaderInfo headerInfo = (ShardedBAMWriting.HeaderInfo) processContext.sideInput(this.headerView);
        KV kv = (KV) processContext.element();
        Contig contig = (Contig) kv.getKey();
        Iterable<Read> iterable = (Iterable) kv.getValue();
        boolean equals = contig.equals(headerInfo.firstShard);
        int i = MAX_RETRIES_FOR_WRITING_A_SHARD;
        boolean z = false;
        do {
            try {
                processContext.output(writeShard(headerInfo.header, contig, iterable, (Options) processContext.getPipelineOptions().as(Options.class), equals));
                z = true;
            } catch (IOException e) {
                Logger logger = LOG;
                String valueOf = String.valueOf(contig);
                String valueOf2 = String.valueOf(e.getMessage());
                logger.warning(new StringBuilder(25 + String.valueOf(valueOf).length() + String.valueOf(valueOf2).length()).append("Write shard failed for ").append(valueOf).append(": ").append(valueOf2).toString());
                i--;
                if (i <= 0) {
                    Logger logger2 = LOG;
                    String valueOf3 = String.valueOf(contig);
                    logger2.warning(new StringBuilder(39 + String.valueOf(valueOf3).length()).append("No more retries - failing the task for ").append(valueOf3).toString());
                    throw e;
                }
            }
        } while (!z);
        Logger logger3 = LOG;
        String valueOf4 = String.valueOf(contig);
        logger3.info(new StringBuilder(17 + String.valueOf(valueOf4).length()).append("Finished writing ").append(valueOf4).toString());
    }

    String writeShard(SAMFileHeader sAMFileHeader, Contig contig, Iterable<Read> iterable, Options options, boolean z) throws IOException {
        String str;
        String str2;
        String output = options.getOutput();
        String valueOf = String.valueOf(contig.referenceName);
        String valueOf2 = String.valueOf(String.format("%012d", Long.valueOf(contig.start)));
        String valueOf3 = String.valueOf(String.format("%012d", Long.valueOf(contig.end)));
        String sb = new StringBuilder(3 + String.valueOf(output).length() + String.valueOf(valueOf).length() + String.valueOf(valueOf2).length() + String.valueOf(valueOf3).length()).append(output).append("-").append(valueOf).append(":").append(valueOf2).append("-").append(valueOf3).toString();
        Logger logger = LOG;
        String valueOf4 = String.valueOf(sb);
        if (valueOf4.length() != 0) {
            str = "Writing shard file ".concat(valueOf4);
        } else {
            str = r2;
            String str3 = new String("Writing shard file ");
        }
        logger.info(str);
        int i = 0;
        int i2 = 0;
        BAMBlockWriter bAMBlockWriter = new BAMBlockWriter(new TruncatedOutputStream(Channels.newOutputStream(new GcsUtil.GcsUtilFactory().create(options).create(GcsPath.fromUri(sb), BAM_INDEX_FILE_MIME_TYPE)), BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length), null);
        bAMBlockWriter.setSortOrder(sAMFileHeader.getSortOrder(), sAMFileHeader.getSortOrder() == SAMFileHeader.SortOrder.unsorted);
        bAMBlockWriter.setHeader(sAMFileHeader);
        if (z) {
            Logger logger2 = LOG;
            String valueOf5 = String.valueOf(sb);
            if (valueOf5.length() != 0) {
                str2 = "First shard - writing header to ".concat(valueOf5);
            } else {
                str2 = r2;
                String str4 = new String("First shard - writing header to ");
            }
            logger2.info(str2);
            bAMBlockWriter.writeHeader(sAMFileHeader);
        }
        Iterator<Read> it = iterable.iterator();
        while (it.hasNext()) {
            SAMRecord makeSAMRecord = ReadUtils.makeSAMRecord(it.next(), sAMFileHeader);
            if (makeSAMRecord.getReadUnmappedFlag()) {
                if (!makeSAMRecord.getMateUnmappedFlag()) {
                    makeSAMRecord.setReferenceName(makeSAMRecord.getMateReferenceName());
                    makeSAMRecord.setAlignmentStart(makeSAMRecord.getMateAlignmentStart());
                }
                i2++;
            }
            bAMBlockWriter.addAlignment(makeSAMRecord);
            i++;
        }
        bAMBlockWriter.close();
        LOG.info(new StringBuilder(51 + String.valueOf(sb).length()).append("Wrote ").append(i).append(" reads, ").append(i2).append(" umapped, into ").append(sb).toString());
        this.readCountAggregator.addValue(Integer.valueOf(i));
        this.unmappedReadCountAggregator.addValue(Integer.valueOf(i2));
        return sb;
    }
}
