package org.mongoflink.sink;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.mongoflink.config.MongoConnectorOptions;
import org.mongoflink.config.SinkConfiguration;
import org.mongoflink.config.SinkConfigurationFactory;
import org.mongoflink.internal.connection.MongoClientProvider;
import org.mongoflink.internal.connection.MongoColloctionProviders;
import org.mongoflink.serde.DocumentSerializer;

/* loaded from: input_file:org/mongoflink/sink/MongoSink.class */
public class MongoSink<IN> implements Sink<IN, DocumentBulk, DocumentBulk, Void> {
    private final MongoClientProvider clientProvider;
    private final DocumentSerializer<IN> serializer;
    private final MongoConnectorOptions options;

    @Deprecated
    public MongoSink(String str, String str2, String str3, DocumentSerializer<IN> documentSerializer, Properties properties) {
        SinkConfiguration fromProperties = SinkConfigurationFactory.fromProperties(properties);
        this.serializer = documentSerializer;
        this.clientProvider = MongoColloctionProviders.getBuilder().connectionString(str).database(str2).collection(str3).build();
        this.options = MongoConnectorOptions.builder().withDatabase(str2).withCollection(str3).withConnectString(str).withTransactionEnable(fromProperties.isTransactional()).withFlushOnCheckpoint(fromProperties.isFlushOnCheckpoint()).withFlushSize((int) fromProperties.getBulkFlushSize()).withFlushInterval(Duration.ofMillis(fromProperties.getBulkFlushInterval())).build();
    }

    public MongoSink(DocumentSerializer<IN> documentSerializer, MongoConnectorOptions mongoConnectorOptions) {
        this.options = mongoConnectorOptions;
        this.serializer = documentSerializer;
        this.clientProvider = MongoColloctionProviders.getBuilder().connectionString(this.options.getConnectString()).database(this.options.getDatabase()).collection(this.options.getCollection()).build();
    }

    public SinkWriter<IN, DocumentBulk, DocumentBulk> createWriter(Sink.InitContext initContext, List<DocumentBulk> list) throws IOException {
        MongoBulkWriter mongoBulkWriter = new MongoBulkWriter(this.clientProvider, this.serializer, this.options);
        mongoBulkWriter.initializeState(list);
        return mongoBulkWriter;
    }

    public Optional<Committer<DocumentBulk>> createCommitter() throws IOException {
        if (!this.options.isTransactionEnable()) {
            return Optional.empty();
        }
        if (!this.options.isUpsertEnable()) {
            return Optional.of(new MongoCommitter(this.clientProvider));
        }
        return Optional.of(new MongoCommitter(this.clientProvider, this.options.isUpsertEnable(), this.options.getUpsertKey()));
    }

    public Optional<GlobalCommitter<DocumentBulk, Void>> createGlobalCommitter() throws IOException {
        return Optional.empty();
    }

    public Optional<SimpleVersionedSerializer<DocumentBulk>> getCommittableSerializer() {
        return this.options.isTransactionEnable() ? Optional.of(DocumentBulkSerializer.INSTANCE) : Optional.empty();
    }

    public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
        return Optional.empty();
    }

    public Optional<SimpleVersionedSerializer<DocumentBulk>> getWriterStateSerializer() {
        return this.options.isTransactionEnable() ? Optional.of(DocumentBulkSerializer.INSTANCE) : Optional.empty();
    }
}
