package io.debezium.connector.mysql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotWithSchemaChangesSupportTest;
import io.debezium.relational.TableId;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.util.Testing;
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/mysql/IncrementalSnapshotIT.class */
public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotWithSchemaChangesSupportTest<MySqlConnector> {
    protected static final String SERVER_NAME = "is_test";
    protected final UniqueDatabase DATABASE = new UniqueDatabase(SERVER_NAME, "incremental_snapshot-test").withDbHistoryPath(SCHEMA_HISTORY_PATH);

    @Before
    public void before() throws SQLException {
        stopConnector();
        this.DATABASE.createAndInitialize();
        initializeConnectorTestFramework();
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
    }

    @After
    public void after() {
        try {
            stopConnector();
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
        } catch (Throwable th) {
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Configuration.Builder config() {
        return this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).with(MySqlConnectorConfig.USER, "mysqluser").with(MySqlConnectorConfig.PASSWORD, "mysqlpw").with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY.getValue()).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, this.DATABASE.qualifiedTableName("debezium_signal")).with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 1).with(MySqlConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10).with(MySqlConnectorConfig.INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES, true).with(CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, CommonConnectorConfig.SchemaNameAdjustmentMode.AVRO);
    }

    protected Configuration.Builder mutableConfig(boolean z, boolean z2) {
        return this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).with(MySqlConnectorConfig.USER, "mysqluser").with(MySqlConnectorConfig.PASSWORD, "mysqlpw").with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL.getValue()).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, this.DATABASE.qualifiedTableName("debezium_signal")).with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, z ? this.DATABASE.qualifiedTableName("c") : this.DATABASE.qualifiedTableName("a") + ", " + this.DATABASE.qualifiedTableName("c")).with(MySqlConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10).with(MySqlConnectorConfig.INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES, true).with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, z2).with(CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, CommonConnectorConfig.SchemaNameAdjustmentMode.AVRO);
    }

    protected Class<MySqlConnector> connectorClass() {
        return MySqlConnector.class;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JdbcConnection databaseConnection() {
        return MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String topicName() {
        return this.DATABASE.topicForTable("a");
    }

    protected List<String> topicNames() {
        return List.of(this.DATABASE.topicForTable("a"), this.DATABASE.topicForTable("c"));
    }

    protected String tableName() {
        return tableNameId().toQuotedString('`');
    }

    protected List<String> tableNames() {
        return List.of(TableId.parse(this.DATABASE.qualifiedTableName("a")).toQuotedString('`'), TableId.parse(this.DATABASE.qualifiedTableName("c")).toQuotedString('`'));
    }

    protected String signalTableName() {
        return tableNameId("debezium_signal").toQuotedString('`');
    }

    protected String signalTableNameSanitized() {
        return this.DATABASE.qualifiedTableName("debezium_signal");
    }

    protected String tableName(String str) {
        return tableNameId(str).toQuotedString('`');
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String tableDataCollectionId() {
        return tableNameId().toString();
    }

    protected List<String> tableDataCollectionIds() {
        return List.of(tableNameId().toString(), tableNameId("c").toString());
    }

    private String dataCollectionName(String str) {
        return tableNameId(str).toString();
    }

    private TableId tableNameId() {
        return tableNameId("a");
    }

    private TableId tableNameId(String str) {
        return TableId.parse(this.DATABASE.qualifiedTableName(str));
    }

    protected String alterColumnStatement(String str, String str2, String str3) {
        return String.format("ALTER TABLE %s MODIFY COLUMN %s %s", str, str2, str3);
    }

    protected String alterColumnSetNotNullStatement(String str, String str2, String str3) {
        return String.format("ALTER TABLE %s MODIFY COLUMN %s %s NOT NULL", str, str2, str3);
    }

    protected String alterColumnDropNotNullStatement(String str, String str2, String str3) {
        return String.format("ALTER TABLE %s MODIFY COLUMN %s %s NULL", str, str2, str3);
    }

    protected String alterColumnSetDefaultStatement(String str, String str2, String str3, String str4) {
        return String.format("ALTER TABLE %s MODIFY COLUMN %s %s DEFAULT %s", str, str2, str3, str4);
    }

    protected String alterColumnDropDefaultStatement(String str, String str2, String str3) {
        return String.format("ALTER TABLE %s MODIFY COLUMN %s %s", str, str2, str3);
    }

    protected void executeRenameTable(JdbcConnection jdbcConnection, String str) throws SQLException {
        jdbcConnection.setAutoCommit(false);
        String format = String.format("RENAME TABLE %s to %s, %s to %s", tableName(), "old_table", str, tableName());
        this.logger.info(format);
        jdbcConnection.executeWithoutCommitting(new String[]{format});
        jdbcConnection.commit();
    }

    protected String createTableStatement(String str, String str2) {
        return String.format("CREATE TABLE %s LIKE %s", str, str2);
    }

    @Test
    public void updates() throws Exception {
        populateTable();
        startConnector();
        sendAdHocSnapshotSignal();
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            databaseConnection.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
            for (int i = 0; i < 1000; i++) {
                databaseConnection.executeWithoutCommitting(new String[]{String.format("UPDATE %s SET aa = aa + 2000 WHERE pk > %s AND pk <= %s", tableName(), Integer.valueOf(i * 10), Integer.valueOf((i + 1) * 10))});
                databaseConnection.commit();
            }
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            Map consumeRecordsMixedWithIncrementalSnapshot = consumeRecordsMixedWithIncrementalSnapshot(1000, entry -> {
                return ((Struct) ((SourceRecord) entry.getValue()).value()).getStruct("after").getInt32(valueFieldName()).intValue() >= 2000;
            }, null);
            for (int i2 = 0; i2 < 1000; i2++) {
                SourceRecord sourceRecord = (SourceRecord) consumeRecordsMixedWithIncrementalSnapshot.get(Integer.valueOf(i2 + 1));
                Assert.assertEquals(i2 + 2000, ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName()).intValue());
                Object obj = ((Struct) sourceRecord.value()).getStruct("source").get("query");
                String obj2 = ((Struct) sourceRecord.value()).getStruct("source").get("snapshot").toString();
                if (obj2.equals("false")) {
                    Assert.assertNotNull(obj);
                } else {
                    Assert.assertNull(obj);
                    Assert.assertEquals("incremental", obj2);
                }
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4939"})
    public void tableWithDatetime() throws Exception {
        Testing.Print.enable();
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            for (int i = 0; i < 10; i++) {
                databaseConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO a_dt (pk, dt, d, t) VALUES (%s, TIMESTAMP('%s-05-01'), '%s-05-01', '%s:00:00')", Integer.valueOf(i + 1), Integer.valueOf(i + 2000), Integer.valueOf(i + 2000), Integer.valueOf(i))});
            }
            databaseConnection.commit();
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            start(connectorClass(), config().with(MySqlConnectorConfig.SNAPSHOT_FETCH_SIZE, 5).build(), loggingCompletion());
            waitForConnectorToStart();
            waitForAvailableRecords(5L, TimeUnit.SECONDS);
            sendAdHocSnapshotSignal(new String[]{dataCollectionName("a_dt")});
            Map consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(10, entry -> {
                return true;
            }, struct -> {
                return struct.getInt32(pkFieldName());
            }, sourceRecord -> {
                long longValue = ((Struct) sourceRecord.value()).getStruct("after").getInt64("dt").longValue();
                return List.of(LocalDateTime.ofEpochSecond(longValue / 1000, (int) TimeUnit.MILLISECONDS.toNanos(longValue % 1000), ZoneOffset.UTC).toLocalDate(), LocalDate.ofEpochDay(((Struct) sourceRecord.value()).getStruct("after").getInt32("d").intValue()), LocalTime.ofSecondOfDay(((Struct) sourceRecord.value()).getStruct("after").getInt64("t").longValue() / 1000000));
            }, this.DATABASE.topicForTable("a_dt"), null);
            for (int i2 = 0; i2 < 10; i2++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i2 + 1), List.of(LocalDateTime.parse(String.format("%s-05-01T00:00:00", Integer.valueOf(2000 + i2))).toLocalDate(), LocalDate.parse(String.format("%s-05-01", Integer.valueOf(2000 + i2))), LocalTime.parse(String.format("0%s:00:00", Integer.valueOf(i2)))))});
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5099"})
    public void tableWithZeroDate() throws Exception {
        Testing.Print.enable();
        LogInterceptor logInterceptor = new LogInterceptor(MySqlBinaryProtocolFieldReader.class);
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            databaseConnection.executeWithoutCommitting(new String[]{"INSERT INTO a_date (pk) VALUES (1)"});
            databaseConnection.commit();
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            start(connectorClass(), config().with(MySqlConnectorConfig.SNAPSHOT_FETCH_SIZE, 5).build(), loggingCompletion());
            waitForConnectorToStart();
            waitForAvailableRecords(5L, TimeUnit.SECONDS);
            sendAdHocSnapshotSignal(new String[]{dataCollectionName("a_date")});
            Assertions.assertThat(consumeMixedWithIncrementalSnapshot(1, entry -> {
                return true;
            }, struct -> {
                return struct.getInt32(pkFieldName());
            }, sourceRecord -> {
                return Arrays.asList(((Struct) sourceRecord.value()).getStruct("after").getInt32("d"), ((Struct) sourceRecord.value()).getStruct("after").getInt32("d_opt"));
            }, this.DATABASE.topicForTable("a_date"), null)).contains(new Map.Entry[]{Assertions.entry(1, Arrays.asList(0, null))});
            Assert.assertFalse(logInterceptor.containsWarnMessage("Invalid length when read MySQL DATE value. BIN_LEN_DATE is 0."));
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
