package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/flink/sink/FlinkSink.class */
public class FlinkSink {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
    private static final String ICEBERG_STREAM_WRITER_NAME = IcebergStreamWriter.class.getSimpleName();
    private static final String ICEBERG_FILES_COMMITTER_NAME = IcebergFilesCommitter.class.getSimpleName();

    /* loaded from: input_file:org/apache/iceberg/flink/sink/FlinkSink$Builder.class */
    public static class Builder {
        private Function<String, DataStream<RowData>> inputCreator;
        private TableLoader tableLoader;
        private Table table;
        private TableSchema tableSchema;
        private Integer writeParallelism;
        private List<String> equalityFieldColumns;
        private String uidPrefix;
        private ReadableConfig readableConfig;
        private final Map<String, String> writeOptions;
        private FlinkWriteConf flinkWriteConf;

        private Builder() {
            this.inputCreator = null;
            this.writeParallelism = null;
            this.equalityFieldColumns = null;
            this.uidPrefix = null;
            this.readableConfig = new Configuration();
            this.writeOptions = Maps.newHashMap();
            this.flinkWriteConf = null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Builder forRowData(DataStream<RowData> dataStream) {
            this.inputCreator = str -> {
                return dataStream;
            };
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public <T> Builder forMapperOutputType(DataStream<T> dataStream, MapFunction<T, RowData> mapFunction, TypeInformation<RowData> typeInformation) {
            this.inputCreator = str -> {
                SingleOutputStreamOperator parallelism = dataStream.map(mapFunction, typeInformation).setParallelism(dataStream.getParallelism());
                if (str != null) {
                    parallelism.name(operatorName(str)).uid(str + "-mapper");
                }
                return parallelism;
            };
            return this;
        }

        public Builder table(Table table) {
            this.table = table;
            return this;
        }

        public Builder tableLoader(TableLoader tableLoader) {
            this.tableLoader = tableLoader;
            return this;
        }

        public Builder set(String str, String str2) {
            this.writeOptions.put(str, str2);
            return this;
        }

        public Builder setAll(Map<String, String> map) {
            this.writeOptions.putAll(map);
            return this;
        }

        public Builder tableSchema(TableSchema tableSchema) {
            this.tableSchema = tableSchema;
            return this;
        }

        public Builder overwrite(boolean z) {
            this.writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(z));
            return this;
        }

        public Builder flinkConf(ReadableConfig readableConfig) {
            this.readableConfig = readableConfig;
            return this;
        }

        public Builder distributionMode(DistributionMode distributionMode) {
            Preconditions.checkArgument(!DistributionMode.RANGE.equals(distributionMode), "Flink does not support 'range' write distribution mode now.");
            if (distributionMode != null) {
                this.writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), distributionMode.modeName());
            }
            return this;
        }

        public Builder writeParallelism(int i) {
            this.writeParallelism = Integer.valueOf(i);
            return this;
        }

        public Builder upsert(boolean z) {
            this.writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), Boolean.toString(z));
            return this;
        }

        public Builder equalityFieldColumns(List<String> list) {
            this.equalityFieldColumns = list;
            return this;
        }

        public Builder uidPrefix(String str) {
            this.uidPrefix = str;
            return this;
        }

        private <T> DataStreamSink<T> chainIcebergOperators() {
            Preconditions.checkArgument(this.inputCreator != null, "Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
            Preconditions.checkNotNull(this.tableLoader, "Table loader shouldn't be null");
            DataStream<RowData> apply = this.inputCreator.apply(this.uidPrefix);
            if (this.table == null) {
                this.tableLoader.open();
                try {
                    TableLoader tableLoader = this.tableLoader;
                    Throwable th = null;
                    try {
                        try {
                            this.table = tableLoader.loadTable();
                            if (tableLoader != null) {
                                if (0 != 0) {
                                    try {
                                        tableLoader.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    tableLoader.close();
                                }
                            }
                        } finally {
                        }
                    } finally {
                    }
                } catch (IOException e) {
                    throw new UncheckedIOException("Failed to load iceberg table from table loader: " + this.tableLoader, e);
                }
            }
            this.flinkWriteConf = new FlinkWriteConf(this.table, this.writeOptions, this.readableConfig);
            List<Integer> checkAndGetEqualityFieldIds = checkAndGetEqualityFieldIds();
            RowType flinkRowType = FlinkSink.toFlinkRowType(this.table.schema(), this.tableSchema);
            return appendDummySink(appendCommitter(appendWriter(distributeDataStream(apply, checkAndGetEqualityFieldIds, this.table.spec(), this.table.schema(), flinkRowType), flinkRowType, checkAndGetEqualityFieldIds)));
        }

        public DataStreamSink<Void> append() {
            return chainIcebergOperators();
        }

        private String operatorName(String str) {
            return this.uidPrefix != null ? this.uidPrefix + "-" + str : str;
        }

        @VisibleForTesting
        List<Integer> checkAndGetEqualityFieldIds() {
            ArrayList newArrayList = Lists.newArrayList(this.table.schema().identifierFieldIds());
            if (this.equalityFieldColumns != null && this.equalityFieldColumns.size() > 0) {
                HashSet newHashSetWithExpectedSize = Sets.newHashSetWithExpectedSize(this.equalityFieldColumns.size());
                for (String str : this.equalityFieldColumns) {
                    Types.NestedField findField = this.table.schema().findField(str);
                    Preconditions.checkNotNull(findField, "Missing required equality field column '%s' in table schema %s", str, this.table.schema());
                    newHashSetWithExpectedSize.add(Integer.valueOf(findField.fieldId()));
                }
                if (!newHashSetWithExpectedSize.equals(this.table.schema().identifierFieldIds())) {
                    FlinkSink.LOG.warn("The configured equality field column IDs {} are not matched with the schema identifier field IDs {}, use job specified equality field columns as the equality fields by default.", newHashSetWithExpectedSize, this.table.schema().identifierFieldIds());
                }
                newArrayList = Lists.newArrayList(newHashSetWithExpectedSize);
            }
            return newArrayList;
        }

        private <T> DataStreamSink<T> appendDummySink(SingleOutputStreamOperator<Void> singleOutputStreamOperator) {
            DataStreamSink<T> parallelism = singleOutputStreamOperator.addSink(new DiscardingSink()).name(operatorName(String.format("IcebergSink %s", this.table.name()))).setParallelism(1);
            if (this.uidPrefix != null) {
                parallelism = parallelism.uid(this.uidPrefix + "-dummysink");
            }
            return parallelism;
        }

        private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> singleOutputStreamOperator) {
            SingleOutputStreamOperator<Void> maxParallelism = singleOutputStreamOperator.transform(operatorName(FlinkSink.ICEBERG_FILES_COMMITTER_NAME), org.apache.flink.api.common.typeinfo.Types.VOID, new IcebergFilesCommitter(this.tableLoader, this.flinkWriteConf.overwriteMode())).setParallelism(1).setMaxParallelism(1);
            if (this.uidPrefix != null) {
                maxParallelism = maxParallelism.uid(this.uidPrefix + "-committer");
            }
            return maxParallelism;
        }

        private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> dataStream, RowType rowType, List<Integer> list) {
            if (this.flinkWriteConf.upsertMode()) {
                Preconditions.checkState(!this.flinkWriteConf.overwriteMode(), "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
                Preconditions.checkState(!list.isEmpty(), "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
                if (!this.table.spec().isUnpartitioned()) {
                    for (PartitionField partitionField : this.table.spec().fields()) {
                        Preconditions.checkState(list.contains(Integer.valueOf(partitionField.sourceId())), "In UPSERT mode, partition field '%s' should be included in equality fields: '%s'", partitionField, this.equalityFieldColumns);
                    }
                }
            }
            SingleOutputStreamOperator<WriteResult> parallelism = dataStream.transform(operatorName(FlinkSink.ICEBERG_STREAM_WRITER_NAME), TypeInformation.of(WriteResult.class), FlinkSink.createStreamWriter(this.table, this.flinkWriteConf, rowType, list)).setParallelism(this.writeParallelism == null ? dataStream.getParallelism() : this.writeParallelism.intValue());
            if (this.uidPrefix != null) {
                parallelism = parallelism.uid(this.uidPrefix + "-writer");
            }
            return parallelism;
        }

        private DataStream<RowData> distributeDataStream(DataStream<RowData> dataStream, List<Integer> list, PartitionSpec partitionSpec, Schema schema, RowType rowType) {
            DistributionMode distributionMode = this.flinkWriteConf.distributionMode();
            FlinkSink.LOG.info("Write distribution mode is '{}'", distributionMode.modeName());
            switch (distributionMode) {
                case NONE:
                    if (list.isEmpty()) {
                        return dataStream;
                    }
                    FlinkSink.LOG.info("Distribute rows by equality fields, because there are equality fields set");
                    return dataStream.keyBy(new EqualityFieldKeySelector(schema, rowType, list));
                case HASH:
                    if (list.isEmpty()) {
                        if (!partitionSpec.isUnpartitioned()) {
                            return dataStream.keyBy(new PartitionKeySelector(partitionSpec, schema, rowType));
                        }
                        FlinkSink.LOG.warn("Fallback to use 'none' distribution mode, because there are no equality fields set and table is unpartitioned");
                        return dataStream;
                    }
                    if (partitionSpec.isUnpartitioned()) {
                        FlinkSink.LOG.info("Distribute rows by equality fields, because there are equality fields set and table is unpartitioned");
                        return dataStream.keyBy(new EqualityFieldKeySelector(schema, rowType, list));
                    }
                    for (PartitionField partitionField : partitionSpec.fields()) {
                        Preconditions.checkState(list.contains(Integer.valueOf(partitionField.sourceId())), "In 'hash' distribution mode with equality fields set, partition field '%s' should be included in equality fields: '%s'", partitionField, this.equalityFieldColumns);
                    }
                    return dataStream.keyBy(new PartitionKeySelector(partitionSpec, schema, rowType));
                case RANGE:
                    if (list.isEmpty()) {
                        FlinkSink.LOG.warn("Fallback to use 'none' distribution mode, because there are no equality fields set and {}=range is not supported yet in flink", TableProperties.WRITE_DISTRIBUTION_MODE);
                        return dataStream;
                    }
                    FlinkSink.LOG.info("Distribute rows by equality fields, because there are equality fields set and{}=range is not supported yet in flink", TableProperties.WRITE_DISTRIBUTION_MODE);
                    return dataStream.keyBy(new EqualityFieldKeySelector(schema, rowType, list));
                default:
                    throw new RuntimeException("Unrecognized write.distribution-mode: " + distributionMode);
            }
        }
    }

    private FlinkSink() {
    }

    public static <T> Builder builderFor(DataStream<T> dataStream, MapFunction<T, RowData> mapFunction, TypeInformation<RowData> typeInformation) {
        return new Builder().forMapperOutputType(dataStream, mapFunction, typeInformation);
    }

    public static Builder forRow(DataStream<Row> dataStream, TableSchema tableSchema) {
        RowType logicalType = tableSchema.toRowDataType().getLogicalType();
        DataFormatConverters.RowConverter rowConverter = new DataFormatConverters.RowConverter(tableSchema.getFieldDataTypes());
        rowConverter.getClass();
        return builderFor(dataStream, (v1) -> {
            return r1.toInternal(v1);
        }, FlinkCompatibilityUtil.toTypeInfo(logicalType)).tableSchema(tableSchema);
    }

    public static Builder forRowData(DataStream<RowData> dataStream) {
        return new Builder().forRowData(dataStream);
    }

    static RowType toFlinkRowType(Schema schema, TableSchema tableSchema) {
        if (tableSchema == null) {
            return FlinkSchemaUtil.convert(schema);
        }
        TypeUtil.validateWriteSchema(schema, TypeUtil.reassignIds(FlinkSchemaUtil.convert(tableSchema), schema), true, true);
        return tableSchema.toRowDataType().getLogicalType();
    }

    static IcebergStreamWriter<RowData> createStreamWriter(Table table, FlinkWriteConf flinkWriteConf, RowType rowType, List<Integer> list) {
        Preconditions.checkArgument(table != null, "Iceberg table shouldn't be null");
        return new IcebergStreamWriter<>(table.name(), new RowDataTaskWriterFactory(SerializableTable.copyOf(table), rowType, flinkWriteConf.targetDataFileSize(), flinkWriteConf.dataFileFormat(), list, flinkWriteConf.upsertMode()));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 550342232:
                if (implMethodName.equals("toInternal")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/table/data/util/DataFormatConverters$DataFormatConverter") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    DataFormatConverters.RowConverter rowConverter = (DataFormatConverters.RowConverter) serializedLambda.getCapturedArg(0);
                    return (v1) -> {
                        return r0.toInternal(v1);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
