package com.google.cloud.genomics.dataflow.readers;

import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.Max;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.genomics.utils.GenomicsFactory;
import com.google.cloud.genomics.utils.ShardBoundary;
import com.google.cloud.genomics.utils.grpc.ReadStreamIterator;
import com.google.common.base.Stopwatch;
import com.google.genomics.v1.Read;
import com.google.genomics.v1.StreamReadsRequest;
import com.google.genomics.v1.StreamReadsResponse;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/google/cloud/genomics/dataflow/readers/ReadStreamer.class */
public class ReadStreamer extends PTransform<PCollection<StreamReadsRequest>, PCollection<Read>> {
    protected final GenomicsFactory.OfflineAuth auth;
    protected final ShardBoundary.Requirement shardBoundary;
    protected final String fields;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/genomics/dataflow/readers/ReadStreamer$ConvergeReadsList.class */
    public class ConvergeReadsList extends DoFn<List<Read>, Read> {
        protected Aggregator<Long, Long> itemCount = createAggregator("Number of reads", new Sum.SumLongFn());

        public ConvergeReadsList() {
        }

        public void processElement(DoFn<List<Read>, Read>.ProcessContext processContext) {
            Iterator it = ((List) processContext.element()).iterator();
            while (it.hasNext()) {
                processContext.output((Read) it.next());
                this.itemCount.addValue(1L);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/genomics/dataflow/readers/ReadStreamer$RetrieveReads.class */
    public class RetrieveReads extends DoFn<StreamReadsRequest, List<Read>> {
        protected Aggregator<Integer, Integer> initializedShardCount = createAggregator("Initialized Shard Count", new Sum.SumIntegerFn());
        protected Aggregator<Integer, Integer> finishedShardCount = createAggregator("Finished Shard Count", new Sum.SumIntegerFn());
        protected Aggregator<Long, Long> shardTimeMaxSec = createAggregator("Maximum Shard Processing Time (sec)", new Max.MaxLongFn());

        public RetrieveReads() {
        }

        public void processElement(DoFn<StreamReadsRequest, List<Read>>.ProcessContext processContext) throws IOException, GeneralSecurityException {
            this.initializedShardCount.addValue(1);
            this.shardTimeMaxSec.addValue(0L);
            Stopwatch createStarted = Stopwatch.createStarted();
            ReadStreamIterator enforceShardBoundary = ReadStreamIterator.enforceShardBoundary(ReadStreamer.this.auth, (StreamReadsRequest) processContext.element(), ReadStreamer.this.shardBoundary, ReadStreamer.this.fields);
            while (enforceShardBoundary.hasNext()) {
                processContext.output(((StreamReadsResponse) enforceShardBoundary.next()).getAlignmentsList());
            }
            createStarted.stop();
            this.shardTimeMaxSec.addValue(Long.valueOf(createStarted.elapsed(TimeUnit.SECONDS)));
            this.finishedShardCount.addValue(1);
        }
    }

    public ReadStreamer(GenomicsFactory.OfflineAuth offlineAuth, ShardBoundary.Requirement requirement, String str) {
        this.auth = offlineAuth;
        this.shardBoundary = requirement;
        this.fields = str;
    }

    public PCollection<Read> apply(PCollection<StreamReadsRequest> pCollection) {
        return pCollection.apply(ParDo.of(new RetrieveReads())).apply(ParDo.of(new ConvergeReadsList()));
    }
}
