package io.delta.flink.sink.internal;

import io.delta.flink.sink.internal.committables.DeltaCommittable;
import io.delta.flink.sink.internal.committables.DeltaGlobalCommittable;
import io.delta.flink.sink.internal.writer.DeltaWriter;
import io.delta.flink.sink.internal.writer.DeltaWriterBucketState;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/delta/flink/sink/internal/DeltaSinkInternal.class */
public class DeltaSinkInternal<IN> implements Sink<IN, DeltaCommittable, DeltaWriterBucketState, DeltaGlobalCommittable> {
    private static final Logger LOG = LoggerFactory.getLogger(DeltaSinkInternal.class);
    private final DeltaSinkBuilder<IN> sinkBuilder;

    /* JADX INFO: Access modifiers changed from: protected */
    public DeltaSinkInternal(DeltaSinkBuilder<IN> deltaSinkBuilder) {
        this.sinkBuilder = (DeltaSinkBuilder) Preconditions.checkNotNull(deltaSinkBuilder);
    }

    public SinkWriter<IN, DeltaCommittable, DeltaWriterBucketState> createWriter(Sink.InitContext initContext, List<DeltaWriterBucketState> list) throws IOException {
        String restoreOrCreateAppId = restoreOrCreateAppId(list);
        long orElse = initContext.getRestoredCheckpointId().orElse(1L);
        DeltaWriter<IN> createWriter = this.sinkBuilder.createWriter(initContext, restoreOrCreateAppId, orElse);
        createWriter.initializeState(list);
        LOG.info("Created new writer for: appId=" + restoreOrCreateAppId + " checkpointId=" + orElse);
        return createWriter;
    }

    private String restoreOrCreateAppId(List<DeltaWriterBucketState> list) {
        return list.isEmpty() ? this.sinkBuilder.getAppId() : list.get(0).getAppId();
    }

    public Optional<SimpleVersionedSerializer<DeltaWriterBucketState>> getWriterStateSerializer() {
        try {
            return Optional.of(this.sinkBuilder.getWriterStateSerializer());
        } catch (IOException e) {
            throw new FlinkRuntimeException("Could not create writer state serializer.", e);
        }
    }

    public Optional<Committer<DeltaCommittable>> createCommitter() throws IOException {
        return Optional.of(this.sinkBuilder.createCommitter());
    }

    public Optional<SimpleVersionedSerializer<DeltaCommittable>> getCommittableSerializer() {
        try {
            return Optional.of(this.sinkBuilder.getCommittableSerializer());
        } catch (IOException e) {
            throw new FlinkRuntimeException("Could not create committable serializer.", e);
        }
    }

    public Optional<GlobalCommitter<DeltaCommittable, DeltaGlobalCommittable>> createGlobalCommitter() {
        return Optional.of(this.sinkBuilder.createGlobalCommitter());
    }

    public Optional<SimpleVersionedSerializer<DeltaGlobalCommittable>> getGlobalCommittableSerializer() {
        try {
            return Optional.of(this.sinkBuilder.getGlobalCommittableSerializer());
        } catch (IOException e) {
            throw new FlinkRuntimeException("Could not create committable serializer.", e);
        }
    }
}
