package io.debezium.connector.binlog;

import io.debezium.config.Configuration;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.converters.JdbcSinkDataTypesConverter;
import io.debezium.connector.binlog.util.BinlogTestConnection;
import io.debezium.connector.binlog.util.TestHelper;
import io.debezium.connector.binlog.util.UniqueDatabase;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.util.List;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/binlog/BinlogJdbcSinkDataTypeConverterIT.class */
public abstract class BinlogJdbcSinkDataTypeConverterIT<C extends SourceConnector> extends AbstractBinlogConnectorIT<C> {
    private static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("file-schema-history-jdbc-sink.text").toAbsolutePath();
    private Configuration config;

    @Before
    public void beforeEach() {
        stopConnector();
        initializeConnectorTestFramework();
    }

    @After
    public void afterEach() {
        try {
            stopConnector();
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
        } catch (Throwable th) {
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-6225"})
    public void testBooleanDataTypeMapping() throws Exception {
        if (VerifyRecord.isApucurioAvailable()) {
            skipAvroValidation();
        }
        UniqueDatabase withDbHistoryPath = TestHelper.getUniqueDatabase("booleanit", "boolean_test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
        withDbHistoryPath.createAndInitialize();
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        this.config = withDbHistoryPath.defaultConfig().with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.INITIAL).with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, withDbHistoryPath.qualifiedTableName("BOOLEAN_TEST") + "," + withDbHistoryPath.qualifiedTableName("BOOLEAN_TEST2")).with(BinlogConnectorConfig.PROPAGATE_COLUMN_SOURCE_TYPE, ".*").with(BinlogConnectorConfig.CUSTOM_CONVERTERS, "jdbc-sink").with("jdbc-sink.type", JdbcSinkDataTypesConverter.class.getName()).with("jdbc-sink.selector.boolean", ".*BOOLEAN_TEST.b.*|.*BOOLEAN_TEST2.b.*").build();
        start(getConnectorClass(), this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(7);
        Assertions.assertThat(consumeRecordsByTopic).isNotNull();
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(withDbHistoryPath.topicForTable("BOOLEAN_TEST"));
        Assertions.assertThat(recordsForTopic).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        System.out.println(sourceRecord);
        Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
        Schema schema = sourceRecord.valueSchema().field("after").schema();
        Assertions.assertThat(schema.field("b1").schema().type()).isEqualTo(Schema.Type.INT16);
        Assertions.assertThat((String) schema.field("b1").schema().parameters().get("__debezium.source.column.type")).isEqualTo("TINYINT");
        Assertions.assertThat((String) schema.field("b1").schema().parameters().get("__debezium.source.column.length")).isEqualTo("1");
        Assertions.assertThat(struct.get("b1")).isEqualTo((short) 0);
        Assertions.assertThat(schema.field("b2").schema().type()).isEqualTo(Schema.Type.INT16);
        Assertions.assertThat((String) schema.field("b2").schema().parameters().get("__debezium.source.column.type")).isEqualTo("TINYINT");
        Assertions.assertThat((String) schema.field("b2").schema().parameters().get("__debezium.source.column.length")).isEqualTo("1");
        Assertions.assertThat(struct.get("b2")).isEqualTo((short) 1);
        BinlogTestConnection testDatabaseConnection = getTestDatabaseConnection(withDbHistoryPath.getDatabaseName());
        try {
            JdbcConnection connect = testDatabaseConnection.connect();
            try {
                connect.execute(new String[]{"CREATE TABLE BOOLEAN_TEST2 (`id` INT NOT NULL AUTO_INCREMENT, `b1` boolean default true, `b2` boolean default false, primary key (`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;"});
                connect.execute(new String[]{"INSERT INTO BOOLEAN_TEST2 (b1,b2) VALUES (true, false)"});
                if (connect != null) {
                    connect.close();
                }
                if (testDatabaseConnection != null) {
                    testDatabaseConnection.close();
                }
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
                Assertions.assertThat(consumeRecordsByTopic2).isNotNull();
                List recordsForTopic2 = consumeRecordsByTopic2.recordsForTopic(withDbHistoryPath.topicForTable("BOOLEAN_TEST2"));
                Assertions.assertThat(recordsForTopic2).hasSize(1);
                SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(0);
                Struct struct2 = ((Struct) sourceRecord2.value()).getStruct("after");
                Schema schema2 = sourceRecord2.valueSchema().field("after").schema();
                Assertions.assertThat(schema2.field("b1").schema().type()).isEqualTo(Schema.Type.INT16);
                Assertions.assertThat((String) schema2.field("b1").schema().parameters().get("__debezium.source.column.type")).isEqualTo("BOOLEAN");
                Assertions.assertThat((String) schema2.field("b1").schema().parameters().get("__debezium.source.column.length")).isNull();
                Assertions.assertThat(struct2.get("b1")).isEqualTo((short) 1);
                Assertions.assertThat(schema2.field("b2").schema().type()).isEqualTo(Schema.Type.INT16);
                Assertions.assertThat((String) schema2.field("b2").schema().parameters().get("__debezium.source.column.type")).isEqualTo("BOOLEAN");
                Assertions.assertThat((String) schema2.field("b2").schema().parameters().get("__debezium.source.column.length")).isNull();
                Assertions.assertThat(struct2.get("b2")).isEqualTo((short) 0);
                stopConnector();
            } finally {
            }
        } catch (Throwable th) {
            if (testDatabaseConnection != null) {
                try {
                    testDatabaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-6226"})
    public void testRealDataTypeMapping() throws Exception {
        UniqueDatabase withDbHistoryPath = TestHelper.getUniqueDatabase("realit", "real_test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
        withDbHistoryPath.createAndInitialize();
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        this.config = withDbHistoryPath.defaultConfig().with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.INITIAL).with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, withDbHistoryPath.qualifiedTableName("REAL_TEST") + "," + withDbHistoryPath.qualifiedTableName("REAL_TEST2")).with(BinlogConnectorConfig.PROPAGATE_COLUMN_SOURCE_TYPE, ".*").with(BinlogConnectorConfig.CUSTOM_CONVERTERS, "jdbc-sink").with("jdbc-sink.type", JdbcSinkDataTypesConverter.class.getName()).with("jdbc-sink.selector.real", ".*REAL_TEST.r.*|.*REAL_TEST2.r.*").build();
        start(getConnectorClass(), this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(7);
        Assertions.assertThat(consumeRecordsByTopic).isNotNull();
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(withDbHistoryPath.topicForTable("REAL_TEST"));
        Assertions.assertThat(recordsForTopic).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
        Schema schema = sourceRecord.valueSchema().field("after").schema();
        Assertions.assertThat(schema.field("r1").schema().type()).isEqualTo(Schema.Type.FLOAT64);
        Assertions.assertThat((String) schema.field("r1").schema().parameters().get("__debezium.source.column.type")).isEqualTo("DOUBLE");
        Assertions.assertThat(struct.get("r1")).isEqualTo(Double.valueOf(2.36d));
        BinlogTestConnection testDatabaseConnection = getTestDatabaseConnection(withDbHistoryPath.getDatabaseName());
        try {
            JdbcConnection connect = testDatabaseConnection.connect();
            try {
                connect.execute(new String[]{"CREATE TABLE REAL_TEST2 (`id` INT NOT NULL AUTO_INCREMENT, `r1` real default 3.14, primary key (`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;"});
                connect.execute(new String[]{"INSERT INTO REAL_TEST2 (r1) VALUES (9.78)"});
                if (connect != null) {
                    connect.close();
                }
                if (testDatabaseConnection != null) {
                    testDatabaseConnection.close();
                }
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
                Assertions.assertThat(consumeRecordsByTopic2).isNotNull();
                List recordsForTopic2 = consumeRecordsByTopic2.recordsForTopic(withDbHistoryPath.topicForTable("REAL_TEST2"));
                Assertions.assertThat(recordsForTopic2).hasSize(1);
                SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(0);
                Struct struct2 = ((Struct) sourceRecord2.value()).getStruct("after");
                Schema schema2 = sourceRecord2.valueSchema().field("after").schema();
                Assertions.assertThat(schema2.field("r1").schema().type()).isEqualTo(Schema.Type.FLOAT64);
                Assertions.assertThat((String) schema2.field("r1").schema().parameters().get("__debezium.source.column.type")).isEqualTo("REAL");
                Assertions.assertThat(struct2.get("r1")).isEqualTo(Double.valueOf(9.78d));
                stopConnector();
            } finally {
            }
        } catch (Throwable th) {
            if (testDatabaseConnection != null) {
                try {
                    testDatabaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-6231"})
    public void testNationalizedCharacterDataTypeMappings() throws Exception {
        UniqueDatabase withDbHistoryPath = TestHelper.getUniqueDatabase("nctestit", "nationalized_character_test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
        withDbHistoryPath.createAndInitialize();
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        this.config = withDbHistoryPath.defaultConfig().with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.INITIAL).with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, withDbHistoryPath.qualifiedTableName("NC_TEST") + "," + withDbHistoryPath.qualifiedTableName("NC_TEST2")).with(BinlogConnectorConfig.PROPAGATE_COLUMN_SOURCE_TYPE, ".*").with(BinlogConnectorConfig.CUSTOM_CONVERTERS, "jdbc-sink").with("jdbc-sink.type", JdbcSinkDataTypesConverter.class.getName()).with("jdbc-sink.selector.string", ".*NC_TEST.nc.*|.*NC_TEST2.nc.*").build();
        start(getConnectorClass(), this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(7);
        Assertions.assertThat(consumeRecordsByTopic).isNotNull();
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(withDbHistoryPath.topicForTable("NC_TEST"));
        Assertions.assertThat(recordsForTopic).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
        Schema schema = sourceRecord.valueSchema().field("after").schema();
        Assertions.assertThat(schema.field("nc1").schema().type()).isEqualTo(Schema.Type.STRING);
        Assertions.assertThat(struct.get("nc1")).isEqualTo("a");
        Assertions.assertThat((String) schema.field("nc1").schema().parameters().get("__debezium.source.column.character_set")).startsWith("utf8");
        Assertions.assertThat((String) schema.field("nc1").schema().parameters().get("__debezium.source.column.type")).isEqualTo("CHAR");
        Assertions.assertThat((String) schema.field("nc1").schema().parameters().get("__debezium.source.column.length")).isEqualTo("1");
        Assertions.assertThat(schema.field("nc2").schema().type()).isEqualTo(Schema.Type.STRING);
        Assertions.assertThat(struct.get("nc2")).isEqualTo("123");
        Assertions.assertThat((String) schema.field("nc2").schema().parameters().get("__debezium.source.column.character_set")).startsWith("utf8");
        Assertions.assertThat((String) schema.field("nc2").schema().parameters().get("__debezium.source.column.type")).isEqualTo("CHAR");
        Assertions.assertThat((String) schema.field("nc2").schema().parameters().get("__debezium.source.column.length")).isEqualTo("5");
        Assertions.assertThat(schema.field("nc3").schema().type()).isEqualTo(Schema.Type.STRING);
        Assertions.assertThat(struct.get("nc3")).isEqualTo("hello");
        Assertions.assertThat((String) schema.field("nc3").schema().parameters().get("__debezium.source.column.character_set")).startsWith("utf8");
        Assertions.assertThat((String) schema.field("nc3").schema().parameters().get("__debezium.source.column.type")).isEqualTo("VARCHAR");
        Assertions.assertThat((String) schema.field("nc3").schema().parameters().get("__debezium.source.column.length")).isEqualTo("25");
        BinlogTestConnection testDatabaseConnection = getTestDatabaseConnection(withDbHistoryPath.getDatabaseName());
        try {
            JdbcConnection connect = testDatabaseConnection.connect();
            try {
                connect.execute(new String[]{"CREATE TABLE NC_TEST2 (`id` INT NOT NULL AUTO_INCREMENT, `nc1` nchar, `nc2` nchar(5), `nc3` nvarchar(25), primary key (`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;"});
                connect.execute(new String[]{"INSERT INTO NC_TEST2 (nc1,nc2,nc3) VALUES ('b', '456', 'world')"});
                if (connect != null) {
                    connect.close();
                }
                if (testDatabaseConnection != null) {
                    testDatabaseConnection.close();
                }
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
                Assertions.assertThat(consumeRecordsByTopic2).isNotNull();
                List recordsForTopic2 = consumeRecordsByTopic2.recordsForTopic(withDbHistoryPath.topicForTable("NC_TEST2"));
                Assertions.assertThat(recordsForTopic2).hasSize(1);
                SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(0);
                Struct struct2 = ((Struct) sourceRecord2.value()).getStruct("after");
                Schema schema2 = sourceRecord2.valueSchema().field("after").schema();
                Assertions.assertThat(schema2.field("nc1").schema().type()).isEqualTo(Schema.Type.STRING);
                Assertions.assertThat(struct2.get("nc1")).isEqualTo(BinlogReadOnlyIncrementalSnapshotIT.EXCLUDED_TABLE);
                Assertions.assertThat((String) schema2.field("nc1").schema().parameters().get("__debezium.source.column.character_set")).startsWith("utf8");
                Assertions.assertThat((String) schema2.field("nc1").schema().parameters().get("__debezium.source.column.type")).isEqualTo("NCHAR");
                Assertions.assertThat((String) schema2.field("nc1").schema().parameters().get("__debezium.source.column.length")).isEqualTo("1");
                Assertions.assertThat(schema2.field("nc2").schema().type()).isEqualTo(Schema.Type.STRING);
                Assertions.assertThat(struct2.get("nc2")).isEqualTo("456");
                Assertions.assertThat((String) schema2.field("nc2").schema().parameters().get("__debezium.source.column.character_set")).startsWith("utf8");
                Assertions.assertThat((String) schema2.field("nc2").schema().parameters().get("__debezium.source.column.type")).isEqualTo("NCHAR");
                Assertions.assertThat((String) schema2.field("nc2").schema().parameters().get("__debezium.source.column.length")).isEqualTo("5");
                Assertions.assertThat(schema2.field("nc3").schema().type()).isEqualTo(Schema.Type.STRING);
                Assertions.assertThat(struct2.get("nc3")).isEqualTo("world");
                Assertions.assertThat((String) schema2.field("nc3").schema().parameters().get("__debezium.source.column.character_set")).startsWith("utf8");
                Assertions.assertThat((String) schema2.field("nc3").schema().parameters().get("__debezium.source.column.type")).isEqualTo("NVARCHAR");
                Assertions.assertThat((String) schema2.field("nc3").schema().parameters().get("__debezium.source.column.length")).isEqualTo("25");
                stopConnector();
            } finally {
            }
        } catch (Throwable th) {
            if (testDatabaseConnection != null) {
                try {
                    testDatabaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
