package io.debezium.connector.jdbc.integration;

import io.debezium.bindings.kafka.KafkaDebeziumSinkRecord;
import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.junit.TestHelper;
import io.debezium.connector.jdbc.junit.jupiter.Sink;
import io.debezium.connector.jdbc.junit.jupiter.SinkRecordFactoryArgumentsProvider;
import io.debezium.connector.jdbc.util.SinkRecordFactory;
import io.debezium.sink.SinkConnectorConfig;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
import org.assertj.db.api.TableAssert;
import org.assertj.db.type.ValueType;
import org.fest.assertions.Assertions;
import org.fest.assertions.Index;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;

/* loaded from: input_file:io/debezium/connector/jdbc/integration/AbstractJdbcSinkPrimaryKeyModeTest.class */
public abstract class AbstractJdbcSinkPrimaryKeyModeTest extends AbstractJdbcSinkTest {
    public AbstractJdbcSinkPrimaryKeyModeTest(Sink sink) {
        super(sink);
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testRecordWithNoPrimaryKeyColumnsWithPrimaryKeyModeNone(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.NONE.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        KafkaDebeziumSinkRecord createRecordNoKey = sinkRecordFactory.createRecordNoKey(topicName("server1", "schema", randomTableName()));
        consume(createRecordNoKey);
        String destinationTableName = destinationTableName(createRecordNoKey);
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName);
        assertTable.exists().hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER, (byte) 1);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT, "John Doe");
        getSink().assertColumnType(assertTable, "nick_name$", ValueType.TEXT, "John Doe$");
        assertHasPrimaryKeyColumns(destinationTableName, new String[0]);
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testRecordWithNoPrimaryKeyColumnsWithPrimaryKeyModeKafka(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.KAFKA.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        String str = topicName("server1", "schema", randomTableName());
        KafkaDebeziumSinkRecord createRecordNoKey = sinkRecordFactory.createRecordNoKey(str);
        consume(createRecordNoKey);
        String destinationTableName = destinationTableName(createRecordNoKey);
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName);
        assertTable.exists().hasNumberOfColumns(6);
        getSink().assertColumnType(assertTable, "__connect_topic", ValueType.TEXT, str);
        getSink().assertColumnType(assertTable, "__connect_partition", ValueType.NUMBER, 0);
        getSink().assertColumnType(assertTable, "__connect_offset", ValueType.NUMBER, 0);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER, (byte) 1);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT, "John Doe");
        getSink().assertColumnType(assertTable, "nick_name$", ValueType.TEXT, "John Doe$");
        assertHasPrimaryKeyColumns(destinationTableName, "__connect_topic", "__connect_partition", "__connect_offset");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testRecordWithPrimaryKeyColumnWithPrimaryKeyModeKafka(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.KAFKA.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        String str = topicName("server1", "schema", randomTableName());
        KafkaDebeziumSinkRecord createRecord = sinkRecordFactory.createRecord(str);
        consume(createRecord);
        String destinationTableName = destinationTableName(createRecord);
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName);
        assertTable.exists().hasNumberOfColumns(6);
        getSink().assertColumnType(assertTable, "__connect_topic", ValueType.TEXT, str);
        getSink().assertColumnType(assertTable, "__connect_partition", ValueType.NUMBER, 0);
        getSink().assertColumnType(assertTable, "__connect_offset", ValueType.NUMBER, 1L);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER, (byte) 1);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT, "John Doe");
        getSink().assertColumnType(assertTable, "nick_name$", ValueType.TEXT, "John Doe$");
        assertHasPrimaryKeyColumns(destinationTableName, "__connect_topic", "__connect_partition", "__connect_offset");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testRecordWithPrimaryKeyColumnWithPrimaryKeyModeRecordKey(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        KafkaDebeziumSinkRecord createRecord = sinkRecordFactory.createRecord(topicName("server1", "schema", randomTableName()));
        consume(createRecord);
        String destinationTableName = destinationTableName(createRecord);
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName);
        assertTable.exists().hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER, (byte) 1);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT, "John Doe");
        getSink().assertColumnType(assertTable, "nick_name$", ValueType.TEXT, "John Doe$");
        TestHelper.assertTable(dataSource(), destinationTableName).exists().hasNumberOfColumns(3).column("id").isNumber(false).hasValues(new Number[]{(byte) 1}).column("name").isText(false).hasValues(new String[]{"John Doe"}).column("nick_name$").isText(false).hasValues(new String[]{"John Doe$"});
        assertHasPrimaryKeyColumns(destinationTableName, "id");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testRecordWithPrimaryKeyColumnsWithPrimaryKeyModeRecordKey(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        KafkaDebeziumSinkRecord createRecordMultipleKeyColumns = sinkRecordFactory.createRecordMultipleKeyColumns(topicName("server1", "schema", randomTableName()));
        consume(createRecordMultipleKeyColumns);
        String destinationTableName = destinationTableName(createRecordMultipleKeyColumns);
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName);
        assertTable.exists().hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id1", ValueType.NUMBER, (byte) 1);
        getSink().assertColumnType(assertTable, "id2", ValueType.NUMBER, 10);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT, "John Doe");
        assertHasPrimaryKeyColumns(destinationTableName, "id1", "id2");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testRecordWithPrimaryKeyColumnWithPrimaryKeyModeRecordHeader(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.RECORD_HEADER.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        KafkaDebeziumSinkRecord createRecord = sinkRecordFactory.createRecord(topicName("server1", "schema", randomTableName()));
        createRecord.getOriginalKafkaRecord().headers().addInt("id", 1);
        consume(createRecord);
        String destinationTableName = destinationTableName(createRecord);
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName);
        assertTable.exists().hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER, (byte) 1);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT, "John Doe");
        getSink().assertColumnType(assertTable, "nick_name$", ValueType.TEXT, "John Doe$");
        TestHelper.assertTable(dataSource(), destinationTableName).exists().hasNumberOfColumns(3).column("id").isNumber(false).hasValues(new Number[]{(byte) 1}).column("name").isText(false).hasValues(new String[]{"John Doe"}).column("nick_name$").isText(false).hasValues(new String[]{"John Doe$"});
        assertHasPrimaryKeyColumns(destinationTableName, "id");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testRecordWithPrimaryKeyColumnsWithPrimaryKeyModeRecordHeader(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.RECORD_HEADER.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        KafkaDebeziumSinkRecord createRecordMultipleKeyColumns = sinkRecordFactory.createRecordMultipleKeyColumns(topicName("server1", "schema", randomTableName()));
        SinkRecord sinkRecord = new SinkRecord(createRecordMultipleKeyColumns.topicName(), createRecordMultipleKeyColumns.partition().intValue(), (Schema) null, (Object) null, createRecordMultipleKeyColumns.valueSchema(), createRecordMultipleKeyColumns.value(), createRecordMultipleKeyColumns.offset());
        sinkRecord.headers().addInt("id1", 1);
        sinkRecord.headers().addInt("id2", 10);
        KafkaDebeziumSinkRecord kafkaDebeziumSinkRecord = new KafkaDebeziumSinkRecord(sinkRecord);
        consume(kafkaDebeziumSinkRecord);
        String destinationTableName = destinationTableName(kafkaDebeziumSinkRecord);
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName);
        assertTable.exists().hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id1", ValueType.NUMBER, (byte) 1);
        getSink().assertColumnType(assertTable, "id2", ValueType.NUMBER, 10);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT, "John Doe");
        assertHasPrimaryKeyColumns(destinationTableName, "id1", "id2");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testRecordWithNoPrimaryKeyColumnsWithPrimaryKeyModeRecordValue(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.RECORD_VALUE.getValue());
        defaultSinkConfig.put("primary.key.fields", "id,name");
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        KafkaDebeziumSinkRecord createRecordNoKey = sinkRecordFactory.createRecordNoKey(topicName("server1", "schema", randomTableName()));
        consume(createRecordNoKey);
        String destinationTableName = destinationTableName(createRecordNoKey);
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName);
        assertTable.exists().hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER, (byte) 1);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT, "John Doe");
        getSink().assertColumnType(assertTable, "nick_name$", ValueType.TEXT, "John Doe$");
        assertHasPrimaryKeyColumns(destinationTableName, "id", "name");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testRecordWithPrimaryKeyColumnWithPrimaryKeyModeRecordValueWithNoFieldsSpecified(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.RECORD_VALUE.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        KafkaDebeziumSinkRecord createRecordNoKey = sinkRecordFactory.createRecordNoKey(topicName("server1", "schema", randomTableName()));
        consume(createRecordNoKey);
        String destinationTableName = destinationTableName(createRecordNoKey);
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName);
        assertTable.exists().hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER, (byte) 1);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT, "John Doe");
        getSink().assertColumnType(assertTable, "nick_name$", ValueType.TEXT, "John Doe$");
        assertHasPrimaryKeyColumns(destinationTableName, "id", "name", "nick_name$");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testRecordWithPrimaryKeyColumnWithPrimaryKeyModeRecordValue(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.RECORD_VALUE.getValue());
        defaultSinkConfig.put("primary.key.fields", "id,name");
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        KafkaDebeziumSinkRecord createRecord = sinkRecordFactory.createRecord(topicName("server1", "schema", randomTableName()));
        consume(createRecord);
        String destinationTableName = destinationTableName(createRecord);
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName);
        assertTable.exists().hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER, (byte) 1);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT, "John Doe");
        getSink().assertColumnType(assertTable, "nick_name$", ValueType.TEXT, "John Doe$");
        assertHasPrimaryKeyColumns(destinationTableName, "id", "name");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testRecordWithPrimaryKeyColumnsWithPrimaryKeyModeRecordValue(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.RECORD_VALUE.getValue());
        defaultSinkConfig.put("primary.key.fields", "id1,id2,name");
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        KafkaDebeziumSinkRecord createRecordMultipleKeyColumns = sinkRecordFactory.createRecordMultipleKeyColumns(topicName("server1", "schema", randomTableName()));
        consume(createRecordMultipleKeyColumns);
        String destinationTableName = destinationTableName(createRecordMultipleKeyColumns);
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName);
        assertTable.exists().hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id1", ValueType.NUMBER, (byte) 1);
        getSink().assertColumnType(assertTable, "id2", ValueType.NUMBER, 10);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT, "John Doe");
        assertHasPrimaryKeyColumns(destinationTableName, "id1", "id2", "name");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testRecordWithPrimaryKeyColumnsWithPrimaryKeyModeRecordValueWithSubsetOfFields(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.RECORD_VALUE.getValue());
        defaultSinkConfig.put("primary.key.fields", "id1,name");
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        KafkaDebeziumSinkRecord createRecordMultipleKeyColumns = sinkRecordFactory.createRecordMultipleKeyColumns(topicName("server1", "schema", randomTableName()));
        consume(createRecordMultipleKeyColumns);
        String destinationTableName = destinationTableName(createRecordMultipleKeyColumns);
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName);
        assertTable.exists().hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id1", ValueType.NUMBER, (byte) 1);
        getSink().assertColumnType(assertTable, "id2", ValueType.NUMBER, 10);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT, "John Doe");
        assertHasPrimaryKeyColumns(destinationTableName, "id1", "name");
    }

    protected void assertHasPrimaryKeyColumns(String str, String... strArr) {
        assertHasPrimaryKeyColumns(str, true, strArr);
    }

    protected void assertHasPrimaryKeyColumns(String str, boolean z, String... strArr) {
        List<String> primaryKeyColumnNames = TestHelper.getPrimaryKeyColumnNames(dataSource(), str);
        if (strArr.length == 0) {
            Assertions.assertThat(primaryKeyColumnNames).isEmpty();
            return;
        }
        if (!z) {
            Assertions.assertThat(primaryKeyColumnNames).containsExactly(strArr);
            return;
        }
        List list = (List) primaryKeyColumnNames.stream().map((v0) -> {
            return v0.toLowerCase();
        }).collect(Collectors.toList());
        Assertions.assertThat(list.size()).isEqualTo(strArr.length);
        for (int i = 0; i < strArr.length; i++) {
            Assertions.assertThat(list).contains(strArr[i].toLowerCase(), Index.atIndex(i));
        }
    }
}
