package org.mongoflink.table;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;
import org.mongoflink.config.MongoConnectorOptions;
import org.mongoflink.config.MongoOptions;

/* loaded from: input_file:org/mongoflink/table/MongoDynamicTableFactory.class */
public class MongoDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    private static final ConfigOption<String> CONNECT_STRING = ConfigOptions.key("connect_string").stringType().noDefaultValue().withDescription("the mongo connect string");
    private static final ConfigOption<String> DATABASE = ConfigOptions.key("database").stringType().noDefaultValue().withDescription("the mongo database name");
    private static final ConfigOption<String> COLLECTION = ConfigOptions.key("collection").stringType().noDefaultValue().withDescription("the mongo collection name");
    private static final ConfigOption<Boolean> TRANSACTION_ENABLE = ConfigOptions.key(MongoOptions.SINK_TRANSACTION_ENABLED).booleanType().defaultValue(false).withDescription("whether enable transaction");
    private static final ConfigOption<Boolean> FLUSH_ON_CHECKPOINT = ConfigOptions.key(MongoOptions.SINK_FLUSH_ON_CHECKPOINT).booleanType().defaultValue(false).withDescription("flush on checkpoint");
    private static final ConfigOption<Integer> FLUSH_SIZE = ConfigOptions.key(MongoOptions.SINK_FLUSH_SIZE).intType().defaultValue(1000).withDescription("flush size");
    private static final ConfigOption<Duration> FLUSH_INTERVAL = ConfigOptions.key(MongoOptions.SINK_FLUSH_INTERVAL).durationType().defaultValue(Duration.of(30000, ChronoUnit.MILLIS)).withDescription("flush interval");
    private static final String IDENTIFIER = "mongo";

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(this, context);
        ReadableConfig options = createTableFactoryHelper.getOptions();
        createTableFactoryHelper.validate();
        return new MongoDynamicTableSource((String) options.get(CONNECT_STRING), (String) options.get(DATABASE), (String) options.get(COLLECTION), TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()));
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(this, context);
        ReadableConfig options = createTableFactoryHelper.getOptions();
        createTableFactoryHelper.validate();
        validate(options);
        ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();
        return new MongoDynamicTableSink(getMongoSinkOptions(options, resolvedSchema.getPrimaryKey()), resolvedSchema);
    }

    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    public Set<ConfigOption<?>> requiredOptions() {
        HashSet hashSet = new HashSet();
        hashSet.add(CONNECT_STRING);
        hashSet.add(DATABASE);
        hashSet.add(COLLECTION);
        return hashSet;
    }

    public Set<ConfigOption<?>> optionalOptions() {
        HashSet hashSet = new HashSet();
        hashSet.add(TRANSACTION_ENABLE);
        hashSet.add(FLUSH_ON_CHECKPOINT);
        hashSet.add(FLUSH_SIZE);
        hashSet.add(FLUSH_INTERVAL);
        return hashSet;
    }

    private void validate(ReadableConfig readableConfig) {
        if (((Boolean) readableConfig.get(TRANSACTION_ENABLE)).booleanValue()) {
            Preconditions.checkArgument(((Boolean) readableConfig.get(FLUSH_ON_CHECKPOINT)).booleanValue(), "`%s` must be true when the transactional sink is enabled", new Object[]{FLUSH_ON_CHECKPOINT.key()});
        }
        Preconditions.checkArgument(((Integer) readableConfig.get(FLUSH_SIZE)).intValue() > 0, "`%s` must be greater than 0", new Object[]{FLUSH_SIZE.key()});
    }

    private MongoConnectorOptions getMongoSinkOptions(ReadableConfig readableConfig, Optional<UniqueConstraint> optional) {
        MongoConnectorOptions.Builder withUpsertKey = MongoConnectorOptions.builder().withConnectString((String) readableConfig.get(CONNECT_STRING)).withDatabase((String) readableConfig.get(DATABASE)).withCollection((String) readableConfig.get(COLLECTION)).withTransactionEnable(((Boolean) readableConfig.get(TRANSACTION_ENABLE)).booleanValue()).withFlushOnCheckpoint(((Boolean) readableConfig.get(FLUSH_ON_CHECKPOINT)).booleanValue()).withFlushSize(((Integer) readableConfig.get(FLUSH_SIZE)).intValue()).withFlushInterval((Duration) readableConfig.get(FLUSH_INTERVAL)).withUpsertEnable(false).withUpsertKey(new String[0]);
        if (optional.isPresent()) {
            List columns = optional.get().getColumns();
            withUpsertKey.withUpsertEnable(true);
            withUpsertKey.withUpsertKey((String[]) columns.toArray(new String[0]));
        }
        return withUpsertKey.build();
    }
}
