package io.delta.flink.sink.internal;

import io.delta.flink.internal.options.DeltaConnectorConfiguration;
import io.delta.flink.sink.internal.committables.DeltaCommittable;
import io.delta.flink.sink.internal.committables.DeltaCommittableSerializer;
import io.delta.flink.sink.internal.committables.DeltaGlobalCommittable;
import io.delta.flink.sink.internal.committables.DeltaGlobalCommittableSerializer;
import io.delta.flink.sink.internal.committer.DeltaCommitter;
import io.delta.flink.sink.internal.committer.DeltaGlobalCommitter;
import io.delta.flink.sink.internal.writer.DeltaWriter;
import io.delta.flink.sink.internal.writer.DeltaWriterBucketState;
import io.delta.flink.sink.internal.writer.DeltaWriterBucketStateSerializer;
import java.io.IOException;
import java.io.Serializable;
import java.util.UUID;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.formats.parquet.ParquetWriterFactory;
import org.apache.flink.formats.parquet.utils.SerializableConfiguration;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.DeltaBulkBucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;

/* loaded from: input_file:io/delta/flink/sink/internal/DeltaSinkBuilder.class */
public class DeltaSinkBuilder<IN> implements Serializable {
    private static final long serialVersionUID = 7493169281026370228L;
    protected static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60000;
    private final Path tableBasePath;
    private final RowType rowType;
    private final String appId;
    private boolean mergeSchema;
    private final DeltaConnectorConfiguration sinkConfiguration;
    private final SerializableConfiguration serializableConfiguration;
    private final long bucketCheckInterval;
    private final ParquetWriterFactory<IN> writerFactory;
    private BucketAssigner<IN, String> bucketAssigner;
    private final CheckpointRollingPolicy<IN, String> rollingPolicy;
    private final OutputFileConfig outputFileConfig;

    /* loaded from: input_file:io/delta/flink/sink/internal/DeltaSinkBuilder$DefaultDeltaFormatBuilder.class */
    public static final class DefaultDeltaFormatBuilder<IN> extends DeltaSinkBuilder<IN> {
        private static final long serialVersionUID = 2818087325120827526L;

        public DefaultDeltaFormatBuilder(Path path, Configuration configuration, ParquetWriterFactory<IN> parquetWriterFactory, BucketAssigner<IN, String> bucketAssigner, CheckpointRollingPolicy<IN, String> checkpointRollingPolicy, RowType rowType, boolean z, DeltaConnectorConfiguration deltaConnectorConfiguration) {
            super(path, configuration, parquetWriterFactory, bucketAssigner, checkpointRollingPolicy, rowType, z, deltaConnectorConfiguration);
        }
    }

    private static String generateNewAppId() {
        return UUID.randomUUID().toString();
    }

    protected DeltaSinkBuilder(Path path, Configuration configuration, ParquetWriterFactory<IN> parquetWriterFactory, BucketAssigner<IN, String> bucketAssigner, CheckpointRollingPolicy<IN, String> checkpointRollingPolicy, RowType rowType, boolean z, DeltaConnectorConfiguration deltaConnectorConfiguration) {
        this(path, configuration, DEFAULT_BUCKET_CHECK_INTERVAL, parquetWriterFactory, bucketAssigner, checkpointRollingPolicy, OutputFileConfig.builder().withPartSuffix(".snappy.parquet").build(), generateNewAppId(), rowType, z, deltaConnectorConfiguration);
    }

    protected DeltaSinkBuilder(Path path, Configuration configuration, long j, ParquetWriterFactory<IN> parquetWriterFactory, BucketAssigner<IN, String> bucketAssigner, CheckpointRollingPolicy<IN, String> checkpointRollingPolicy, OutputFileConfig outputFileConfig, String str, RowType rowType, boolean z, DeltaConnectorConfiguration deltaConnectorConfiguration) {
        this.tableBasePath = (Path) Preconditions.checkNotNull(path);
        this.serializableConfiguration = new SerializableConfiguration((Configuration) Preconditions.checkNotNull(configuration));
        this.bucketCheckInterval = j;
        this.writerFactory = parquetWriterFactory;
        this.bucketAssigner = (BucketAssigner) Preconditions.checkNotNull(bucketAssigner);
        this.rollingPolicy = (CheckpointRollingPolicy) Preconditions.checkNotNull(checkpointRollingPolicy);
        this.outputFileConfig = (OutputFileConfig) Preconditions.checkNotNull(outputFileConfig);
        this.appId = str;
        this.rowType = rowType;
        this.mergeSchema = z;
        this.sinkConfiguration = deltaConnectorConfiguration;
    }

    public DeltaSinkBuilder<IN> withMergeSchema(boolean z) {
        this.mergeSchema = z;
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Committer<DeltaCommittable> createCommitter() throws IOException {
        return new DeltaCommitter(createBucketWriter());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GlobalCommitter<DeltaCommittable, DeltaGlobalCommittable> createGlobalCommitter() {
        return new DeltaGlobalCommitter(this.serializableConfiguration.conf(), this.tableBasePath, this.rowType, this.mergeSchema);
    }

    protected Path getTableBasePath() {
        return this.tableBasePath;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getAppId() {
        return this.appId;
    }

    protected SerializableConfiguration getSerializableConfiguration() {
        return this.serializableConfiguration;
    }

    public DeltaSinkBuilder<IN> withBucketAssigner(BucketAssigner<IN, String> bucketAssigner) {
        this.bucketAssigner = (BucketAssigner) Preconditions.checkNotNull(bucketAssigner);
        return this;
    }

    public DeltaSinkInternal<IN> build() {
        return new DeltaSinkInternal<>(this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DeltaWriter<IN> createWriter(Sink.InitContext initContext, String str, long j) throws IOException {
        return new DeltaWriter<>(this.tableBasePath, this.bucketAssigner, createBucketWriter(), this.rollingPolicy, this.outputFileConfig, initContext.getProcessingTimeService(), initContext.metricGroup(), this.bucketCheckInterval, str, j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SimpleVersionedSerializer<DeltaWriterBucketState> getWriterStateSerializer() throws IOException {
        return new DeltaWriterBucketStateSerializer();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SimpleVersionedSerializer<DeltaCommittable> getCommittableSerializer() throws IOException {
        return new DeltaCommittableSerializer(createBucketWriter().getProperties().getPendingFileRecoverableSerializer());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SimpleVersionedSerializer<DeltaGlobalCommittable> getGlobalCommittableSerializer() throws IOException {
        return new DeltaGlobalCommittableSerializer(createBucketWriter().getProperties().getPendingFileRecoverableSerializer());
    }

    private DeltaBulkBucketWriter<IN, String> createBucketWriter() throws IOException {
        return new DeltaBulkBucketWriter<>(FileSystem.get(this.tableBasePath.toUri()).createRecoverableWriter(), this.writerFactory);
    }
}
