package io.debezium.connector.jdbc.e2e;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.bindings.kafka.KafkaDebeziumSinkRecord;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.junit.TestHelper;
import io.debezium.connector.jdbc.junit.jupiter.Sink;
import io.debezium.connector.jdbc.junit.jupiter.SinkType;
import io.debezium.connector.jdbc.junit.jupiter.WithPostgresExtension;
import io.debezium.connector.jdbc.junit.jupiter.e2e.ForSource;
import io.debezium.connector.jdbc.junit.jupiter.e2e.SkipExtractNewRecordState;
import io.debezium.connector.jdbc.junit.jupiter.e2e.SkipWhenSink;
import io.debezium.connector.jdbc.junit.jupiter.e2e.SkipWhenSinks;
import io.debezium.connector.jdbc.junit.jupiter.e2e.SkipWhenSource;
import io.debezium.connector.jdbc.junit.jupiter.e2e.SkipWhenSources;
import io.debezium.connector.jdbc.junit.jupiter.e2e.WithTemporalPrecisionMode;
import io.debezium.connector.jdbc.junit.jupiter.e2e.source.Source;
import io.debezium.connector.jdbc.junit.jupiter.e2e.source.SourceConnectorOptions;
import io.debezium.connector.jdbc.junit.jupiter.e2e.source.SourcePipelineInvocationContextProvider;
import io.debezium.connector.jdbc.junit.jupiter.e2e.source.SourceType;
import io.debezium.connector.jdbc.junit.jupiter.e2e.source.ValueBinder;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.sink.SinkConnectorConfig;
import io.debezium.sink.naming.CollectionNamingStrategy;
import io.debezium.sink.naming.DefaultCollectionNamingStrategy;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import io.debezium.time.MicroDuration;
import io.debezium.util.HexConverter;
import io.debezium.util.Strings;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Date;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoField;
import java.time.temporal.TemporalField;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.TimeZone;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.connect.sink.SinkRecord;
import org.fest.assertions.Assertions;
import org.fest.assertions.IntAssert;
import org.fest.assertions.ObjectAssert;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({SourcePipelineInvocationContextProvider.class})
@SkipExtractNewRecordState
/* loaded from: input_file:io/debezium/connector/jdbc/e2e/AbstractJdbcSinkPipelineIT.class */
public abstract class AbstractJdbcSinkPipelineIT extends AbstractJdbcSinkIT {
    private final CollectionNamingStrategy collectionNamingStrategy = new DefaultCollectionNamingStrategy();
    private static final ZoneId SOURCE_ZONE_ID = TimeZone.getTimeZone(TestHelper.getSourceTimeZone()).toZoneId();
    private static final ZoneId SINK_ZONE_ID = TimeZone.getTimeZone(TestHelper.getSinkTimeZone()).toZoneId();

    /* JADX INFO: Access modifiers changed from: protected */
    @FunctionalInterface
    /* loaded from: input_file:io/debezium/connector/jdbc/e2e/AbstractJdbcSinkPipelineIT$ColumnReader.class */
    public interface ColumnReader<T> {
        T read(ResultSet resultSet, int i) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @FunctionalInterface
    /* loaded from: input_file:io/debezium/connector/jdbc/e2e/AbstractJdbcSinkPipelineIT$ConfigurationAdjuster.class */
    public interface ConfigurationAdjuster {
        void adjust(ConnectorConfiguration connectorConfiguration);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @FunctionalInterface
    /* loaded from: input_file:io/debezium/connector/jdbc/e2e/AbstractJdbcSinkPipelineIT$DataTypeColumnAssert.class */
    public interface DataTypeColumnAssert {
        void assertColumn(SinkRecord sinkRecord);
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.ORACLE}, reason = "No BIT data type support")
    public void testBitDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "bit", bitValues(source, "1", "0"), isBitCoercedToBoolean() ? List.of(true, false) : List.of(1, 0), sinkRecord -> {
            SourceConnectorOptions options = source.getOptions();
            switch (sink.getType()) {
                case ORACLE:
                    assertColumn(sink, sinkRecord, "id", getBooleanType(), 1);
                    assertColumn(sink, sinkRecord, "data", getBooleanType(), 1);
                    return;
                case POSTGRES:
                    assertColumn(sink, sinkRecord, "id", getBooleanType());
                    assertColumn(sink, sinkRecord, "data", options.isColumnTypePropagated() ? "BIT" : getBooleanType());
                    return;
                default:
                    assertColumn(sink, sinkRecord, "id", getBooleanType());
                    assertColumn(sink, sinkRecord, "data", getBooleanType());
                    return;
            }
        }, (resultSet, i) -> {
            return isBitCoercedToBoolean() ? Boolean.valueOf(resultSet.getBoolean(i)) : Integer.valueOf(resultSet.getInt(i));
        });
    }

    @SkipWhenSink(value = {SinkType.ORACLE, SinkType.DB2}, reason = "BIT(n) is sent as bytes, BLOB is not permitted in primary keys")
    @TestTemplate
    @SkipWhenSource(value = {SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No BIT(n) data type support")
    public void testBitWithSizeDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "bit(2)", bitValues(source, "10", "01"), List.of(2, 1), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getBitsDataType(), 2);
            assertColumn(sink, sinkRecord, "data", getBitsDataType(), 2);
        }, (resultSet, i) -> {
            switch (sink.getType()) {
                case SQLSERVER:
                    return Integer.valueOf(new BigInteger(resultSet.getBytes(i)).intValue());
                case POSTGRES:
                    return Integer.valueOf(Integer.parseInt(resultSet.getString(i), 2));
                default:
                    return Integer.valueOf(resultSet.getInt(i));
            }
        });
    }

    @SkipWhenSink(value = {SinkType.MYSQL, SinkType.POSTGRES, SinkType.SQLSERVER}, reason = "BIT(n) is only applicable to non-key columns")
    @TestTemplate
    @SkipWhenSource(value = {SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No BIT(n) data type support")
    public void testBitWithSizeDataTypeNotInKey(Source source, Sink sink) throws Exception {
        String randomTableName = source.randomTableName();
        registerSourceConnector(source, randomTableName);
        source.execute(String.format("CREATE TABLE %s (data bit(2))", randomTableName));
        source.streamTable(randomTableName);
        source.execute(String.format("INSERT INTO %s VALUES (%s)", randomTableName, Strings.join(",", bitValues(source, "01"))));
        Properties defaultSinkConfig = getDefaultSinkConfig(sink);
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.NONE.getValue());
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.INSERT.getValue());
        startSink(source, defaultSinkConfig, randomTableName);
        SinkRecord consumeSinkRecord = consumeSinkRecord();
        assertColumn(sink, consumeSinkRecord, "data", getBitsDataType());
        sink.assertRows(getSinkTable(consumeSinkRecord, sink), resultSet -> {
            Blob blob = resultSet.getBlob(1);
            Assertions.assertThat(blob.getBytes(1L, (int) blob.length())).isEqualTo(ByteBuffer.allocate(1).put((byte) 1).array());
            return null;
        });
    }

    @SkipWhenSink(value = {SinkType.ORACLE, SinkType.DB2}, reason = "BIT VARYING(n) is sent as bytes, BLOB is not permitted in primary keys")
    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No BIT VARYING(n) data type support")
    public void testBitVaryingDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "bit varying(2)", bitValues(source, "10", "01"), List.of(2, 1), sinkRecord -> {
            SourceConnectorOptions options = source.getOptions();
            assertColumn(sink, sinkRecord, "id", getBitsDataType(), 2);
            if (options.isColumnTypePropagated() && sink.getType() == SinkType.POSTGRES) {
                assertColumn(sink, sinkRecord, "data", "VARBIT", 2);
            } else {
                assertColumn(sink, sinkRecord, "data", getBitsDataType(), 2);
            }
        }, (resultSet, i) -> {
            switch (sink.getType()) {
                case SQLSERVER:
                    return Integer.valueOf(new BigInteger(resultSet.getBytes(i)).intValue());
                case POSTGRES:
                    return Integer.valueOf(Integer.parseInt(resultSet.getString(i), 2));
                default:
                    return Integer.valueOf(resultSet.getInt(i));
            }
        });
    }

    @SkipWhenSink(value = {SinkType.MYSQL, SinkType.POSTGRES, SinkType.SQLSERVER}, reason = "BIT VARYING(n) is only applicable to non-key columns")
    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No BIT VARYING(n) data type support")
    public void testBitVaryingDataTypeNotInKey(Source source, Sink sink) throws Exception {
        String randomTableName = source.randomTableName();
        registerSourceConnector(source, randomTableName);
        source.execute(String.format("CREATE TABLE %s (data bit varying(2))", randomTableName));
        source.streamTable(randomTableName);
        source.execute(String.format("INSERT INTO %s VALUES (%s)", randomTableName, Strings.join(",", bitValues(source, "01"))));
        Properties defaultSinkConfig = getDefaultSinkConfig(sink);
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.NONE.getValue());
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.INSERT.getValue());
        startSink(source, defaultSinkConfig, randomTableName);
        SinkRecord consumeSinkRecord = consumeSinkRecord();
        assertColumn(sink, consumeSinkRecord, "data", getBitsDataType());
        sink.assertRows(getSinkTable(consumeSinkRecord, sink), resultSet -> {
            Blob blob = resultSet.getBlob(1);
            Assertions.assertThat(blob.getBytes(1L, (int) blob.length())).isEqualTo(ByteBuffer.allocate(1).put((byte) 1).array());
            return null;
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No BOOLEAN data type support")
    public void testBooleanDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "boolean", List.of("true", "false"), List.of(1, 0), connectorConfiguration -> {
            applyJdbcSourceConverter(source, connectorConfiguration, ".*id|.*data", null, null);
        }, sinkRecord -> {
            if (!source.getType().is(SourceType.MYSQL)) {
                assertColumn(sink, sinkRecord, "id", getBooleanType());
                assertColumn(sink, sinkRecord, "data", getBooleanType());
                return;
            }
            assertColumn(sink, sinkRecord, "id", getInt16Type());
            if (sink.getType().is(SinkType.MYSQL) && source.getOptions().isColumnTypePropagated()) {
                assertColumn(sink, sinkRecord, "data", getBooleanType());
            } else {
                assertColumn(sink, sinkRecord, "data", getInt16Type());
            }
        }, (resultSet, i) -> {
            switch (sink.getType()) {
                case DB2:
                case POSTGRES:
                    return Integer.valueOf(resultSet.getBoolean(i) ? 1 : 0);
                default:
                    return Integer.valueOf(resultSet.getInt(i));
            }
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE}, reason = "No TINYINT data type support")
    public void testTinyIntDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "tinyint", List.of(10, 12), sinkRecord -> {
            boolean isColumnTypePropagated = source.getOptions().isColumnTypePropagated();
            assertColumn(sink, sinkRecord, "id", getInt16Type());
            assertColumn(sink, sinkRecord, "data", isColumnTypePropagated ? getInt8Type() : getInt16Type());
        }, (v0, v1) -> {
            return v0.getInt(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No TINYINT(n) data type support")
    public void testTinyIntWithSizeDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "tinyint(2)", List.of(10, 12), sinkRecord -> {
            boolean z = SinkType.MYSQL.is(sink.getType()) && source.getOptions().isColumnTypePropagated();
            assertColumn(sink, sinkRecord, "id", getInt16Type());
            assertColumn(sink, sinkRecord, "data", z ? getInt8Type() : getInt16Type());
        }, (v0, v1) -> {
            return v0.getInt(v1);
        });
    }

    @TestTemplate
    public void testSmallIntDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "smallint", List.of(10, 12), sinkRecord -> {
            if (source.getType().is(SourceType.ORACLE)) {
                assertColumn(sink, sinkRecord, "id", getDecimalType(), getMaxDecimalPrecision(), 0);
                assertColumn(sink, sinkRecord, "data", getDecimalType(), getMaxDecimalPrecision(), 0);
            } else {
                assertColumn(sink, sinkRecord, "id", getInt16Type());
                assertColumn(sink, sinkRecord, "data", getInt16Type());
            }
        }, (v0, v1) -> {
            return v0.getInt(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No SMALLINT(n) data type support")
    public void testSmallIntWithSizeDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "smallint(2)", List.of(10, 12), sinkRecord -> {
            SourceConnectorOptions options = source.getOptions();
            assertColumn(sink, sinkRecord, "id", getInt16Type());
            if (sink.getType().is(SinkType.ORACLE) && options.isColumnTypePropagated()) {
                assertColumn(sink, sinkRecord, "data", getInt16Type(), 2);
            } else {
                assertColumn(sink, sinkRecord, "data", getInt16Type());
            }
        }, (v0, v1) -> {
            return v0.getInt(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No SMALLSERIAL data type support")
    public void testSmallSerialDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "smallserial", List.of(10, 12), sinkRecord -> {
            SourceConnectorOptions options = source.getOptions();
            assertColumn(sink, sinkRecord, "id", getInt16Type());
            if (sink.getType().is(SinkType.POSTGRES) && options.isColumnTypePropagated()) {
                assertColumn(sink, sinkRecord, "data", "SMALLSERIAL");
            } else {
                assertColumn(sink, sinkRecord, "data", getInt16Type());
            }
        }, (v0, v1) -> {
            return v0.getInt(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No SERIAL data type support")
    public void testSerialDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "serial", List.of(10, 12), sinkRecord -> {
            SourceConnectorOptions options = source.getOptions();
            assertColumn(sink, sinkRecord, "id", getInt32Type());
            if (sink.getType().is(SinkType.POSTGRES) && options.isColumnTypePropagated()) {
                assertColumn(sink, sinkRecord, "data", "SERIAL");
            } else {
                assertColumn(sink, sinkRecord, "data", getInt32Type());
            }
        }, (v0, v1) -> {
            return v0.getInt(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No BIGSERIAL data type support")
    public void testBigSerialDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "bigserial", List.of(10, 12), sinkRecord -> {
            SourceConnectorOptions options = source.getOptions();
            assertColumn(sink, sinkRecord, "id", getInt64Type());
            if (sink.getType().is(SinkType.POSTGRES) && options.isColumnTypePropagated()) {
                assertColumn(sink, sinkRecord, "data", "BIGSERIAL");
            } else {
                assertColumn(sink, sinkRecord, "data", getInt64Type());
            }
        }, (v0, v1) -> {
            return v0.getInt(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No MEDIUMINT data type support")
    public void testMediumIntDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "mediumint", List.of(10, 12), sinkRecord -> {
            SourceConnectorOptions options = source.getOptions();
            assertColumn(sink, sinkRecord, "id", getInt32Type());
            if (sink.getType().is(SinkType.MYSQL) && options.isColumnTypePropagated()) {
                assertColumn(sink, sinkRecord, "data", "MEDIUMINT");
            } else {
                assertColumn(sink, sinkRecord, "data", getInt32Type());
            }
        }, (v0, v1) -> {
            return v0.getInt(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No MEDIUMINT(n) data type support")
    public void testMediumIntWithSizeDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "mediumint(2)", List.of(10, 12), sinkRecord -> {
            SourceConnectorOptions options = source.getOptions();
            assertColumn(sink, sinkRecord, "id", getInt32Type());
            if (sink.getType().is(SinkType.MYSQL) && options.isColumnTypePropagated()) {
                assertColumn(sink, sinkRecord, "data", "MEDIUMINT");
            } else {
                assertColumn(sink, sinkRecord, "data", getInt32Type());
            }
        }, (v0, v1) -> {
            return v0.getInt(v1);
        });
    }

    @TestTemplate
    public void testIntDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "int", List.of(10, 12), sinkRecord -> {
            if (source.getType().is(SourceType.ORACLE)) {
                assertColumn(sink, sinkRecord, "id", getDecimalType(), getMaxDecimalPrecision(), 0);
                assertColumn(sink, sinkRecord, "data", getDecimalType(), getMaxDecimalPrecision(), 0);
            } else {
                assertColumn(sink, sinkRecord, "id", getInt32Type());
                assertColumn(sink, sinkRecord, "data", getInt32Type());
            }
        }, (v0, v1) -> {
            return v0.getInt(v1);
        });
    }

    @TestTemplate
    public void testIntegerDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "integer", List.of(10, 12), sinkRecord -> {
            if (source.getType().is(SourceType.ORACLE)) {
                assertColumn(sink, sinkRecord, "id", getDecimalType(), getMaxDecimalPrecision(), 0);
                assertColumn(sink, sinkRecord, "data", getDecimalType(), getMaxDecimalPrecision(), 0);
            } else {
                assertColumn(sink, sinkRecord, "id", getInt32Type());
                assertColumn(sink, sinkRecord, "data", getInt32Type());
            }
        }, (v0, v1) -> {
            return v0.getInt(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No INTEGER(n) data type support")
    public void testIntegerWithSizeDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "integer(2)", List.of(10, 12), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getInt32Type());
            assertColumn(sink, sinkRecord, "data", getInt32Type());
        }, (v0, v1) -> {
            return v0.getInt(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.ORACLE}, reason = "No BIGINT data type support")
    public void testBigIntDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "bigint", List.of(10, 12), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getInt64Type());
            assertColumn(sink, sinkRecord, "data", getInt64Type());
        }, (v0, v1) -> {
            return v0.getInt(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No BIGINT(n) data type support")
    public void testBigIntWithSizeDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "bigint(2)", List.of(10, 12), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getInt64Type());
            assertColumn(sink, sinkRecord, "data", getInt64Type());
        }, (v0, v1) -> {
            return v0.getInt(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason = "No NUMBER data type support")
    public void testNumberDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "number", List.of(10, 12), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getVariableScaleDecimalType());
            assertColumn(sink, sinkRecord, "data", getVariableScaleDecimalType());
        }, (v0, v1) -> {
            return v0.getInt(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason = "No NUMBER(n) data type support")
    public void testNumberWithPrecisionDataType(Source source, Sink sink) throws Exception {
        assertDataTypes(source, sink, List.of("number(2)", "number(3)", "number(8)", "number(18)", "number(24)"), List.of(10, 12, 14, 16, 18), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id0", getInt8Type());
            assertColumn(sink, sinkRecord, "id1", getInt16Type());
            assertColumn(sink, sinkRecord, "id2", getInt32Type());
            assertColumn(sink, sinkRecord, "id3", getInt64Type());
            assertColumn(sink, sinkRecord, "id4", getDecimalType(), 24, 0);
            assertColumn(sink, sinkRecord, "data0", getInt8Type());
            assertColumn(sink, sinkRecord, "data1", getInt16Type());
            assertColumn(sink, sinkRecord, "data2", getInt32Type());
            assertColumn(sink, sinkRecord, "data3", getInt64Type());
            assertColumn(sink, sinkRecord, "data4", getDecimalType(), 24, 0);
        }, (v0, v1) -> {
            return v0.getInt(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason = "No NUMBER(n,s) data type support")
    public void testNumberWithPrecisionAndScaleDataType(Source source, Sink sink) throws Exception {
        assertDataTypes(source, sink, List.of("number(2,1)", "number(3,1)", "number(8,1)", "number(18,1)", "number(24,1)"), List.of(Double.valueOf(1.0d), Double.valueOf(10.0d), Double.valueOf(11.0d), Double.valueOf(12.0d), Double.valueOf(13.0d)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id0", getDecimalType(), 2, 1);
            assertColumn(sink, sinkRecord, "id1", getDecimalType(), 3, 1);
            assertColumn(sink, sinkRecord, "id2", getDecimalType(), 8, 1);
            assertColumn(sink, sinkRecord, "id3", getDecimalType(), 18, 1);
            assertColumn(sink, sinkRecord, "id4", getDecimalType(), 24, 1);
            assertColumn(sink, sinkRecord, "data0", getDecimalType(), 2, 1);
            assertColumn(sink, sinkRecord, "data1", getDecimalType(), 3, 1);
            assertColumn(sink, sinkRecord, "data2", getDecimalType(), 8, 1);
            assertColumn(sink, sinkRecord, "data3", getDecimalType(), 18, 1);
            assertColumn(sink, sinkRecord, "data4", getDecimalType(), 24, 1);
        }, (v0, v1) -> {
            return v0.getDouble(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason = "No NUMBER(n,s) negative scale data type support")
    public void testNumberWithPrecisionAndNegativeScaleDataType(Source source, Sink sink) throws Exception {
        assertDataTypes(source, sink, List.of("number(2,-1)", "number(3,-1)", "number(8,-1)", "number(18,-1)", "number(24,-3)"), List.of(1L, 111L, 11111111L, 111111111111111111L, 111111111111111111L), List.of(0L, 110L, 11111110L, 111111111111111110L, 111111111111111000L), sinkRecord -> {
            boolean is = sink.getType().is(SinkType.MYSQL);
            assertColumn(sink, sinkRecord, "id0", is ? getInt16Type() : getInt8Type());
            assertColumn(sink, sinkRecord, "id1", getInt16Type());
            assertColumn(sink, sinkRecord, "id2", getInt32Type());
            assertColumn(sink, sinkRecord, "data0", is ? getInt16Type() : getInt8Type());
            assertColumn(sink, sinkRecord, "data1", getInt16Type());
            assertColumn(sink, sinkRecord, "data2", getInt32Type());
            if (SinkType.ORACLE.is(sink.getType())) {
                assertColumn(sink, sinkRecord, "id3", getDecimalType(), 18, -1);
                assertColumn(sink, sinkRecord, "id4", getDecimalType(), 24, -3);
                assertColumn(sink, sinkRecord, "data3", getDecimalType(), 18, -1);
                assertColumn(sink, sinkRecord, "data4", getDecimalType(), 24, -3);
                return;
            }
            assertColumn(sink, sinkRecord, "id3", getDecimalType(), 18, 0);
            assertColumn(sink, sinkRecord, "id4", getDecimalType(), 24, 0);
            assertColumn(sink, sinkRecord, "data3", getDecimalType(), 18, 0);
            assertColumn(sink, sinkRecord, "data4", getDecimalType(), 24, 0);
        }, (v0, v1) -> {
            return v0.getLong(v1);
        });
    }

    @TestTemplate
    public void testNumericDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "numeric", List.of(10, 12), sinkRecord -> {
            if (source.getType().is(SourceType.POSTGRES)) {
                assertColumn(sink, sinkRecord, "id", getVariableScaleDecimalType());
                assertColumn(sink, sinkRecord, "data", getVariableScaleDecimalType());
            } else {
                assertColumn(sink, sinkRecord, "id", getDecimalType());
                assertColumn(sink, sinkRecord, "data", getDecimalType());
            }
        }, (v0, v1) -> {
            return v0.getInt(v1);
        });
    }

    @TestTemplate
    public void testNumericWithPrecisionDataType(Source source, Sink sink) throws Exception {
        assertDataTypes(source, sink, List.of("numeric(2)", "numeric(3)", "numeric(8)", "numeric(18)", "numeric(24)"), List.of(10L, 11L, 12L, 13L, 14L), sinkRecord -> {
            if (SourceType.ORACLE.is(source.getType())) {
                assertColumn(sink, sinkRecord, "id0", getInt8Type());
                assertColumn(sink, sinkRecord, "id1", getInt16Type());
                assertColumn(sink, sinkRecord, "id2", getInt32Type());
                assertColumn(sink, sinkRecord, "id3", getInt64Type());
                assertColumn(sink, sinkRecord, "id4", getDecimalType(), 24, 0);
                assertColumn(sink, sinkRecord, "data0", getInt8Type());
                assertColumn(sink, sinkRecord, "data1", getInt16Type());
                assertColumn(sink, sinkRecord, "data2", getInt32Type());
                assertColumn(sink, sinkRecord, "data3", getInt64Type());
                assertColumn(sink, sinkRecord, "data4", getDecimalType(), 24, 0);
                return;
            }
            assertColumn(sink, sinkRecord, "id0", getDecimalType(), 2, 0);
            assertColumn(sink, sinkRecord, "id1", getDecimalType(), 3, 0);
            assertColumn(sink, sinkRecord, "id2", getDecimalType(), 8, 0);
            assertColumn(sink, sinkRecord, "id3", getDecimalType(), 18, 0);
            assertColumn(sink, sinkRecord, "id4", getDecimalType(), 24, 0);
            assertColumn(sink, sinkRecord, "data0", getDecimalType(), 2, 0);
            assertColumn(sink, sinkRecord, "data1", getDecimalType(), 3, 0);
            assertColumn(sink, sinkRecord, "data2", getDecimalType(), 8, 0);
            assertColumn(sink, sinkRecord, "data3", getDecimalType(), 18, 0);
            assertColumn(sink, sinkRecord, "data4", getDecimalType(), 24, 0);
        }, (v0, v1) -> {
            return v0.getLong(v1);
        });
    }

    @TestTemplate
    public void testNumericWithPrecisionAndScaleDataType(Source source, Sink sink) throws Exception {
        assertDataTypes(source, sink, List.of("numeric(2,1)", "numeric(3,1)", "numeric(8,1)", "numeric(18,1)", "numeric(24,1)"), List.of(Double.valueOf(1.0d), Double.valueOf(10.0d), Double.valueOf(11.0d), Double.valueOf(12.0d), Double.valueOf(13.0d)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id0", getDecimalType(), 2, 1);
            assertColumn(sink, sinkRecord, "id1", getDecimalType(), 3, 1);
            assertColumn(sink, sinkRecord, "id2", getDecimalType(), 8, 1);
            assertColumn(sink, sinkRecord, "id3", getDecimalType(), 18, 1);
            assertColumn(sink, sinkRecord, "id4", getDecimalType(), 24, 1);
            assertColumn(sink, sinkRecord, "data0", getDecimalType(), 2, 1);
            assertColumn(sink, sinkRecord, "data1", getDecimalType(), 3, 1);
            assertColumn(sink, sinkRecord, "data2", getDecimalType(), 8, 1);
            assertColumn(sink, sinkRecord, "data3", getDecimalType(), 18, 1);
            assertColumn(sink, sinkRecord, "data4", getDecimalType(), 24, 1);
        }, (v0, v1) -> {
            return v0.getDouble(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason = "No NUMERIC(n,s) negative scale data type support")
    public void testNumericWithPrecisionAndNegativeScaleDataType(Source source, Sink sink) throws Exception {
        assertDataTypes(source, sink, List.of("numeric(2,-1)", "numeric(3,-1)", "numeric(8,-1)", "numeric(18,-1)", "numeric(24,-3)"), List.of(1L, 111L, 11111111L, 111111111111111111L, 111111111111111111L), List.of(0L, 110L, 11111110L, 111111111111111110L, 111111111111111000L), sinkRecord -> {
            boolean is = sink.getType().is(SinkType.ORACLE);
            boolean is2 = sink.getType().is(SinkType.MYSQL);
            assertColumn(sink, sinkRecord, "id0", is2 ? getInt16Type() : getInt8Type());
            assertColumn(sink, sinkRecord, "id1", getInt16Type());
            assertColumn(sink, sinkRecord, "id2", getInt32Type());
            assertColumn(sink, sinkRecord, "id3", getDecimalType(), 18, is ? -1 : 0);
            assertColumn(sink, sinkRecord, "id4", getDecimalType(), 24, is ? -3 : 0);
            assertColumn(sink, sinkRecord, "data0", is2 ? getInt16Type() : getInt8Type());
            assertColumn(sink, sinkRecord, "data1", getInt16Type());
            assertColumn(sink, sinkRecord, "data2", getInt32Type());
            assertColumn(sink, sinkRecord, "data3", getDecimalType(), 18, is ? -1 : 0);
            assertColumn(sink, sinkRecord, "data4", getDecimalType(), 24, is ? -3 : 0);
        }, (v0, v1) -> {
            return v0.getLong(v1);
        });
    }

    @TestTemplate
    public void testDecimalDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "decimal", List.of(10, 12), sinkRecord -> {
            if (source.getType().is(SourceType.POSTGRES)) {
                assertColumn(sink, sinkRecord, "id", getVariableScaleDecimalType());
                assertColumn(sink, sinkRecord, "data", getVariableScaleDecimalType());
            } else {
                assertColumn(sink, sinkRecord, "id", getDecimalType());
                assertColumn(sink, sinkRecord, "data", getDecimalType());
            }
        }, (v0, v1) -> {
            return v0.getInt(v1);
        });
    }

    @TestTemplate
    public void testDecimalWithPrecisionDataType(Source source, Sink sink) throws Exception {
        assertDataTypes(source, sink, List.of("decimal(2)", "decimal(3)", "decimal(8)", "decimal(18)", "decimal(24)"), List.of(10L, 11L, 12L, 13L, 14L), sinkRecord -> {
            if (SourceType.ORACLE.is(source.getType())) {
                assertColumn(sink, sinkRecord, "id0", getInt8Type());
                assertColumn(sink, sinkRecord, "id1", getInt16Type());
                assertColumn(sink, sinkRecord, "id2", getInt32Type());
                assertColumn(sink, sinkRecord, "id3", getInt64Type());
                assertColumn(sink, sinkRecord, "id4", getDecimalType(), 24, 0);
                assertColumn(sink, sinkRecord, "data0", getInt8Type());
                assertColumn(sink, sinkRecord, "data1", getInt16Type());
                assertColumn(sink, sinkRecord, "data2", getInt32Type());
                assertColumn(sink, sinkRecord, "data3", getInt64Type());
                assertColumn(sink, sinkRecord, "data4", getDecimalType(), 24, 0);
                return;
            }
            assertColumn(sink, sinkRecord, "id0", getDecimalType(), 2, 0);
            assertColumn(sink, sinkRecord, "id1", getDecimalType(), 3, 0);
            assertColumn(sink, sinkRecord, "id2", getDecimalType(), 8, 0);
            assertColumn(sink, sinkRecord, "id3", getDecimalType(), 18, 0);
            assertColumn(sink, sinkRecord, "id4", getDecimalType(), 24, 0);
            assertColumn(sink, sinkRecord, "data0", getDecimalType(), 2, 0);
            assertColumn(sink, sinkRecord, "data1", getDecimalType(), 3, 0);
            assertColumn(sink, sinkRecord, "data2", getDecimalType(), 8, 0);
            assertColumn(sink, sinkRecord, "data3", getDecimalType(), 18, 0);
            assertColumn(sink, sinkRecord, "data4", getDecimalType(), 24, 0);
        }, (v0, v1) -> {
            return v0.getLong(v1);
        });
    }

    @TestTemplate
    public void testDecimalWithPrecisionAndScaleDataType(Source source, Sink sink) throws Exception {
        assertDataTypes(source, sink, List.of("decimal(2,1)", "decimal(3,1)", "decimal(8,1)", "decimal(18,1)", "decimal(24,1)"), List.of(Double.valueOf(1.0d), Double.valueOf(10.0d), Double.valueOf(11.0d), Double.valueOf(12.0d), Double.valueOf(13.0d)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id0", getDecimalType(), 2, 1);
            assertColumn(sink, sinkRecord, "id1", getDecimalType(), 3, 1);
            assertColumn(sink, sinkRecord, "id2", getDecimalType(), 8, 1);
            assertColumn(sink, sinkRecord, "id3", getDecimalType(), 18, 1);
            assertColumn(sink, sinkRecord, "id4", getDecimalType(), 24, 1);
            assertColumn(sink, sinkRecord, "data0", getDecimalType(), 2, 1);
            assertColumn(sink, sinkRecord, "data1", getDecimalType(), 3, 1);
            assertColumn(sink, sinkRecord, "data2", getDecimalType(), 8, 1);
            assertColumn(sink, sinkRecord, "data3", getDecimalType(), 18, 1);
            assertColumn(sink, sinkRecord, "data4", getDecimalType(), 24, 1);
        }, (v0, v1) -> {
            return v0.getDouble(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason = "No DECIMAL(n,s) negative scale data type support")
    public void testDecimalWithPrecisionAndNegativeScaleDataType(Source source, Sink sink) throws Exception {
        assertDataTypes(source, sink, List.of("decimal(2,-1)", "decimal(3,-1)", "decimal(8,-1)", "decimal(18,-1)", "decimal(24,-3)"), List.of(1L, 111L, 11111111L, 111111111111111111L, 111111111111111111L), List.of(0L, 110L, 11111110L, 111111111111111110L, 111111111111111000L), sinkRecord -> {
            boolean is = sink.getType().is(SinkType.MYSQL);
            assertColumn(sink, sinkRecord, "id0", is ? getInt16Type() : getInt8Type());
            assertColumn(sink, sinkRecord, "id1", getInt16Type());
            assertColumn(sink, sinkRecord, "id2", getInt32Type());
            assertColumn(sink, sinkRecord, "data0", is ? getInt16Type() : getInt8Type());
            assertColumn(sink, sinkRecord, "data1", getInt16Type());
            assertColumn(sink, sinkRecord, "data2", getInt32Type());
            if (SinkType.ORACLE.is(sink.getType())) {
                assertColumn(sink, sinkRecord, "id3", getDecimalType(), 18, -1);
                assertColumn(sink, sinkRecord, "id4", getDecimalType(), 24, -3);
                assertColumn(sink, sinkRecord, "data3", getDecimalType(), 18, -1);
                assertColumn(sink, sinkRecord, "data4", getDecimalType(), 24, -3);
                return;
            }
            assertColumn(sink, sinkRecord, "id3", getDecimalType(), 18, 0);
            assertColumn(sink, sinkRecord, "id4", getDecimalType(), 24, 0);
            assertColumn(sink, sinkRecord, "data3", getDecimalType(), 18, 0);
            assertColumn(sink, sinkRecord, "data4", getDecimalType(), 24, 0);
        }, (v0, v1) -> {
            return v0.getLong(v1);
        });
    }

    @TestTemplate
    public void testRealDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "real", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), connectorConfiguration -> {
            applyJdbcSourceConverter(source, connectorConfiguration, null, ".*id|.*data", null);
        }, sinkRecord -> {
            String float64Type = source.getType().is(SourceType.ORACLE, SourceType.MYSQL) ? getFloat64Type() : getFloat32Type();
            assertColumn(sink, sinkRecord, "id", float64Type);
            assertColumn(sink, sinkRecord, "data", float64Type);
        }, (v0, v1) -> {
            return v0.getFloat(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "Applies to MySQL JDBC custom converter")
    public void testRealDataTypeTreatAsFloat(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "real", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), connectorConfiguration -> {
            applyJdbcSourceConverter(source, connectorConfiguration, null, ".*id|.*data", null);
            connectorConfiguration.with("jdbc-sink.treat.real.as.double", "false");
        }, sinkRecord -> {
            String float64Type = source.getType().is(SourceType.ORACLE) ? getFloat64Type() : getFloat32Type();
            assertColumn(sink, sinkRecord, "id", float64Type);
            assertColumn(sink, sinkRecord, "data", float64Type);
        }, (v0, v1) -> {
            return v0.getFloat(v1);
        });
    }

    @Disabled("Not supported by any of our current source connectors")
    @TestTemplate
    public void testRealWithPrecisionDataType(Source source, Sink sink) throws Exception {
        throw new IllegalStateException("Not yet implemented");
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No REAL(p,s) data type support")
    public void testRealWithPrecisionAndScaleDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "real(4, 2)", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), connectorConfiguration -> {
            applyJdbcSourceConverter(source, connectorConfiguration, null, ".*id|.*data", null);
        }, sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getFloat64Type());
            assertColumn(sink, sinkRecord, "data", getFloat64Type());
        }, (v0, v1) -> {
            return v0.getFloat(v1);
        });
    }

    @TestTemplate
    public void testFloatDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "float", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), sinkRecord -> {
            String float64Type = !source.getType().is(SourceType.MYSQL) ? getFloat64Type() : getFloat32Type();
            assertColumn(sink, sinkRecord, "id", float64Type);
            assertColumn(sink, sinkRecord, "data", float64Type);
        }, (v0, v1) -> {
            return v0.getFloat(v1);
        });
    }

    @TestTemplate
    public void testFloatWithPrecisionDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "float(8)", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), sinkRecord -> {
            String float64Type = source.getType().is(SourceType.ORACLE) ? getFloat64Type() : getFloat32Type();
            assertColumn(sink, sinkRecord, "id", float64Type);
            assertColumn(sink, sinkRecord, "data", float64Type);
        }, (v0, v1) -> {
            return v0.getFloat(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No FLOAT(p,s) data type support")
    public void testFloatWithPrecisionAndScaleDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "float(4, 2)", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getFloat64Type());
            assertColumn(sink, sinkRecord, "data", getFloat64Type());
        }, (v0, v1) -> {
            return v0.getFloat(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No DOUBLE data type support")
    public void testDoubleDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "double", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getFloat64Type());
            assertColumn(sink, sinkRecord, "data", getFloat64Type());
        }, (v0, v1) -> {
            return v0.getFloat(v1);
        });
    }

    @Disabled("Not supported by any of our currently supported source databases")
    @TestTemplate
    public void testDoubleWithPrecisionDataType(Source source, Sink sink) throws Exception {
        throw new IllegalStateException("Not yet implemented");
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No DOUBLE(p,s) data type support")
    public void testDoubleWithPrecisionAndScaleDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "double(4, 2)", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getFloat64Type());
            assertColumn(sink, sinkRecord, "data", getFloat64Type());
        }, (v0, v1) -> {
            return v0.getFloat(v1);
        });
    }

    @TestTemplate
    public void testDoublePrecisionDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "double precision", List.of(Double.valueOf(3.14d), Double.valueOf(3.14d)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getFloat64Type());
            assertColumn(sink, sinkRecord, "data", getFloat64Type());
        }, (v0, v1) -> {
            return v0.getDouble(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason = "No BINARY_FLOAT data type support")
    public void testBinaryFloatDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "binary_float", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getFloat32Type());
            assertColumn(sink, sinkRecord, "data", getFloat32Type());
        }, (v0, v1) -> {
            return v0.getFloat(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason = "No BINARY_DOUBLE data type support")
    public void testBinaryDoubleDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "binary_double", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getFloat64Type());
            assertColumn(sink, sinkRecord, "data", getFloat64Type());
        }, (v0, v1) -> {
            return v0.getFloat(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.ORACLE}, reason = "No SMALLMONEY data type support")
    public void testSmallMoneyDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "smallmoney", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getDecimalType(), 10, 4);
            assertColumn(sink, sinkRecord, "data", getDecimalType(), 10, 4);
        }, (v0, v1) -> {
            return v0.getFloat(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.ORACLE}, reason = "No MONEY data type support")
    public void testMoneyDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "money", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getDecimalType(), 19, 4);
            assertColumn(sink, sinkRecord, "data", getDecimalType(), 19, 4);
        }, (v0, v1) -> {
            return v0.getFloat(v1);
        });
    }

    @TestTemplate
    public void testDecimalHandlingModeDouble(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "decimal", List.of(Float.valueOf(10.0f), Float.valueOf(12.0f)), connectorConfiguration -> {
            connectorConfiguration.with("decimal.handling.mode", RelationalDatabaseConnectorConfig.DecimalHandlingMode.DOUBLE.getValue());
        }, sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getFloat64Type());
            assertColumn(sink, sinkRecord, "data", getFloat64Type());
        }, (v0, v1) -> {
            return v0.getFloat(v1);
        });
    }

    @TestTemplate
    public void testDecimalHandlingModeString(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "decimal", List.of(Float.valueOf(10.0f), Float.valueOf(12.0f)), source.getType() == SourceType.POSTGRES ? List.of("10.0", "12.0") : List.of("10", "12"), connectorConfiguration -> {
            connectorConfiguration.with("decimal.handling.mode", RelationalDatabaseConnectorConfig.DecimalHandlingMode.STRING.getValue());
        }, sinkRecord -> {
            switch (sink.getType()) {
                case MYSQL:
                    assertColumn(sink, sinkRecord, "id", "VARCHAR", 255);
                    assertColumn(sink, sinkRecord, "data", "LONGTEXT", Integer.MAX_VALUE);
                    return;
                case DB2:
                    assertColumn(sink, sinkRecord, "id", "VARCHAR", 512);
                    assertColumn(sink, sinkRecord, "data", "CLOB");
                    return;
                case ORACLE:
                    assertColumn(sink, sinkRecord, "id", "VARCHAR2", 4000);
                    assertColumn(sink, sinkRecord, "data", "CLOB");
                    return;
                case SQLSERVER:
                    assertColumn(sink, sinkRecord, "id", "VARCHAR", 900);
                    assertColumn(sink, sinkRecord, "data", "VARCHAR", Integer.MAX_VALUE);
                    return;
                case POSTGRES:
                    assertColumn(sink, sinkRecord, "id", "TEXT");
                    assertColumn(sink, sinkRecord, "data", "TEXT");
                    return;
                default:
                    return;
            }
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @TestTemplate
    public void testCharDataType(Source source, Sink sink) throws Exception {
        assertCharDataType(source, sink, "char", false);
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.ORACLE}, reason = "Awaiting the merging of DBZ-6221 upstream")
    public void testCharacterDataType(Source source, Sink sink) throws Exception {
        assertCharDataType(source, sink, "character", false);
    }

    @TestTemplate
    public void testCharWithLengthDataType(Source source, Sink sink) throws Exception {
        assertCharWithLengthDataType(source, sink, "char(5)", 5, false);
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.ORACLE}, reason = "Awaiting the merging of DBZ-6221 upstream")
    public void testCharacterWithLengthDataType(Source source, Sink sink) throws Exception {
        assertCharWithLengthDataType(source, sink, "character(5)", 5, false);
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES}, reason = "NCHAR is treated as CHAR as PostgreSQL does not use nationalized types")
    public void testNationalizedCharDataType(Source source, Sink sink) throws Exception {
        assertCharDataType(source, sink, "nchar", true);
    }

    @TestTemplate
    @SkipWhenSources({@SkipWhenSource(value = {SourceType.POSTGRES}, reason = "NCHARACTER is treated as CHAR as PostgreSQL does not use nationalized types"), @SkipWhenSource(value = {SourceType.MYSQL, SourceType.ORACLE}, reason = "NCHARACTER not a supported data type")})
    public void testNationalizedCharacterDataType(Source source, Sink sink) throws Exception {
        assertCharDataType(source, sink, "ncharacter", true);
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES}, reason = "NCHAR(n) is treated as CHAR(n) as PostgreSQL does not use nationalized types")
    public void testNationalizedCharWithLengthDataType(Source source, Sink sink) throws Exception {
        assertCharWithLengthDataType(source, sink, "nchar(5)", 5, true);
    }

    @TestTemplate
    @SkipWhenSources({@SkipWhenSource(value = {SourceType.POSTGRES}, reason = "NCHAR(n) is treated as CHAR(n) as PostgreSQL does not use nationalized types"), @SkipWhenSource(value = {SourceType.MYSQL, SourceType.ORACLE}, reason = "NCHARACTER(n) not a supported data type")})
    public void testNationalizedCharacterWithLengthDataType(Source source, Sink sink) throws Exception {
        assertCharWithLengthDataType(source, sink, "ncharacter(5)", 5, true);
    }

    @TestTemplate
    public void testVarcharDataType(Source source, Sink sink) throws Exception {
        assertVarcharDataType(source, sink, "varchar(25)", 25, false);
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason = "No VARCHAR2(n) data type support")
    public void testVarchar2DataType(Source source, Sink sink) throws Exception {
        assertVarcharDataType(source, sink, "varchar2(25)", 25, false);
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE}, reason = "No NVARCHAR(n) data type support")
    public void testNVarcharDataType(Source source, Sink sink) throws Exception {
        assertVarcharDataType(source, sink, "nvarchar(25)", 25, true);
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason = "No NVARCHAR2(n) data type support")
    public void testNVarchar2DataType(Source source, Sink sink) throws Exception {
        assertVarcharDataType(source, sink, "nvarchar2(25)", 25, true);
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.ORACLE}, reason = "Awaiting the merging of DBZ-6221 upstream")
    public void testCharacterVaryingDataType(Source source, Sink sink) throws Exception {
        assertVarcharDataType(source, sink, "character varying(25)", 25, false);
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.ORACLE}, reason = "No NCHARACTER VARYING(n) data type support")
    public void testNCharacterVaryingDataType(Source source, Sink sink) throws Exception {
        assertVarcharDataType(source, sink, "ncharacter varying(25)", 25, true);
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No TINYTEXT data type support")
    public void testTinyTextDataType(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "tinytext", List.of("'hello world'"), List.of("hello world"), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getTextType());
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No MEDIUMTEXT data type support")
    public void testMediumTextDataType(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "mediumtext", List.of("'hello world'"), List.of("hello world"), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getTextType());
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No MEDIUMTEXT data type support")
    public void testLongTextDataType(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "longtext", List.of("'hello world'"), List.of("hello world"), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getTextType());
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.ORACLE}, reason = "No TEXT data type support")
    public void testTextDataType(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "text", List.of("'hello world'"), List.of("hello world"), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getTextType());
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.ORACLE}, reason = "No NTEXT data type support")
    public void testNTextDataType(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "ntext", List.of("'hello world'"), List.of("hello world"), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getTextType());
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason = "No CLOB data type support")
    public void testClobDataType(Source source, Sink sink) throws Exception {
        String randomAlphanumeric = RandomStringUtils.randomAlphanumeric(65536);
        assertDataTypeNonKeyOnly(source, sink, "clob", (preparedStatement, i) -> {
            Clob createClob = preparedStatement.getConnection().createClob();
            createClob.setString(1L, randomAlphanumeric);
            preparedStatement.setClob(i, createClob);
        }, List.of(randomAlphanumeric), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getTextType());
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason = "No CLOB data type support")
    public void testClobDataTypeWithUpsert(Source source, Sink sink) throws Exception {
        String randomAlphanumeric = RandomStringUtils.randomAlphanumeric(65536);
        assertDataTypeNonKeyOnly(source, sink, "clob", (preparedStatement, i) -> {
            Clob createClob = preparedStatement.getConnection().createClob();
            createClob.setString(1L, randomAlphanumeric);
            preparedStatement.setClob(i, createClob);
        }, List.of(randomAlphanumeric), connectorConfiguration -> {
            connectorConfiguration.with("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue());
            connectorConfiguration.with("insert.mode", JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue());
        }, sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getTextType());
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason = "No NCLOB data type support")
    public void testNClobDataType(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "nclob", List.of("'hello world'"), List.of("hello world"), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getTextType());
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE}, reason = "No BINARY(n) data type support")
    public void testBinaryDataType(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "binary(15)", List.of(binaryValue(source, "binary(15)", "'hello world'")), List.of(byteArrayPadded("hello world", 15)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getBinaryType(source, "binary"));
        }, (v0, v1) -> {
            return v0.getBytes(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE}, reason = "No VARBINARY(n) data type support")
    public void testVarBinaryDataType(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "varbinary(15)", List.of(binaryValue(source, "varbinary(15)", "'hello world'")), List.of("hello world".getBytes(StandardCharsets.UTF_8)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getBinaryType(source, "varbinary"));
        }, (v0, v1) -> {
            return v0.getBytes(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No TINYBLOB data type support")
    public void testTinyBlobDataType(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "tinyblob", List.of("'hello world'"), List.of("hello world".getBytes(StandardCharsets.UTF_8)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getBinaryType(source, "tinyblob"));
        }, (v0, v1) -> {
            return v0.getBytes(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No MEDIUMBLOB data type support")
    public void testMediumBlobDataType(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "mediumblob", List.of("'hello world'"), List.of("hello world".getBytes(StandardCharsets.UTF_8)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getBinaryType(source, "mediumblob"));
        }, (v0, v1) -> {
            return v0.getBytes(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No LONGBLOB data type support")
    public void testLongBlobDataType(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "longblob", List.of("'hello world'"), List.of("hello world".getBytes(StandardCharsets.UTF_8)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getBinaryType(source, "longblob"));
        }, (v0, v1) -> {
            return v0.getBytes(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.SQLSERVER}, reason = "No BLOB data type support")
    public void testBlobDataType(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "blob", List.of(source.getType().is(SourceType.ORACLE) ? "UTL_RAW.CAST_TO_RAW('hello world')" : "'hello world'"), List.of("hello world".getBytes(StandardCharsets.UTF_8)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getBinaryType(source, "blob"));
        }, (v0, v1) -> {
            return v0.getBytes(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "Only a single source is needed")
    public void testBinaryHandlingModeBase64(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "varbinary(35)", List.of(binaryValue(source, "varbinary(35)", "'hello world'")), List.of(Base64.getEncoder().encodeToString("hello world".getBytes(StandardCharsets.UTF_8))), connectorConfiguration -> {
            connectorConfiguration.with("binary.handling.mode", CommonConnectorConfig.BinaryHandlingMode.BASE64.getValue());
        }, sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getStringType(source, false, false, true));
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "Only a single source is needed")
    public void testBinaryHandlingModeBase64UrlSafe(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "varbinary(35)", List.of(binaryValue(source, "varbinary(35)", "'hello world'")), List.of(Base64.getUrlEncoder().encodeToString("hello world".getBytes(StandardCharsets.UTF_8))), connectorConfiguration -> {
            connectorConfiguration.with("binary.handling.mode", CommonConnectorConfig.BinaryHandlingMode.BASE64_URL_SAFE.getValue());
        }, sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getStringType(source, false, false, true));
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "Only a single source is needed")
    public void testBinaryHandlingModeHex(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "varbinary(35)", List.of(binaryValue(source, "varbinary(35)", "'hello world'")), List.of(HexConverter.convertToHexString("hello world".getBytes(StandardCharsets.UTF_8))), connectorConfiguration -> {
            connectorConfiguration.with("binary.handling.mode", CommonConnectorConfig.BinaryHandlingMode.HEX.getValue());
        }, sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getStringType(source, false, false, true));
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No JSON data type support")
    public void testJsonDataType(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "json", List.of(String.format("'%s'", "{\"key\": \"value\"}")), List.of(new ObjectMapper().readTree("{\"key\": \"value\"}")), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getJsonType(source));
        }, (resultSet, i) -> {
            return new ObjectMapper().readTree(resultSet.getString(i));
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No JSONB data type support")
    public void testJsonbDataType(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "jsonb", List.of(String.format("'%s'", "{\"key\": \"value\"}")), List.of(new ObjectMapper().readTree("{\"key\": \"value\"}")), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getJsonbType(source));
        }, (resultSet, i) -> {
            return new ObjectMapper().readTree(resultSet.getString(i));
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.ORACLE}, reason = "No XML data type support")
    public void testXmlDataType(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "xml", List.of("'<doc>abc</doc>'"), List.of("<doc>abc</doc>"), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getXmlType(source));
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No UUID data type support")
    public void testUuidDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "uuid", List.of("'77412aae-c023-11ed-afa1-0242ac120002'", "'ed338923-f8ac-404c-87e7-e1ba5a122a12'"), List.of("77412aae-c023-11ed-afa1-0242ac120002", "ed338923-f8ac-404c-87e7-e1ba5a122a12"), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getUuidType(source, true));
            assertColumn(sink, sinkRecord, "data", getUuidType(source, false));
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No ENUM data type support")
    public void testEnumDataType(Source source, Sink sink) throws Exception {
        String str;
        if (SourceType.POSTGRES.is(source.getType())) {
            str = source.randomObjectName();
            source.execute(String.format("CREATE TYPE %s as ENUM ('apples', 'oranges')", str));
        } else {
            str = "enum('apples', 'oranges')";
        }
        assertDataType(source, sink, str, List.of("'apples'", "'oranges'"), List.of("apples", "oranges"), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getEnumType(source, true));
            assertColumn(sink, sinkRecord, "data", getEnumType(source, false));
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No SET data type support")
    public void testSetDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "set('apples','oranges')", List.of("'apples'", "'oranges'"), List.of("apples", "oranges"), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getSetType(source, true));
            assertColumn(sink, sinkRecord, "data", getSetType(source, false));
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No SET data type support")
    public void testYearDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "year", List.of(1969, 2023), sinkRecord -> {
            if (SinkType.MYSQL.is(sink.getType())) {
                assertColumn(sink, sinkRecord, "id", getYearType(), 4);
                assertColumn(sink, sinkRecord, "data", getYearType(), 4);
            } else {
                assertColumn(sink, sinkRecord, "id", getYearType());
                assertColumn(sink, sinkRecord, "data", getYearType());
            }
        }, (v0, v1) -> {
            return v0.getInt(v1);
        });
    }

    @TestTemplate
    @WithTemporalPrecisionMode
    public void testDateDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "date", List.of(dateValue(source, 3, 1, 2023), dateValue(source, 5, 10, 2021)), List.of(Date.valueOf("2023-03-01"), Date.valueOf("2021-05-10")), sinkRecord -> {
            if (SourceType.ORACLE.is(source.getType())) {
                assertColumn(sink, sinkRecord, "id", getTimestampType(source, true, 6));
                assertColumn(sink, sinkRecord, "data", getTimestampType(source, false, 6));
            } else {
                assertColumn(sink, sinkRecord, "id", getDateType());
                assertColumn(sink, sinkRecord, "data", getDateType());
            }
        }, (v0, v1) -> {
            return v0.getDate(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.ORACLE}, reason = "No TIME data type support")
    @WithTemporalPrecisionMode
    public void testTimeDataType(Source source, Sink sink) throws Exception {
        boolean z = source.getOptions().getTemporalPrecisionMode() == TemporalPrecisionMode.CONNECT;
        int i = z ? 123000000 : 123456000;
        switch (source.getType()) {
            case MYSQL:
                i = 0;
                break;
        }
        switch (sink.getType()) {
            case MYSQL:
                if (!source.getType().is(SourceType.POSTGRES)) {
                    if (!source.getType().is(SourceType.SQLSERVER)) {
                        i = 0;
                        break;
                    }
                } else {
                    i = z ? 123000000 : 123456000;
                    break;
                }
                break;
            case DB2:
                i = 0;
                break;
        }
        assertDataType(source, sink, "time", List.of("'01:02:03.123456'", "'14:15:16.123456'"), List.of(OffsetTime.of(1, 2, 3, i, getCurrentSinkTimeOffset()), OffsetTime.of(14, 15, 16, i, getCurrentSinkTimeOffset())), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getTimeType(source, true, 6));
            assertColumn(sink, sinkRecord, "data", getTimeType(source, false, 6));
        }, this::getTimeAsOffsetTime);
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.ORACLE}, reason = "No TIME(n) data type support")
    @WithTemporalPrecisionMode
    public void testTimeWithPrecisionDataType(Source source, Sink sink) throws Exception {
        int i = 123000000;
        int i2 = source.getOptions().getTemporalPrecisionMode() == TemporalPrecisionMode.CONNECT ? 456000000 : 456789000;
        if (sink.getType().is(SinkType.DB2)) {
            i = 0;
            i2 = 0;
        }
        List of = List.of(OffsetTime.of(1, 2, 3, i, getCurrentSinkTimeOffset()), OffsetTime.of(14, 15, 16, i2, getCurrentSinkTimeOffset()), OffsetTime.of(1, 2, 3, i, getCurrentSinkTimeOffset()), OffsetTime.of(14, 15, 16, i2, getCurrentSinkTimeOffset()));
        int i3 = (!sink.getType().is(SinkType.ORACLE) || source.getOptions().isColumnTypePropagated()) ? 3 : 6;
        assertDataTypes2(source, sink, List.of("time(3)", "time(6)"), List.of("'01:02:03.123456'", "'14:15:16.456789'"), of, sinkRecord -> {
            assertColumn(sink, sinkRecord, "id0", getTimeType(source, true, i3));
            assertColumn(sink, sinkRecord, "id1", getTimeType(source, true, 6));
            assertColumn(sink, sinkRecord, "data0", getTimeType(source, false, i3));
            assertColumn(sink, sinkRecord, "data1", getTimeType(source, false, 6));
        }, this::getTimeAsOffsetTime);
    }

    @TestTemplate
    @SkipWhenSources({@SkipWhenSource(value = {SourceType.ORACLE}, reason = "No TIME(n) data type support"), @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES}, reason = "Max TIME(n) precision is 6")})
    @WithTemporalPrecisionMode
    public void testNanoTimeDataType(Source source, Sink sink) throws Exception {
        int i = source.getOptions().getTemporalPrecisionMode() == TemporalPrecisionMode.CONNECT ? 456000000 : 456789000;
        if (sink.getType().is(SinkType.DB2)) {
            i = 0;
        }
        assertDataTypeNonKeyOnly(source, sink, "time(7)", List.of("'14:15:16.456789012'"), List.of(OffsetTime.of(14, 15, 16, i, getCurrentSinkTimeOffset())), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getTimeType(source, false, 7));
        }, this::getTimeAsOffsetTime);
    }

    @TestTemplate
    @SkipWhenSources({@SkipWhenSource(value = {SourceType.SQLSERVER}, reason = "TIMESTAMP is an internal type and isn't the same as TIMESTAMP(n)"), @SkipWhenSource(value = {SourceType.MYSQL}, reason = "MySQL emits timestamps as ZonedTimestamp types, tested separately")})
    @WithTemporalPrecisionMode
    public void testTimestampDataType(Source source, Sink sink) throws Exception {
        List of = List.of(ZonedDateTime.of(2023, 5, 10, 16, 17, 18, 123456000, ZoneOffset.UTC), ZonedDateTime.of(2022, 12, 31, 14, 15, 16, 456789000, ZoneOffset.UTC));
        List<String> timestampStrings = toTimestampStrings(source, of);
        ArrayList arrayList = new ArrayList();
        if (TemporalPrecisionMode.CONNECT == source.getOptions().getTemporalPrecisionMode()) {
            arrayList.add(((ZonedDateTime) of.get(0)).with((TemporalField) ChronoField.NANO_OF_SECOND, 123000000L).withZoneSameLocal(SINK_ZONE_ID));
            arrayList.add(((ZonedDateTime) of.get(1)).with((TemporalField) ChronoField.NANO_OF_SECOND, 456000000L).withZoneSameLocal(SINK_ZONE_ID));
        } else {
            arrayList.addAll((Collection) of.stream().map(zonedDateTime -> {
                return zonedDateTime.withZoneSameLocal(SINK_ZONE_ID);
            }).collect(Collectors.toList()));
        }
        assertDataType(source, sink, "timestamp", timestampStrings, arrayList, sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getTimestampType(source, true, 6));
            assertColumn(sink, sinkRecord, "data", getTimestampType(source, false, 6));
        }, this::getTimestampAsZonedDateTime);
    }

    @TestTemplate
    @SkipWhenSources({@SkipWhenSource(value = {SourceType.SQLSERVER}, reason = "No TIMESTAMP(n) data type support"), @SkipWhenSource(value = {SourceType.MYSQL}, reason = "MySQL emits timestamps as ZonedTimestamp types, tested separately")})
    @WithTemporalPrecisionMode
    public void testTimestampWithPrecisionDataType(Source source, Sink sink) throws Exception {
        ZonedDateTime of = ZonedDateTime.of(2022, 12, 31, 14, 15, 16, 456789000, ZoneOffset.UTC);
        String str = toTimestampStrings(source, List.of(of)).get(0);
        ArrayList arrayList = new ArrayList();
        arrayList.add(of.with((TemporalField) ChronoField.NANO_OF_SECOND, 500000000L).withZoneSameLocal(SINK_ZONE_ID));
        arrayList.add(of.with((TemporalField) ChronoField.NANO_OF_SECOND, 460000000L).withZoneSameLocal(SINK_ZONE_ID));
        arrayList.add(of.with((TemporalField) ChronoField.NANO_OF_SECOND, 457000000L).withZoneSameLocal(SINK_ZONE_ID));
        if (TemporalPrecisionMode.CONNECT == source.getOptions().getTemporalPrecisionMode()) {
            arrayList.add(of.with((TemporalField) ChronoField.NANO_OF_SECOND, 456000000L).withZoneSameLocal(SINK_ZONE_ID));
            arrayList.add(of.with((TemporalField) ChronoField.NANO_OF_SECOND, 456000000L).withZoneSameLocal(SINK_ZONE_ID));
            arrayList.add(of.with((TemporalField) ChronoField.NANO_OF_SECOND, 456000000L).withZoneSameLocal(SINK_ZONE_ID));
        } else {
            arrayList.add(of.with((TemporalField) ChronoField.NANO_OF_SECOND, 456800000L).withZoneSameLocal(SINK_ZONE_ID));
            arrayList.add(of.with((TemporalField) ChronoField.NANO_OF_SECOND, 456790000L).withZoneSameLocal(SINK_ZONE_ID));
            arrayList.add(of.with((TemporalField) ChronoField.NANO_OF_SECOND, 456789000L).withZoneSameLocal(SINK_ZONE_ID));
        }
        assertDataTypes(source, sink, List.of("timestamp(1)", "timestamp(2)", "timestamp(3)", "timestamp(4)", "timestamp(5)", "timestamp(6)"), List.of(str, str, str, str, str, str), arrayList, sinkRecord -> {
            assertColumn(sink, sinkRecord, "id0", getTimestampType(source, true, 1));
            assertColumn(sink, sinkRecord, "id1", getTimestampType(source, true, 2));
            assertColumn(sink, sinkRecord, "id2", getTimestampType(source, true, 3));
            assertColumn(sink, sinkRecord, "id3", getTimestampType(source, true, 4));
            assertColumn(sink, sinkRecord, "id4", getTimestampType(source, true, 5));
            assertColumn(sink, sinkRecord, "id5", getTimestampType(source, true, 6));
            assertColumn(sink, sinkRecord, "data0", getTimestampType(source, false, 1));
            assertColumn(sink, sinkRecord, "data1", getTimestampType(source, false, 2));
            assertColumn(sink, sinkRecord, "data2", getTimestampType(source, false, 3));
            assertColumn(sink, sinkRecord, "data3", getTimestampType(source, false, 4));
            assertColumn(sink, sinkRecord, "data4", getTimestampType(source, false, 5));
            assertColumn(sink, sinkRecord, "data5", getTimestampType(source, false, 6));
        }, this::getTimestampAsZonedDateTime);
    }

    @ForSource(value = {SourceType.MYSQL}, reason = "MySQL emits TIMESTAMP(p) as ZonedTimestamp")
    @TestTemplate
    @WithTemporalPrecisionMode
    public void testTimestampDataTypeAsZonedTimestampType(Source source, Sink sink) throws Exception {
        List of = List.of(ZonedDateTime.of(2023, 5, 10, 16, 17, 18, 123456000, ZoneOffset.UTC), ZonedDateTime.of(2022, 12, 31, 14, 15, 16, 456789000, ZoneOffset.UTC));
        assertDataTypesNonKeyOnly(source, sink, List.of("timestamp", "timestamp"), toTimestampStrings(source, (List) of.stream().map(zonedDateTime -> {
            return zonedDateTime.withZoneSameInstant(SOURCE_ZONE_ID);
        }).collect(Collectors.toList())), (List) of.stream().map(zonedDateTime2 -> {
            return zonedDateTime2.with((TemporalField) ChronoField.NANO_OF_SECOND, 0L);
        }).collect(Collectors.toList()), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data0", getTimestampWithTimezoneType(source, false, 6));
            assertColumn(sink, sinkRecord, "data1", getTimestampWithTimezoneType(source, false, 6));
        }, (resultSet, i) -> {
            return resultSet.getTimestamp(i).toInstant().atZone(ZoneOffset.UTC);
        });
    }

    @ForSource(value = {SourceType.MYSQL}, reason = "MySQL emits TIMESTAMP(p) as ZonedTimestamp")
    @TestTemplate
    @WithTemporalPrecisionMode
    public void testTimestampWithPrecisionDataTypeAsZonedTimestampType(Source source, Sink sink) throws Exception {
        ZonedDateTime of = ZonedDateTime.of(2022, 12, 31, 14, 15, 16, 456789000, ZoneOffset.UTC);
        String str = toTimestampStrings(source, List.of(of.withZoneSameInstant(SOURCE_ZONE_ID))).get(0);
        ArrayList arrayList = new ArrayList();
        arrayList.add(of.with((TemporalField) ChronoField.NANO_OF_SECOND, 500000000L));
        arrayList.add(of.with((TemporalField) ChronoField.NANO_OF_SECOND, 460000000L));
        arrayList.add(of.with((TemporalField) ChronoField.NANO_OF_SECOND, 457000000L));
        arrayList.add(of.with((TemporalField) ChronoField.NANO_OF_SECOND, 456800000L));
        arrayList.add(of.with((TemporalField) ChronoField.NANO_OF_SECOND, 456790000L));
        arrayList.add(of.with((TemporalField) ChronoField.NANO_OF_SECOND, 456789000L));
        assertDataTypesNonKeyOnly(source, sink, List.of("timestamp(1)", "timestamp(2)", "timestamp(3)", "timestamp(4)", "timestamp(5)", "timestamp(6)"), List.of(str, str, str, str, str, str), arrayList, sinkRecord -> {
            assertColumn(sink, sinkRecord, "data0", getTimestampWithTimezoneType(source, false, 1));
            assertColumn(sink, sinkRecord, "data1", getTimestampWithTimezoneType(source, false, 2));
            assertColumn(sink, sinkRecord, "data2", getTimestampWithTimezoneType(source, false, 3));
            assertColumn(sink, sinkRecord, "data3", getTimestampWithTimezoneType(source, false, 4));
            assertColumn(sink, sinkRecord, "data4", getTimestampWithTimezoneType(source, false, 5));
            assertColumn(sink, sinkRecord, "data5", getTimestampWithTimezoneType(source, false, 6));
        }, (resultSet, i) -> {
            return resultSet.getTimestamp(i).toInstant().atZone(ZoneOffset.UTC);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No TIMESTAMPTZ data type support")
    @WithTemporalPrecisionMode
    public void testTimestampTzDataType(Source source, Sink sink) throws Exception {
        ZonedDateTime of = ZonedDateTime.of(2022, 12, 31, 14, 15, 16, 456789000, ZoneOffset.UTC);
        assertDataTypeNonKeyOnly(source, sink, "timestamptz", toTimestampWithTimeZoneStrings(source, List.of(of)), List.of(of.withZoneSameInstant(SINK_ZONE_ID)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getTimestampWithTimezoneType(source, false, 6));
        }, this::getTimestampWithTimeZoneAsZonedDateTime);
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.SQLSERVER}, reason = "No TIMESTAMP(n) WITH TIME ZONE data type support")
    @WithTemporalPrecisionMode
    public void testTimestampWithTimeZoneDataType(Source source, Sink sink) throws Exception {
        ZonedDateTime of = ZonedDateTime.of(2022, 12, 31, 14, 15, 16, 456789000, ZoneOffset.UTC);
        assertDataTypeNonKeyOnly(source, sink, "timestamp(6) with time zone", toTimestampWithTimeZoneStrings(source, List.of(of)), List.of(of.withZoneSameInstant(SINK_ZONE_ID)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getTimestampWithTimezoneType(source, false, 6));
        }, this::getTimestampWithTimeZoneAsZonedDateTime);
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason = "No TIMESTAMP(n) WITH LOCAL TIME ZONE data type support")
    @WithTemporalPrecisionMode
    public void testTimestampWithLocalTimeZoneDataType(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "timestamp(6) with local time zone", List.of("TO_TIMESTAMP('2022-12-31 14:15:16.456789', 'YYYY-MM-DD HH24:MI:SS.FF6')"), List.of(OffsetDateTime.of(2022, 12, 31, 14, 15, 16, 456789000, ZoneOffset.UTC).toLocalDateTime()), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getTimestampWithTimezoneType(source, false, 6));
        }, (resultSet, i) -> {
            return getTimestamp(resultSet, i).toLocalDateTime();
        });
    }

    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No TIME(n) WITH TIME ZONE data type support")
    @WithTemporalPrecisionMode
    @SkipWhenSinks({@SkipWhenSink(value = {SinkType.MYSQL}, reason = "MySQL has no support for TIME(n) with TIME ZONE support"), @SkipWhenSink(value = {SinkType.DB2}, reason = "There is an issue with Daylight Savings Time")})
    @TestTemplate
    public void testTimeWithTimeZoneDataType(Source source, Sink sink) throws Exception {
        String str;
        str = "'14:15:16.456789 -00:00'";
        assertDataTypeNonKeyOnly(source, sink, "time(6) with time zone", List.of(SourceType.ORACLE.is(source.getType()) ? String.format("TO_TIMESTAMP_TZ(%s,'HH24:MI:SS.FF6 TZH:TZM')", str) : "'14:15:16.456789 -00:00'"), List.of(OffsetTime.of(14, 15, 16, sink.getType().is(SinkType.DB2) ? 0 : 456789000, ZoneOffset.UTC)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getTimeWithTimezoneType());
        }, (resultSet, i) -> {
            return getTimestampWithTimeZoneAsZonedDateTime(resultSet, i).withZoneSameInstant((ZoneId) ZoneOffset.UTC).toOffsetDateTime().toOffsetTime();
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE}, reason = "No DATETIME data type support")
    @WithTemporalPrecisionMode
    public void testDateTimeDataType(Source source, Sink sink) throws Exception {
        int i = (SourceType.MYSQL.is(source.getType()) && source.getOptions().isColumnTypePropagated()) ? 6 : 3;
        int i2 = source.getType().is(SourceType.MYSQL) ? 0 : 457000000;
        int i3 = i;
        assertDataTypesNonKeyOnly(source, sink, List.of("datetime", "datetime"), List.of("'2023-05-10 16:00:00.456'", "'2023-01-10 16:00:00.456'"), List.of(toZonedDateTimeAtSinkOffset(2023, 5, 10, 16, 0, 0, i2), toZonedDateTimeAtSinkOffset(2023, 1, 10, 16, 0, 0, i2)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data0", getTimestampType(source, false, i3));
            assertColumn(sink, sinkRecord, "data1", getTimestampType(source, false, i3));
        }, this::getTimestampAsZonedDateTime);
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No DATETIME(n) data type support")
    @WithTemporalPrecisionMode
    public void testDateTimeWithPrecisionDataType(Source source, Sink sink) throws Exception {
        List<String> of = List.of("datetime(1)", "datetime(2)", "datetime(3)", "datetime(4)", "datetime(5)", "datetime(6)");
        List of2 = List.of("'2023-03-01 14:15:16.456789'", "'2023-03-01 14:15:16.456789'", "'2023-03-01 14:15:16.456789'", "'2023-03-01 14:15:16.456789'", "'2023-03-01 14:15:16.456789'", "'2023-03-01 14:15:16.456789'");
        boolean equals = TemporalPrecisionMode.CONNECT.equals(source.getOptions().getTemporalPrecisionMode());
        assertDataTypesNonKeyOnly(source, sink, of, of2, List.of(toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, 500000000), toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, 460000000), toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, 457000000), toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, equals ? 456000000 : 456800000), toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, equals ? 456000000 : 456790000), toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, equals ? 456000000 : 456789000)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data0", getTimestampType(source, false, 1));
            assertColumn(sink, sinkRecord, "data1", getTimestampType(source, false, 2));
            assertColumn(sink, sinkRecord, "data2", getTimestampType(source, false, 3));
            assertColumn(sink, sinkRecord, "data3", getTimestampType(source, false, 4));
            assertColumn(sink, sinkRecord, "data4", getTimestampType(source, false, 5));
            assertColumn(sink, sinkRecord, "data5", getTimestampType(source, false, 6));
        }, this::getTimestampAsZonedDateTime);
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.ORACLE}, reason = "No DATETIME2 data type support")
    @WithTemporalPrecisionMode
    public void testDateTime2DataType(Source source, Sink sink) throws Exception {
        int i = 456789000;
        if (source.getOptions().getTemporalPrecisionMode() == TemporalPrecisionMode.CONNECT) {
            i = 456000000;
        }
        assertDataTypeNonKeyOnly(source, sink, "datetime2", List.of("'2023-03-01 14:15:16.456789Z'"), List.of(toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, i)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getTimestampType(source, false, 6));
        }, this::getTimestampAsZonedDateTime);
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.ORACLE}, reason = "No DATETIME2(n) data type support")
    @WithTemporalPrecisionMode
    public void testDateTime2WithPrecisionDataType(Source source, Sink sink) throws Exception {
        List<String> of = List.of("datetime2(1)", "datetime2(2)", "datetime2(3)", "datetime2(4)", "datetime2(5)", "datetime2(6)", "datetime2(7)");
        List of2 = List.of("'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'");
        int i = 456789000;
        if (source.getOptions().isColumnTypePropagated() && SinkType.SQLSERVER.is(sink.getType())) {
            i = 456789100;
        }
        boolean equals = TemporalPrecisionMode.CONNECT.equals(source.getOptions().getTemporalPrecisionMode());
        assertDataTypesNonKeyOnly(source, sink, of, of2, List.of(toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, 500000000), toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, 460000000), toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, 457000000), toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, equals ? 456000000 : 456800000), toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, equals ? 456000000 : 456790000), toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, equals ? 456000000 : 456789000), toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, equals ? 456000000 : i)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data0", getTimestampType(source, false, 1));
            assertColumn(sink, sinkRecord, "data1", getTimestampType(source, false, 2));
            assertColumn(sink, sinkRecord, "data2", getTimestampType(source, false, 3));
            assertColumn(sink, sinkRecord, "data3", getTimestampType(source, false, 4));
            assertColumn(sink, sinkRecord, "data4", getTimestampType(source, false, 5));
            assertColumn(sink, sinkRecord, "data5", getTimestampType(source, false, 6));
            assertColumn(sink, sinkRecord, "data6", getTimestampType(source, false, 6));
        }, this::getTimestampAsZonedDateTime);
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.ORACLE}, reason = "No DATETIMEOFFSET data type support")
    @WithTemporalPrecisionMode
    public void testDateTimeOffsetDataType(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "datetimeoffset", List.of("'2023-03-01 14:15:16.456789Z'"), List.of(OffsetDateTime.of(2023, 3, 1, 14, 15, 16, 456789000, ZoneOffset.UTC)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getTimestampWithTimezoneType(source, false, 6));
        }, (resultSet, i) -> {
            return getTimestamp(resultSet, i).toInstant().atOffset(ZoneOffset.UTC);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.ORACLE}, reason = "No DATETIMEOFFSET(n) data type support")
    @WithTemporalPrecisionMode
    public void testDateTimeOffsetWithPrecisionDataType(Source source, Sink sink) throws Exception {
        List<String> of = List.of("datetimeoffset(1)", "datetimeoffset(2)", "datetimeoffset(3)", "datetimeoffset(4)", "datetimeoffset(5)", "datetimeoffset(6)", "datetimeoffset(7)");
        List of2 = List.of("'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'");
        int i = 456789000;
        if (sink.getType().is(SinkType.SQLSERVER) && source.getOptions().isColumnTypePropagated()) {
            i = 456789100;
        }
        assertDataTypesNonKeyOnly(source, sink, of, of2, List.of(OffsetDateTime.of(2023, 3, 1, 14, 15, 16, 500000000, ZoneOffset.UTC), OffsetDateTime.of(2023, 3, 1, 14, 15, 16, 460000000, ZoneOffset.UTC), OffsetDateTime.of(2023, 3, 1, 14, 15, 16, 457000000, ZoneOffset.UTC), OffsetDateTime.of(2023, 3, 1, 14, 15, 16, 456800000, ZoneOffset.UTC), OffsetDateTime.of(2023, 3, 1, 14, 15, 16, 456790000, ZoneOffset.UTC), OffsetDateTime.of(2023, 3, 1, 14, 15, 16, 456789000, ZoneOffset.UTC), OffsetDateTime.of(2023, 3, 1, 14, 15, 16, i, ZoneOffset.UTC)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data0", getTimestampWithTimezoneType(source, false, 1));
            assertColumn(sink, sinkRecord, "data1", getTimestampWithTimezoneType(source, false, 2));
            assertColumn(sink, sinkRecord, "data2", getTimestampWithTimezoneType(source, false, 3));
            assertColumn(sink, sinkRecord, "data3", getTimestampWithTimezoneType(source, false, 4));
            assertColumn(sink, sinkRecord, "data4", getTimestampWithTimezoneType(source, false, 5));
            assertColumn(sink, sinkRecord, "data5", getTimestampWithTimezoneType(source, false, 6));
            assertColumn(sink, sinkRecord, "data6", getTimestampWithTimezoneType(source, false, 6));
        }, (resultSet, i2) -> {
            return getTimestamp(resultSet, i2).toInstant().atOffset(ZoneOffset.UTC);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.ORACLE}, reason = "No SMALLDATETIME data type support")
    @WithTemporalPrecisionMode
    public void testSmallDateTimeDataType(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "smalldatetime", List.of("'2023-03-01 14:15:16'"), List.of(toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 0, 0)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getTimestampType(source, false, 6));
        }, this::getTimestampAsZonedDateTime);
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No INTERVAL data type support")
    public void testIntervalDataTypeIntervalHandlingModeNumeric(Source source, Sink sink) throws Exception {
        if (sink.getType().is(SinkType.POSTGRES)) {
            assertDataTypeNonKeyOnly(source, sink, "interval", List.of("'P1Y2M3DT4H5M6.78S'::INTERVAL"), List.of("10303:05:06"), connectorConfiguration -> {
                connectorConfiguration.with("interval.handling.mode", "numeric");
            }, sinkRecord -> {
                assertColumn(sink, sinkRecord, "data", getIntervalType(source, true));
            }, (v0, v1) -> {
                return v0.getString(v1);
            });
        } else {
            assertDataTypeNonKeyOnly(source, sink, "interval", List.of("'P1Y2M3DT4H5M6.78S'::INTERVAL"), List.of(Long.valueOf(MicroDuration.durationMicros(1, 2, 3, 4, 5, 6.78d, Double.valueOf(30.4375d)))), connectorConfiguration2 -> {
                connectorConfiguration2.with("interval.handling.mode", "numeric");
            }, sinkRecord2 -> {
                assertColumn(sink, sinkRecord2, "data", getIntervalType(source, true));
            }, (v0, v1) -> {
                return v0.getLong(v1);
            });
        }
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No INTERVAL data type support")
    public void testIntervalDataTypeIntervalHandlingModeString(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "interval", List.of("'P1Y2M3DT4H5M6.78S'::INTERVAL"), List.of("P1Y2M3DT4H5M6.78S"), connectorConfiguration -> {
            connectorConfiguration.with("interval.handling.mode", "string");
        }, sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getTextType());
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason = "No INTERVAL DAY(m) TO SECOND data type support")
    public void testIntervalDayToSecondDataTypeIntervalHandlingModeNumeric(Source source, Sink sink) throws Exception {
        if (sink.getType().is(SinkType.POSTGRES)) {
            assertDataTypeNonKeyOnly(source, sink, "interval day to second", List.of("TO_DSINTERVAL('P10DT50H99M1000.365S')"), List.of("291:55:40"), connectorConfiguration -> {
                connectorConfiguration.with("interval.handling.mode", "numeric");
            }, sinkRecord -> {
                assertColumn(sink, sinkRecord, "data", getIntervalType(source, true));
            }, (v0, v1) -> {
                return v0.getString(v1);
            });
        } else {
            assertDataTypeNonKeyOnly(source, sink, "interval day to second", List.of("TO_DSINTERVAL('P10DT50H99M1000.365S')"), List.of(1050940365000L), connectorConfiguration2 -> {
                connectorConfiguration2.with("interval.handling.mode", "numeric");
            }, sinkRecord2 -> {
                assertColumn(sink, sinkRecord2, "data", getIntervalType(source, true));
            }, (v0, v1) -> {
                return v0.getLong(v1);
            });
        }
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason = "No INTERVAL DAY(m) TO SECOND data type support")
    public void testIntervalDayToSecondDataTypeIntervalHandlingModeString(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "interval day to second", List.of("TO_DSINTERVAL('P10DT50H99M1000.365S')"), List.of("P0Y0M12DT3H55M40.365S"), connectorConfiguration -> {
            connectorConfiguration.with("interval.handling.mode", "string");
        }, sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getTextType());
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason = "No INTERVAL YEAR(m) TO MONTH data type support")
    public void testIntervalYearToMonthDataTypeIntervalHandlingModeNumeric(Source source, Sink sink) throws Exception {
        if (sink.getType().is(SinkType.POSTGRES)) {
            assertDataTypeNonKeyOnly(source, sink, "interval year to month", List.of("INTERVAL '10-2' YEAR TO MONTH"), List.of("89121:00:00"), connectorConfiguration -> {
                connectorConfiguration.with("interval.handling.mode", "numeric");
            }, sinkRecord -> {
                assertColumn(sink, sinkRecord, "data", getIntervalType(source, true));
            }, (v0, v1) -> {
                return v0.getString(v1);
            });
        } else {
            assertDataTypeNonKeyOnly(source, sink, "interval year to month", List.of("INTERVAL '10-2' YEAR TO MONTH"), List.of(320835600000000L), connectorConfiguration2 -> {
                connectorConfiguration2.with("interval.handling.mode", "numeric");
            }, sinkRecord2 -> {
                assertColumn(sink, sinkRecord2, "data", getIntervalType(source, true));
            }, (v0, v1) -> {
                return v0.getLong(v1);
            });
        }
    }

    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason = "No INTERVAL YEAR(m) TO MONTH data type support")
    public void testIntervalYearToMonthDataTypeIntervalHandlingModeString(Source source, Sink sink) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, "interval year to month", List.of("INTERVAL '10-2' YEAR TO MONTH"), List.of("P10Y2M0DT0H0M0S"), connectorConfiguration -> {
            connectorConfiguration.with("interval.handling.mode", "string");
        }, sinkRecord -> {
            assertColumn(sink, sinkRecord, "data", getTextType());
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @SkipWhenSink(value = {SinkType.MYSQL, SinkType.ORACLE, SinkType.DB2}, reason = "These data types are not allowed in the primary keys")
    @TestTemplate
    @SkipWhenSource(value = {SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason = "No BYTEA data type support")
    public void testByteaDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "bytea", List.of("'hello'"), List.of("hello".getBytes(StandardCharsets.UTF_8)), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getBinaryType(source, "bytea"));
            assertColumn(sink, sinkRecord, "data", getBinaryType(source, "bytea"));
        }, (v0, v1) -> {
            return v0.getBytes(v1);
        });
    }

    @ForSource(value = {SourceType.POSTGRES}, reason = "The OID data type only applies to PostgreSQL")
    @TestTemplate
    public void testOidDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "oid", List.of(3802), sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getInt64Type());
            if (source.getOptions().isColumnTypePropagated() && sink.getType().is(SinkType.POSTGRES)) {
                assertColumn(sink, sinkRecord, "data", "OID");
            } else {
                assertColumn(sink, sinkRecord, "data", getInt64Type());
            }
        }, (v0, v1) -> {
            return v0.getInt(v1);
        });
    }

    @ForSource(value = {SourceType.POSTGRES}, reason = "The LTREE data type only applies to PostgreSQL")
    @TestTemplate
    @WithPostgresExtension("ltree")
    public void testLtreeDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "ltree", List.of("'abc.xyz'"), List.of("abc.xyz"), connectorConfiguration -> {
            connectorConfiguration.with("include.unknown.datatypes", true);
        }, sinkRecord -> {
            if (sink.getType().is(SinkType.POSTGRES)) {
                assertColumn(sink, sinkRecord, "id", "LTREE");
                assertColumn(sink, sinkRecord, "data", "LTREE");
            } else {
                assertColumn(sink, sinkRecord, "id", getStringType(source, true, false));
                assertColumn(sink, sinkRecord, "data", getStringType(source, false, false, true));
            }
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @ForSource(value = {SourceType.POSTGRES}, reason = "The CITEXT data type only applies to PostgreSQL")
    @TestTemplate
    @WithPostgresExtension("citext")
    public void testCaseInsensitiveDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "citext", List.of("'AbCd'"), List.of("AbCd"), connectorConfiguration -> {
            connectorConfiguration.with("include.unknown.datatypes", true);
        }, sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getStringType(source, true, false));
            if (sink.getType().is(SinkType.POSTGRES) && source.getOptions().isColumnTypePropagated()) {
                assertColumn(sink, sinkRecord, "data", "CITEXT");
            } else {
                assertColumn(sink, sinkRecord, "data", getStringType(source, false, false, true));
            }
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @ForSource(value = {SourceType.POSTGRES}, reason = "The INET data type only applies to PostgreSQL")
    @TestTemplate
    public void testInetDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "inet", List.of("'192.168.1.0'"), List.of("192.168.1.0"), connectorConfiguration -> {
            connectorConfiguration.with("include.unknown.datatypes", true);
        }, sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getStringType(source, true, false));
            if (sink.getType().is(SinkType.POSTGRES) && source.getOptions().isColumnTypePropagated()) {
                assertColumn(sink, sinkRecord, "data", "INET");
            } else {
                assertColumn(sink, sinkRecord, "data", getStringType(source, false, false, true));
            }
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @ForSource(value = {SourceType.POSTGRES}, reason = "The INT4RANGE data type only applies to PostgreSQL")
    @TestTemplate
    public void testInt4RangeDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "int4range", List.of("'[1000,6000)'"), List.of("[1000,6000)"), connectorConfiguration -> {
            connectorConfiguration.with("include.unknown.datatypes", true);
        }, sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getStringType(source, true, false));
            if (sink.getType().is(SinkType.POSTGRES) && source.getOptions().isColumnTypePropagated()) {
                assertColumn(sink, sinkRecord, "data", "INT4RANGE");
            } else {
                assertColumn(sink, sinkRecord, "data", getStringType(source, false, false, true));
            }
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @ForSource(value = {SourceType.POSTGRES}, reason = "The INT8RANGE data type only applies to PostgreSQL")
    @TestTemplate
    public void testInt8RangeDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "int8range", List.of("'[1000000,6000000)'"), List.of("[1000000,6000000)"), connectorConfiguration -> {
            connectorConfiguration.with("include.unknown.datatypes", true);
        }, sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getStringType(source, true, false));
            if (sink.getType().is(SinkType.POSTGRES) && source.getOptions().isColumnTypePropagated()) {
                assertColumn(sink, sinkRecord, "data", "INT8RANGE");
            } else {
                assertColumn(sink, sinkRecord, "data", getStringType(source, false, false, true));
            }
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @ForSource(value = {SourceType.POSTGRES}, reason = "The NUMRANGE data type only applies to PostgreSQL")
    @TestTemplate
    public void testNumrangeDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "numrange", List.of("'[5.3,6.3)'"), List.of("[5.3,6.3)"), connectorConfiguration -> {
            connectorConfiguration.with("include.unknown.datatypes", true);
        }, sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getStringType(source, true, false));
            if (sink.getType().is(SinkType.POSTGRES) && source.getOptions().isColumnTypePropagated()) {
                assertColumn(sink, sinkRecord, "data", "NUMRANGE");
            } else {
                assertColumn(sink, sinkRecord, "data", getStringType(source, false, false, true));
            }
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @ForSource(value = {SourceType.POSTGRES}, reason = "The TSRANGE data type only applies to PostgreSQL")
    @TestTemplate
    public void testTsrangeDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "tsrange", List.of("'[2019-03-31 15:30:00,infinity)'"), List.of("[\"2019-03-31 15:30:00\",infinity)"), connectorConfiguration -> {
            connectorConfiguration.with("include.unknown.datatypes", true);
        }, sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getStringType(source, true, false));
            if (sink.getType().is(SinkType.POSTGRES) && source.getOptions().isColumnTypePropagated()) {
                assertColumn(sink, sinkRecord, "data", "TSRANGE");
            } else {
                assertColumn(sink, sinkRecord, "data", getStringType(source, false, false, true));
            }
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @ForSource(value = {SourceType.POSTGRES}, reason = "The TSTZRANGE data type only applies to PostgreSQL")
    @TestTemplate
    public void testTstzrangeDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "tstzrange", List.of("'[2017-06-05 11:29:12.549426+00,)'"), List.of("[\"2017-06-05 11:29:12.549426+00\",)"), connectorConfiguration -> {
            connectorConfiguration.with("include.unknown.datatypes", true);
        }, sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getStringType(source, true, false));
            if (sink.getType().is(SinkType.POSTGRES) && source.getOptions().isColumnTypePropagated()) {
                assertColumn(sink, sinkRecord, "data", "TSTZRANGE");
            } else {
                assertColumn(sink, sinkRecord, "data", getStringType(source, false, false, true));
            }
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @ForSource(value = {SourceType.POSTGRES}, reason = "The DATERANGE data type only applies to PostgreSQL")
    @TestTemplate
    public void testDaterangeDataType(Source source, Sink sink) throws Exception {
        assertDataType(source, sink, "daterange", List.of("'[2019-03-31, infinity)'"), List.of("[2019-03-31,infinity)"), connectorConfiguration -> {
            connectorConfiguration.with("include.unknown.datatypes", true);
        }, sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getStringType(source, true, false));
            if (sink.getType().is(SinkType.POSTGRES) && source.getOptions().isColumnTypePropagated()) {
                assertColumn(sink, sinkRecord, "data", "DATERANGE");
            } else {
                assertColumn(sink, sinkRecord, "data", getStringType(source, false, false, true));
            }
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @ForSource(value = {SourceType.POSTGRES}, reason = "The HSTORE data type only applies to PostgreSQL")
    @TestTemplate
    @WithPostgresExtension("hstore")
    public void testHstoreDataType(Source source, Sink sink) throws Exception {
        Object obj = "{\"key\":\"val\"}";
        if (sink.getType().is(SinkType.POSTGRES) && source.getOptions().isColumnTypePropagated()) {
            obj = "\"key\"=>\"val\"";
        } else if (sink.getType().is(SinkType.MYSQL)) {
            obj = "{\"key\": \"val\"}";
        }
        assertDataTypeNonKeyOnly(source, sink, "hstore", List.of("'\"key\" => \"val\"'::hstore"), List.of(obj), connectorConfiguration -> {
            connectorConfiguration.with("include.unknown.datatypes", true);
        }, sinkRecord -> {
            if (sink.getType().is(SinkType.POSTGRES) && source.getOptions().isColumnTypePropagated()) {
                assertColumn(sink, sinkRecord, "data", "HSTORE");
            } else {
                assertColumn(sink, sinkRecord, "data", getJsonbType(source));
            }
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @ForSource(value = {SourceType.POSTGRES}, reason = "The HSTORE data type only applies to PostgreSQL")
    @TestTemplate
    @WithPostgresExtension("hstore")
    public void testHstoreWithMapModeDataType(Source source, Sink sink) throws Exception {
        Object obj = "{\"key\":\"val\"}";
        if (sink.getType().is(SinkType.POSTGRES)) {
            obj = "\"key\"=>\"val\"";
        } else if (sink.getType().is(SinkType.MYSQL)) {
            obj = "{\"key\": \"val\"}";
        }
        assertDataTypeNonKeyOnly(source, sink, "hstore", List.of("'\"key\" => \"val\"'::hstore"), List.of(obj), connectorConfiguration -> {
            connectorConfiguration.with("include.unknown.datatypes", true);
            connectorConfiguration.with("hstore.handling.mode", "map");
        }, sinkRecord -> {
            if (sink.getType().is(SinkType.POSTGRES)) {
                assertColumn(sink, sinkRecord, "data", "HSTORE");
            } else if (sink.getType().is(SinkType.MYSQL)) {
                assertColumn(sink, sinkRecord, "data", getJsonType(source));
            } else {
                assertColumn(sink, sinkRecord, "data", getTextType());
            }
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    @ForSource(value = {SourceType.POSTGRES}, reason = "The infinity value is valid only for PostgreSQL")
    @TestTemplate
    @WithTemporalPrecisionMode
    public void testTimestampWithTimeZoneDataTypeWithInfinityValue(Source source, Sink sink) throws Exception {
        assertDataTypesNonKeyOnly(source, sink, List.of("timestamptz", "timestamptz"), List.of("'-infinity'", "'infinity'"), getExpectedZonedDateTimes(sink), sinkRecord -> {
            assertColumn(sink, sinkRecord, "data0", getTimestampWithTimezoneType(source, false, 6));
            assertColumn(sink, sinkRecord, "data1", getTimestampWithTimezoneType(source, false, 6));
        }, (resultSet, i) -> {
            return resultSet.getTimestamp(i).toInstant().atZone(ZoneOffset.UTC);
        });
    }

    private static List<ZonedDateTime> getExpectedZonedDateTimes(Sink sink) {
        List<ZonedDateTime> of = List.of();
        if (sink.getType().is(SinkType.SQLSERVER) && sink.getType().is(SinkType.DB2)) {
            of = List.of(ZonedDateTime.of(1, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC), ZonedDateTime.of(9999, 12, 31, 23, 59, 59, 0, ZoneOffset.UTC));
        } else if (sink.getType().is(SinkType.MYSQL)) {
            of = List.of(ZonedDateTime.of(1970, 1, 1, 0, 0, 1, 0, ZoneOffset.UTC), ZonedDateTime.of(2038, 1, 19, 3, 14, 7, 0, ZoneOffset.UTC));
        } else if (sink.getType().is(SinkType.ORACLE)) {
            of = List.of(ZonedDateTime.of(-4712, 11, 24, 0, 0, 0, 0, ZoneOffset.UTC), ZonedDateTime.of(9999, 12, 31, 23, 59, 59, 0, ZoneOffset.UTC));
        }
        return of;
    }

    protected int getMaxDecimalPrecision() {
        return 38;
    }

    protected abstract String getBooleanType();

    protected abstract String getBitsDataType();

    protected abstract String getInt8Type();

    protected abstract String getInt16Type();

    protected abstract String getInt32Type();

    protected abstract String getInt64Type();

    protected abstract String getVariableScaleDecimalType();

    protected abstract String getDecimalType();

    protected abstract String getFloat32Type();

    protected abstract String getFloat64Type();

    protected abstract String getCharType(Source source, boolean z, boolean z2);

    /* JADX INFO: Access modifiers changed from: protected */
    public String getStringType(Source source, boolean z, boolean z2) {
        return getStringType(source, z, z2, false);
    }

    protected abstract String getStringType(Source source, boolean z, boolean z2, boolean z3);

    protected abstract String getTextType(boolean z);

    /* JADX INFO: Access modifiers changed from: protected */
    public String getTextType() {
        return getTextType(false);
    }

    protected abstract String getBinaryType(Source source, String str);

    protected abstract String getJsonType(Source source);

    protected String getJsonbType(Source source) {
        return getJsonType(source);
    }

    protected abstract String getXmlType(Source source);

    protected abstract String getUuidType(Source source, boolean z);

    protected abstract String getEnumType(Source source, boolean z);

    protected abstract String getSetType(Source source, boolean z);

    protected abstract String getYearType();

    protected abstract String getDateType();

    protected abstract String getTimeType(Source source, boolean z, int i);

    protected abstract String getTimeWithTimezoneType();

    protected abstract String getTimestampType(Source source, boolean z, int i);

    protected abstract String getTimestampWithTimezoneType(Source source, boolean z, int i);

    protected abstract String getIntervalType(Source source, boolean z);

    protected boolean isBitCoercedToBoolean() {
        return false;
    }

    private static List<String> toTimestampStrings(Source source, List<ZonedDateTime> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<ZonedDateTime> it = list.iterator();
        while (it.hasNext()) {
            String format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS").format(it.next());
            if (source.getType().is(SourceType.ORACLE)) {
                arrayList.add(String.format("TO_TIMESTAMP('%s', 'YYYY-MM-DD HH24:MI:SS.FF6')", format));
            } else {
                arrayList.add(String.format("'%s'", format));
            }
        }
        return arrayList;
    }

    private static List<String> toTimestampWithTimeZoneStrings(Source source, List<ZonedDateTime> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<ZonedDateTime> it = list.iterator();
        while (it.hasNext()) {
            String format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSXXXXX").format(it.next());
            if (source.getType().is(SourceType.ORACLE)) {
                arrayList.add(String.format("TO_TIMESTAMP_TZ('%s', 'YYYY-MM-DD HH24:MI:SS.FF6 TZH:TZM')", format));
            } else {
                arrayList.add(String.format("'%s'", format));
            }
        }
        return arrayList;
    }

    /* JADX WARN: Type inference failed for: r0v3, types: [java.time.ZonedDateTime] */
    protected ZonedDateTime toZonedDateTimeAtSinkOffset(int i, int i2, int i3, int i4, int i5, int i6, int i7) {
        return LocalDate.of(i, i2, i3).atTime(i4, i5, i6, i7).atZone(SINK_ZONE_ID);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Timestamp getTimestamp(ResultSet resultSet, int i) throws SQLException {
        return resultSet.getTimestamp(i);
    }

    /* JADX WARN: Type inference failed for: r0v7, types: [java.time.ZonedDateTime] */
    /* JADX WARN: Type inference failed for: r0v8, types: [java.time.ZonedDateTime] */
    /* JADX WARN: Type inference failed for: r2v10, types: [java.time.ZonedDateTime] */
    protected ZonedDateTime getTimestampWithTimeZoneAsZonedDateTime(ResultSet resultSet, int i) throws SQLException {
        LOGGER.trace("Timestamp from ResultSet " + String.valueOf(getTimestamp(resultSet, i)));
        LOGGER.trace("Timestamp to LocalDateTime " + String.valueOf(getTimestamp(resultSet, i).toLocalDateTime()));
        LOGGER.trace("Timestamp at Zone " + String.valueOf(ZoneOffset.systemDefault()) + " " + String.valueOf(getTimestamp(resultSet, i).toLocalDateTime().atZone(ZoneOffset.systemDefault())));
        LOGGER.trace("Timestamp at Zone " + String.valueOf(SINK_ZONE_ID) + " " + String.valueOf(getTimestamp(resultSet, i).toLocalDateTime().atZone(ZoneOffset.systemDefault()).withZoneSameInstant(SINK_ZONE_ID)));
        return getTimestamp(resultSet, i).toLocalDateTime().atZone(ZoneOffset.systemDefault()).withZoneSameInstant(SINK_ZONE_ID);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Type inference failed for: r0v6, types: [java.time.ZonedDateTime] */
    public ZonedDateTime getTimestampAsZonedDateTime(ResultSet resultSet, int i) throws SQLException {
        LOGGER.trace("Timestamp from ResultSet " + String.valueOf(getTimestamp(resultSet, i)));
        LOGGER.trace("Timestamp to LocalDateTime " + String.valueOf(getTimestamp(resultSet, i).toLocalDateTime()));
        LOGGER.trace("Timestamp at Zone " + String.valueOf(SINK_ZONE_ID) + " " + String.valueOf(getTimestamp(resultSet, i).toLocalDateTime().atZone(SINK_ZONE_ID)));
        return getTimestamp(resultSet, i).toLocalDateTime().atZone(SINK_ZONE_ID);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public OffsetTime getTimeAsOffsetTime(ResultSet resultSet, int i) throws SQLException {
        LOGGER.trace(String.valueOf(getTimestamp(resultSet, i)) + " " + getTimestamp(resultSet, i).getNanos());
        return getTimestamp(resultSet, i).toLocalDateTime().toLocalTime().atOffset(getCurrentSinkTimeOffset());
    }

    protected ZoneOffset getCurrentSinkTimeOffset() {
        return getCurrentSinkTimeOffset(Instant.EPOCH);
    }

    protected ZoneOffset getCurrentSinkTimeOffset(Instant instant) {
        return instant.atZone(SINK_ZONE_ID).getOffset();
    }

    protected List<String> bitValues(Source source, String... strArr) {
        switch (source.getType()) {
            case POSTGRES:
                return (List) Arrays.stream(strArr).map(str -> {
                    return "'" + str + "'::bit" + (str.length() > 1 ? "(" + str.length() + ")" : "");
                }).collect(Collectors.toList());
            case SQLSERVER:
                if (strArr.length >= 1) {
                    ((IntAssert) Assertions.assertThat(strArr[0].length()).as("SQL Server bit type only supports 1 or 0.")).isEqualTo(1);
                }
                return (List) Arrays.stream(strArr).collect(Collectors.toList());
            default:
                return (List) Arrays.stream(strArr).map(str2 -> {
                    return "b'" + str2 + "'";
                }).collect(Collectors.toList());
        }
    }

    protected String charValue(Source source, Sink sink, int i, boolean z, String str) {
        if (SinkType.MYSQL.equals(sink.getType())) {
            if (SourceType.MYSQL.equals(source.getType())) {
                return str;
            }
            if (source.getOptions().isColumnTypePropagated() && !z) {
                return str;
            }
            return Strings.justifyLeft(str, i, ' ');
        }
        if (!SourceType.MYSQL.equals(source.getType())) {
            return Strings.justifyLeft(str, i, ' ');
        }
        if (source.getOptions().isColumnTypePropagated() && !z) {
            return Strings.justifyLeft(str, i, ' ');
        }
        return str;
    }

    protected String binaryValue(Source source, String str, String str2) {
        return SourceType.SQLSERVER.equals(source.getType()) ? String.format("CONVERT(%s, %s)", str, str2) : str2;
    }

    protected byte[] byteArrayPadded(String str, int i) {
        ByteBuffer allocate = ByteBuffer.allocate(i);
        allocate.put(str.getBytes(StandardCharsets.UTF_8));
        return allocate.array();
    }

    protected String dateValue(Source source, int i, int i2, int i3) {
        return SourceType.ORACLE.is(source.getType()) ? String.format("TO_DATE('%04d-%02d-%02d', 'YYYY-MM-DD')", Integer.valueOf(i3), Integer.valueOf(i), Integer.valueOf(i2)) : String.format("'%04d-%02d-%02d'", Integer.valueOf(i3), Integer.valueOf(i), Integer.valueOf(i2));
    }

    protected String pointValue(ResultSet resultSet, int i) throws SQLException {
        String string = resultSet.getString(i);
        if (!Strings.isNullOrEmpty(string) && string.startsWith("(") && string.endsWith(")")) {
            string = string.substring(1, string.length() - 1);
            String[] split = string.split(",");
            if (split.length == 2) {
                string = String.format("(%.6f,%.6f)", Float.valueOf(Float.parseFloat(split[0])), Float.valueOf(Float.parseFloat(split[1])));
            }
        }
        return string;
    }

    protected void registerSourceConnector(Source source, String str) {
        registerSourceConnector(source, null, str, null);
    }

    private String getSinkTable(SinkRecord sinkRecord, Sink sink) {
        String resolveCollectionName = this.collectionNamingStrategy.resolveCollectionName(new KafkaDebeziumSinkRecord(sinkRecord), getCurrentSinkConfig().getCollectionNameFormat());
        return sink.getType().is(SinkType.POSTGRES) ? resolveCollectionName.toLowerCase() : resolveCollectionName;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Properties getDefaultSinkConfig(Sink sink) {
        Properties properties = new Properties();
        properties.put("connection.url", sink.getJdbcUrl());
        properties.put("connection.username", sink.getUsername());
        properties.put("connection.password", sink.getPassword());
        properties.put("use.time.zone", TestHelper.getSinkTimeZone());
        return properties;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertColumn(Sink sink, SinkRecord sinkRecord, String str, String str2) {
        sink.assertColumn(getSinkTable(sinkRecord, sink), str, str2);
    }

    protected void assertColumn(Sink sink, SinkRecord sinkRecord, String str, String str2, int i) {
        sink.assertColumn(getSinkTable(sinkRecord, sink), str, str2, i);
    }

    protected void assertColumn(Sink sink, SinkRecord sinkRecord, String str, String str2, int i, int i2) {
        sink.assertColumn(getSinkTable(sinkRecord, sink), str, str2, i, i2);
    }

    protected <T> void assertDataType(Source source, Sink sink, String str, List<T> list, DataTypeColumnAssert dataTypeColumnAssert, ColumnReader<T> columnReader) throws Exception {
        assertDataType(source, sink, str, list, list, null, dataTypeColumnAssert, columnReader);
    }

    protected <T> void assertDataTypes(Source source, Sink sink, List<String> list, List<T> list2, DataTypeColumnAssert dataTypeColumnAssert, ColumnReader<T> columnReader) throws Exception {
        assertDataTypes(source, sink, list, list2, list2, null, dataTypeColumnAssert, columnReader);
    }

    protected <T, U> void assertDataType(Source source, Sink sink, String str, List<T> list, List<U> list2, DataTypeColumnAssert dataTypeColumnAssert, ColumnReader<U> columnReader) throws Exception {
        assertDataType(source, sink, str, list, list2, null, dataTypeColumnAssert, columnReader);
    }

    protected <T, U> void assertDataTypes(Source source, Sink sink, List<String> list, List<T> list2, List<U> list3, DataTypeColumnAssert dataTypeColumnAssert, ColumnReader<U> columnReader) throws Exception {
        assertDataTypes(source, sink, list, list2, list3, null, dataTypeColumnAssert, columnReader);
    }

    protected <T, U> void assertDataTypes2(Source source, Sink sink, List<String> list, List<T> list2, List<U> list3, DataTypeColumnAssert dataTypeColumnAssert, ColumnReader<U> columnReader) throws Exception {
        assertDataTypes2(source, sink, list, list2, list3, null, dataTypeColumnAssert, columnReader);
    }

    protected <T, U> void assertDataTypeNonKeyOnly(Source source, Sink sink, String str, List<T> list, List<U> list2, DataTypeColumnAssert dataTypeColumnAssert, ColumnReader<U> columnReader) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, str, list, list2, (ConfigurationAdjuster) null, dataTypeColumnAssert, columnReader);
    }

    protected <T, U> void assertDataTypeNonKeyOnly(Source source, Sink sink, String str, ValueBinder valueBinder, List<U> list, DataTypeColumnAssert dataTypeColumnAssert, ColumnReader<U> columnReader) throws Exception {
        assertDataTypeNonKeyOnly(source, sink, str, valueBinder, list, (ConfigurationAdjuster) null, dataTypeColumnAssert, columnReader);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T, U> void assertDataTypesNonKeyOnly(Source source, Sink sink, List<String> list, List<T> list2, List<U> list3, DataTypeColumnAssert dataTypeColumnAssert, ColumnReader<U> columnReader) throws Exception {
        assertDataTypesNonKeyOnly(source, sink, list, list2, list3, null, dataTypeColumnAssert, columnReader);
    }

    protected <T> void assertDataType(Source source, Sink sink, String str, List<T> list, ConfigurationAdjuster configurationAdjuster, DataTypeColumnAssert dataTypeColumnAssert, ColumnReader<T> columnReader) throws Exception {
        assertDataType(source, sink, str, list, list, configurationAdjuster, dataTypeColumnAssert, columnReader);
    }

    protected boolean skipDefaultValues(String str) {
        return Arrays.asList("smallserial", "serial", "bigserial", "json", "tinytext", "mediumtext", "longtext", "text", "tinyblob", "mediumblob", "longblob", "interval year to month").contains(str);
    }

    protected <T, U> void assertDataType(Source source, Sink sink, String str, List<T> list, List<U> list2, ConfigurationAdjuster configurationAdjuster, DataTypeColumnAssert dataTypeColumnAssert, ColumnReader<U> columnReader) throws Exception {
        String randomTableName = source.randomTableName();
        registerSourceConnector(source, Collections.singletonList(str), randomTableName, configurationAdjuster, (!source.getOptions().useDefaultValues() || skipDefaultValues(str)) ? String.format("CREATE TABLE %s (id %s, data %s, primary key(id))", randomTableName, str, str) : String.format("CREATE TABLE %s (id %s, data %s DEFAULT %s NOT NULL, primary key(id))", randomTableName, str, str, list.get(0)), String.format("INSERT INTO %s VALUES (%s)", randomTableName, Strings.join(",", list)));
        Properties defaultSinkConfig = getDefaultSinkConfig(sink);
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue());
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue());
        startSink(source, defaultSinkConfig, randomTableName);
        consumeAndAssert(sink, dataTypeColumnAssert, list2, columnReader);
    }

    protected <T, U> void assertDataTypeNonKeyOnly(Source source, Sink sink, String str, List<T> list, List<U> list2, ConfigurationAdjuster configurationAdjuster, DataTypeColumnAssert dataTypeColumnAssert, ColumnReader<U> columnReader) throws Exception {
        String format;
        String format2;
        String randomTableName = source.randomTableName();
        if (isLobTypeName(str)) {
            format = String.format("CREATE TABLE %s (data %s, id integer, primary key(id))", randomTableName, str);
            format2 = String.format("INSERT INTO %s VALUES (%s, 1)", randomTableName, Strings.join(",", list));
        } else {
            format = (!source.getOptions().useDefaultValues() || skipDefaultValues(str)) ? String.format("CREATE TABLE %s (data %s NOT NULL)", randomTableName, str) : String.format("CREATE TABLE %s (data %s DEFAULT %s NOT NULL)", randomTableName, str, list.get(0));
            format2 = String.format("INSERT INTO %s VALUES (%s)", randomTableName, Strings.join(",", list));
        }
        registerSourceConnector(source, Collections.singletonList(str), randomTableName, configurationAdjuster, format, format2);
        Properties defaultSinkConfig = getDefaultSinkConfig(sink);
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.NONE.getValue());
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.INSERT.getValue());
        startSink(source, defaultSinkConfig, randomTableName);
        consumeAndAssert(sink, dataTypeColumnAssert, list2, columnReader);
    }

    protected <T, U> void assertDataTypeNonKeyOnly(Source source, Sink sink, String str, ValueBinder valueBinder, List<U> list, ConfigurationAdjuster configurationAdjuster, DataTypeColumnAssert dataTypeColumnAssert, ColumnReader<U> columnReader) throws Exception {
        String randomTableName = source.randomTableName();
        String format = String.format("CREATE TABLE %s (data %s NOT NULL, id integer, primary key(id))", randomTableName, str);
        String format2 = String.format("INSERT INTO %s VALUES (?, 1)", randomTableName);
        if (source.getOptions().useSnapshot()) {
            source.execute(format);
            source.streamTable(randomTableName);
            source.execute(format2, valueBinder);
            registerSourceConnector(source, Collections.singletonList(str), randomTableName, configurationAdjuster);
        } else {
            registerSourceConnector(source, Collections.singletonList(str), randomTableName, configurationAdjuster);
            source.execute(format);
            source.streamTable(randomTableName);
            source.execute(format2, valueBinder);
        }
        Properties defaultSinkConfig = getDefaultSinkConfig(sink);
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.NONE.getValue());
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.INSERT.getValue());
        startSink(source, defaultSinkConfig, randomTableName);
        consumeAndAssert(sink, dataTypeColumnAssert, list, columnReader);
    }

    protected <T, U> void assertDataTypesNonKeyOnly(Source source, Sink sink, List<String> list, List<T> list2, List<U> list3, ConfigurationAdjuster configurationAdjuster, DataTypeColumnAssert dataTypeColumnAssert, ColumnReader<U> columnReader) throws Exception {
        String randomTableName = source.randomTableName();
        registerSourceConnector(source, list, randomTableName, configurationAdjuster, createTableFromTypes(source, randomTableName, false, list, list2), String.format("INSERT INTO %s VALUES (%s)", randomTableName, Strings.join(",", list2)));
        Properties defaultSinkConfig = getDefaultSinkConfig(sink);
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.NONE.getValue());
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.INSERT.getValue());
        startSink(source, defaultSinkConfig, randomTableName);
        consumeAndAssert(sink, dataTypeColumnAssert, list3, columnReader);
    }

    protected <T, U> void assertDataTypes(Source source, Sink sink, List<String> list, List<T> list2, List<U> list3, ConfigurationAdjuster configurationAdjuster, DataTypeColumnAssert dataTypeColumnAssert, ColumnReader<U> columnReader) throws Exception {
        String randomTableName = source.randomTableName();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2; i++) {
            arrayList.addAll(list2);
        }
        registerSourceConnector(source, list, randomTableName, configurationAdjuster, createTableFromTypes(source, randomTableName, true, list, list2), String.format("INSERT INTO %s VALUES (%s)", randomTableName, Strings.join(",", arrayList)));
        Properties defaultSinkConfig = getDefaultSinkConfig(sink);
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue());
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue());
        startSink(source, defaultSinkConfig, randomTableName);
        ArrayList arrayList2 = new ArrayList();
        for (int i2 = 0; i2 < 2; i2++) {
            arrayList2.addAll(list3);
        }
        consumeAndAssert(sink, dataTypeColumnAssert, arrayList2, columnReader);
    }

    protected <T, U> void assertDataTypes2(Source source, Sink sink, List<String> list, List<T> list2, List<U> list3, ConfigurationAdjuster configurationAdjuster, DataTypeColumnAssert dataTypeColumnAssert, ColumnReader<U> columnReader) throws Exception {
        String randomTableName = source.randomTableName();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2; i++) {
            arrayList.addAll(list2);
        }
        registerSourceConnector(source, list, randomTableName, configurationAdjuster, createTableFromTypes(source, randomTableName, true, list, list2), String.format("INSERT INTO %s VALUES (%s)", randomTableName, Strings.join(",", arrayList)));
        Properties defaultSinkConfig = getDefaultSinkConfig(sink);
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue());
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue());
        startSink(source, defaultSinkConfig, randomTableName);
        consumeAndAssert(sink, dataTypeColumnAssert, list3, columnReader);
    }

    protected boolean isLobTypeName(String str) {
        return str.equalsIgnoreCase("CLOB") || str.equalsIgnoreCase("NCLOB") || str.equalsIgnoreCase("BLOB");
    }

    protected String createTableFromTypes(Source source, String str, boolean z, List<String> list, List<?> list2) {
        StringBuilder append = new StringBuilder("CREATE TABLE ").append(str).append(" (");
        if (z) {
            for (int i = 0; i < list.size(); i++) {
                append.append("id").append(i).append(" ").append(list.get(i)).append(", ");
            }
        }
        for (int i2 = 0; i2 < list.size(); i2++) {
            append.append("data").append(i2).append(" ").append(list.get(i2));
            if (i2 + 1 < list.size()) {
                if (source.getOptions().useDefaultValues()) {
                    append.append(" DEFAULT ").append(list2.get(i2)).append(" NOT NULL");
                }
                append.append(", ");
            }
        }
        if (z) {
            append.append(", primary key (");
            for (int i3 = 0; i3 < list.size(); i3++) {
                append.append("id").append(i3);
                if (i3 + 1 < list.size()) {
                    append.append(", ");
                }
            }
            append.append(")");
        }
        append.append(")");
        return append.toString();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void registerSourceConnector(Source source, List<String> list, String str, ConfigurationAdjuster configurationAdjuster, String str2, String str3) throws Exception {
        if (source.getOptions().useSnapshot()) {
            source.execute(str2);
            source.streamTable(str);
            source.execute(str3);
            registerSourceConnector(source, list, str, configurationAdjuster);
        } else {
            registerSourceConnector(source, list, str, configurationAdjuster);
            source.execute(str2);
            source.streamTable(str);
            source.execute(str3);
        }
        if (TestHelper.shouldQueryDatabaseState()) {
            source.queryContainerTable(str);
        }
    }

    protected void registerSourceConnector(Source source, List<String> list, String str, ConfigurationAdjuster configurationAdjuster) {
        ConnectorConfiguration sourceConnectorConfig = getSourceConnectorConfig(source, str);
        sourceConnectorConfig.with("decimal.handling.mode", RelationalDatabaseConnectorConfig.DecimalHandlingMode.PRECISE.getValue());
        sourceConnectorConfig.with("binary.handling.mode", CommonConnectorConfig.BinaryHandlingMode.BYTES.getValue());
        if (configurationAdjuster != null) {
            configurationAdjuster.adjust(sourceConnectorConfig);
        }
        if (SourceType.ORACLE == source.getType()) {
            sourceConnectorConfig.with("converters", "boolean").with("boolean.type", "io.debezium.connector.oracle.converters.NumberOneToBooleanConverter").with("boolean.selector", ".*");
            if (list != null && list.stream().anyMatch(str2 -> {
                return str2.equalsIgnoreCase("CLOB") || str2.equalsIgnoreCase("NCLOB") || str2.equalsIgnoreCase("BLOB");
            })) {
                sourceConnectorConfig.with("lob.enabled", "true");
            }
        }
        source.registerSourceConnector(sourceConnectorConfig);
    }

    protected void applyJdbcSourceConverter(Source source, ConnectorConfiguration connectorConfiguration, String str, String str2, String str3) {
        if (source.getType().is(SourceType.MYSQL)) {
            connectorConfiguration.with("converters", "jdbc-sink");
            connectorConfiguration.with("jdbc-sink.type", "io.debezium.connector.mysql.converters.JdbcSinkDataTypesConverter");
            if (!Strings.isNullOrEmpty(str)) {
                connectorConfiguration.with("jdbc-sink.selector.boolean", str);
            }
            if (!Strings.isNullOrEmpty(str2)) {
                connectorConfiguration.with("jdbc-sink.selector.real", str2);
            }
            if (Strings.isNullOrEmpty(str3)) {
                return;
            }
            connectorConfiguration.with("jdbc-sink.selector.string", str3);
        }
    }

    protected <U> void consumeAndAssert(Sink sink, DataTypeColumnAssert dataTypeColumnAssert, List<U> list, ColumnReader<U> columnReader) throws Exception {
        SinkRecord consumeSinkRecord = consumeSinkRecord();
        String sinkTable = getSinkTable(consumeSinkRecord, sink);
        if (TestHelper.shouldQueryDatabaseState()) {
            sink.queryContainerTable(sinkTable);
        }
        dataTypeColumnAssert.assertColumn(consumeSinkRecord);
        sink.assertRows(sinkTable, resultSet -> {
            for (int i = 0; i < list.size(); i++) {
                ((ObjectAssert) Assertions.assertThat(columnReader.read(resultSet, i + 1)).as(String.format("Column %s read failed.", resultSet.getMetaData().getColumnName(i + 1)))).isEqualTo(list.get(i));
            }
            return null;
        });
    }

    protected void assertCharDataType(Source source, Sink sink, String str, boolean z) throws Exception {
        assertDataType(source, sink, getDataTypeWithCollation(source, str, z), List.of("'a'", "'b'"), List.of("a", "b"), connectorConfiguration -> {
            applyJdbcSourceConverter(source, connectorConfiguration, null, null, ".*.id|.*.data");
        }, sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getCharType(source, true, z));
            assertColumn(sink, sinkRecord, "data", getCharType(source, false, z));
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    protected void assertCharWithLengthDataType(Source source, Sink sink, String str, int i, boolean z) throws Exception {
        assertDataType(source, sink, getDataTypeWithCollation(source, str, z), List.of("'a'", "'b'"), List.of(charValue(source, sink, i, true, "a"), charValue(source, sink, i, false, "b")), connectorConfiguration -> {
            applyJdbcSourceConverter(source, connectorConfiguration, null, null, ".*.id|.*.data");
        }, sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getCharType(source, true, z));
            if (source.getOptions().isColumnTypePropagated()) {
                assertColumn(sink, sinkRecord, "data", getCharType(source, false, z), i);
            } else {
                assertColumn(sink, sinkRecord, "data", getCharType(source, false, z));
            }
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    protected void assertVarcharDataType(Source source, Sink sink, String str, int i, boolean z) throws Exception {
        assertDataType(source, sink, getDataTypeWithCollation(source, str, z), List.of("'abc'", "'hello world'"), List.of("abc", "hello world"), connectorConfiguration -> {
            applyJdbcSourceConverter(source, connectorConfiguration, null, null, ".*.id|.*.data");
        }, sinkRecord -> {
            assertColumn(sink, sinkRecord, "id", getStringType(source, true, z));
            if (source.getOptions().isColumnTypePropagated()) {
                assertColumn(sink, sinkRecord, "data", getStringType(source, false, z), i);
            } else {
                assertColumn(sink, sinkRecord, "data", getStringType(source, false, z));
            }
        }, (v0, v1) -> {
            return v0.getString(v1);
        });
    }

    protected String getDataTypeWithCollation(Source source, String str, boolean z) {
        return (!source.getType().is(SourceType.MYSQL) || z) ? source.getType().is(SourceType.MYSQL) ? String.format("%s collate utf8mb3_general_ci", str) : str : String.format("%s collate latin1_general_cs", str);
    }
}
