package com.google.cloud.spark.bigquery.v2;

import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Clustering;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobConfiguration;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.bigquery.connector.common.BigQueryClient;
import com.google.cloud.bigquery.connector.common.BigQueryUtil;
import com.google.cloud.spark.bigquery.AvroSchemaConverter;
import com.google.cloud.spark.bigquery.SchemaConverters;
import com.google.cloud.spark.bigquery.SparkBigQueryConfig;
import com.google.cloud.spark.bigquery.SparkBigQueryUtil;
import com.google.cloud.spark.bigquery.SupportedCustomDataType;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/cloud/spark/bigquery/v2/BigQueryIndirectDataSourceWriter.class */
public class BigQueryIndirectDataSourceWriter implements DataSourceWriter {
    private static final Logger logger = LoggerFactory.getLogger(BigQueryIndirectDataSourceWriter.class);
    private final BigQueryClient bigQueryClient;
    private final SparkBigQueryConfig config;
    private final Configuration hadoopConfiguration;
    private final StructType sparkSchema;
    private final String writeUUID;
    private final SaveMode saveMode;
    private final Path gcsPath;
    private final Optional<IntermediateDataCleaner> intermediateDataCleaner;

    public BigQueryIndirectDataSourceWriter(BigQueryClient bigQueryClient, SparkBigQueryConfig sparkBigQueryConfig, Configuration configuration, StructType structType, String str, SaveMode saveMode, Path path, Optional<IntermediateDataCleaner> optional) {
        this.bigQueryClient = bigQueryClient;
        this.config = sparkBigQueryConfig;
        this.hadoopConfiguration = configuration;
        this.sparkSchema = structType;
        this.writeUUID = str;
        this.saveMode = saveMode;
        this.gcsPath = path;
        this.intermediateDataCleaner = optional;
    }

    static <T> Iterable<T> wrap(RemoteIterator<T> remoteIterator) {
        return () -> {
            return new Iterator<T>() { // from class: com.google.cloud.spark.bigquery.v2.BigQueryIndirectDataSourceWriter.1
                @Override // java.util.Iterator
                public boolean hasNext() {
                    try {
                        return remoteIterator.hasNext();
                    } catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                }

                @Override // java.util.Iterator
                public T next() {
                    try {
                        return (T) remoteIterator.next();
                    } catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                }
            };
        };
    }

    public DataWriterFactory<InternalRow> createWriterFactory() {
        return new BigQueryIndirectDataWriterFactory(new SerializableConfiguration(this.hadoopConfiguration), this.gcsPath.toString(), this.sparkSchema, AvroSchemaConverter.sparkSchemaToAvroSchema(this.sparkSchema).toString());
    }

    public void commit(WriterCommitMessage[] writerCommitMessageArr) {
        logger.info("Data has been successfully written to GCS. Going to load {} files to BigQuery", Integer.valueOf(writerCommitMessageArr.length));
        try {
            try {
                loadDataToBigQuery((List) Stream.of((Object[]) writerCommitMessageArr).map(writerCommitMessage -> {
                    return ((BigQueryIndirectWriterCommitMessage) writerCommitMessage).getUri();
                }).collect(Collectors.toList()));
                updateMetadataIfNeeded();
                logger.info("Data has been successfully loaded to BigQuery");
                cleanTemporaryGcsPathIfNeeded();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        } catch (Throwable th) {
            cleanTemporaryGcsPathIfNeeded();
            throw th;
        }
    }

    public void abort(WriterCommitMessage[] writerCommitMessageArr) {
        try {
            logger.warn("Aborting write {} for table {}", this.writeUUID, BigQueryUtil.friendlyTableName(this.config.getTableId()));
        } finally {
            cleanTemporaryGcsPathIfNeeded();
        }
    }

    void loadDataToBigQuery(List<String> list) throws IOException {
        JobConfiguration.Builder autodetect = LoadJobConfiguration.newBuilder(this.config.getTableId(), SparkBigQueryUtil.optimizeLoadUriListForSpark(list), this.config.getIntermediateFormat().getFormatOptions()).setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED).setWriteDisposition(saveModeToWriteDisposition(this.saveMode)).setAutodetect(true);
        Optional<JobInfo.CreateDisposition> createDisposition = this.config.getCreateDisposition();
        autodetect.getClass();
        createDisposition.ifPresent(autodetect::setCreateDisposition);
        if (this.config.getPartitionField().isPresent() || this.config.getPartitionType().isPresent()) {
            TimePartitioning.Builder newBuilder = TimePartitioning.newBuilder(this.config.getPartitionTypeOrDefault());
            OptionalLong partitionExpirationMs = this.config.getPartitionExpirationMs();
            newBuilder.getClass();
            partitionExpirationMs.ifPresent((v1) -> {
                r1.setExpirationMs(v1);
            });
            Optional<Boolean> partitionRequireFilter = this.config.getPartitionRequireFilter();
            newBuilder.getClass();
            partitionRequireFilter.ifPresent(newBuilder::setRequirePartitionFilter);
            Optional<String> partitionField = this.config.getPartitionField();
            newBuilder.getClass();
            partitionField.ifPresent(newBuilder::setField);
            autodetect.setTimePartitioning(newBuilder.build());
        }
        this.config.getClusteredFields().ifPresent(immutableList -> {
            autodetect.setClustering(Clustering.newBuilder().setFields(immutableList).build());
        });
        if (!this.config.getLoadSchemaUpdateOptions().isEmpty()) {
            autodetect.setSchemaUpdateOptions(this.config.getLoadSchemaUpdateOptions());
        }
        Job createAndWaitFor = this.bigQueryClient.createAndWaitFor(autodetect);
        if (createAndWaitFor.getStatus().getError() != null) {
            throw new BigQueryException(0, String.format("Failed to load to %s in job %s. BigQuery error was '%s'", BigQueryUtil.friendlyTableName(this.config.getTableId()), createAndWaitFor.getJobId(), createAndWaitFor.getStatus().getError().getMessage()), createAndWaitFor.getStatus().getError());
        }
        logger.info("Done loading to {}. jobId: {}", BigQueryUtil.friendlyTableName(this.config.getTableId()), createAndWaitFor.getJobId());
    }

    JobInfo.WriteDisposition saveModeToWriteDisposition(SaveMode saveMode) {
        if (saveMode == SaveMode.ErrorIfExists) {
            return JobInfo.WriteDisposition.WRITE_EMPTY;
        }
        if (saveMode == SaveMode.Append || saveMode == SaveMode.Ignore) {
            return JobInfo.WriteDisposition.WRITE_APPEND;
        }
        if (saveMode == SaveMode.Overwrite) {
            return JobInfo.WriteDisposition.WRITE_TRUNCATE;
        }
        throw new UnsupportedOperationException("SaveMode " + saveMode + " is currently not supported.");
    }

    void updateMetadataIfNeeded() {
        Map map = (Map) Stream.of((Object[]) this.sparkSchema.fields()).filter(structField -> {
            return SupportedCustomDataType.of(structField.dataType()).isPresent() || SchemaConverters.getDescriptionOrCommentOfField(structField).isPresent();
        }).collect(Collectors.toMap((v0) -> {
            return v0.name();
        }, Function.identity()));
        if (map.isEmpty()) {
            return;
        }
        logger.debug("updating schema, found fields to update: {}", map.keySet());
        TableInfo table = this.bigQueryClient.getTable(this.config.getTableIdWithoutThePartition());
        TableDefinition definition = table.getDefinition();
        this.bigQueryClient.update(table.toBuilder().setDefinition(definition.toBuilder().setSchema(Schema.of((Iterable) definition.getSchema().getFields().stream().map(field -> {
            return (Field) Optional.ofNullable(map.get(field.getName())).map(structField2 -> {
                return updatedField(field, structField2);
            }).orElse(field);
        }).collect(Collectors.toList()))).build()).build());
    }

    Field updatedField(Field field, StructField structField) {
        Field.Builder builder = field.toBuilder();
        Optional<String> descriptionOrCommentOfField = SchemaConverters.getDescriptionOrCommentOfField(structField);
        if (descriptionOrCommentOfField.isPresent()) {
            builder.setDescription(descriptionOrCommentOfField.get());
        } else {
            String description = field.getDescription();
            String typeMarker = SupportedCustomDataType.of(structField.dataType()).get().getTypeMarker();
            if (description == null) {
                builder.setDescription(typeMarker);
            } else if (!description.endsWith(typeMarker)) {
                builder.setDescription(description + " " + typeMarker);
            }
        }
        return builder.build();
    }

    void cleanTemporaryGcsPathIfNeeded() {
        this.intermediateDataCleaner.ifPresent(intermediateDataCleaner -> {
            intermediateDataCleaner.deletePath();
        });
    }
}
