package io.debezium.connector.jdbc.integration;

import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.junit.TestHelper;
import io.debezium.connector.jdbc.junit.jupiter.Sink;
import io.debezium.connector.jdbc.junit.jupiter.SinkRecordFactoryArgumentsProvider;
import io.debezium.connector.jdbc.util.SinkRecordFactory;
import io.debezium.doc.FixFor;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.sink.SinkRecord;
import org.assertj.db.api.TableAssert;
import org.assertj.db.type.ValueType;
import org.fest.assertions.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;

/* loaded from: input_file:io/debezium/connector/jdbc/integration/AbstractJdbcSinkInsertModeTest.class */
public abstract class AbstractJdbcSinkInsertModeTest extends AbstractJdbcSinkTest {
    public AbstractJdbcSinkInsertModeTest(Sink sink) {
        super(sink);
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testInsertModeInsertWithNoPrimaryKey(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.NONE.getValue());
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.INSERT.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        String str = topicName("server1", "schema", randomTableName());
        SinkRecord createRecordNoKey = sinkRecordFactory.createRecordNoKey(str);
        consume(createRecordNoKey);
        consume(sinkRecordFactory.createRecordNoKey(str));
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName(createRecordNoKey));
        assertTable.exists().hasNumberOfRows(2).hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER, (byte) 1, (byte) 1);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT, "John Doe", "John Doe");
        getSink().assertColumnType(assertTable, "nick_name$", ValueType.TEXT, "John Doe$", "John Doe$");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testInsertModeInsertWithPrimaryKeyModeKafka(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.KAFKA.getValue());
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.INSERT.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        String str = topicName("server1", "schema", randomTableName());
        SinkRecord createRecordNoKey = sinkRecordFactory.createRecordNoKey(str);
        consume(createRecordNoKey);
        consume(sinkRecordFactory.createRecord(str));
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName(createRecordNoKey));
        assertTable.exists().hasNumberOfRows(2).hasNumberOfColumns(6);
        getSink().assertColumnType(assertTable, "__connect_topic", ValueType.TEXT, str, str);
        getSink().assertColumnType(assertTable, "__connect_partition", ValueType.NUMBER, 0, 0);
        getSink().assertColumnType(assertTable, "__connect_offset", ValueType.NUMBER, 0, 1);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER, (byte) 1, (byte) 1);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT, "John Doe", "John Doe");
        getSink().assertColumnType(assertTable, "nick_name$", ValueType.TEXT, "John Doe$", "John Doe$");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testInsertModeInsertWithPrimaryKeyModeRecordKey(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue());
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.INSERT.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        String str = topicName("server1", "schema", randomTableName());
        SinkRecord createRecord = sinkRecordFactory.createRecord(str, (byte) 1);
        consume(createRecord);
        consume(sinkRecordFactory.createRecord(str, (byte) 2));
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName(createRecord));
        assertTable.exists().hasNumberOfRows(2).hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER, (byte) 1, (byte) 2);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT, "John Doe", "John Doe");
        getSink().assertColumnType(assertTable, "nick_name$", ValueType.TEXT, "John Doe$", "John Doe$");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testInsertModeInsertWithPrimaryKeyModeRecordValue(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_VALUE.getValue());
        defaultSinkConfig.put("primary.key.fields", "id");
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.INSERT.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        String str = topicName("server1", "schema", randomTableName());
        SinkRecord createRecord = sinkRecordFactory.createRecord(str, (byte) 1);
        consume(createRecord);
        consume(sinkRecordFactory.createRecord(str, (byte) 2));
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName(createRecord));
        assertTable.exists().hasNumberOfRows(2).hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER, (byte) 1, (byte) 2);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT, "John Doe", "John Doe");
        getSink().assertColumnType(assertTable, "nick_name$", ValueType.TEXT, "John Doe$", "John Doe$");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testInsertModeUpsertWithNoPrimaryKey(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.NONE.getValue());
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        try {
            consume(sinkRecordFactory.createRecordNoKey(topicName("server1", "schema", randomTableName())));
            stopSinkConnector();
        } catch (Exception e) {
            Assertions.assertThat(e.getCause().getCause().getMessage()).matches("Cannot write to table [a-zA-Z0-9_]* with no key fields defined\\.");
        }
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testInsertModeUpsertWithPrimaryKeyModeKafka(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.KAFKA.getValue());
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        String str = topicName("server1", "schema", randomTableName());
        SinkRecord createRecord = sinkRecordFactory.createRecord(str, (byte) 1);
        consume(createRecord);
        consume(sinkRecordFactory.createRecord(str, (byte) 1));
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName(createRecord));
        assertTable.exists().hasNumberOfRows(1).hasNumberOfColumns(6);
        getSink().assertColumnType(assertTable, "__connect_topic", ValueType.TEXT, str);
        getSink().assertColumnType(assertTable, "__connect_partition", ValueType.NUMBER, 0);
        getSink().assertColumnType(assertTable, "__connect_offset", ValueType.NUMBER, 1L);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER, (byte) 1);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT, "John Doe");
        getSink().assertColumnType(assertTable, "nick_name$", ValueType.TEXT, "John Doe$");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testInsertModeUpsertWithPrimaryKeyModeRecordKey(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue());
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        String str = topicName("server1", "schema", randomTableName());
        SinkRecord createRecord = sinkRecordFactory.createRecord(str, (byte) 1);
        consume(createRecord);
        consume(sinkRecordFactory.createRecord(str, (byte) 1));
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName(createRecord));
        assertTable.exists().hasNumberOfRows(1).hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER, (byte) 1);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT, "John Doe");
        getSink().assertColumnType(assertTable, "nick_name$", ValueType.TEXT, "John Doe$");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testInsertModeUpsertWithPrimaryKeyModeRecordValue(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_VALUE.getValue());
        defaultSinkConfig.put("primary.key.fields", "id");
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        String str = topicName("server1", "schema", randomTableName());
        SinkRecord createRecord = sinkRecordFactory.createRecord(str, (byte) 1);
        consume(createRecord);
        consume(sinkRecordFactory.createRecord(str, (byte) 1));
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName(createRecord));
        assertTable.exists().hasNumberOfRows(1).hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER, (byte) 1);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT, "John Doe");
        getSink().assertColumnType(assertTable, "nick_name$", ValueType.TEXT, "John Doe$");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testInsertModeUpdateWithNoPrimaryKey(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.NONE.getValue());
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.UPDATE.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        SinkRecord createRecordNoKey = sinkRecordFactory.createRecordNoKey(topicName("server1", "schema", randomTableName()));
        consume(createRecordNoKey);
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName(createRecordNoKey));
        assertTable.exists().hasNumberOfRows(0).hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT);
        getSink().assertColumnType(assertTable, "nick_name$", ValueType.TEXT);
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testInsertModeUpdateWithPrimaryKeyModeKafka(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.KAFKA.getValue());
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.UPDATE.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        SinkRecord createRecord = sinkRecordFactory.createRecord(topicName("server1", "schema", randomTableName()));
        consume(createRecord);
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName(createRecord));
        assertTable.exists().hasNumberOfRows(0).hasNumberOfColumns(6);
        getSink().assertColumnType(assertTable, "__connect_topic", ValueType.TEXT);
        getSink().assertColumnType(assertTable, "__connect_partition", ValueType.NUMBER);
        getSink().assertColumnType(assertTable, "__connect_offset", ValueType.NUMBER);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT);
        getSink().assertColumnType(assertTable, "nick_name$", ValueType.TEXT);
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testInsertModeUpdateWithPrimaryKeyModeRecordKey(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue());
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.UPDATE.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        SinkRecord createRecord = sinkRecordFactory.createRecord(topicName("server1", "schema", randomTableName()));
        consume(createRecord);
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName(createRecord));
        assertTable.exists().hasNumberOfRows(0).hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT);
        getSink().assertColumnType(assertTable, "nick_name$", ValueType.TEXT);
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testInsertModeUpdateWithPrimaryKeyModeRecordValue(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_VALUE.getValue());
        defaultSinkConfig.put("primary.key.fields", "id");
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.UPDATE.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        SinkRecord createRecord = sinkRecordFactory.createRecord(topicName("server1", "schema", randomTableName()));
        consume(createRecord);
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName(createRecord));
        assertTable.exists().hasNumberOfRows(0).hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT);
        getSink().assertColumnType(assertTable, "nick_name$", ValueType.TEXT);
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @FixFor({"DBZ-7191"})
    @ParameterizedTest
    public void testRecordDefaultValueUsedOnlyWithRequiredFieldWithNullValue(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue());
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.INSERT.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        SinkRecord createRecordWithSchemaValue = sinkRecordFactory.createRecordWithSchemaValue(topicName("server1", "schema", randomTableName()), (byte) 1, List.of("optional_with_default_null_value"), List.of(SchemaBuilder.string().defaultValue("default").optional().build()), Arrays.asList(null));
        consume(createRecordWithSchemaValue);
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName(createRecordWithSchemaValue));
        assertTable.exists().hasNumberOfRows(1).hasNumberOfColumns(2);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER, (byte) 1);
        getSink().assertColumnHasNullValue(assertTable, "optional_with_default_null_value");
    }
}
