package com.google.cloud.flink.bigquery.source.split.reader;

import com.google.cloud.bigquery.storage.v1.AvroRows;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.flink.bigquery.services.BigQueryServices;
import com.google.cloud.flink.bigquery.services.BigQueryServicesFactory;
import com.google.cloud.flink.bigquery.source.config.BigQueryReadOptions;
import com.google.cloud.flink.bigquery.source.reader.BigQuerySourceReaderContext;
import com.google.cloud.flink.bigquery.source.split.BigQuerySourceSplit;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:com/google/cloud/flink/bigquery/source/split/reader/BigQuerySourceSplitReader.class */
public class BigQuerySourceSplitReader implements SplitReader<GenericRecord, BigQuerySourceSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(BigQuerySourceSplitReader.class);
    private final BigQueryReadOptions readOptions;
    private final BigQuerySourceReaderContext readerContext;
    private final Configuration configuration;
    private Long splitStartFetch;
    private final Queue<BigQuerySourceSplit> assignedSplits = new ArrayDeque();
    private Boolean closed = false;
    private Schema avroSchema = null;
    private Long readSoFar = 0L;
    private Iterator<ReadRowsResponse> readStreamIterator = null;

    /* loaded from: input_file:com/google/cloud/flink/bigquery/source/split/reader/BigQuerySourceSplitReader$GenericRecordReader.class */
    static class GenericRecordReader {
        private final Schema schema;

        private GenericRecordReader(Schema schema) {
            Preconditions.checkNotNull(schema, "The provided avro schema reference is null.");
            this.schema = schema;
        }

        public static GenericRecordReader create(Schema schema) {
            return new GenericRecordReader(schema);
        }

        public List<GenericRecord> processRows(AvroRows avroRows) throws IOException {
            BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(avroRows.getSerializedBinaryRows().toByteArray(), (BinaryDecoder) null);
            GenericDatumReader genericDatumReader = new GenericDatumReader(this.schema);
            ArrayList arrayList = new ArrayList();
            while (!binaryDecoder.isEnd()) {
                arrayList.add((GenericRecord) genericDatumReader.read((Object) null, binaryDecoder));
            }
            return arrayList;
        }
    }

    public BigQuerySourceSplitReader(BigQueryReadOptions bigQueryReadOptions, BigQuerySourceReaderContext bigQuerySourceReaderContext) {
        this.readOptions = bigQueryReadOptions;
        this.readerContext = bigQuerySourceReaderContext;
        this.configuration = bigQuerySourceReaderContext.getConfiguration();
    }

    Long offsetToFetch(BigQuerySourceSplit bigQuerySourceSplit) {
        if (bigQuerySourceSplit.getOffset().longValue() > 0) {
            this.readSoFar = bigQuerySourceSplit.getOffset();
            this.splitStartFetch = Long.valueOf(System.currentTimeMillis());
        } else if (this.readSoFar.longValue() == 0) {
            this.splitStartFetch = Long.valueOf(System.currentTimeMillis());
        }
        LOG.debug("[subtask #{}] Offset to fetch from {} for stream {}.", new Object[]{Integer.valueOf(this.readerContext.getIndexOfSubtask()), this.readSoFar, bigQuerySourceSplit.getStreamName()});
        return this.readSoFar;
    }

    BigQueryServices.BigQueryServerStream<ReadRowsResponse> retrieveReadStream(BigQuerySourceSplit bigQuerySourceSplit) throws IOException {
        try {
            BigQueryServices.StorageReadClient storageRead = BigQueryServicesFactory.instance(this.readOptions.getBigQueryConnectOptions()).storageRead();
            Throwable th = null;
            try {
                BigQueryServices.BigQueryServerStream<ReadRowsResponse> readRows = storageRead.readRows(ReadRowsRequest.newBuilder().setReadStream(bigQuerySourceSplit.getStreamName()).setOffset(offsetToFetch(bigQuerySourceSplit).longValue()).build());
                if (storageRead != null) {
                    if (0 != 0) {
                        try {
                            storageRead.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        storageRead.close();
                    }
                }
                return readRows;
            } finally {
            }
        } catch (Exception e) {
            throw new IOException(String.format("[subtask #%d][hostname %s] Problems while opening the stream %s from BigQuery with connection info %s. Current split offset %d, reader offset %d.", Integer.valueOf(this.readerContext.getIndexOfSubtask()), this.readerContext.getLocalHostName(), Optional.ofNullable(bigQuerySourceSplit.getStreamName()).orElse("NA"), this.readOptions.toString(), bigQuerySourceSplit.getOffset(), this.readSoFar), e);
        }
    }

    public RecordsWithSplitIds<GenericRecord> fetch() throws IOException {
        if (this.closed.booleanValue()) {
            throw new IllegalStateException("Can't fetch records from a closed split reader.");
        }
        RecordsBySplits.Builder builder = new RecordsBySplits.Builder();
        if (this.assignedSplits.isEmpty()) {
            return builder.build();
        }
        if (this.readerContext.willExceedLimit(0)) {
            LOG.info("Completing reading because we are over limit (context reader count {}).", this.readerContext.currentReadCount());
            builder.addFinishedSplits((Collection) this.assignedSplits.stream().map(bigQuerySourceSplit -> {
                return bigQuerySourceSplit.splitId();
            }).collect(Collectors.toList()));
            this.assignedSplits.clear();
            return builder.build();
        }
        BigQuerySourceSplit peek = this.assignedSplits.peek();
        int intValue = this.readOptions.getMaxRecordsPerSplitFetch().intValue();
        int i = 0;
        Long valueOf = Long.valueOf(System.currentTimeMillis());
        Boolean bool = false;
        try {
            if (this.readStreamIterator == null) {
                this.readStreamIterator = retrieveReadStream(peek).iterator();
            }
            Long valueOf2 = Long.valueOf(System.currentTimeMillis());
            while (true) {
                if (!this.readStreamIterator.hasNext()) {
                    break;
                }
                ReadRowsResponse next = this.readStreamIterator.next();
                if (!next.hasAvroRows()) {
                    LOG.info("[subtask #{}][hostname {}] The response contained no avro records for stream {}.", new Object[]{Integer.valueOf(this.readerContext.getIndexOfSubtask()), this.readerContext.getLocalHostName(), peek.getStreamName()});
                }
                if (this.avroSchema == null) {
                    if (!next.hasAvroSchema()) {
                        throw new IllegalArgumentException("Avro schema not initialized and not available in the response.");
                    }
                    this.avroSchema = new Schema.Parser().parse(next.getAvroSchema().getSchema());
                }
                Long valueOf3 = Long.valueOf(System.currentTimeMillis());
                List<GenericRecord> processRows = GenericRecordReader.create(this.avroSchema).processRows(next.getAvroRows());
                LOG.debug("[subtask #{}][hostname %s] Iteration decoded records in {}ms from stream {}.", new Object[]{Integer.valueOf(this.readerContext.getIndexOfSubtask()), Long.valueOf(System.currentTimeMillis() - valueOf3.longValue()), peek.getStreamName()});
                Iterator<GenericRecord> it = processRows.iterator();
                while (it.hasNext()) {
                    builder.add(peek, it.next());
                    i++;
                    if (this.readerContext.willExceedLimit(i)) {
                        break;
                    }
                }
                if (this.readerContext.willExceedLimit(i)) {
                    break;
                }
                LOG.debug("[subtask #{}][hostname {}] Completed reading iteration in {}ms, so far read {} from stream {}.", new Object[]{Integer.valueOf(this.readerContext.getIndexOfSubtask()), this.readerContext.getLocalHostName(), Long.valueOf(System.currentTimeMillis() - valueOf2.longValue()), Long.valueOf(this.readSoFar.longValue() + i), peek.getStreamName()});
                valueOf2 = Long.valueOf(System.currentTimeMillis());
                if (i + processRows.size() > intValue) {
                    bool = true;
                    break;
                }
            }
            this.readSoFar = Long.valueOf(this.readSoFar.longValue() + i);
            if (bool.booleanValue()) {
                LOG.debug("[subtask #{}][hostname {}] Completed a partial fetch in {}ms, so far read {} from stream {}.", new Object[]{Integer.valueOf(this.readerContext.getIndexOfSubtask()), this.readerContext.getLocalHostName(), Long.valueOf(System.currentTimeMillis() - valueOf.longValue()), this.readSoFar, peek.getStreamName()});
            } else {
                this.readerContext.updateReadCount(this.readSoFar);
                LOG.info("[subtask #{}][hostname {}] Completed reading split, {} records in {}ms on stream {}.", new Object[]{Integer.valueOf(this.readerContext.getIndexOfSubtask()), this.readerContext.getLocalHostName(), this.readSoFar, Long.valueOf(System.currentTimeMillis() - this.splitStartFetch.longValue()), peek.splitId()});
                this.readSoFar = 0L;
                this.assignedSplits.poll();
                this.readStreamIterator = null;
                builder.addFinishedSplit(peek.splitId());
            }
            return builder.build();
        } catch (Exception e) {
            LOG.error(String.format("[subtask #%d][hostname %s] Problems while reading stream %s from BigQuery with connection info %s. Current split offset %d, reader offset %d. Flink options %s.", Integer.valueOf(this.readerContext.getIndexOfSubtask()), Optional.ofNullable(this.readerContext.getLocalHostName()).orElse("NA"), Optional.ofNullable(peek.getStreamName()).orElse("NA"), this.readOptions.toString(), peek.getOffset(), this.readSoFar, this.configuration.toString()), e);
            this.readStreamIterator = null;
            return new RecordsBySplits.Builder().build();
        }
    }

    public void handleSplitsChanges(SplitsChange<BigQuerySourceSplit> splitsChange) {
        LOG.debug("Handle split changes {}.", splitsChange);
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        this.assignedSplits.addAll(splitsChange.splits());
    }

    public void wakeUp() {
        LOG.debug("[subtask #{}][hostname %{}] Wake up called.", Integer.valueOf(this.readerContext.getIndexOfSubtask()), this.readerContext.getLocalHostName());
    }

    public void close() throws Exception {
        LOG.debug("[subtask #{}][hostname {}] Close called, assigned splits {}.", new Object[]{Integer.valueOf(this.readerContext.getIndexOfSubtask()), this.readerContext.getLocalHostName(), this.assignedSplits.toString()});
        if (this.closed.booleanValue()) {
            return;
        }
        this.closed = true;
        this.readSoFar = 0L;
        this.readStreamIterator = null;
    }
}
