package com.google.cloud.flink.bigquery.sink.writer;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.sink.TwoPhaseCommittingStatefulSink;
import com.google.cloud.flink.bigquery.sink.committer.BigQueryCommittable;
import com.google.cloud.flink.bigquery.sink.exceptions.BigQueryConnectorException;
import com.google.cloud.flink.bigquery.sink.exceptions.BigQuerySerializationException;
import com.google.cloud.flink.bigquery.sink.serializer.BigQueryProtoSerializer;
import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProvider;
import com.google.cloud.flink.bigquery.sink.throttle.Throttler;
import com.google.cloud.flink.bigquery.sink.throttle.WriteStreamCreationThrottler;
import com.google.cloud.flink.bigquery.sink.writer.BaseWriter;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.StringUtils;

/* loaded from: input_file:com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriter.class */
public class BigQueryBufferedWriter<IN> extends BaseWriter<IN> implements TwoPhaseCommittingStatefulSink.PrecommittingStatefulSinkWriter<IN, BigQueryWriterState, BigQueryCommittable> {
    private final Throttler writeStreamCreationThrottler;
    private String streamNameInState;
    private long streamOffset;
    private long streamOffsetInState;
    private long appendRequestRowCount;
    Counter numberOfRecordsBufferedByBigQuerySinceCheckpoint;
    long totalRecordsCommitted;
    private boolean isFirstWriteAfterCheckpoint;

    public BigQueryBufferedWriter(String str, BigQueryConnectOptions bigQueryConnectOptions, BigQuerySchemaProvider bigQuerySchemaProvider, BigQueryProtoSerializer bigQueryProtoSerializer, Sink.InitContext initContext) {
        this("", 0L, str, 0L, 0L, 0L, bigQueryConnectOptions, bigQuerySchemaProvider, bigQueryProtoSerializer, initContext);
    }

    public BigQueryBufferedWriter(String str, long j, String str2, long j2, long j3, long j4, BigQueryConnectOptions bigQueryConnectOptions, BigQuerySchemaProvider bigQuerySchemaProvider, BigQueryProtoSerializer bigQueryProtoSerializer, Sink.InitContext initContext) {
        super(initContext.getSubtaskId(), str2, bigQueryConnectOptions, bigQuerySchemaProvider, bigQueryProtoSerializer);
        this.streamNameInState = StringUtils.isNullOrWhitespaceOnly(str) ? "" : str;
        this.streamName = this.streamNameInState;
        this.streamOffsetInState = j;
        this.streamOffset = j;
        this.totalRecordsSeen = j2;
        this.totalRecordsWritten = j3;
        this.totalRecordsCommitted = j4;
        this.writeStreamCreationThrottler = new WriteStreamCreationThrottler(this.subtaskId);
        this.appendRequestRowCount = 0L;
        this.isFirstWriteAfterCheckpoint = true;
        initializeExactlyOnceMetrics(initContext);
    }

    public void write(IN in, SinkWriter.Context context) {
        if (this.isFirstWriteAfterCheckpoint) {
            preWriteOpsAfterCommit();
        }
        this.totalRecordsSeen++;
        this.numberOfRecordsSeenByWriter.inc();
        this.numberOfRecordsSeenByWriterSinceCheckpoint.inc();
        try {
            ByteString protoRow = getProtoRow(in);
            if (!fitsInAppendRequest(protoRow)) {
                validateAppendResponses(false);
                append();
            }
            addToAppendRequest(protoRow);
            this.appendRequestRowCount++;
        } catch (BigQuerySerializationException e) {
            this.logger.error(String.format("Unable to serialize record %s. Dropping it!", in), e);
        }
    }

    private void preWriteOpsAfterCommit() {
        this.isFirstWriteAfterCheckpoint = false;
        long j = this.totalRecordsWritten - this.totalRecordsCommitted;
        this.totalRecordsCommitted = this.totalRecordsWritten;
        this.numberOfRecordsWrittenToBigQuery.inc(j);
        this.numberOfRecordsBufferedByBigQuerySinceCheckpoint.dec(this.numberOfRecordsBufferedByBigQuerySinceCheckpoint.getCount());
        this.numberOfRecordsSeenByWriterSinceCheckpoint.dec(this.numberOfRecordsSeenByWriterSinceCheckpoint.getCount());
    }

    @Override // com.google.cloud.flink.bigquery.sink.writer.BaseWriter
    void sendAppendRequest(ProtoRows protoRows) {
        long serializedRowsCount = protoRows.getSerializedRowsCount();
        if (this.streamOffset == this.streamOffsetInState && this.streamName.equals(this.streamNameInState) && !StringUtils.isNullOrWhitespaceOnly(this.streamName)) {
            performFirstAppendOnRestoredStream(protoRows, serializedRowsCount);
            return;
        }
        if (StringUtils.isNullOrWhitespaceOnly(this.streamName)) {
            this.logger.info("Throttling creation of BigQuery write stream in subtask {}", Integer.valueOf(this.subtaskId));
            this.writeStreamCreationThrottler.throttle();
            createWriteStream(WriteStream.Type.BUFFERED);
            createStreamWriter(false);
        }
        postAppendOps(this.streamWriter.append(protoRows, this.streamOffset), serializedRowsCount);
    }

    @Override // com.google.cloud.flink.bigquery.sink.writer.BaseWriter
    void validateAppendResponse(BaseWriter.AppendInfo appendInfo) {
        ApiFuture<AppendRowsResponse> future = appendInfo.getFuture();
        long expectedOffset = appendInfo.getExpectedOffset();
        long recordsAppended = appendInfo.getRecordsAppended();
        try {
            AppendRowsResponse appendRowsResponse = (AppendRowsResponse) future.get();
            if (appendRowsResponse.hasError()) {
                logAndThrowFatalException(appendRowsResponse.getError().getMessage());
            }
            long value = appendRowsResponse.getAppendResult().getOffset().getValue();
            if (value != expectedOffset) {
                logAndThrowFatalException(String.format("Inconsistent offset in BigQuery API response. Found %d, expected %d", Long.valueOf(value), Long.valueOf(expectedOffset)));
            }
            this.totalRecordsWritten += recordsAppended;
            this.numberOfRecordsBufferedByBigQuerySinceCheckpoint.inc(recordsAppended);
        } catch (InterruptedException | ExecutionException e) {
            if (e.getCause().getClass() == Exceptions.OffsetAlreadyExists.class) {
                this.logger.info("Ignoring OffsetAlreadyExists error in subtask {} as this can be due to faulty retries", Integer.valueOf(this.subtaskId));
            } else {
                logAndThrowFatalException(e);
            }
        }
    }

    public Collection<BigQueryCommittable> prepareCommit() throws IOException, InterruptedException {
        this.logger.info("Preparing commit in subtask {}", Integer.valueOf(this.subtaskId));
        if (this.streamOffset != 0 && (!this.streamNameInState.equals(this.streamName) || this.streamOffset != this.streamOffsetInState)) {
            return Collections.singletonList(new BigQueryCommittable(this.subtaskId, this.streamName, this.streamOffset - 1));
        }
        this.logger.info("No new data appended in subtask {}. Nothing to commit.", Integer.valueOf(this.subtaskId));
        return Collections.EMPTY_LIST;
    }

    public List<BigQueryWriterState> snapshotState(long j) {
        this.logger.info("Snapshotting state in subtask {} for checkpoint {}", Integer.valueOf(this.subtaskId), Long.valueOf(j));
        this.isFirstWriteAfterCheckpoint = true;
        this.streamNameInState = this.streamName;
        this.streamOffsetInState = this.streamOffset;
        return Collections.singletonList(new BigQueryWriterState(this.streamName, this.streamOffset, this.totalRecordsSeen, this.totalRecordsWritten, this.totalRecordsCommitted, j));
    }

    @Override // com.google.cloud.flink.bigquery.sink.writer.BaseWriter
    public void close() {
        if (!this.streamNameInState.equals(this.streamName) || this.streamOffsetInState != this.streamOffset) {
            finalizeStream();
        }
        super.close();
    }

    private void performFirstAppendOnRestoredStream(ProtoRows protoRows, long j) {
        try {
            createStreamWriter(false);
            try {
                postAppendOps(ApiFutures.immediateFuture((AppendRowsResponse) this.streamWriter.append(protoRows, this.streamOffset).get()), j);
            } catch (InterruptedException | ExecutionException e) {
                if (e.getCause().getClass() == Exceptions.OffsetAlreadyExists.class || e.getCause().getClass() == Exceptions.OffsetOutOfRange.class || e.getCause().getClass() == Exceptions.StreamFinalizedException.class || e.getCause().getClass() == Exceptions.StreamNotFound.class) {
                    discardStreamAndResendAppendRequest(e, protoRows);
                } else {
                    logAndThrowFatalException(e);
                }
            }
        } catch (BigQueryConnectorException e2) {
            discardStreamAndResendAppendRequest(e2, protoRows);
        }
    }

    private void discardStreamAndResendAppendRequest(Exception exc, ProtoRows protoRows) {
        discardStream(exc);
        sendAppendRequest(protoRows);
    }

    private void discardStream(Exception exc) {
        this.logger.info(String.format("Writer %d cannot use stream %s. Discarding this stream.", Integer.valueOf(this.subtaskId), this.streamName), exc);
        finalizeStream();
        this.streamName = "";
        this.streamOffset = 0L;
    }

    private void finalizeStream() {
        this.logger.debug("Finalizing write stream {} in subtask {}", this.streamName, Integer.valueOf(this.subtaskId));
        try {
            this.writeClient.finalizeWriteStream(this.streamName);
        } catch (Exception e) {
            this.logger.debug(String.format("Failed while finalizing write stream %s in subtask %d", this.streamName, Integer.valueOf(this.subtaskId)), e);
        }
    }

    private void postAppendOps(ApiFuture<AppendRowsResponse> apiFuture, long j) {
        this.appendResponseFuturesQueue.add(new BaseWriter.AppendInfo(apiFuture, this.streamOffset, j));
        this.streamOffset += this.appendRequestRowCount;
        this.appendRequestRowCount = 0L;
    }

    private void initializeExactlyOnceMetrics(Sink.InitContext initContext) {
        SinkWriterMetricGroup metricGroup = initContext.metricGroup();
        initializeMetrics(metricGroup);
        this.numberOfRecordsBufferedByBigQuerySinceCheckpoint = metricGroup.counter("numberOfRecordsBufferedByBigQuerySinceCheckpoint");
        this.numberOfRecordsSeenByWriter.inc(this.totalRecordsSeen);
        this.numberOfRecordsWrittenToBigQuery.inc(this.totalRecordsCommitted);
    }

    @Internal
    long getStreamOffset() {
        return this.streamOffset;
    }

    @Internal
    long getStreamOffsetInState() {
        return this.streamOffsetInState;
    }

    @Internal
    String getStreamNameInState() {
        return this.streamNameInState;
    }

    @Override // com.google.cloud.flink.bigquery.sink.writer.BaseWriter
    public /* bridge */ /* synthetic */ void flush(boolean z) {
        super.flush(z);
    }
}
