package io.debezium.connector.jdbc.integration.postgres;

import io.debezium.bindings.kafka.KafkaDebeziumSinkRecord;
import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.integration.AbstractJdbcSinkTest;
import io.debezium.connector.jdbc.junit.TestHelper;
import io.debezium.connector.jdbc.junit.jupiter.PostgresSinkDatabaseContextProvider;
import io.debezium.connector.jdbc.junit.jupiter.Sink;
import io.debezium.connector.jdbc.junit.jupiter.SinkRecordFactoryArgumentsProvider;
import io.debezium.connector.jdbc.util.SinkRecordFactory;
import io.debezium.doc.FixFor;
import io.debezium.sink.SinkConnectorConfig;
import java.util.Map;
import org.assertj.db.api.TableAssert;
import org.assertj.db.type.ValueType;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;

@Tags({@Tag("all"), @Tag("it"), @Tag("it-postgresql")})
@ExtendWith({PostgresSinkDatabaseContextProvider.class})
/* loaded from: input_file:io/debezium/connector/jdbc/integration/postgres/JdbcSinkFieldFilterIT.class */
public class JdbcSinkFieldFilterIT extends AbstractJdbcSinkTest {
    public JdbcSinkFieldFilterIT(Sink sink) {
        super(sink);
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @FixFor({"DBZ-6636"})
    @ParameterizedTest
    public void testFieldIncludeListWithInsertMode(SinkRecordFactory sinkRecordFactory) throws Exception {
        String str = topicName("server1", "schema", randomTableName());
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.INSERT.getValue());
        defaultSinkConfig.put("field.include.list", str + ":name," + str + ":id");
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        KafkaDebeziumSinkRecord createRecordNoKey = sinkRecordFactory.createRecordNoKey(str);
        consume(createRecordNoKey);
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName(createRecordNoKey));
        assertTable.exists().column("id").value().isEqualTo(1);
        assertTable.exists().column("name").value().isEqualTo("John Doe");
        assertTable.exists().hasNumberOfColumns(2);
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @FixFor({"DBZ-6636"})
    @ParameterizedTest
    public void testFieldExcludeListWithInsertMode(SinkRecordFactory sinkRecordFactory) throws Exception {
        String str = topicName("server1", "schema", randomTableName());
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.INSERT.getValue());
        defaultSinkConfig.put("field.exclude.list", str + ":name");
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        KafkaDebeziumSinkRecord createRecordNoKey = sinkRecordFactory.createRecordNoKey(str);
        consume(createRecordNoKey);
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName(createRecordNoKey));
        assertTable.exists().hasNumberOfColumns(2);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER, 1);
        getSink().assertColumnType(assertTable, "nick_name$", ValueType.TEXT, "John Doe$");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @FixFor({"DBZ-6636"})
    @ParameterizedTest
    public void testFieldIncludeListWithUpsertMode(SinkRecordFactory sinkRecordFactory) throws Exception {
        String str = topicName("server1", "schema", randomTableName());
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.RECORD_VALUE.getValue());
        defaultSinkConfig.put("primary.key.fields", "id");
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue());
        defaultSinkConfig.put("field.include.list", str + ":name," + str + ":id");
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        KafkaDebeziumSinkRecord createRecord = sinkRecordFactory.createRecord(str, (byte) 1);
        consume(createRecord);
        consume(sinkRecordFactory.createRecord(str, (byte) 1));
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName(createRecord));
        assertTable.exists().hasNumberOfRows(1).hasNumberOfColumns(2);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER, (byte) 1);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT, "John Doe");
        KafkaDebeziumSinkRecord updateRecord = sinkRecordFactory.updateRecord(str);
        consume(updateRecord);
        TableAssert assertTable2 = TestHelper.assertTable(dataSource(), destinationTableName(updateRecord));
        assertTable2.exists().hasNumberOfRows(1).hasNumberOfColumns(2);
        getSink().assertColumnType(assertTable2, "id", ValueType.NUMBER, (byte) 1);
        getSink().assertColumnType(assertTable2, "name", ValueType.TEXT, "Jane Doe");
    }
}
