package com.google.cloud.genomics.dataflow.writers.bam;

import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.Max;
import com.google.cloud.dataflow.sdk.transforms.Min;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.util.GcsUtil;
import com.google.cloud.dataflow.sdk.util.Transport;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.cloud.genomics.dataflow.readers.bam.BAMIO;
import com.google.cloud.genomics.dataflow.readers.bam.HeaderInfo;
import com.google.cloud.genomics.dataflow.utils.GCSOptions;
import com.google.cloud.genomics.dataflow.utils.GCSOutputOptions;
import com.google.common.base.Stopwatch;
import htsjdk.samtools.BAMShardIndexer;
import htsjdk.samtools.SAMRecord;
import htsjdk.samtools.SAMRecordIterator;
import htsjdk.samtools.SamReader;
import htsjdk.samtools.ValidationStringency;
import java.nio.channels.Channels;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/* loaded from: input_file:com/google/cloud/genomics/dataflow/writers/bam/WriteBAIFn.class */
public class WriteBAIFn extends DoFn<String, String> {
    private static final Logger LOG = Logger.getLogger(WriteBAIFn.class.getName());
    public static TupleTag<Long> NO_COORD_READS_COUNT_TAG = new TupleTag<Long>() { // from class: com.google.cloud.genomics.dataflow.writers.bam.WriteBAIFn.1
    };
    public static TupleTag<String> WRITTEN_BAI_NAMES_TAG = new TupleTag<String>() { // from class: com.google.cloud.genomics.dataflow.writers.bam.WriteBAIFn.2
    };
    PCollectionView<String> writtenBAMFilerView;
    PCollectionView<HeaderInfo> headerView;
    PCollectionView<Iterable<KV<Integer, Long>>> sequenceShardSizesView;
    Aggregator<Long, Long> readCountAggregator = createAggregator("Indexed reads", new Sum.SumLongFn());
    Aggregator<Long, Long> noCoordReadCountAggregator = createAggregator("Indexed no coordinate reads", new Sum.SumLongFn());
    Aggregator<Integer, Integer> initializedShardCount = createAggregator("Initialized Indexing Shard Count", new Sum.SumIntegerFn());
    Aggregator<Integer, Integer> finishedShardCount = createAggregator("Finished Indexing Shard Count", new Sum.SumIntegerFn());
    Aggregator<Long, Long> shardTimeMaxSec = createAggregator("Maximum Indexing Shard Processing Time (sec)", new Max.MaxLongFn());
    Aggregator<Long, Long> shardTimeMinSec = createAggregator("Minimum Indexing Shard Processing Time (sec)", new Min.MinLongFn());
    Aggregator<Long, Long> shardReadCountMax = createAggregator("Maximum Reads Per Indexing Shard", new Max.MaxLongFn());
    Aggregator<Long, Long> shardReadCountMin = createAggregator("Minimum Reads Per Indexing Shard", new Min.MinLongFn());

    /* loaded from: input_file:com/google/cloud/genomics/dataflow/writers/bam/WriteBAIFn$Options.class */
    public interface Options extends GCSOutputOptions {
    }

    public WriteBAIFn(PCollectionView<HeaderInfo> pCollectionView, PCollectionView<String> pCollectionView2, PCollectionView<Iterable<KV<Integer, Long>>> pCollectionView3) {
        this.writtenBAMFilerView = pCollectionView2;
        this.headerView = pCollectionView;
        this.sequenceShardSizesView = pCollectionView3;
    }

    public void processElement(DoFn<String, String>.ProcessContext processContext) throws Exception {
        this.initializedShardCount.addValue(1);
        Stopwatch createStarted = Stopwatch.createStarted();
        HeaderInfo headerInfo = (HeaderInfo) processContext.sideInput(this.headerView);
        String str = (String) processContext.sideInput(this.writtenBAMFilerView);
        Iterable<KV> iterable = (Iterable) processContext.sideInput(this.sequenceShardSizesView);
        String str2 = (String) processContext.element();
        int sequenceIndex = headerInfo.header.getSequence(str2).getSequenceIndex();
        String str3 = str + "-" + String.format("%02d", Integer.valueOf(sequenceIndex)) + "-" + str2 + ".bai";
        long j = 0;
        int i = 0;
        long j2 = 0;
        for (KV kv : iterable) {
            if (((Integer) kv.getKey()).intValue() < sequenceIndex) {
                j += ((Long) kv.getValue()).longValue();
                i++;
            } else if (((Integer) kv.getKey()).intValue() == sequenceIndex) {
                j2 = ((Long) kv.getValue()).longValue();
            }
        }
        LOG.info("Generating BAI index: " + str3);
        LOG.info("Reading BAM file: " + str + " for reference " + str2 + ", skipping " + i + " references at offset " + j + ", expecting to process " + j2 + " bytes");
        Options as = processContext.getPipelineOptions().as(Options.class);
        SamReader openBAM = BAMIO.openBAM(Transport.newStorageClient(as.as(GCSOptions.class)).build().objects(), str, ValidationStringency.SILENT, true, j);
        BAMShardIndexer bAMShardIndexer = new BAMShardIndexer(Channels.newOutputStream(new GcsUtil.GcsUtilFactory().create(as).create(GcsPath.fromUri(str3), BAMIO.BAM_INDEX_FILE_MIME_TYPE)), openBAM.getFileHeader(), sequenceIndex);
        long j3 = 0;
        long j4 = 0;
        if (j2 > 0) {
            SAMRecordIterator it = openBAM.iterator();
            boolean z = false;
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                SAMRecord sAMRecord = (SAMRecord) it.next();
                if (sAMRecord.getReferenceName().equals(str2)) {
                    if (!z) {
                        LOG.info("Found records for refrence " + str2 + " after skipping " + j4);
                        z = true;
                    }
                    bAMShardIndexer.processAlignment(sAMRecord);
                    j3++;
                } else {
                    if (z) {
                        LOG.info("Finishing index building for " + str2 + " after processing " + j3);
                        break;
                    }
                    j4++;
                }
            }
            it.close();
        } else {
            LOG.info("No records for refrence " + str2 + ": writing empty index ");
        }
        long finish = bAMShardIndexer.finish();
        processContext.output(str3);
        processContext.sideOutput(NO_COORD_READS_COUNT_TAG, Long.valueOf(finish));
        LOG.info("Generated " + str3 + ", " + j3 + " reads, " + finish + " no coordinate reads, " + j4 + ", skipped reads");
        createStarted.stop();
        this.shardTimeMaxSec.addValue(Long.valueOf(createStarted.elapsed(TimeUnit.SECONDS)));
        this.shardTimeMinSec.addValue(Long.valueOf(createStarted.elapsed(TimeUnit.SECONDS)));
        this.finishedShardCount.addValue(1);
        this.readCountAggregator.addValue(Long.valueOf(j3));
        this.noCoordReadCountAggregator.addValue(Long.valueOf(finish));
        this.shardReadCountMax.addValue(Long.valueOf(j3));
        this.shardReadCountMin.addValue(Long.valueOf(j3));
    }
}
