package io.odpf.depot.bigquery;

import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import io.odpf.depot.OdpfSink;
import io.odpf.depot.OdpfSinkResponse;
import io.odpf.depot.bigquery.client.BigQueryClient;
import io.odpf.depot.bigquery.client.BigQueryResponseParser;
import io.odpf.depot.bigquery.client.BigQueryRow;
import io.odpf.depot.bigquery.converter.MessageRecordConverterCache;
import io.odpf.depot.bigquery.handler.ErrorHandler;
import io.odpf.depot.bigquery.models.Record;
import io.odpf.depot.bigquery.models.Records;
import io.odpf.depot.error.ErrorInfo;
import io.odpf.depot.message.OdpfMessage;
import io.odpf.depot.metrics.BigQueryMetrics;
import io.odpf.depot.metrics.Instrumentation;
import java.io.IOException;
import java.util.List;
import java.util.Map;

/* loaded from: input_file:io/odpf/depot/bigquery/BigQuerySink.class */
public class BigQuerySink implements OdpfSink {
    private final BigQueryClient bigQueryClient;
    private final BigQueryRow rowCreator;
    private final MessageRecordConverterCache messageRecordConverterCache;
    private final Instrumentation instrumentation;
    private final BigQueryMetrics bigQueryMetrics;
    private final ErrorHandler errorHandler;

    public BigQuerySink(BigQueryClient bigQueryClient, MessageRecordConverterCache messageRecordConverterCache, BigQueryRow bigQueryRow, BigQueryMetrics bigQueryMetrics, Instrumentation instrumentation, ErrorHandler errorHandler) {
        this.bigQueryClient = bigQueryClient;
        this.messageRecordConverterCache = messageRecordConverterCache;
        this.rowCreator = bigQueryRow;
        this.instrumentation = instrumentation;
        this.bigQueryMetrics = bigQueryMetrics;
        this.errorHandler = errorHandler;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }

    private InsertAllResponse insertIntoBQ(List<Record> list) {
        InsertAllRequest.Builder newBuilder = InsertAllRequest.newBuilder(this.bigQueryClient.getTableID());
        list.forEach(record -> {
            newBuilder.addRow(this.rowCreator.of(record));
        });
        return this.bigQueryClient.insertAll(newBuilder.build());
    }

    @Override // io.odpf.depot.OdpfSink
    public OdpfSinkResponse pushToSink(List<OdpfMessage> list) {
        Records convert = this.messageRecordConverterCache.getMessageRecordConverter().convert(list);
        OdpfSinkResponse odpfSinkResponse = new OdpfSinkResponse();
        convert.getInvalidRecords().forEach(record -> {
            odpfSinkResponse.addErrors(record.getIndex(), record.getErrorInfo());
        });
        if (convert.getValidRecords().size() > 0) {
            InsertAllResponse insertIntoBQ = insertIntoBQ(convert.getValidRecords());
            Instrumentation instrumentation = this.instrumentation;
            Object[] objArr = new Object[2];
            objArr[0] = Integer.valueOf(convert.getValidRecords().size());
            objArr[1] = Boolean.valueOf(!insertIntoBQ.hasErrors());
            instrumentation.logInfo("Pushed a batch of {} records to BQ. Insert success?: {}", objArr);
            if (insertIntoBQ.hasErrors()) {
                Map<Long, ErrorInfo> errorsFromBQResponse = BigQueryResponseParser.getErrorsFromBQResponse(convert.getValidRecords(), insertIntoBQ, this.bigQueryMetrics, this.instrumentation);
                odpfSinkResponse.getClass();
                errorsFromBQResponse.forEach((v1, v2) -> {
                    r1.addErrors(v1, v2);
                });
                this.errorHandler.handle(insertIntoBQ.getInsertErrors(), convert.getValidRecords());
            }
        }
        return odpfSinkResponse;
    }
}
