package com.google.cloud.genomics.dataflow.pipelines;

import com.google.api.services.genomics.model.Read;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.ComposeRequest;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.DelegateCoder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.options.Default;
import com.google.cloud.dataflow.sdk.options.Description;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.View;
import com.google.cloud.dataflow.sdk.util.GcsUtil;
import com.google.cloud.dataflow.sdk.util.Transport;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.cloud.genomics.dataflow.readers.bam.BAMIO;
import com.google.cloud.genomics.dataflow.readers.bam.ReadBAMTransform;
import com.google.cloud.genomics.dataflow.readers.bam.ReaderOptions;
import com.google.cloud.genomics.dataflow.readers.bam.ShardingPolicy;
import com.google.cloud.genomics.dataflow.utils.DataflowWorkarounds;
import com.google.cloud.genomics.dataflow.utils.GCSOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsDatasetOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsOptions;
import com.google.cloud.genomics.dataflow.utils.TruncatedOutputStream;
import com.google.cloud.genomics.utils.Contig;
import com.google.cloud.genomics.utils.GenomicsFactory;
import com.google.cloud.genomics.utils.ReadUtils;
import com.google.common.collect.Lists;
import htsjdk.samtools.BAMBlockWriter;
import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SAMRecord;
import htsjdk.samtools.SAMRecordIterator;
import htsjdk.samtools.SAMTextHeaderCodec;
import htsjdk.samtools.SamReader;
import htsjdk.samtools.ValidationStringency;
import htsjdk.samtools.util.BlockCompressedStreamConstants;
import htsjdk.samtools.util.StringLineReader;
import java.io.IOException;
import java.io.OutputStream;
import java.io.StringWriter;
import java.nio.channels.Channels;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.logging.Logger;

/* loaded from: input_file:com/google/cloud/genomics/dataflow/pipelines/ShardedBAMWriting.class */
public class ShardedBAMWriting {
    private static ShardedBAMWritingOptions options;
    private static Pipeline p;
    private static GenomicsFactory.OfflineAuth auth;
    private static Iterable<Contig> contigs;
    static Coder<HeaderInfo> HEADER_INFO_CODER;
    static final long LOCI_PER_SHARD = 10000;
    public static TupleTag<KV<Contig, Iterable<Read>>> SHARDED_READS_TAG;
    public static TupleTag<HeaderInfo> HEADER_TAG;
    private static final Logger LOG = Logger.getLogger(ShardedBAMWriting.class.getName());
    static Coder<Contig> CONTIG_CODER = DelegateCoder.of(StringUtf8Coder.of(), new DelegateCoder.CodingFunction<Contig, String>() { // from class: com.google.cloud.genomics.dataflow.pipelines.ShardedBAMWriting.2
        public String apply(Contig contig) throws Exception {
            return contig.toString();
        }
    }, new DelegateCoder.CodingFunction<String, Contig>() { // from class: com.google.cloud.genomics.dataflow.pipelines.ShardedBAMWriting.3
        public Contig apply(String str) throws Exception {
            return (Contig) Contig.parseContigsFromCommandLine(str).iterator().next();
        }
    });
    static final SAMTextHeaderCodec SAM_HEADER_CODEC = new SAMTextHeaderCodec();

    /* loaded from: input_file:com/google/cloud/genomics/dataflow/pipelines/ShardedBAMWriting$CombineShardsFn.class */
    public static class CombineShardsFn extends DoFn<String, String> {
        final PCollectionView<Iterable<String>> shards;

        public CombineShardsFn(PCollectionView<Iterable<String>> pCollectionView) {
            this.shards = pCollectionView;
        }

        public void processElement(DoFn<String, String>.ProcessContext processContext) throws Exception {
            processContext.output(combineShards(processContext.getPipelineOptions().as(ShardedBAMWritingOptions.class), (String) processContext.element(), (Iterable) processContext.sideInput(this.shards)));
        }

        static String combineShards(ShardedBAMWritingOptions shardedBAMWritingOptions, String str, Iterable<String> iterable) throws IOException {
            String str2;
            String str3;
            Logger logger = ShardedBAMWriting.LOG;
            String valueOf = String.valueOf(str);
            if (valueOf.length() != 0) {
                str2 = "Combining shards into ".concat(valueOf);
            } else {
                str2 = r2;
                String str4 = new String("Combining shards into ");
            }
            logger.info(str2);
            Storage.Objects objects = Transport.newStorageClient(shardedBAMWritingOptions.as(GCSOptions.class)).build().objects();
            GcsPath fromUri = GcsPath.fromUri(str);
            StorageObject contentType = new StorageObject().setContentType("application/octet-stream");
            ArrayList newArrayList = Lists.newArrayList(iterable);
            Collections.sort(newArrayList);
            String concat = String.valueOf(shardedBAMWritingOptions.getOutput()).concat("-EOF");
            OutputStream newOutputStream = Channels.newOutputStream(new GcsUtil.GcsUtilFactory().create(shardedBAMWritingOptions).create(GcsPath.fromUri(concat), "application/octet-stream"));
            newOutputStream.write(BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK);
            newOutputStream.close();
            newArrayList.add(concat);
            ArrayList arrayList = new ArrayList();
            Iterator it = newArrayList.iterator();
            while (it.hasNext()) {
                GcsPath fromUri2 = GcsPath.fromUri((String) it.next());
                Logger logger2 = ShardedBAMWriting.LOG;
                String valueOf2 = String.valueOf(String.valueOf(fromUri2));
                logger2.info(new StringBuilder(14 + valueOf2.length()).append("Adding object ").append(valueOf2).toString());
                arrayList.add(new ComposeRequest.SourceObjects().setName(fromUri2.getObject()));
            }
            String storageObject = ((StorageObject) objects.compose(fromUri.getBucket(), fromUri.getObject(), new ComposeRequest().setDestination(contentType).setSourceObjects(arrayList)).execute()).toString();
            Logger logger3 = ShardedBAMWriting.LOG;
            String valueOf3 = String.valueOf(storageObject);
            if (valueOf3.length() != 0) {
                str3 = "Combine result is ".concat(valueOf3);
            } else {
                str3 = r2;
                String str5 = new String("Combine result is ");
            }
            logger3.info(str3);
            return storageObject;
        }
    }

    /* loaded from: input_file:com/google/cloud/genomics/dataflow/pipelines/ShardedBAMWriting$HeaderInfo.class */
    public static class HeaderInfo {
        public SAMFileHeader header;
        public Contig firstShard;

        public HeaderInfo(SAMFileHeader sAMFileHeader, Contig contig) {
            this.header = sAMFileHeader;
            this.firstShard = contig;
        }
    }

    /* loaded from: input_file:com/google/cloud/genomics/dataflow/pipelines/ShardedBAMWriting$KeyReadsFn.class */
    public static class KeyReadsFn extends DoFn<Read, KV<Contig, Read>> {
        public void processElement(DoFn<Read, KV<Contig, Read>>.ProcessContext processContext) throws Exception {
            Read read = (Read) processContext.element();
            processContext.output(KV.of(ShardedBAMWriting.shardKeyForRead(read), read));
        }
    }

    /* loaded from: input_file:com/google/cloud/genomics/dataflow/pipelines/ShardedBAMWriting$ShardReadsTransform.class */
    public static class ShardReadsTransform extends PTransform<PCollection<Read>, PCollection<KV<Contig, Iterable<Read>>>> {
        public PCollection<KV<Contig, Iterable<Read>>> apply(PCollection<Read> pCollection) {
            return pCollection.apply(ParDo.named("KeyReads").of(new KeyReadsFn())).apply(GroupByKey.create());
        }

        public static PCollection<KV<Contig, Iterable<Read>>> shard(PCollection<Read> pCollection) {
            return new ShardReadsTransform().apply(pCollection);
        }
    }

    /* loaded from: input_file:com/google/cloud/genomics/dataflow/pipelines/ShardedBAMWriting$ShardedBAMWritingOptions.class */
    public interface ShardedBAMWritingOptions extends GenomicsDatasetOptions, GCSOptions {
        @Default.String("")
        @Description("The Google Cloud Storage path to the BAM file to get reads data from")
        String getBAMFilePath();

        void setBAMFilePath(String str);
    }

    /* loaded from: input_file:com/google/cloud/genomics/dataflow/pipelines/ShardedBAMWriting$WriteReadsTransform.class */
    public static class WriteReadsTransform extends PTransform<PCollectionTuple, PCollection<String>> {
        public PCollection<String> apply(PCollectionTuple pCollectionTuple) {
            PCollectionView apply = pCollectionTuple.get(ShardedBAMWriting.HEADER_TAG).apply(View.asSingleton());
            PCollectionView apply2 = pCollectionTuple.get(ShardedBAMWriting.SHARDED_READS_TAG).apply(ParDo.named("Write shards").withSideInputs(Arrays.asList(apply)).of(new WriteShardFn(apply))).apply(View.asIterable());
            return ShardedBAMWriting.p.apply(Create.of(new String[]{ShardedBAMWriting.options.getOutput()})).apply(ParDo.named("Combine shards").withSideInputs(new PCollectionView[]{apply2}).of(new CombineShardsFn(apply2)));
        }

        public static PCollection<String> write(PCollection<KV<Contig, Iterable<Read>>> pCollection, HeaderInfo headerInfo) {
            return new WriteReadsTransform().apply(PCollectionTuple.of(ShardedBAMWriting.SHARDED_READS_TAG, pCollection).and(ShardedBAMWriting.HEADER_TAG, ShardedBAMWriting.p.apply(Create.of(new HeaderInfo[]{headerInfo})).setCoder(ShardedBAMWriting.HEADER_INFO_CODER)));
        }
    }

    /* loaded from: input_file:com/google/cloud/genomics/dataflow/pipelines/ShardedBAMWriting$WriteShardFn.class */
    public static class WriteShardFn extends DoFn<KV<Contig, Iterable<Read>>, String> {
        final PCollectionView<HeaderInfo> headerView;
        Storage.Objects storage;

        public WriteShardFn(PCollectionView<HeaderInfo> pCollectionView) {
            this.headerView = pCollectionView;
        }

        public void startBundle(DoFn<KV<Contig, Iterable<Read>>, String>.Context context) throws IOException {
            this.storage = Transport.newStorageClient(context.getPipelineOptions().as(GCSOptions.class)).build().objects();
        }

        public void processElement(DoFn<KV<Contig, Iterable<Read>>, String>.ProcessContext processContext) throws Exception {
            HeaderInfo headerInfo = (HeaderInfo) processContext.sideInput(this.headerView);
            KV kv = (KV) processContext.element();
            Contig contig = (Contig) kv.getKey();
            Iterable<Read> iterable = (Iterable) kv.getValue();
            boolean equals = contig.equals(headerInfo.firstShard);
            if (equals) {
                Logger logger = ShardedBAMWriting.LOG;
                String valueOf = String.valueOf(String.valueOf(contig));
                logger.info(new StringBuilder(20 + valueOf.length()).append("Writing first shard ").append(valueOf).toString());
            } else {
                Logger logger2 = ShardedBAMWriting.LOG;
                String valueOf2 = String.valueOf(String.valueOf(contig));
                logger2.info(new StringBuilder(24 + valueOf2.length()).append("Writing non-first shard ").append(valueOf2).toString());
            }
            processContext.output(writeShard(headerInfo.header, contig, iterable, (ShardedBAMWritingOptions) processContext.getPipelineOptions().as(ShardedBAMWritingOptions.class), equals));
            Logger logger3 = ShardedBAMWriting.LOG;
            String valueOf3 = String.valueOf(String.valueOf(contig));
            logger3.info(new StringBuilder(17 + valueOf3.length()).append("Finished writing ").append(valueOf3).toString());
        }

        String writeShard(SAMFileHeader sAMFileHeader, Contig contig, Iterable<Read> iterable, ShardedBAMWritingOptions shardedBAMWritingOptions, boolean z) throws IOException {
            String str;
            String valueOf = String.valueOf(String.valueOf(shardedBAMWritingOptions.getOutput()));
            String valueOf2 = String.valueOf(String.valueOf(contig));
            String sb = new StringBuilder(1 + valueOf.length() + valueOf2.length()).append(valueOf).append("-").append(valueOf2).toString();
            Logger logger = ShardedBAMWriting.LOG;
            String valueOf3 = String.valueOf(sb);
            if (valueOf3.length() != 0) {
                str = "Writing shard file ".concat(valueOf3);
            } else {
                str = r2;
                String str2 = new String("Writing shard file ");
            }
            logger.info(str);
            int i = 0;
            BAMBlockWriter bAMBlockWriter = new BAMBlockWriter(new TruncatedOutputStream(Channels.newOutputStream(new GcsUtil.GcsUtilFactory().create(shardedBAMWritingOptions).create(GcsPath.fromUri(sb), "application/octet-stream")), BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length), null);
            bAMBlockWriter.setSortOrder(sAMFileHeader.getSortOrder(), true);
            bAMBlockWriter.setHeader(sAMFileHeader);
            if (z) {
                bAMBlockWriter.writeHeader(sAMFileHeader);
            }
            Iterator<Read> it = iterable.iterator();
            while (it.hasNext()) {
                bAMBlockWriter.addAlignment(ReadUtils.makeSAMRecord(it.next(), sAMFileHeader));
                i++;
            }
            bAMBlockWriter.close();
            Logger logger2 = ShardedBAMWriting.LOG;
            String valueOf4 = String.valueOf(String.valueOf(sb));
            logger2.info(new StringBuilder(29 + valueOf4.length()).append("Wrote ").append(i).append(" reads into ").append(valueOf4).toString());
            return sb;
        }
    }

    public static void main(String[] strArr) throws GeneralSecurityException, IOException {
        PipelineOptionsFactory.register(ShardedBAMWritingOptions.class);
        options = PipelineOptionsFactory.fromArgs(strArr).withValidation().as(ShardedBAMWritingOptions.class);
        GenomicsDatasetOptions.Methods.validateOptions(options);
        auth = GenomicsOptions.Methods.getGenomicsAuth(options);
        p = Pipeline.create(options);
        DataflowWorkarounds.registerGenomicsCoders(p);
        DataflowWorkarounds.registerCoder(p, Contig.class, CONTIG_CODER);
        contigs = Contig.parseContigsFromCommandLine(options.getReferences());
        WriteReadsTransform.write(ShardReadsTransform.shard(getReadsFromBAMFile()), getHeader()).apply(TextIO.Write.to(String.valueOf(options.getOutput()).concat("-result")).named("Write Output Result").withoutSharding());
        p.run();
    }

    private static HeaderInfo getHeader() throws IOException {
        String str;
        String str2;
        String str3;
        String str4;
        HeaderInfo headerInfo = null;
        ArrayList newArrayList = Lists.newArrayList(contigs);
        if (newArrayList.size() <= 0) {
            throw new IOException("No contigs specified");
        }
        Collections.sort(newArrayList, new Comparator<Contig>() { // from class: com.google.cloud.genomics.dataflow.pipelines.ShardedBAMWriting.1
            @Override // java.util.Comparator
            public int compare(Contig contig, Contig contig2) {
                int compareTo = contig.referenceName.compareTo(contig2.referenceName);
                return compareTo != 0 ? compareTo : (int) (contig.start - contig2.start);
            }
        });
        Contig contig = (Contig) newArrayList.get(0);
        Storage.Objects objects = Transport.newStorageClient(options.as(GCSOptions.class)).build().objects();
        Logger logger = LOG;
        String valueOf = String.valueOf(options.getBAMFilePath());
        if (valueOf.length() != 0) {
            str = "Reading header from ".concat(valueOf);
        } else {
            str = r2;
            String str5 = new String("Reading header from ");
        }
        logger.info(str);
        SamReader openBAM = BAMIO.openBAM(objects, options.getBAMFilePath(), ValidationStringency.DEFAULT_STRINGENCY);
        SAMFileHeader fileHeader = openBAM.getFileHeader();
        Logger logger2 = LOG;
        String valueOf2 = String.valueOf(options.getBAMFilePath());
        if (valueOf2.length() != 0) {
            str2 = "Reading first chunk of reads from ".concat(valueOf2);
        } else {
            str2 = r2;
            String str6 = new String("Reading first chunk of reads from ");
        }
        logger2.info(str2);
        SAMRecordIterator query = openBAM.query(contig.referenceName, ((int) contig.start) + 1, ((int) contig.end) + 1, false);
        Contig contig2 = null;
        while (query.hasNext()) {
            int alignmentStart = ((SAMRecord) query.next()).getAlignmentStart();
            if (contig2 == null && alignmentStart > contig.start && alignmentStart < contig.end) {
                contig2 = shardFromAlignmentStart(contig.referenceName, alignmentStart);
                Logger logger3 = LOG;
                String valueOf3 = String.valueOf(String.valueOf(contig2));
                logger3.info(new StringBuilder(29 + valueOf3.length()).append("Determined first shard to be ").append(valueOf3).toString());
                headerInfo = new HeaderInfo(fileHeader, contig2);
            }
        }
        query.close();
        openBAM.close();
        if (headerInfo == null) {
            String valueOf4 = String.valueOf(contig.toString());
            if (valueOf4.length() != 0) {
                str4 = "Did not find reads for the first contig ".concat(valueOf4);
            } else {
                str4 = r3;
                String str7 = new String("Did not find reads for the first contig ");
            }
            throw new IOException(str4);
        }
        Logger logger4 = LOG;
        String valueOf5 = String.valueOf(options.getBAMFilePath());
        if (valueOf5.length() != 0) {
            str3 = "Finished header reading from ".concat(valueOf5);
        } else {
            str3 = r2;
            String str8 = new String("Finished header reading from ");
        }
        logger4.info(str3);
        return headerInfo;
    }

    private static PCollection<Read> getReadsFromBAMFile() throws IOException {
        String str;
        Logger logger = LOG;
        String valueOf = String.valueOf(options.getBAMFilePath());
        if (valueOf.length() != 0) {
            str = "Sharded reading of ".concat(valueOf);
        } else {
            str = r2;
            String str2 = new String("Sharded reading of ");
        }
        logger.info(str);
        return ReadBAMTransform.getReadsFromBAMFilesSharded(p, auth, contigs, new ReaderOptions(ValidationStringency.LENIENT, true), options.getBAMFilePath(), ShardingPolicy.BYTE_SIZE_POLICY);
    }

    static Contig shardKeyForRead(Read read) {
        String str = null;
        Long l = null;
        if (read.getAlignment() != null && read.getAlignment().getPosition() != null) {
            str = read.getAlignment().getPosition().getReferenceName();
            l = read.getAlignment().getPosition().getPosition();
        }
        if ((str == null || l == null) && read.getNextMatePosition() != null) {
            str = read.getNextMatePosition().getReferenceName();
            l = read.getNextMatePosition().getPosition();
        }
        if (str == null || l == null) {
            str = "*";
            l = new Long(0L);
        }
        return shardFromAlignmentStart(str, l.longValue());
    }

    static Contig shardFromAlignmentStart(String str, long j) {
        long j2 = (j / LOCI_PER_SHARD) * LOCI_PER_SHARD;
        return new Contig(str, j2, j2 + LOCI_PER_SHARD);
    }

    static {
        SAM_HEADER_CODEC.setValidationStringency(ValidationStringency.SILENT);
        HEADER_INFO_CODER = DelegateCoder.of(StringUtf8Coder.of(), new DelegateCoder.CodingFunction<HeaderInfo, String>() { // from class: com.google.cloud.genomics.dataflow.pipelines.ShardedBAMWriting.4
            public String apply(HeaderInfo headerInfo) throws Exception {
                StringWriter stringWriter = new StringWriter();
                ShardedBAMWriting.SAM_HEADER_CODEC.encode(stringWriter, headerInfo.header);
                String valueOf = String.valueOf(String.valueOf(headerInfo.firstShard.toString()));
                String valueOf2 = String.valueOf(String.valueOf(stringWriter.toString()));
                return new StringBuilder(1 + valueOf.length() + valueOf2.length()).append(valueOf).append("\n").append(valueOf2).toString();
            }
        }, new DelegateCoder.CodingFunction<String, HeaderInfo>() { // from class: com.google.cloud.genomics.dataflow.pipelines.ShardedBAMWriting.5
            public HeaderInfo apply(String str) throws Exception {
                int indexOf = str.indexOf("\n");
                String substring = str.substring(0, indexOf);
                return new HeaderInfo(ShardedBAMWriting.SAM_HEADER_CODEC.decode(new StringLineReader(str.substring(indexOf + 1)), "HEADER_INFO_CODER"), (Contig) Contig.parseContigsFromCommandLine(substring).iterator().next());
            }
        });
        SHARDED_READS_TAG = new TupleTag<>();
        HEADER_TAG = new TupleTag<>();
    }
}
