package io.debezium.connector.jdbc.integration.postgres;

import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.integration.AbstractJdbcSinkInsertModeTest;
import io.debezium.connector.jdbc.junit.PostgresExtensionUtils;
import io.debezium.connector.jdbc.junit.TestHelper;
import io.debezium.connector.jdbc.junit.jupiter.PostgresSinkDatabaseContextProvider;
import io.debezium.connector.jdbc.junit.jupiter.Sink;
import io.debezium.connector.jdbc.junit.jupiter.SinkRecordFactoryArgumentsProvider;
import io.debezium.connector.jdbc.junit.jupiter.WithPostgresExtension;
import io.debezium.connector.jdbc.util.SinkRecordFactory;
import io.debezium.data.SchemaAndValueField;
import io.debezium.data.geometry.Geometry;
import io.debezium.doc.FixFor;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.assertj.db.api.TableAssert;
import org.assertj.db.type.DataSourceWithLetterCase;
import org.assertj.db.type.ValueType;
import org.assertj.db.type.lettercase.CaseComparisons;
import org.assertj.db.type.lettercase.CaseConversions;
import org.assertj.db.type.lettercase.LetterCase;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.postgresql.geometric.PGpoint;
import org.postgresql.util.PGobject;

@Tags({@Tag("all"), @Tag("it"), @Tag("it-postgresql")})
@ExtendWith({PostgresSinkDatabaseContextProvider.class})
/* loaded from: input_file:io/debezium/connector/jdbc/integration/postgres/JdbcSinkInsertModeIT.class */
public class JdbcSinkInsertModeIT extends AbstractJdbcSinkInsertModeTest {
    public static final LetterCase LOWER_CASE_STRICT = LetterCase.getLetterCase(CaseConversions.LOWER, CaseComparisons.STRICT);
    public static final LetterCase UPPER_CASE_STRICT = LetterCase.getLetterCase(CaseConversions.UPPER, CaseComparisons.STRICT);

    public JdbcSinkInsertModeIT(Sink sink) {
        super(sink);
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @WithPostgresExtension(PostgresExtensionUtils.EXTENSION_POSTGIS)
    @FixFor({"DBZ-6637"})
    @ParameterizedTest
    public void testInsertModeInsertWithPrimaryKeyModeComplexRecordValue(SinkRecordFactory sinkRecordFactory) throws SQLException {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_VALUE.getValue());
        defaultSinkConfig.put("primary.key.fields", "id");
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.INSERT.getValue());
        defaultSinkConfig.put("dialect.postgres.postgis.schema", PostgresExtensionUtils.EXTENSION_POSTGIS);
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        String str = topicName("server1", "schema", randomTableName());
        Schema buildGeoTypeSchema = buildGeoTypeSchema("Geometry");
        Struct put = new Struct(buildGeoTypeSchema).put("wkb", Base64.getDecoder().decode("AQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAFEAAAAAAAAAAQAAAAAAAABRAAAAAAAAAAEAAAAAAAAAcQAAAAAAAAAAAAAAAAAAAHEAAAAAAAAAAAAAAAAAAABRA".getBytes()));
        Schema buildGeoTypeSchema2 = buildGeoTypeSchema("Point");
        Struct put2 = new Struct(buildGeoTypeSchema2).put("x", Double.valueOf(1.0d)).put("y", Double.valueOf(1.0d)).put("wkb", Base64.getDecoder().decode("AQEAAAAAAAAAAADwPwAAAAAAAPA/".getBytes())).put("srid", 3187);
        Schema buildGeoTypeSchema3 = buildGeoTypeSchema("Geography");
        SinkRecord createRecordWithSchemaValue = sinkRecordFactory.createRecordWithSchemaValue(str, (byte) 1, List.of("geometry", "point", "geography", "p"), List.of(buildGeoTypeSchema, buildGeoTypeSchema2, buildGeoTypeSchema3, buildGeoTypeSchema2), Arrays.asList(put, put2, new Struct(buildGeoTypeSchema3).put("wkb", Base64.getDecoder().decode("AQUAACDmEAAAAQAAAAECAAAAAgAAAKd5xyk6JGVAC0YldQJaRsDGbTSAt/xkQMPTK2UZUkbA".getBytes())).put("srid", 4326)));
        consume(createRecordWithSchemaValue);
        TableAssert assertTable = TestHelper.assertTable(dataSource(), destinationTableName(createRecordWithSchemaValue));
        assertTable.exists().hasNumberOfRows(1).hasNumberOfColumns(5);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER, (byte) 1);
        PGobject pGobject = new PGobject();
        pGobject.setType("\"postgis\".\"geometry\"");
        pGobject.setValue("01030000000100000005000000000000000000000000000000000014400000000000000040000000000000144000000000000000400000000000001C4000000000000000000000000000001C4000000000000000000000000000001440");
        getSink().assertColumnType(assertTable, "geometry", PGobject.class, pGobject);
        getSink().assertColumnType(assertTable, "point", PGobject.class, new PGpoint(1.0d, 1.0d));
        PGobject pGobject2 = new PGobject();
        pGobject2.setType("\"postgis\".\"geography\"");
        pGobject2.setValue("0105000020E610000001000000010200000002000000A779C7293A2465400B462575025A46C0C66D3480B7FC6440C3D32B65195246C0");
        getSink().assertColumnType(assertTable, "geography", PGobject.class, pGobject2);
        getSink().assertColumnHasNullValue(assertTable, "p");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @WithPostgresExtension(PostgresExtensionUtils.EXTENSION_POSTGIS)
    @FixFor({"DBZ-8221"})
    @ParameterizedTest
    public void testBatchWithDifferingSqlParameterBindings(SinkRecordFactory sinkRecordFactory) throws SQLException {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue());
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue());
        defaultSinkConfig.put("dialect.postgres.postgis.schema", PostgresExtensionUtils.EXTENSION_POSTGIS);
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        String str = topicName("server1", "schema", randomTableName());
        SinkRecord createInsertSchemaAndValue = sinkRecordFactory.createInsertSchemaAndValue(str, List.of(new SchemaAndValueField("id", Schema.STRING_SCHEMA, "12345")), List.of(new SchemaAndValueField("gis_area", Geometry.schema(), Geometry.createValue(Geometry.schema(), Base64.getDecoder().decode("AQEAACARDWAAuooeV7P4V0EWN+bdvgBVQO==".getBytes()), 3857)), new SchemaAndValueField("__deleted", Schema.BOOLEAN_SCHEMA, false)), 0);
        consume(List.of(createInsertSchemaAndValue, sinkRecordFactory.createInsertSchemaAndValue(str, List.of(new SchemaAndValueField("id", Schema.STRING_SCHEMA, "23456")), List.of(new SchemaAndValueField("gis_area", Geometry.schema(), (Object) null), new SchemaAndValueField("__deleted", Schema.BOOLEAN_SCHEMA, false)), 1), sinkRecordFactory.createInsertSchemaAndValue(str, List.of(new SchemaAndValueField("id", Schema.STRING_SCHEMA, "23456")), List.of(new SchemaAndValueField("gis_area", Geometry.schema(), Geometry.createValue(Geometry.schema(), Base64.getDecoder().decode("AQEAACARDWAAuooeV7P4V0EWN+bdvgBVQO==".getBytes()), 3857)), new SchemaAndValueField("__deleted", Schema.BOOLEAN_SCHEMA, false)), 0)));
        TestHelper.assertTable(dataSource(), destinationTableName(createInsertSchemaAndValue)).hasNumberOfRows(2).hasNumberOfColumns(3);
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @FixFor({"DBZ-6682"})
    @ParameterizedTest
    public void testInsertModeInsertWithPrimaryKeyModeUpperCaseColumnNameWithQuotedIdentifiers(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_VALUE.getValue());
        defaultSinkConfig.put("primary.key.fields", "ID");
        defaultSinkConfig.put("quote.identifiers", "true");
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.INSERT.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        String str = topicName("server1", "schema", randomTableName());
        SinkRecord createRecord = sinkRecordFactory.createRecord(str, (byte) 1, (v0) -> {
            return v0.toUpperCase();
        });
        SinkRecord createRecord2 = sinkRecordFactory.createRecord(str, (byte) 2, (v0) -> {
            return v0.toUpperCase();
        });
        consume(createRecord);
        consume(createRecord2);
        TableAssert assertTable = TestHelper.assertTable((DataSource) new DataSourceWithLetterCase(dataSource(), LetterCase.TABLE_DEFAULT, UPPER_CASE_STRICT, UPPER_CASE_STRICT), destinationTableName(createRecord));
        assertTable.exists().hasNumberOfRows(2).hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "ID", ValueType.NUMBER, (byte) 1, (byte) 2);
        getSink().assertColumnType(assertTable, "NAME", ValueType.TEXT, "John Doe", "John Doe");
        getSink().assertColumnType(assertTable, "NICK_NAME$", ValueType.TEXT, "John Doe$", "John Doe$");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @FixFor({"DBZ-6682"})
    @ParameterizedTest
    public void testInsertModeInsertWithPrimaryKeyModeUpperCaseColumnNameWithoutQuotedIdentifiers(SinkRecordFactory sinkRecordFactory) {
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_VALUE.getValue());
        defaultSinkConfig.put("primary.key.fields", "ID");
        defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.INSERT.getValue());
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        String str = topicName("server1", "schema", randomTableName());
        SinkRecord createRecord = sinkRecordFactory.createRecord(str, (byte) 1, (v0) -> {
            return v0.toUpperCase();
        });
        SinkRecord createRecord2 = sinkRecordFactory.createRecord(str, (byte) 2, (v0) -> {
            return v0.toUpperCase();
        });
        consume(createRecord);
        consume(createRecord2);
        TableAssert assertTable = TestHelper.assertTable(new DataSourceWithLetterCase(dataSource(), LetterCase.TABLE_DEFAULT, LOWER_CASE_STRICT, LOWER_CASE_STRICT), destinationTableName(createRecord), null, null);
        assertTable.exists().hasNumberOfRows(2).hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id", ValueType.NUMBER, (byte) 1, (byte) 2);
        getSink().assertColumnType(assertTable, "name", ValueType.TEXT, "John Doe", "John Doe");
        getSink().assertColumnType(assertTable, "nick_name$", ValueType.TEXT, "John Doe$", "John Doe$");
    }

    private static Schema buildGeoTypeSchema(String str) {
        SchemaBuilder optional = SchemaBuilder.struct().name("io.debezium.data.geometry." + str).field("wkb", Schema.BYTES_SCHEMA).field("srid", Schema.OPTIONAL_INT32_SCHEMA).optional();
        if ("Point".equals(str)) {
            optional.field("x", Schema.FLOAT64_SCHEMA).field("y", Schema.FLOAT64_SCHEMA);
        }
        return optional.build();
    }
}
