package io.debezium.connector.mysql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlTestConnection;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.engine.DebeziumEngine;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.storage.file.history.FileSchemaHistory;
import io.debezium.storage.kafka.history.KafkaSchemaHistory;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import junit.framework.TestCase;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

@SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = MySqlConnectorIT.ORDERS_TABLE_EVENT_COUNT, minor = 6, reason = "DDL uses fractional second data types, not supported until MySQL 5.6")
/* loaded from: input_file:io/debezium/connector/mysql/MySqlConnectorIT.class */
public class MySqlConnectorIT extends AbstractConnectorTest {
    private static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("file-schema-history-connect.txt").toAbsolutePath();
    private final UniqueDatabase DATABASE = new UniqueDatabase("myServer1", "connector_test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
    private final UniqueDatabase RO_DATABASE = new UniqueDatabase("myServer2", "connector_test_ro", this.DATABASE).withDbHistoryPath(SCHEMA_HISTORY_PATH);
    private static final int PRODUCTS_TABLE_EVENT_COUNT = 9;
    private static final int ORDERS_TABLE_EVENT_COUNT = 5;
    private static final int INITIAL_EVENT_COUNT = 33;
    private Configuration config;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/debezium/connector/mysql/MySqlConnectorIT$BinlogPosition.class */
    public static class BinlogPosition {
        private String binlogFilename;
        private long binlogPosition;
        private String gtidSet;

        protected BinlogPosition() {
        }

        public void readFromDatabase(ResultSet resultSet) throws SQLException {
            if (resultSet.next()) {
                this.binlogFilename = resultSet.getString(1);
                this.binlogPosition = resultSet.getLong(2);
                if (resultSet.getMetaData().getColumnCount() > 4) {
                    this.gtidSet = resultSet.getString(MySqlConnectorIT.ORDERS_TABLE_EVENT_COUNT);
                }
            }
        }

        public String binlogFilename() {
            return this.binlogFilename;
        }

        public long binlogPosition() {
            return this.binlogPosition;
        }

        public String gtidSet() {
            return this.gtidSet;
        }

        public boolean hasGtids() {
            return this.gtidSet != null;
        }

        public String toString() {
            String str = this.binlogFilename;
            long j = this.binlogPosition;
            if (this.gtidSet != null) {
                String str2 = this.gtidSet;
            }
            return "file=" + str + ", pos=" + j + ", gtids=" + str;
        }
    }

    /* loaded from: input_file:io/debezium/connector/mysql/MySqlConnectorIT$NoTombStonesHandler.class */
    private static class NoTombStonesHandler implements DebeziumEngine.ChangeConsumer<SourceRecord> {
        protected BlockingQueue<SourceRecord> recordQueue;

        NoTombStonesHandler(BlockingQueue<SourceRecord> blockingQueue) {
            this.recordQueue = blockingQueue;
        }

        public void handleBatch(List<SourceRecord> list, DebeziumEngine.RecordCommitter<SourceRecord> recordCommitter) throws InterruptedException {
            for (SourceRecord sourceRecord : list) {
                this.recordQueue.offer(sourceRecord);
                recordCommitter.markProcessed(sourceRecord);
            }
        }

        public boolean supportsTombstoneEvents() {
            return false;
        }
    }

    @Before
    public void beforeEach() {
        stopConnector();
        this.DATABASE.createAndInitialize();
        this.RO_DATABASE.createAndInitialize();
        initializeConnectorTestFramework();
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
    }

    @After
    public void afterEach() {
        try {
            stopConnector();
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
        } catch (Throwable th) {
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
            throw th;
        }
    }

    @Test
    public void shouldNotStartWithInvalidConfiguration() {
        this.config = Configuration.create().with(CommonConnectorConfig.TOPIC_PREFIX, "myserver").with(KafkaSchemaHistory.TOPIC, "myserver").with(MySqlConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class).with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH).build();
        this.logger.info("Attempting to start the connector with an INVALID configuration, so MULTIPLE error messages and exceptions will appear in the log");
        start(MySqlConnector.class, this.config, (z, str, th) -> {
            Assertions.assertThat(z).isFalse();
            Assertions.assertThat(th).isNotNull();
        });
        assertConnectorNotRunning();
    }

    @Test
    public void shouldFailToValidateInvalidConfiguration() {
        Config validate = new MySqlConnector().validate(Configuration.create().with(MySqlConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class).with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH).build().asMap());
        assertConfigurationErrors(validate, MySqlConnectorConfig.HOSTNAME, 1);
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.PORT});
        assertConfigurationErrors(validate, MySqlConnectorConfig.USER, 1);
        assertConfigurationErrors(validate, CommonConnectorConfig.TOPIC_PREFIX, 1);
        assertConfigurationErrors(validate, MySqlConnectorConfig.SERVER_ID);
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TABLES_IGNORE_BUILTIN});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DATABASE_INCLUDE_LIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DATABASE_EXCLUDE_LIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TABLE_INCLUDE_LIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TABLE_EXCLUDE_LIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.COLUMN_EXCLUDE_LIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.COLUMN_INCLUDE_LIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.CONNECTION_TIMEOUT_MS});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.KEEP_ALIVE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.MAX_QUEUE_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.MAX_BATCH_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.POLL_INTERVAL_MS});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SCHEMA_HISTORY});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SNAPSHOT_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SNAPSHOT_NEW_TABLES});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_KEYSTORE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_TRUSTSTORE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DECIMAL_HANDLING_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TIME_PRECISION_MODE});
    }

    @Test
    public void shouldValidateAcceptableConfiguration() {
        Config validate = new MySqlConnector().validate(Configuration.create().build().asMap());
        assertConfigurationErrors(validate, MySqlConnectorConfig.HOSTNAME, 1);
        assertConfigurationErrors(validate, MySqlConnectorConfig.USER, 1);
        assertConfigurationErrors(validate, MySqlConnectorConfig.SERVER_ID, 1);
        assertConfigurationErrors(validate, CommonConnectorConfig.TOPIC_PREFIX, 1);
        validateConfigField(validate, MySqlConnectorConfig.PORT, 3306);
        validateConfigField(validate, MySqlConnectorConfig.PASSWORD, null);
        validateConfigField(validate, MySqlConnectorConfig.ON_CONNECT_STATEMENTS, null);
        validateConfigField(validate, MySqlConnectorConfig.TABLES_IGNORE_BUILTIN, Boolean.TRUE);
        validateConfigField(validate, MySqlConnectorConfig.DATABASE_INCLUDE_LIST, null);
        validateConfigField(validate, MySqlConnectorConfig.DATABASE_EXCLUDE_LIST, null);
        validateConfigField(validate, MySqlConnectorConfig.TABLE_INCLUDE_LIST, null);
        validateConfigField(validate, MySqlConnectorConfig.TABLE_EXCLUDE_LIST, null);
        validateConfigField(validate, MySqlConnectorConfig.COLUMN_EXCLUDE_LIST, null);
        validateConfigField(validate, MySqlConnectorConfig.COLUMN_INCLUDE_LIST, null);
        validateConfigField(validate, MySqlConnectorConfig.MSG_KEY_COLUMNS, null);
        validateConfigField(validate, MySqlConnectorConfig.CONNECTION_TIMEOUT_MS, 30000);
        validateConfigField(validate, MySqlConnectorConfig.KEEP_ALIVE, Boolean.TRUE);
        validateConfigField(validate, MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS, 60000L);
        validateConfigField(validate, MySqlConnectorConfig.MAX_QUEUE_SIZE, 8192);
        validateConfigField(validate, MySqlConnectorConfig.MAX_BATCH_SIZE, 2048);
        validateConfigField(validate, MySqlConnectorConfig.POLL_INTERVAL_MS, 500L);
        validateConfigField(validate, MySqlConnectorConfig.SCHEMA_HISTORY, "io.debezium.storage.kafka.history.KafkaSchemaHistory");
        validateConfigField(validate, MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, Boolean.TRUE);
        validateConfigField(validate, MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL);
        validateConfigField(validate, MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE, MySqlConnectorConfig.SnapshotLockingMode.MINIMAL);
        validateConfigField(validate, MySqlConnectorConfig.SNAPSHOT_NEW_TABLES, MySqlConnectorConfig.SnapshotNewTables.OFF);
        validateConfigField(validate, MySqlConnectorConfig.SSL_MODE, MySqlConnectorConfig.SecureConnectionMode.PREFERRED);
        validateConfigField(validate, MySqlConnectorConfig.SSL_KEYSTORE, null);
        validateConfigField(validate, MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD, null);
        validateConfigField(validate, MySqlConnectorConfig.SSL_TRUSTSTORE, null);
        validateConfigField(validate, MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD, null);
        validateConfigField(validate, MySqlConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.PRECISE);
        validateConfigField(validate, MySqlConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS);
    }

    private <T> void validateConfigField(Config config, Field field, T t) {
        assertNoConfigurationErrors(config, new Field[]{field});
        Object value = configValue(config, field.name()).value();
        if (value == null) {
            value = field.defaultValue();
        }
        if (t == null) {
            Assertions.assertThat(value).isNull();
        } else if (t instanceof EnumeratedValue) {
            Assertions.assertThat(((EnumeratedValue) t).getValue()).isEqualTo(value.toString());
        } else {
            Assertions.assertThat(t).isEqualTo(value);
        }
    }

    @Test
    @FixFor({"DBZ-639"})
    public void shouldValidateLockingModeNoneWithValidSnapshotModeConfiguration() {
        Iterator it = ((List) Arrays.stream(MySqlConnectorConfig.SnapshotMode.values()).map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toList())).iterator();
        while (it.hasNext()) {
            Configuration build = this.DATABASE.defaultJdbcConfigBuilder().with(MySqlConnectorConfig.SERVER_ID, 18765).with(CommonConnectorConfig.TOPIC_PREFIX, "myServer").with(KafkaSchemaHistory.BOOTSTRAP_SERVERS, "some.host.com").with(KafkaSchemaHistory.TOPIC, "my.db.history.topic").with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE, MySqlConnectorConfig.SnapshotLockingMode.NONE.getValue()).with(MySqlConnectorConfig.SNAPSHOT_MODE, (String) it.next()).build();
            assertNoConfigurationErrors(new MySqlConnector().validate(build.asMap()), new Field[]{MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE});
            Assertions.assertThat(new MySqlConnectorConfig(build).getSnapshotLockingMode()).isEqualTo(MySqlConnectorConfig.SnapshotLockingMode.NONE);
        }
    }

    private Optional<Header> getPKUpdateNewKeyHeader(SourceRecord sourceRecord) {
        return getHeaderField(sourceRecord, "__debezium.newkey");
    }

    private Optional<Header> getPKUpdateOldKeyHeader(SourceRecord sourceRecord) {
        return getHeaderField(sourceRecord, "__debezium.oldkey");
    }

    private Optional<Header> getHeaderField(SourceRecord sourceRecord, String str) {
        return StreamSupport.stream(sourceRecord.headers().spliterator(), false).filter(header -> {
            return str.equals(header.key());
        }).findFirst();
    }

    @Test
    public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLException, InterruptedException {
        shouldConsumeAllEventsFromDatabaseUsingSnapshotByField(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, 18765);
    }

    @Test
    public void shouldConsumeAllEventsFromDatabaseUsingSnapshotOld() throws SQLException, InterruptedException {
        shouldConsumeAllEventsFromDatabaseUsingSnapshotByField(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, 18775);
    }

    private void shouldConsumeAllEventsFromDatabaseUsingSnapshotByField(Field field, int i) throws SQLException, InterruptedException {
        boolean equals = System.getProperty("database.port", "3306").equals(System.getProperty("database.replica.port", "3306"));
        if (!equals) {
            Thread.sleep(5000L);
        }
        this.config = this.DATABASE.defaultJdbcConfigBuilder().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost")).with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306")).with(MySqlConnectorConfig.SERVER_ID, i).with(CommonConnectorConfig.TOPIC_PREFIX, this.DATABASE.getServerName()).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10).with(field, this.DATABASE.getDatabaseName()).with(MySqlConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH).build();
        start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(39);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.getServerName()).size()).isEqualTo(12);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(PRODUCTS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products_on_hand")).size()).isEqualTo(PRODUCTS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("customers")).size()).isEqualTo(4);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.databaseNames().size()).isEqualTo(2);
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(11);
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase("readbinlog_test")).isNull();
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase("").size()).isEqualTo(1);
        consumeRecordsByTopic.ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).forEach(sourceRecord -> {
            this.print(sourceRecord);
        });
        consumeRecordsByTopic.forEach(sourceRecord2 -> {
            this.validate(sourceRecord2);
        });
        List allRecordsInOrder = consumeRecordsByTopic.allRecordsInOrder();
        SourceRecord sourceRecord3 = (SourceRecord) allRecordsInOrder.get(allRecordsInOrder.size() - 1);
        SourceRecord sourceRecord4 = (SourceRecord) allRecordsInOrder.get(allRecordsInOrder.size() - 2);
        Assertions.assertThat(sourceRecord4.sourceOffset().containsKey("snapshot")).isTrue();
        Assertions.assertThat(sourceRecord3.sourceOffset().containsKey("snapshot")).isFalse();
        Assertions.assertThat(((Struct) sourceRecord4.value()).getStruct("source").getString("snapshot")).isEqualTo("true");
        Assertions.assertThat(((Struct) sourceRecord3.value()).getStruct("source").getString("snapshot")).isEqualTo("last");
        waitForAvailableRecords(3L, TimeUnit.SECONDS);
        System.out.println("TOTAL CONSUMED = " + consumeAvailableRecords(sourceRecord5 -> {
            this.print(sourceRecord5);
        }));
        stopConnector();
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.query("SELECT * FROM products", resultSet -> {
                    if (Testing.Print.isEnabled()) {
                        connect.print(resultSet);
                    }
                });
                connect.execute(new String[]{"INSERT INTO products VALUES (default,'robot','Toy robot',1.304);"});
                connect.query("SELECT * FROM products", resultSet2 -> {
                    if (Testing.Print.isEnabled()) {
                        connect.print(resultSet2);
                    }
                });
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                Testing.print("*** Restarting connector after inserts were made");
                start(MySqlConnector.class, this.config);
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
                Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(1);
                Assertions.assertThat(consumeRecordsByTopic2.topics().size()).isEqualTo(1);
                assertInsert((SourceRecord) consumeRecordsByTopic2.recordsForTopic(this.DATABASE.topicForTable("products")).get(0), "id", 110);
                Testing.print("*** Done with inserts and restart");
                Testing.print("*** Stopping connector");
                stopConnector();
                Testing.print("*** Restarting connector");
                start(MySqlConnector.class, this.config);
                MySqlTestConnection forTestDatabase2 = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                try {
                    JdbcConnection connect2 = forTestDatabase2.connect();
                    try {
                        connect2.execute(new String[]{"INSERT INTO products VALUES (1001,'roy','old robot',1234.56);"});
                        connect2.query("SELECT * FROM products", resultSet3 -> {
                            if (Testing.Print.isEnabled()) {
                                connect2.print(resultSet3);
                            }
                        });
                        if (connect2 != null) {
                            connect2.close();
                        }
                        if (forTestDatabase2 != null) {
                            forTestDatabase2.close();
                        }
                        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(1);
                        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(1);
                        Assertions.assertThat(consumeRecordsByTopic3.topics().size()).isEqualTo(1);
                        assertInsert((SourceRecord) consumeRecordsByTopic3.recordsForTopic(this.DATABASE.topicForTable("products")).get(0), "id", 1001);
                        Testing.print("*** Done with simple insert");
                        MySqlTestConnection forTestDatabase3 = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                        try {
                            JdbcConnection connect3 = forTestDatabase3.connect();
                            try {
                                connect3.execute(new String[]{"UPDATE products SET id=2001, description='really old robot' WHERE id=1001"});
                                connect3.query("SELECT * FROM products", resultSet4 -> {
                                    if (Testing.Print.isEnabled()) {
                                        connect3.print(resultSet4);
                                    }
                                });
                                if (connect3 != null) {
                                    connect3.close();
                                }
                                if (forTestDatabase3 != null) {
                                    forTestDatabase3.close();
                                }
                                List recordsForTopic = consumeRecordsByTopic(3).recordsForTopic(this.DATABASE.topicForTable("products"));
                                Assertions.assertThat(recordsForTopic.size()).isEqualTo(3);
                                SourceRecord sourceRecord6 = (SourceRecord) recordsForTopic.get(0);
                                assertDelete(sourceRecord6, "id", 1001);
                                TestCase.assertEquals(2001, ((Struct) getPKUpdateNewKeyHeader(sourceRecord6).get().value()).getInt32("id"));
                                assertTombstone((SourceRecord) recordsForTopic.get(1), "id", 1001);
                                SourceRecord sourceRecord7 = (SourceRecord) recordsForTopic.get(2);
                                assertInsert(sourceRecord7, "id", 2001);
                                TestCase.assertEquals(1001, ((Struct) getPKUpdateOldKeyHeader(sourceRecord7).get().value()).getInt32("id"));
                                Testing.print("*** Done with PK change");
                                MySqlTestConnection forTestDatabase4 = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                                try {
                                    JdbcConnection connect4 = forTestDatabase4.connect();
                                    try {
                                        connect4.execute(new String[]{"UPDATE products SET weight=1345.67 WHERE id=2001"});
                                        connect4.query("SELECT * FROM products", resultSet5 -> {
                                            if (Testing.Print.isEnabled()) {
                                                connect4.print(resultSet5);
                                            }
                                        });
                                        if (connect4 != null) {
                                            connect4.close();
                                        }
                                        if (forTestDatabase4 != null) {
                                            forTestDatabase4.close();
                                        }
                                        AbstractConnectorTest.SourceRecords consumeRecordsByTopic4 = consumeRecordsByTopic(1);
                                        Assertions.assertThat(consumeRecordsByTopic4.topics().size()).isEqualTo(1);
                                        List recordsForTopic2 = consumeRecordsByTopic4.recordsForTopic(this.DATABASE.topicForTable("products"));
                                        Assertions.assertThat(recordsForTopic2.size()).isEqualTo(1);
                                        assertUpdate((SourceRecord) recordsForTopic2.get(0), "id", 2001);
                                        recordsForTopic2.forEach(sourceRecord8 -> {
                                            this.validate(sourceRecord8);
                                        });
                                        Testing.print("*** Done with simple update");
                                        forTestDatabase2 = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                                        try {
                                            connect = forTestDatabase2.connect();
                                            try {
                                                connect.execute(new String[]{String.format("ALTER TABLE %s.products ADD COLUMN volume FLOAT, ADD COLUMN alias VARCHAR(30) NULL AFTER description", this.DATABASE.getDatabaseName())});
                                                connect.execute(new String[]{"UPDATE products SET volume=13.5 WHERE id=2001"});
                                                connect.query("SELECT * FROM products", resultSet6 -> {
                                                    if (Testing.Print.isEnabled()) {
                                                        connect.print(resultSet6);
                                                    }
                                                });
                                                if (connect != null) {
                                                    connect.close();
                                                }
                                                if (forTestDatabase2 != null) {
                                                    forTestDatabase2.close();
                                                }
                                                AbstractConnectorTest.SourceRecords consumeRecordsByTopic5 = consumeRecordsByTopic(2);
                                                Assertions.assertThat(consumeRecordsByTopic5.topics().size()).isEqualTo(2);
                                                Assertions.assertThat(consumeRecordsByTopic5.recordsForTopic(this.DATABASE.getServerName()).size()).isEqualTo(1);
                                                List recordsForTopic3 = consumeRecordsByTopic5.recordsForTopic(this.DATABASE.topicForTable("products"));
                                                Assertions.assertThat(recordsForTopic3.size()).isEqualTo(1);
                                                assertUpdate((SourceRecord) recordsForTopic3.get(0), "id", 2001);
                                                recordsForTopic3.forEach(sourceRecord9 -> {
                                                    this.validate(sourceRecord9);
                                                });
                                                Testing.print("*** Done with schema change (same db and fully-qualified name)");
                                                forTestDatabase = MySqlTestConnection.forTestDatabase("emptydb");
                                                try {
                                                    JdbcConnection connect5 = forTestDatabase.connect();
                                                    try {
                                                        connect5.execute(new String[]{String.format("CREATE TABLE %s.stores ( id INT(11) PRIMARY KEY NOT NULL AUTO_INCREMENT, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL );", this.DATABASE.getDatabaseName())});
                                                        if (connect5 != null) {
                                                            connect5.close();
                                                        }
                                                        if (forTestDatabase != null) {
                                                            forTestDatabase.close();
                                                        }
                                                        AbstractConnectorTest.SourceRecords consumeRecordsByTopic6 = consumeRecordsByTopic(1);
                                                        Assertions.assertThat(consumeRecordsByTopic6.topics().size()).isEqualTo(1);
                                                        Assertions.assertThat(consumeRecordsByTopic6.recordsForTopic(this.DATABASE.getServerName()).size()).isEqualTo(1);
                                                        consumeRecordsByTopic6.recordsForTopic(this.DATABASE.getServerName()).forEach(sourceRecord10 -> {
                                                            this.validate(sourceRecord10);
                                                        });
                                                        Testing.print("*** Done with PK change (different db and fully-qualified name)");
                                                        MySqlTestConnection forTestDatabase5 = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                                                        try {
                                                            JdbcConnection connect6 = forTestDatabase5.connect();
                                                            try {
                                                                connect6.execute(new String[]{"UPDATE products_on_hand SET quantity=20 WHERE product_id=109"});
                                                                connect6.query("SELECT * FROM products_on_hand", resultSet7 -> {
                                                                    if (Testing.Print.isEnabled()) {
                                                                        connect6.print(resultSet7);
                                                                    }
                                                                });
                                                                if (connect6 != null) {
                                                                    connect6.close();
                                                                }
                                                                if (forTestDatabase5 != null) {
                                                                    forTestDatabase5.close();
                                                                }
                                                                AbstractConnectorTest.SourceRecords consumeRecordsByTopic7 = consumeRecordsByTopic(1);
                                                                Assertions.assertThat(consumeRecordsByTopic7.topics().size()).isEqualTo(1);
                                                                List recordsForTopic4 = consumeRecordsByTopic7.recordsForTopic(this.DATABASE.topicForTable("products_on_hand"));
                                                                Assertions.assertThat(recordsForTopic4.size()).isEqualTo(1);
                                                                assertUpdate((SourceRecord) recordsForTopic4.get(0), "product_id", 109);
                                                                recordsForTopic4.forEach(sourceRecord11 -> {
                                                                    this.validate(sourceRecord11);
                                                                });
                                                                Testing.print("*** Done with verifying no additional events");
                                                                stopConnector();
                                                                Testing.print("*** Restarting connector");
                                                                EmbeddedEngine.CompletionResult completionResult = new EmbeddedEngine.CompletionResult();
                                                                start(MySqlConnector.class, this.config, completionResult, sourceRecord12 -> {
                                                                    return ((Number) ((Struct) sourceRecord12.key()).get("id")).intValue() == 3003;
                                                                });
                                                                BinlogPosition binlogPosition = new BinlogPosition();
                                                                BinlogPosition binlogPosition2 = new BinlogPosition();
                                                                BinlogPosition binlogPosition3 = new BinlogPosition();
                                                                MySqlTestConnection forTestDatabase6 = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                                                                try {
                                                                    JdbcConnection connect7 = forTestDatabase6.connect();
                                                                    try {
                                                                        Objects.requireNonNull(binlogPosition);
                                                                        connect7.query("SHOW MASTER STATUS", binlogPosition::readFromDatabase);
                                                                        connect7.execute(new String[]{"INSERT INTO products(id,name,description,weight,volume,alias) VALUES (3001,'ashley','super robot',34.56,0.00,'ashbot'), (3002,'arthur','motorcycle',87.65,0.00,'arcycle'), (3003,'oak','tree',987.65,0.00,'oak');"});
                                                                        connect7.query("SELECT * FROM products", resultSet8 -> {
                                                                            if (Testing.Print.isEnabled()) {
                                                                                connect7.print(resultSet8);
                                                                            }
                                                                        });
                                                                        Objects.requireNonNull(binlogPosition2);
                                                                        connect7.query("SHOW MASTER STATUS", binlogPosition2::readFromDatabase);
                                                                        connect7.execute(new String[]{"UPDATE products_on_hand SET quantity=40 WHERE product_id=109"});
                                                                        connect7.query("SELECT * FROM products_on_hand", resultSet9 -> {
                                                                            if (Testing.Print.isEnabled()) {
                                                                                connect7.print(resultSet9);
                                                                            }
                                                                        });
                                                                        Objects.requireNonNull(binlogPosition3);
                                                                        connect7.query("SHOW MASTER STATUS", binlogPosition3::readFromDatabase);
                                                                        if (connect7 != null) {
                                                                            connect7.close();
                                                                        }
                                                                        if (forTestDatabase6 != null) {
                                                                            forTestDatabase6.close();
                                                                        }
                                                                        AbstractConnectorTest.SourceRecords consumeRecordsByTopic8 = consumeRecordsByTopic(2);
                                                                        Assertions.assertThat(consumeRecordsByTopic8.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(2);
                                                                        Assertions.assertThat(consumeRecordsByTopic8.topics().size()).isEqualTo(1);
                                                                        List recordsForTopic5 = consumeRecordsByTopic8.recordsForTopic(this.DATABASE.topicForTable("products"));
                                                                        assertInsert((SourceRecord) recordsForTopic5.get(0), "id", 3001);
                                                                        assertInsert((SourceRecord) recordsForTopic5.get(1), "id", 3002);
                                                                        completionResult.await(10L, TimeUnit.SECONDS);
                                                                        Assertions.assertThat(completionResult.hasCompleted()).isTrue();
                                                                        Assertions.assertThat(completionResult.hasError()).isTrue();
                                                                        Assertions.assertThat(completionResult.success()).isFalse();
                                                                        assertNoRecordsToConsume();
                                                                        assertConnectorNotRunning();
                                                                        stopConnector();
                                                                        String string = this.config.getString(CommonConnectorConfig.TOPIC_PREFIX);
                                                                        MySqlOffsetContext.Loader loader = new MySqlOffsetContext.Loader(new MySqlConnectorConfig(Configuration.create().with(CommonConnectorConfig.TOPIC_PREFIX, string).build()));
                                                                        Map readLastCommittedOffset = readLastCommittedOffset(this.config, new MySqlPartition(string, this.DATABASE.getDatabaseName()).getSourcePartition());
                                                                        MySqlOffsetContext load = loader.load(readLastCommittedOffset);
                                                                        SourceInfo source = load.getSource();
                                                                        Testing.print("Position before inserts: " + binlogPosition);
                                                                        Testing.print("Position after inserts:  " + binlogPosition2);
                                                                        Testing.print("Offset: " + readLastCommittedOffset);
                                                                        Testing.print("Position after update:  " + binlogPosition3);
                                                                        if (equals) {
                                                                            Assertions.assertThat(source.binlogFilename()).isEqualTo(binlogPosition.binlogFilename());
                                                                            Assertions.assertThat(source.binlogFilename()).isEqualTo(binlogPosition2.binlogFilename());
                                                                            MySqlTestConnection.MySqlVersion mySqlVersion = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName()).getMySqlVersion();
                                                                            if (mySqlVersion == MySqlTestConnection.MySqlVersion.MYSQL_5_5 || mySqlVersion == MySqlTestConnection.MySqlVersion.MYSQL_5_6) {
                                                                                Assertions.assertThat(source.binlogPosition()).isGreaterThanOrEqualTo(binlogPosition.binlogPosition());
                                                                            } else {
                                                                                Assertions.assertThat(source.binlogPosition()).isGreaterThan(binlogPosition.binlogPosition());
                                                                            }
                                                                            Assertions.assertThat(source.binlogPosition()).isLessThan(binlogPosition2.binlogPosition());
                                                                        }
                                                                        Assertions.assertThat(load.eventsToSkipUponRestart()).isEqualTo(2L);
                                                                        Testing.print("*** Restarting connector, and should begin with inserting 3003 (not 109!)");
                                                                        start(MySqlConnector.class, this.config);
                                                                        AbstractConnectorTest.SourceRecords consumeRecordsByTopic9 = consumeRecordsByTopic(1);
                                                                        Assertions.assertThat(consumeRecordsByTopic9.topics().size()).isEqualTo(1);
                                                                        List recordsForTopic6 = consumeRecordsByTopic9.recordsForTopic(this.DATABASE.topicForTable("products"));
                                                                        if (recordsForTopic6 == null && consumeRecordsByTopic9.recordsForTopic(this.DATABASE.topicForTable("products_on_hand")) != null) {
                                                                            Assert.fail("Restarted connector and missed the insert of product id=3003!");
                                                                        }
                                                                        SourceRecord sourceRecord13 = (SourceRecord) recordsForTopic6.get(0);
                                                                        assertInsert(sourceRecord13, "id", 3003);
                                                                        assertOffset(sourceRecord13, "file", readLastCommittedOffset.get("file"));
                                                                        assertOffset(sourceRecord13, "pos", readLastCommittedOffset.get("pos"));
                                                                        assertOffset(sourceRecord13, "row", 3);
                                                                        assertOffset(sourceRecord13, "event", readLastCommittedOffset.get("event"));
                                                                        assertValueField(sourceRecord13, "after/id", 3003);
                                                                        assertValueField(sourceRecord13, "after/name", "oak");
                                                                        assertValueField(sourceRecord13, "after/description", "tree");
                                                                        assertValueField(sourceRecord13, "after/weight", Double.valueOf(987.65d));
                                                                        assertValueField(sourceRecord13, "after/volume", Double.valueOf(0.0d));
                                                                        assertValueField(sourceRecord13, "after/alias", "oak");
                                                                        AbstractConnectorTest.SourceRecords consumeRecordsByTopic10 = consumeRecordsByTopic(1);
                                                                        Assertions.assertThat(consumeRecordsByTopic10.topics().size()).isEqualTo(1);
                                                                        List recordsForTopic7 = consumeRecordsByTopic10.recordsForTopic(this.DATABASE.topicForTable("products_on_hand"));
                                                                        Assertions.assertThat(recordsForTopic7.size()).isEqualTo(1);
                                                                        assertUpdate((SourceRecord) recordsForTopic7.get(0), "product_id", 109);
                                                                        recordsForTopic7.forEach(sourceRecord14 -> {
                                                                            this.validate(sourceRecord14);
                                                                        });
                                                                        Testing.print("*** Done with simple insert");
                                                                    } finally {
                                                                        if (connect7 != null) {
                                                                            try {
                                                                                connect7.close();
                                                                            } catch (Throwable th) {
                                                                                th.addSuppressed(th);
                                                                            }
                                                                        }
                                                                    }
                                                                } finally {
                                                                    if (forTestDatabase6 != null) {
                                                                        try {
                                                                            forTestDatabase6.close();
                                                                        } catch (Throwable th2) {
                                                                            th.addSuppressed(th2);
                                                                        }
                                                                    }
                                                                }
                                                            } finally {
                                                                if (connect6 != null) {
                                                                    try {
                                                                        connect6.close();
                                                                    } catch (Throwable th3) {
                                                                        th.addSuppressed(th3);
                                                                    }
                                                                }
                                                            }
                                                        } finally {
                                                            if (forTestDatabase5 != null) {
                                                                try {
                                                                    forTestDatabase5.close();
                                                                } catch (Throwable th4) {
                                                                    th.addSuppressed(th4);
                                                                }
                                                            }
                                                        }
                                                    } finally {
                                                        if (connect5 != null) {
                                                            try {
                                                                connect5.close();
                                                            } catch (Throwable th5) {
                                                                th.addSuppressed(th5);
                                                            }
                                                        }
                                                    }
                                                } finally {
                                                    if (forTestDatabase != null) {
                                                        try {
                                                            forTestDatabase.close();
                                                        } catch (Throwable th6) {
                                                            th.addSuppressed(th6);
                                                        }
                                                    }
                                                }
                                            } finally {
                                                if (connect != null) {
                                                    try {
                                                        connect.close();
                                                    } catch (Throwable th7) {
                                                        th.addSuppressed(th7);
                                                    }
                                                }
                                            }
                                        } finally {
                                            if (forTestDatabase2 != null) {
                                                try {
                                                    forTestDatabase2.close();
                                                } catch (Throwable th8) {
                                                    th.addSuppressed(th8);
                                                }
                                            }
                                        }
                                    } finally {
                                        if (connect4 != null) {
                                            try {
                                                connect4.close();
                                            } catch (Throwable th9) {
                                                th.addSuppressed(th9);
                                            }
                                        }
                                    }
                                } finally {
                                    if (forTestDatabase4 != null) {
                                        try {
                                            forTestDatabase4.close();
                                        } catch (Throwable th10) {
                                            th.addSuppressed(th10);
                                        }
                                    }
                                }
                            } finally {
                                if (connect3 != null) {
                                    try {
                                        connect3.close();
                                    } catch (Throwable th11) {
                                        th.addSuppressed(th11);
                                    }
                                }
                            }
                        } finally {
                            if (forTestDatabase3 != null) {
                                try {
                                    forTestDatabase3.close();
                                } catch (Throwable th12) {
                                    th.addSuppressed(th12);
                                }
                            }
                        }
                    } finally {
                        if (connect2 != null) {
                            try {
                                connect2.close();
                            } catch (Throwable th13) {
                                th.addSuppressed(th13);
                            }
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void shouldUseOverriddenSelectStatementDuringSnapshotting() throws SQLException, InterruptedException {
        if (!System.getProperty("database.port", "3306").equals(System.getProperty("database.replica.port", "3306"))) {
            Thread.sleep(5000L);
        }
        this.config = this.DATABASE.defaultJdbcConfigBuilder().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost")).with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306")).with(MySqlConnectorConfig.SERVER_ID, 28765).with(CommonConnectorConfig.TOPIC_PREFIX, this.DATABASE.getServerName()).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, this.DATABASE.getDatabaseName()).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.getDatabaseName() + ".products").with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, this.DATABASE.getDatabaseName() + ".products").with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + this.DATABASE.getDatabaseName() + ".products", String.format("SELECT * from %s.products where id>=108 order by id", this.DATABASE.getDatabaseName())).with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true).with(MySqlConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH).build();
        start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(8);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.getServerName()).size()).isEqualTo(6);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(2);
        Assertions.assertThat(((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(0)).key()).getInt32("id")).isEqualTo(108);
        Assertions.assertThat(((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(1)).key()).getInt32("id")).isEqualTo(109);
        consumeRecordsByTopic.forEach(sourceRecord -> {
            this.validate(sourceRecord);
        });
    }

    @Test
    public void shouldUseMultipleOverriddenSelectStatementsDuringSnapshotting() throws SQLException, InterruptedException {
        if (!System.getProperty("database.port", "3306").equals(System.getProperty("database.replica.port", "3306"))) {
            Thread.sleep(5000L);
        }
        String format = String.format("%s.products,%s.products_on_hand", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName());
        this.config = this.DATABASE.defaultJdbcConfigBuilder().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost")).with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306")).with(MySqlConnectorConfig.SERVER_ID, 28765).with(CommonConnectorConfig.TOPIC_PREFIX, this.DATABASE.getServerName()).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, this.DATABASE.getDatabaseName()).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, format).with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true).with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, format).with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + this.DATABASE.getDatabaseName() + ".products", String.format("SELECT * from %s.products where id>=108 order by id", this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + this.DATABASE.getDatabaseName() + ".products_on_hand", String.format("SELECT * from %s.products_on_hand where product_id>=108 order by product_id", this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH).build();
        start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(12);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.getServerName()).size()).isEqualTo(8);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(2);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products_on_hand")).size()).isEqualTo(2);
        Assertions.assertThat(((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(0)).key()).getInt32("id")).isEqualTo(108);
        Assertions.assertThat(((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(1)).key()).getInt32("id")).isEqualTo(109);
        Assertions.assertThat(((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products_on_hand")).get(0)).key()).getInt32("product_id")).isEqualTo(108);
        Assertions.assertThat(((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products_on_hand")).get(1)).key()).getInt32("product_id")).isEqualTo(109);
        consumeRecordsByTopic.forEach(sourceRecord -> {
            this.validate(sourceRecord);
        });
    }

    @Test
    @FixFor({"DBZ-977"})
    public void shouldIgnoreAlterTableForNonCapturedTablesNotStoredInHistory() throws SQLException, InterruptedException {
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, String.format("%s.customers", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true).build();
        start(MySqlConnector.class, this.config);
        Assertions.assertThat(consumeRecordsByTopic(6).ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"ALTER TABLE orders ADD COLUMN (newcol INT)"});
                connect.execute(new String[]{"ALTER TABLE customers ADD COLUMN (newcol INT)"});
                connect.execute(new String[]{"INSERT INTO customers VALUES (default,'name','surname','email',1);"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
                Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("customers")).size()).isEqualTo(1);
                Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(1);
                stopConnector();
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-1201"})
    public void shouldSaveSetCharacterSetWhenStoringOnlyCapturededTables() throws SQLException, InterruptedException {
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, "no_" + this.DATABASE.getDatabaseName()).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true).build();
        start(MySqlConnector.class, this.config);
        Assertions.assertThat(consumeRecordsByTopic(1).ddlRecordsForDatabase("").size()).isEqualTo(1);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1246"})
    public void shouldProcessCreateUniqueIndex() throws SQLException, InterruptedException {
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, String.format("%s.migration_test", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build();
        start(MySqlConnector.class, this.config);
        waitForStreamingRunning(this.DATABASE.getServerName());
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"create table migration_test (id varchar(20) null,mgb_no varchar(20) null)", "create unique index migration_test_mgb_no_uindex on migration_test (mgb_no)", "insert into migration_test values(1,'2')"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(15);
                List recordsForTopic = consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("migration_test"));
                Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
                Assertions.assertThat(((SourceRecord) recordsForTopic.get(0)).key()).isNull();
                Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(13);
                forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                try {
                    connect = forTestDatabase.connect();
                    try {
                        connect.execute(new String[]{"alter table migration_test change column mgb_no mgb_no varchar(20) not null", "alter table migration_test drop index migration_test_mgb_no_uindex", "create unique index migration_test_mgb_no_uindex on migration_test (mgb_no)", "insert into migration_test values(2,'3')"});
                        if (connect != null) {
                            connect.close();
                        }
                        if (forTestDatabase != null) {
                            forTestDatabase.close();
                        }
                        List recordsForTopic2 = consumeRecordsByTopic(4).recordsForTopic(this.DATABASE.topicForTable("migration_test"));
                        Assertions.assertThat(recordsForTopic2.size()).isEqualTo(1);
                        Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic2.get(0)).key()).getString("mgb_no")).isEqualTo("3");
                        stopConnector();
                    } finally {
                        if (connect != null) {
                            try {
                                connect.close();
                            } catch (Throwable th) {
                                th.addSuppressed(th);
                            }
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    @FixFor({"DBZ-977"})
    public void shouldIgnoreAlterTableForNonCapturedTablesStoredInHistory() throws SQLException, InterruptedException {
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, String.format("%s.customers", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build();
        dropDatabases();
        start(MySqlConnector.class, this.config);
        Assertions.assertThat(consumeRecordsByTopic(12).ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(11);
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"ALTER TABLE orders ADD COLUMN (newcol INT)"});
                connect.execute(new String[]{"ALTER TABLE customers ADD COLUMN (newcol INT)"});
                connect.execute(new String[]{"INSERT INTO customers VALUES (default,'name','surname','email',1);"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(3);
                Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("customers")).size()).isEqualTo(1);
                Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(2);
                stopConnector();
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-1264"})
    public void shouldIgnoreCreateIndexForNonCapturedTablesNotStoredInHistory() throws SQLException, InterruptedException {
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, String.format("%s.customers", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true).build();
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"CREATE TABLE nonmon (id INT)"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                start(MySqlConnector.class, this.config);
                Assertions.assertThat(consumeRecordsByTopic(6).ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
                forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                try {
                    connect = forTestDatabase.connect();
                    try {
                        connect.execute(new String[]{"CREATE UNIQUE INDEX pk ON nonmon(id)", "INSERT INTO customers VALUES (default,'name','surname','email');"});
                        if (connect != null) {
                            connect.close();
                        }
                        if (forTestDatabase != null) {
                            forTestDatabase.close();
                        }
                        Assertions.assertThat(consumeRecord().topic()).isEqualTo(this.DATABASE.topicForTable("customers"));
                    } finally {
                        if (connect != null) {
                            try {
                                connect.close();
                            } catch (Throwable th) {
                                th.addSuppressed(th);
                            }
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    @FixFor({"DBZ-683"})
    public void shouldReceiveSchemaForNonWhitelistedTablesAndDatabases() throws SQLException, InterruptedException {
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, String.format("%s.customers,%s.orders", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, ".*").build();
        dropDatabases();
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase("mysql");
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"CREATE DATABASE non_wh", "USE non_wh", "CREATE TABLE t1 (ID INT PRIMARY KEY)"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                start(MySqlConnector.class, this.config);
                Assertions.assertThat(consumeRecordsByTopic(17).ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(11);
                stopConnector();
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-1546"})
    public void shouldHandleIncludeListTables() throws SQLException, InterruptedException {
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, String.format("%s.customers, %s.orders", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, ".*").build();
        dropDatabases();
        start(MySqlConnector.class, this.config);
        Assertions.assertThat(consumeRecordsByTopic(17).ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(11);
        stopConnector();
    }

    @Test
    public void shouldHandleIncludedTables() throws SQLException, InterruptedException {
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, String.format("%s.customers, %s.orders", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, ".*").build();
        dropDatabases();
        start(MySqlConnector.class, this.config);
        Assertions.assertThat(consumeRecordsByTopic(17).ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(11);
        stopConnector();
    }

    private void dropDatabases() throws SQLException {
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase("mysql");
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.query("SHOW DATABASES", resultSet -> {
                    while (resultSet.next()) {
                        String string = resultSet.getString(1);
                        if (!MySqlConnectorConfig.isBuiltInDatabase(string) && !string.equals(this.DATABASE.getDatabaseName())) {
                            connect.execute(new String[]{"DROP DATABASE IF EXISTS `" + string + "`"});
                        }
                    }
                });
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private Struct getAfter(SourceRecord sourceRecord) {
        return (Struct) ((Struct) sourceRecord.value()).get("after");
    }

    @Test
    public void shouldConsumeEventsWithNoSnapshot() throws SQLException, InterruptedException {
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        this.config = this.RO_DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build();
        start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(INITIAL_EVENT_COUNT);
        Assertions.assertThat(recordsForTopicForRoProductsTable(consumeRecordsByTopic).size()).isEqualTo(PRODUCTS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("products_on_hand")).size()).isEqualTo(PRODUCTS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("customers")).size()).isEqualTo(4);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("orders")).size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("Products")).size()).isEqualTo(PRODUCTS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase(this.RO_DATABASE.getDatabaseName()).size()).isEqualTo(6);
        Optional findFirst = consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("Products")).stream().filter(sourceRecord -> {
            return "hammer2".equals(getAfter(sourceRecord).get("name"));
        }).findFirst();
        Assertions.assertThat(findFirst.isPresent());
        Assertions.assertThat(getAfter((SourceRecord) findFirst.get()).get("weight")).isEqualTo(Float.valueOf(0.875f));
        consumeRecordsByTopic.forEach(sourceRecord2 -> {
            this.validate(sourceRecord2);
        });
        stopConnector();
        consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("orders")).forEach(sourceRecord3 -> {
            print(sourceRecord3);
        });
        consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("customers")).forEach(sourceRecord4 -> {
            print(sourceRecord4);
        });
    }

    @Test
    @FixFor({"DBZ-1962"})
    public void shouldConsumeEventsWithIncludedColumns() throws SQLException, InterruptedException {
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        this.config = this.RO_DATABASE.defaultConfig().with(MySqlConnectorConfig.COLUMN_INCLUDE_LIST, this.RO_DATABASE.qualifiedTableName("orders") + ".order_number").with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).build();
        start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(28);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("orders")).size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        consumeRecordsByTopic.forEach(sourceRecord -> {
            this.validate(sourceRecord);
        });
        stopConnector();
        consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("orders")).forEach(sourceRecord2 -> {
            print(sourceRecord2);
            Struct struct = ((Struct) sourceRecord2.value()).getStruct("after");
            try {
                struct.get("order_number");
            } catch (DataException e) {
                Assert.fail("The 'order_number' field was not found but should exist");
            }
            try {
                struct.get("order_date");
                Assert.fail("The 'order_date' field was found but should be filtered");
            } catch (DataException e2) {
            }
        });
    }

    @Test
    @FixFor({"DBZ-2525"})
    public void shouldConsumeEventsWithIncludedColumnsForKeywordNamedTable() throws SQLException, InterruptedException {
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.RO_DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{String.format("CREATE TABLE %s.`order` ( id INT(11) PRIMARY KEY NOT NULL AUTO_INCREMENT, `select` VARCHAR(255) NOT NULL, not_included VARCHAR(255) NOT NULL);", this.RO_DATABASE.getDatabaseName())});
                connect.execute(new String[]{String.format("INSERT INTO %s.`order` VALUES (100001,'included','not included');", this.RO_DATABASE.getDatabaseName())});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                this.config = this.RO_DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.RO_DATABASE.qualifiedTableName("order")).with(MySqlConnectorConfig.COLUMN_INCLUDE_LIST, this.RO_DATABASE.qualifiedTableName("order") + ".select").with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).build();
                start(MySqlConnector.class, this.config);
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
                Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("order")).size()).isEqualTo(1);
                Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(1);
                consumeRecordsByTopic.forEach(sourceRecord -> {
                    this.validate(sourceRecord);
                });
                stopConnector();
                consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("order")).forEach(sourceRecord2 -> {
                    print(sourceRecord2);
                    Struct struct = ((Struct) sourceRecord2.value()).getStruct("after");
                    try {
                        struct.get("select");
                    } catch (DataException e) {
                        Assert.fail("The 'select' field was not found but should exist");
                    }
                    try {
                        struct.get("not_included");
                        Assert.fail("The 'not_included' field was found but should be filtered");
                    } catch (DataException e2) {
                    }
                });
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldConsumeEventsWithMaskedAndBlacklistedColumns() throws SQLException, InterruptedException {
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        this.config = this.RO_DATABASE.defaultConfig().with(MySqlConnectorConfig.COLUMN_EXCLUDE_LIST, this.RO_DATABASE.qualifiedTableName("orders") + ".order_number").with("column.mask.with.12.chars", this.RO_DATABASE.qualifiedTableName("customers") + ".email").with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).build();
        start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(28);
        Assertions.assertThat(recordsForTopicForRoProductsTable(consumeRecordsByTopic).size()).isEqualTo(PRODUCTS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("products_on_hand")).size()).isEqualTo(PRODUCTS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("customers")).size()).isEqualTo(4);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("orders")).size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        consumeRecordsByTopic.forEach(sourceRecord -> {
            this.validate(sourceRecord);
        });
        stopConnector();
        consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("orders")).forEach(sourceRecord2 -> {
            print(sourceRecord2);
            try {
                ((Struct) sourceRecord2.value()).getStruct("after").get("order_number");
                Assert.fail("The 'order_number' field was found but should not exist");
            } catch (DataException e) {
            }
        });
        consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("customers")).forEach(sourceRecord3 -> {
            Struct struct = (Struct) sourceRecord3.value();
            if (struct.getStruct("after") != null) {
                Assertions.assertThat(struct.getStruct("after").getString("email")).isEqualTo("************");
            }
            if (struct.getStruct("before") != null) {
                Assertions.assertThat(struct.getStruct("before").getString("email")).isEqualTo("************");
            }
            print(sourceRecord3);
        });
    }

    @Test
    @FixFor({"DBZ-1692"})
    public void shouldConsumeEventsWithMaskedHashedColumns() throws InterruptedException {
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        this.config = this.RO_DATABASE.defaultConfig().with("column.mask.hash.SHA-256.with.salt.CzQMA0cB5K", this.RO_DATABASE.qualifiedTableName("customers") + ".email").with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).build();
        start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(28);
        Assertions.assertThat(recordsForTopicForRoProductsTable(consumeRecordsByTopic)).hasSize(PRODUCTS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("products_on_hand"))).hasSize(PRODUCTS_TABLE_EVENT_COUNT);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("customers"));
        Assertions.assertThat(recordsForTopic).hasSize(4);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("orders"))).hasSize(ORDERS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.topics()).hasSize(ORDERS_TABLE_EVENT_COUNT);
        consumeRecordsByTopic.forEach(sourceRecord -> {
            this.validate(sourceRecord);
        });
        stopConnector();
        Struct struct = (Struct) ((SourceRecord) recordsForTopic.get(0)).value();
        if (struct.getStruct("after") != null) {
            Assertions.assertThat(struct.getStruct("after").getString("email")).isEqualTo("d540e71abf15be8b51c7967397ba359db27d6f6ae85a297fe8d0d7005ffd0e82");
        }
        Struct struct2 = (Struct) ((SourceRecord) recordsForTopic.get(1)).value();
        if (struct2.getStruct("after") != null) {
            Assertions.assertThat(struct2.getStruct("after").getString("email")).isEqualTo("b1f1a1a63559c1d3a98bd7bb5c363d7e21a37463a7266bc2ff341eaef7ac8ef3");
        }
        Struct struct3 = (Struct) ((SourceRecord) recordsForTopic.get(2)).value();
        if (struct3.getStruct("after") != null) {
            Assertions.assertThat(struct3.getStruct("after").getString("email")).isEqualTo("bbe1de7b1068bc8f86bbb19f432ce1d44fbd461339916f42544b3f7ebff674d6");
        }
        Struct struct4 = (Struct) ((SourceRecord) recordsForTopic.get(3)).value();
        if (struct4.getStruct("after") != null) {
            Assertions.assertThat(struct4.getStruct("after").getString("email")).isEqualTo("ff21be44fb224e57d822ea9a51d343d77e4c49ac3dedd3d144024ac2012af0a1");
        }
    }

    @Test
    @FixFor({"DBZ-1972"})
    public void shouldConsumeEventsWithTruncatedColumns() throws InterruptedException {
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        this.config = this.RO_DATABASE.defaultConfig().with("column.truncate.to.7.chars", this.RO_DATABASE.qualifiedTableName("customers") + ".email").with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).build();
        start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(28);
        Assertions.assertThat(recordsForTopicForRoProductsTable(consumeRecordsByTopic)).hasSize(PRODUCTS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("products_on_hand"))).hasSize(PRODUCTS_TABLE_EVENT_COUNT);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("customers"));
        Assertions.assertThat(recordsForTopic).hasSize(4);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("orders"))).hasSize(ORDERS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.topics()).hasSize(ORDERS_TABLE_EVENT_COUNT);
        consumeRecordsByTopic.forEach(sourceRecord -> {
            this.validate(sourceRecord);
        });
        stopConnector();
        Struct struct = (Struct) ((SourceRecord) recordsForTopic.get(0)).value();
        if (struct.getStruct("after") != null) {
            Assertions.assertThat(struct.getStruct("after").getString("email")).isEqualTo("sally.t");
        }
        Struct struct2 = (Struct) ((SourceRecord) recordsForTopic.get(1)).value();
        if (struct2.getStruct("after") != null) {
            Assertions.assertThat(struct2.getStruct("after").getString("email")).isEqualTo("gbailey");
        }
        Struct struct3 = (Struct) ((SourceRecord) recordsForTopic.get(2)).value();
        if (struct3.getStruct("after") != null) {
            Assertions.assertThat(struct3.getStruct("after").getString("email")).isEqualTo("ed@walk");
        }
        Struct struct4 = (Struct) ((SourceRecord) recordsForTopic.get(3)).value();
        if (struct4.getStruct("after") != null) {
            Assertions.assertThat(struct4.getStruct("after").getString("email")).isEqualTo("annek@n");
        }
    }

    @Test
    @FixFor({"DBZ-582"})
    public void shouldEmitTombstoneOnDeleteByDefault() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER).build();
        start(MySqlConnector.class, this.config);
        Assertions.assertThat(consumeRecordsByTopic(INITIAL_EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"UPDATE orders SET order_number=10101 WHERE order_number=10001"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                List recordsForTopic = consumeRecordsByTopic(3).recordsForTopic(this.DATABASE.topicForTable("orders"));
                Assertions.assertThat(recordsForTopic.size()).isEqualTo(3);
                assertDelete((SourceRecord) recordsForTopic.get(0), "order_number", 10001);
                assertTombstone((SourceRecord) recordsForTopic.get(1), "order_number", 10001);
                assertInsert((SourceRecord) recordsForTopic.get(2), "order_number", 10101);
                forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                try {
                    connect = forTestDatabase.connect();
                    try {
                        connect.execute(new String[]{"DELETE FROM orders WHERE order_number=10101"});
                        if (connect != null) {
                            connect.close();
                        }
                        if (forTestDatabase != null) {
                            forTestDatabase.close();
                        }
                        List recordsForTopic2 = consumeRecordsByTopic(2).recordsForTopic(this.DATABASE.topicForTable("orders"));
                        Assertions.assertThat(recordsForTopic2.size()).isEqualTo(2);
                        assertDelete((SourceRecord) recordsForTopic2.get(0), "order_number", 10101);
                        assertTombstone((SourceRecord) recordsForTopic2.get(1), "order_number", 10101);
                        stopConnector();
                    } finally {
                        if (connect != null) {
                            try {
                                connect.close();
                            } catch (Throwable th) {
                                th.addSuppressed(th);
                            }
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    @FixFor({"DBZ-582"})
    public void shouldEmitNoTombstoneOnDelete() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).build();
        start(MySqlConnector.class, this.config);
        Assertions.assertThat(consumeRecordsByTopic(INITIAL_EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"UPDATE orders SET order_number=10101 WHERE order_number=10001"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic(this.DATABASE.topicForTable("orders"));
                Assertions.assertThat(recordsForTopic.size()).isEqualTo(2);
                assertDelete((SourceRecord) recordsForTopic.get(0), "order_number", 10001);
                assertInsert((SourceRecord) recordsForTopic.get(1), "order_number", 10101);
                forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                try {
                    connect = forTestDatabase.connect();
                    try {
                        connect.execute(new String[]{"DELETE FROM orders WHERE order_number = 10101;"});
                        connect.execute(new String[]{"DELETE FROM orders WHERE order_number = 10002;"});
                        if (connect != null) {
                            connect.close();
                        }
                        if (forTestDatabase != null) {
                            forTestDatabase.close();
                        }
                        List recordsForTopic2 = consumeRecordsByTopic(2).recordsForTopic(this.DATABASE.topicForTable("orders"));
                        Assertions.assertThat(recordsForTopic2.size()).isEqualTo(2);
                        assertDelete((SourceRecord) recordsForTopic2.get(0), "order_number", 10101);
                        assertDelete((SourceRecord) recordsForTopic2.get(1), "order_number", 10002);
                        stopConnector();
                    } finally {
                        if (connect != null) {
                            try {
                                connect.close();
                            } catch (Throwable th) {
                                th.addSuppressed(th);
                            }
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    @FixFor({"DBZ-794"})
    public void shouldEmitNoSavepoints() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).build();
        start(MySqlConnector.class, this.config);
        Assertions.assertThat(consumeRecordsByTopic(INITIAL_EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        waitForStreamingRunning(this.DATABASE.getServerName());
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                Connection connection = connect.connection();
                connect.setAutoCommit(false);
                Statement createStatement = connection.createStatement();
                createStatement.executeUpdate("DELETE FROM orders WHERE order_number = 10001");
                createStatement.executeUpdate("SavePoint sp2");
                createStatement.executeUpdate("DELETE FROM orders WHERE order_number = 10002");
                connection.commit();
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
                Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase(this.DATABASE.getDatabaseName())).isNullOrEmpty();
                List recordsForTopic = consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("orders"));
                assertDelete((SourceRecord) recordsForTopic.get(0), "order_number", 10001);
                assertDelete((SourceRecord) recordsForTopic.get(1), "order_number", 10002);
                stopConnector();
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-706"})
    public void shouldNotParseQueryIfServerOptionDisabled() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("products")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).build();
        start(MySqlConnector.class, this.config);
        waitForStreamingRunning(this.DATABASE.getServerName());
        consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null);
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"SET binlog_rows_query_log_events=OFF"});
                connect.execute(new String[]{"INSERT INTO products VALUES (default,'robot','Toy robot',1.304)"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
                Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(1);
                SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(0);
                validate(sourceRecord);
                assertInsert(sourceRecord, "id", 110);
                assertHasNoSourceQuery(sourceRecord);
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-706"})
    public void shouldNotParseQueryIfConnectorNotConfiguredTo() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("products")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, false).build();
        start(MySqlConnector.class, this.config);
        consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null);
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
                connect.execute(new String[]{"INSERT INTO products VALUES (default,'robot','Toy robot',1.304)"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
                Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(1);
                SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(0);
                this.logger.info("Record: {}", sourceRecord);
                validate(sourceRecord);
                assertInsert(sourceRecord, "id", 110);
                assertHasNoSourceQuery(sourceRecord);
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-706"})
    public void shouldParseQueryIfAvailableAndConnectorOptionEnabled() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("products")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).build();
        start(MySqlConnector.class, this.config);
        consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null);
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
                connect.execute(new String[]{"INSERT INTO products VALUES (default,'robot','Toy robot',1.304)"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
                Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(1);
                SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(0);
                this.logger.info("Record: {}", sourceRecord);
                validate(sourceRecord);
                assertInsert(sourceRecord, "id", 110);
                assertSourceQuery(sourceRecord, "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)");
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-706"})
    public void parseMultipleInsertStatements() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("products")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).build();
        start(MySqlConnector.class, this.config);
        consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null);
        this.logger.warn(this.DATABASE.getDatabaseName());
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
                connect.execute(new String[]{"INSERT INTO products VALUES (default,'robot','Toy robot',1.304)"});
                connect.execute(new String[]{"INSERT INTO products VALUES (default,'toaster','Toaster',3.33)"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
                Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(2);
                SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(0);
                validate(sourceRecord);
                assertInsert(sourceRecord, "id", 110);
                assertSourceQuery(sourceRecord, "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)");
                SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(1);
                validate(sourceRecord2);
                assertInsert(sourceRecord2, "id", 111);
                assertSourceQuery(sourceRecord2, "INSERT INTO products VALUES (default,'toaster','Toaster',3.33)");
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-706"})
    public void parseMultipleRowInsertStatement() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("products")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).build();
        start(MySqlConnector.class, this.config);
        consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null);
        this.logger.warn(this.DATABASE.getDatabaseName());
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
                connect.execute(new String[]{"INSERT INTO products VALUES (default,'robot','Toy robot',1.304), (default,'toaster','Toaster',3.33)"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
                Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(2);
                SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(0);
                validate(sourceRecord);
                assertInsert(sourceRecord, "id", 110);
                assertSourceQuery(sourceRecord, "INSERT INTO products VALUES (default,'robot','Toy robot',1.304), (default,'toaster','Toaster',3.33)");
                SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(1);
                validate(sourceRecord2);
                assertInsert(sourceRecord2, "id", 111);
                assertSourceQuery(sourceRecord2, "INSERT INTO products VALUES (default,'robot','Toy robot',1.304), (default,'toaster','Toaster',3.33)");
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-706"})
    public void parseDeleteQuery() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("orders")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).build();
        start(MySqlConnector.class, this.config);
        consumeRecords(ORDERS_TABLE_EVENT_COUNT, null);
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
                connect.execute(new String[]{"DELETE FROM orders WHERE order_number=10001 LIMIT 1"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
                Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(1);
                SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("orders")).get(0);
                validate(sourceRecord);
                assertDelete(sourceRecord, "order_number", 10001);
                assertSourceQuery(sourceRecord, "DELETE FROM orders WHERE order_number=10001 LIMIT 1");
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-706"})
    public void parseMultiRowDeleteQuery() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("orders")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).build();
        start(MySqlConnector.class, this.config);
        consumeRecords(ORDERS_TABLE_EVENT_COUNT, null);
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
                connect.execute(new String[]{"DELETE FROM orders WHERE purchaser=1002"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
                Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(2);
                SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("orders")).get(0);
                validate(sourceRecord);
                assertDelete(sourceRecord, "order_number", 10002);
                assertSourceQuery(sourceRecord, "DELETE FROM orders WHERE purchaser=1002");
                SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("orders")).get(1);
                validate(sourceRecord2);
                assertDelete(sourceRecord2, "order_number", 10004);
                assertSourceQuery(sourceRecord2, "DELETE FROM orders WHERE purchaser=1002");
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-706"})
    public void parseUpdateQuery() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("products")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).build();
        start(MySqlConnector.class, this.config);
        consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null);
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
                connect.execute(new String[]{"UPDATE products set name='toaster' where id=109 LIMIT 1"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
                Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(1);
                SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(0);
                validate(sourceRecord);
                assertUpdate(sourceRecord, "id", 109);
                assertSourceQuery(sourceRecord, "UPDATE products set name='toaster' where id=109 LIMIT 1");
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-706"})
    public void parseMultiRowUpdateQuery() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("orders")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).build();
        start(MySqlConnector.class, this.config);
        consumeRecords(ORDERS_TABLE_EVENT_COUNT, null);
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
                connect.execute(new String[]{"UPDATE orders set quantity=0 where order_number in (10001, 10004)"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
                Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(2);
                SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("orders")).get(0);
                validate(sourceRecord);
                assertUpdate(sourceRecord, "order_number", 10001);
                assertSourceQuery(sourceRecord, "UPDATE orders set quantity=0 where order_number in (10001, 10004)");
                SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("orders")).get(1);
                validate(sourceRecord2);
                assertUpdate(sourceRecord2, "order_number", 10004);
                assertSourceQuery(sourceRecord2, "UPDATE orders set quantity=0 where order_number in (10001, 10004)");
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-1234"})
    public void shouldFailToValidateAdaptivePrecisionMode() throws InterruptedException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER).with(MySqlConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.ADAPTIVE).build();
        assertConfigurationErrors(new MySqlConnector().validate(this.config.asMap()), MySqlConnectorConfig.TIME_PRECISION_MODE);
    }

    @Test
    @FixFor({"DBZ-1242"})
    public void testEmptySchemaLogWarningWithDatabaseWhitelist() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(RelationalDatabaseSchema.class);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, "my_database").build();
        start(MySqlConnector.class, this.config);
        consumeRecordsByTopic(12);
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsWarnMessage("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!")).isTrue();
        });
    }

    @Test
    @FixFor({"DBZ-1242"})
    public void testNoEmptySchemaLogWarningWithDatabaseWhitelist() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(RelationalDatabaseSchema.class);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).build();
        start(MySqlConnector.class, this.config);
        consumeRecordsByTopic(12);
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsWarnMessage("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!")).isFalse();
        });
    }

    @Test
    @FixFor({"DBZ-1242"})
    public void testEmptySchemaWarningWithTableWhitelist() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(RelationalDatabaseSchema.class);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("my_products")).build();
        start(MySqlConnector.class, this.config);
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted("mysql", this.DATABASE.getServerName());
        consumeRecordsByTopic(12);
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsWarnMessage("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!")).isTrue();
        });
    }

    @Test
    @FixFor({"DBZ-1242"})
    public void testNoEmptySchemaWarningWithTableWhitelist() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(RelationalDatabaseSchema.class);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).build();
        start(MySqlConnector.class, this.config);
        assertConnectorIsRunning();
        consumeRecordsByTopic(12);
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsWarnMessage("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!")).isFalse();
        });
    }

    @Test
    @FixFor({"DBZ-1015"})
    public void shouldRewriteIdentityKey() throws InterruptedException, SQLException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("products")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).with(MySqlConnectorConfig.MSG_KEY_COLUMNS, "(.*).products:id,name").build();
        start(MySqlConnector.class, this.config);
        consumeRecordsByTopic(PRODUCTS_TABLE_EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("products")).forEach(sourceRecord -> {
            Struct struct = (Struct) sourceRecord.key();
            Assertions.assertThat(struct.get("id")).isNotNull();
            Assertions.assertThat(struct.get("name")).isNotNull();
        });
    }

    @Test
    @FixFor({"DBZ-2957"})
    public void shouldRewriteIdentityKeyWithWhitespace() throws InterruptedException, SQLException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("products")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).with(MySqlConnectorConfig.MSG_KEY_COLUMNS, "   (.*).products:id,name   ").build();
        start(MySqlConnector.class, this.config);
        consumeRecordsByTopic(PRODUCTS_TABLE_EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("products")).forEach(sourceRecord -> {
            Struct struct = (Struct) sourceRecord.key();
            Assertions.assertThat(struct.get("id")).isNotNull();
            Assertions.assertThat(struct.get("name")).isNotNull();
        });
    }

    @Test
    @FixFor({"DBZ-2957"})
    public void shouldRewriteIdentityKeyWithMsgKeyColumnsFieldRegexValidation() throws InterruptedException, SQLException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("products")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).with(MySqlConnectorConfig.MSG_KEY_COLUMNS, "(.*).products:id,name;").build();
        start(MySqlConnector.class, this.config);
        consumeRecordsByTopic(PRODUCTS_TABLE_EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("products")).forEach(sourceRecord -> {
            Struct struct = (Struct) sourceRecord.key();
            Assertions.assertThat(struct.get("id")).isNotNull();
            Assertions.assertThat(struct.get("name")).isNotNull();
        });
    }

    @Test
    @FixFor({"DBZ-1292"})
    public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("products")).build();
        start(MySqlConnector.class, this.config);
        for (SourceRecord sourceRecord : consumeRecordsByTopic(PRODUCTS_TABLE_EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("products"))) {
            CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord, false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord, false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord, "mysql", "myServer1", false);
        }
    }

    private void waitForStreamingRunning(String str) throws InterruptedException {
        waitForStreamingRunning("mysql", str, getStreamingNamespace());
    }

    private List<SourceRecord> recordsForTopicForRoProductsTable(AbstractConnectorTest.SourceRecords sourceRecords) {
        List<SourceRecord> recordsForTopic = sourceRecords.recordsForTopic(this.RO_DATABASE.topicForTable("Products"));
        return recordsForTopic != null ? recordsForTopic : sourceRecords.recordsForTopic(this.RO_DATABASE.topicForTable("products"));
    }

    @Test
    @FixFor({"DBZ-1531"})
    public void shouldEmitHeadersOnPrimaryKeyUpdate() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER).build();
        start(MySqlConnector.class, this.config);
        Assertions.assertThat(consumeRecordsByTopic(INITIAL_EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"UPDATE orders SET order_number=10303 WHERE order_number=10003"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                List recordsForTopic = consumeRecordsByTopic(3).recordsForTopic(this.DATABASE.topicForTable("orders"));
                Assertions.assertThat(recordsForTopic.size()).isEqualTo(3);
                TestCase.assertEquals(10303, ((Struct) getPKUpdateNewKeyHeader((SourceRecord) recordsForTopic.get(0)).get().value()).getInt32("order_number"));
                TestCase.assertEquals(10003, ((Struct) getPKUpdateOldKeyHeader((SourceRecord) recordsForTopic.get(2)).get().value()).getInt32("order_number"));
                forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                try {
                    connect = forTestDatabase.connect();
                    try {
                        connect.execute(new String[]{"UPDATE orders SET quantity=5 WHERE order_number=10004"});
                        if (connect != null) {
                            connect.close();
                        }
                        if (forTestDatabase != null) {
                            forTestDatabase.close();
                        }
                        List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic(this.DATABASE.topicForTable("orders"));
                        Assertions.assertThat(recordsForTopic2.size()).isEqualTo(1);
                        Assertions.assertThat(getPKUpdateNewKeyHeader((SourceRecord) recordsForTopic2.get(0)).isPresent()).isFalse();
                        stopConnector();
                    } finally {
                        if (connect != null) {
                            try {
                                connect.close();
                            } catch (Throwable th) {
                                th.addSuppressed(th);
                            }
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    @FixFor({"DBZ-1895"})
    public void shouldEmitNoEventsForSkippedCreateOperations() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "c").build();
        start(MySqlConnector.class, this.config);
        waitForSnapshotToBeCompleted("mysql", this.DATABASE.getServerName());
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"INSERT INTO products VALUES (201,'rubberduck','Rubber Duck',2.12);"});
                connect.execute(new String[]{"UPDATE products SET weight=3.13 WHERE name = 'rubberduck'"});
                connect.execute(new String[]{"INSERT INTO products VALUES (202,'rubbercrocodile','Rubber Crocodile',4.14);"});
                connect.execute(new String[]{"DELETE FROM products WHERE name = 'rubberduck'"});
                connect.execute(new String[]{"INSERT INTO products VALUES (203,'rubberfish','Rubber Fish',5.15);"});
                connect.execute(new String[]{"DELETE FROM products WHERE name = 'rubbercrocodile'"});
                connect.execute(new String[]{"DELETE FROM products WHERE name = 'rubberfish'"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                List recordsForTopic = consumeRecordsByTopic(7).recordsForTopic(this.DATABASE.topicForTable("products"));
                assertUpdate((SourceRecord) recordsForTopic.get(0), "id", 201);
                assertDelete((SourceRecord) recordsForTopic.get(1), "id", 201);
                assertTombstone((SourceRecord) recordsForTopic.get(2), "id", 201);
                assertDelete((SourceRecord) recordsForTopic.get(3), "id", 202);
                assertTombstone((SourceRecord) recordsForTopic.get(4), "id", 202);
                assertDelete((SourceRecord) recordsForTopic.get(ORDERS_TABLE_EVENT_COUNT), "id", 203);
                assertTombstone((SourceRecord) recordsForTopic.get(6), "id", 203);
                Assertions.assertThat(recordsForTopic.size()).isEqualTo(7);
                stopConnector();
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-1895"})
    public void shouldEmitNoEventsForSkippedUpdateAndDeleteOperations() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(MySqlConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "u,d").build();
        start(MySqlConnector.class, this.config);
        waitForSnapshotToBeCompleted("mysql", this.DATABASE.getServerName());
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"INSERT INTO products VALUES (204,'rubberduck','Rubber Duck',2.12);"});
                connect.execute(new String[]{"UPDATE products SET weight=3.13 WHERE name = 'rubberduck'"});
                connect.execute(new String[]{"INSERT INTO products VALUES (205,'rubbercrocodile','Rubber Crocodile',4.14);"});
                connect.execute(new String[]{"DELETE FROM products WHERE name = 'rubberduck'"});
                connect.execute(new String[]{"INSERT INTO products VALUES (206,'rubberfish','Rubber Fish',5.15);"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                List recordsForTopic = consumeRecordsByTopic(3).recordsForTopic(this.DATABASE.topicForTable("products"));
                assertInsert((SourceRecord) recordsForTopic.get(0), "id", 204);
                assertInsert((SourceRecord) recordsForTopic.get(1), "id", 205);
                assertInsert((SourceRecord) recordsForTopic.get(2), "id", 206);
                Assertions.assertThat(recordsForTopic.size()).isEqualTo(3);
                stopConnector();
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-1344"})
    public void testNoEmptySchemaLogWarningWithSnapshotNever() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(RelationalDatabaseSchema.class);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, "my_database").build();
        start(MySqlConnector.class, this.config);
        consumeRecordsByTopic(12);
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsWarnMessage("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!")).isFalse();
        });
    }

    @Test
    @FixFor({"DBZ-3949"})
    public void testDmlInChangeEvents() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("products")).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(MySqlConnectorConfig.EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE, CommonConnectorConfig.EventProcessingFailureHandlingMode.FAIL).build();
        EmbeddedEngine.CompletionResult completionResult = new EmbeddedEngine.CompletionResult();
        start(MySqlConnector.class, this.config, completionResult);
        waitForStreamingRunning(this.DATABASE.getServerName());
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"INSERT INTO products VALUES (204,'rubberduck','Rubber Duck',2.12);"});
                connect.execute(new String[]{"INSERT INTO products VALUES (205,'rubbercrocodile','Rubber Crocodile',4.14);"});
                connect.execute(new String[]{"INSERT INTO products VALUES (206,'rubberfish','Rubber Fish',5.15);"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                MySqlTestConnection forTestDatabase2 = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                try {
                    JdbcConnection connect2 = forTestDatabase2.connect();
                    try {
                        connect2.execute(new String[]{String.format("SET GLOBAL binlog_format = 'STATEMENT'", this.DATABASE.getDatabaseName())});
                        if (connect2 != null) {
                            connect2.close();
                        }
                        if (forTestDatabase2 != null) {
                            forTestDatabase2.close();
                        }
                        forTestDatabase2 = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                        try {
                            JdbcConnection connect3 = forTestDatabase2.connect();
                            try {
                                connect3.execute(new String[]{"UPDATE products SET weight=2.22 WHERE id=204;"});
                                connect3.execute(new String[]{"UPDATE products SET weight=4.44 WHERE id=205;"});
                                connect3.execute(new String[]{"UPDATE products SET weight=5.55 WHERE id=206;"});
                                if (connect3 != null) {
                                    connect3.close();
                                }
                                if (forTestDatabase2 != null) {
                                    forTestDatabase2.close();
                                }
                                Assertions.assertThat(consumeRecordsByTopic(3).recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(3);
                                Assertions.assertThat(completionResult.hasError()).isFalse();
                                forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                                try {
                                    connect = forTestDatabase.connect();
                                    try {
                                        connect.execute(new String[]{String.format("SET GLOBAL binlog_format = 'ROW'", this.DATABASE.getDatabaseName())});
                                        if (connect != null) {
                                            connect.close();
                                        }
                                        if (forTestDatabase != null) {
                                            forTestDatabase.close();
                                        }
                                        stopConnector();
                                    } finally {
                                        if (connect != null) {
                                            try {
                                                connect.close();
                                            } catch (Throwable th) {
                                                th.addSuppressed(th);
                                            }
                                        }
                                    }
                                } finally {
                                    if (forTestDatabase != null) {
                                        try {
                                            forTestDatabase.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    }
                                }
                            } finally {
                                if (connect3 != null) {
                                    try {
                                        connect3.close();
                                    } catch (Throwable th3) {
                                        th.addSuppressed(th3);
                                    }
                                }
                            }
                        } finally {
                            if (forTestDatabase2 != null) {
                                try {
                                    forTestDatabase2.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            }
                        }
                    } finally {
                        if (connect2 != null) {
                            try {
                                connect2.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    @FixFor({"DBZ-5052"})
    public void shouldNotSendTombstonesWhenNotSupportedByHandler() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "c").build();
        start(MySqlConnector.class, this.config, new NoTombStonesHandler(this.consumedLines));
        waitForSnapshotToBeCompleted("mysql", this.DATABASE.getServerName());
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"INSERT INTO products VALUES (201,'rubberduck','Rubber Duck',2.12);"});
                connect.execute(new String[]{"DELETE FROM products WHERE name = 'rubberduck'"});
                connect.execute(new String[]{"INSERT INTO products VALUES (201,'rubberduck','Rubber Duck',2.12);"});
                connect.execute(new String[]{"DELETE FROM products WHERE name = 'rubberduck'"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic(this.DATABASE.topicForTable("products"));
                assertDelete((SourceRecord) recordsForTopic.get(0), "id", 201);
                assertDelete((SourceRecord) recordsForTopic.get(1), "id", 201);
                Assertions.assertThat(recordsForTopic.size()).isEqualTo(2);
                stopConnector();
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5610"})
    public void shouldEmitTruncateOperation() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE, MySqlConnectorConfig.SnapshotLockingMode.NONE).with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "none").build();
        start(MySqlConnector.class, this.config);
        waitForSnapshotToBeCompleted("mysql", this.DATABASE.getServerName());
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"insert into orders values(1000, '2022-10-09', 1002, 90, 106)"});
                connect.execute(new String[]{"truncate table orders;"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic(this.DATABASE.topicForTable("orders"));
                Struct struct = (Struct) ((SourceRecord) recordsForTopic.get(1)).value();
                assertInsert((SourceRecord) recordsForTopic.get(0), "order_number", 1000);
                Assertions.assertThat(struct.get("before")).isNull();
                Assertions.assertThat(struct.get("after")).isNull();
                Assertions.assertThat(struct.get("op")).isEqualTo("t");
                Assertions.assertThat(recordsForTopic.size()).isEqualTo(2);
                stopConnector();
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
