package io.debezium.connector.mysql;

import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.relational.history.KafkaDatabaseHistory;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/mysql/MySqlConnectorIT.class */
public class MySqlConnectorIT extends AbstractConnectorTest {
    private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-connect.txt").toAbsolutePath();
    private Configuration config;

    @Before
    public void beforeEach() {
        stopConnector();
        initializeConnectorTestFramework();
        Testing.Files.delete(DB_HISTORY_PATH);
    }

    @After
    public void afterEach() {
        try {
            stopConnector();
            Testing.Files.delete(DB_HISTORY_PATH);
        } catch (Throwable th) {
            Testing.Files.delete(DB_HISTORY_PATH);
            throw th;
        }
    }

    @Test
    public void shouldNotStartWithInvalidConfiguration() {
        this.config = Configuration.create().with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class).with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH).build();
        this.logger.info("Attempting to start the connector with an INVALID configuration, so MULTIPLE error messages & one exceptions will appear in the log");
        start(MySqlConnector.class, this.config, (z, str, th) -> {
            Assertions.assertThat(z).isFalse();
            Assertions.assertThat(th).isNotNull();
        });
        assertConnectorNotRunning();
    }

    @Test
    public void shouldFailToValidateInvalidConfiguration() {
        Config validate = new MySqlConnector().validate(Configuration.create().with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class).with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH).build().asMap());
        assertConfigurationErrors(validate, MySqlConnectorConfig.HOSTNAME, 1);
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.PORT});
        assertConfigurationErrors(validate, MySqlConnectorConfig.USER, 1);
        assertConfigurationErrors(validate, MySqlConnectorConfig.PASSWORD, 1);
        assertConfigurationErrors(validate, MySqlConnectorConfig.SERVER_NAME, 1);
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SERVER_ID});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TABLES_IGNORE_BUILTIN});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DATABASE_WHITELIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DATABASE_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TABLE_WHITELIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TABLE_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.COLUMN_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.CONNECTION_TIMEOUT_MS});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.KEEP_ALIVE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.MAX_QUEUE_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.MAX_BATCH_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.POLL_INTERVAL_MS});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DATABASE_HISTORY});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SNAPSHOT_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SNAPSHOT_MINIMAL_LOCKING});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_KEYSTORE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_TRUSTSTORE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD});
        assertConfigurationErrors(validate, KafkaDatabaseHistory.BOOTSTRAP_SERVERS);
        assertConfigurationErrors(validate, KafkaDatabaseHistory.TOPIC);
        assertNoConfigurationErrors(validate, new Field[]{KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS});
        assertNoConfigurationErrors(validate, new Field[]{KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS});
    }

    @Test
    public void shouldValidateValidConfigurationWithSSL() {
        Config validate = new MySqlConnector().validate(Configuration.create().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.hostname")).with(MySqlConnectorConfig.PORT, System.getProperty("database.port")).with(MySqlConnectorConfig.USER, "snapper").with(MySqlConnectorConfig.PASSWORD, "snapperpass").with(MySqlConnectorConfig.SSL_MODE, MySqlConnectorConfig.SecureConnectionMode.REQUIRED.name().toLowerCase()).with(MySqlConnectorConfig.SSL_KEYSTORE, "/some/path/to/keystore").with(MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD, "keystore1234").with(MySqlConnectorConfig.SSL_TRUSTSTORE, "/some/path/to/truststore").with(MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD, "truststore1234").with(MySqlConnectorConfig.SERVER_ID, 18765).with(MySqlConnectorConfig.SERVER_NAME, "myServer").with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, "some.host.com").with(KafkaDatabaseHistory.TOPIC, "my.db.history.topic").with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build().asMap());
        assertConfigurationErrors(validate, MySqlConnectorConfig.HOSTNAME, 0, 1);
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.PORT});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.USER});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.PASSWORD});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SERVER_NAME});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SERVER_ID});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TABLES_IGNORE_BUILTIN});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DATABASE_WHITELIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DATABASE_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TABLE_WHITELIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TABLE_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.COLUMN_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.CONNECTION_TIMEOUT_MS});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.KEEP_ALIVE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.MAX_QUEUE_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.MAX_BATCH_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.POLL_INTERVAL_MS});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DATABASE_HISTORY});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SNAPSHOT_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SNAPSHOT_MINIMAL_LOCKING});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_KEYSTORE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_TRUSTSTORE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD});
        assertNoConfigurationErrors(validate, new Field[]{KafkaDatabaseHistory.BOOTSTRAP_SERVERS});
        assertNoConfigurationErrors(validate, new Field[]{KafkaDatabaseHistory.TOPIC});
        assertNoConfigurationErrors(validate, new Field[]{KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS});
        assertNoConfigurationErrors(validate, new Field[]{KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS});
    }

    @Test
    public void shouldValidateAcceptableConfiguration() {
        Configuration build = Configuration.create().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.hostname")).with(MySqlConnectorConfig.PORT, System.getProperty("database.port")).with(MySqlConnectorConfig.USER, "snapper").with(MySqlConnectorConfig.PASSWORD, "snapperpass").with(MySqlConnectorConfig.SSL_MODE, MySqlConnectorConfig.SecureConnectionMode.DISABLED.name().toLowerCase()).with(MySqlConnectorConfig.SERVER_ID, 18765).with(MySqlConnectorConfig.SERVER_NAME, "myServer").with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, "some.host.com").with(KafkaDatabaseHistory.TOPIC, "my.db.history.topic").with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build();
        Config validate = new MySqlConnector().validate(build.asMap());
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.HOSTNAME});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.PORT});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.USER});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.PASSWORD});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SERVER_NAME});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SERVER_ID});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TABLES_IGNORE_BUILTIN});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DATABASE_WHITELIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DATABASE_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TABLE_WHITELIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TABLE_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.COLUMN_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.CONNECTION_TIMEOUT_MS});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.KEEP_ALIVE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.MAX_QUEUE_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.MAX_BATCH_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.POLL_INTERVAL_MS});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DATABASE_HISTORY});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SNAPSHOT_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SNAPSHOT_MINIMAL_LOCKING});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_KEYSTORE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_TRUSTSTORE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD});
        assertNoConfigurationErrors(validate, new Field[]{KafkaDatabaseHistory.BOOTSTRAP_SERVERS});
        assertNoConfigurationErrors(validate, new Field[]{KafkaDatabaseHistory.TOPIC});
        assertNoConfigurationErrors(validate, new Field[]{KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS});
        assertNoConfigurationErrors(validate, new Field[]{KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS});
        List validValues = MySqlConnectorConfig.DATABASE_WHITELIST.recommender().validValues(MySqlConnectorConfig.DATABASE_WHITELIST, build);
        Testing.debug("List of dbNames: " + validValues);
        Assertions.assertThat(validValues).containsOnly(new Object[]{"connector_test", "readbinlog_test", "regression_test", "json_test", "connector_test_ro", "emptydb"});
        Field.Recommender recommender = MySqlConnectorConfig.TABLE_WHITELIST.recommender();
        List validValues2 = recommender.validValues(MySqlConnectorConfig.TABLE_WHITELIST, build);
        Testing.debug("List of tableNames: " + validValues2);
        Assertions.assertThat(validValues2).containsOnly(new Object[]{"readbinlog_test.product", "readbinlog_test.purchased", "readbinlog_test.person", "connector_test.customers", "connector_test.orders", "connector_test.products", "connector_test.products_on_hand", "connector_test_ro.customers", "connector_test_ro.orders", "connector_test_ro.products", "connector_test_ro.products_on_hand", "regression_test.t1464075356413_testtable6", "regression_test.dbz_85_fractest", "regression_test.dbz84_integer_types_table", "regression_test.dbz_100_enumsettest", "regression_test.dbz_102_charsettest", "regression_test.dbz_114_zerovaluetest", "regression_test.dbz_123_bitvaluetest", "regression_test.dbz_104_customers", "json_test.dbz_126_jsontable"});
        List validValues3 = recommender.validValues(MySqlConnectorConfig.TABLE_WHITELIST, build.edit().with(MySqlConnectorConfig.DATABASE_WHITELIST, "connector_test,connector_test_ro").build());
        Assertions.assertThat(validValues3).containsOnly(new Object[]{"connector_test.customers", "connector_test.orders", "connector_test.products", "connector_test.products_on_hand", "connector_test_ro.customers", "connector_test_ro.orders", "connector_test_ro.products", "connector_test_ro.products_on_hand"});
        Testing.debug("List of tableNames: " + validValues3);
    }

    @Test
    public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLException, InterruptedException {
        MySQLConnection forTestDatabase;
        Throwable th;
        if (!System.getProperty("database.port").equals(System.getProperty("database.replica.port"))) {
            Thread.sleep(5000L);
        }
        this.config = Configuration.create().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname")).with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port")).with(MySqlConnectorConfig.USER, "snapper").with(MySqlConnectorConfig.PASSWORD, "snapperpass").with(MySqlConnectorConfig.SERVER_ID, 18765).with(MySqlConnectorConfig.SERVER_NAME, "myServer").with(MySqlConnectorConfig.SSL_MODE, MySqlConnectorConfig.SecureConnectionMode.DISABLED.name().toLowerCase()).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10).with(MySqlConnectorConfig.DATABASE_WHITELIST, "connector_test").with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH).build();
        start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(39);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("myServer").size()).isEqualTo(12);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("myServer.connector_test.products").size()).isEqualTo(9);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("myServer.connector_test.products_on_hand").size()).isEqualTo(9);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("myServer.connector_test.customers").size()).isEqualTo(4);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("myServer.connector_test.orders").size()).isEqualTo(5);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(5);
        Assertions.assertThat(consumeRecordsByTopic.databaseNames().size()).isEqualTo(2);
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase("connector_test").size()).isEqualTo(11);
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase("readbinlog_test")).isNull();
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase("").size()).isEqualTo(1);
        consumeRecordsByTopic.ddlRecordsForDatabase("connector_test").forEach(this::print);
        consumeRecordsByTopic.forEach(this::validate);
        List allRecordsInOrder = consumeRecordsByTopic.allRecordsInOrder();
        SourceRecord sourceRecord = (SourceRecord) allRecordsInOrder.get(allRecordsInOrder.size() - 1);
        SourceRecord sourceRecord2 = (SourceRecord) allRecordsInOrder.get(allRecordsInOrder.size() - 2);
        Assertions.assertThat(sourceRecord2.sourceOffset().containsKey("snapshot")).isTrue();
        Assertions.assertThat(sourceRecord.sourceOffset().containsKey("snapshot")).isFalse();
        Assertions.assertThat(((Struct) sourceRecord2.value()).getStruct("source").getBoolean("snapshot")).isTrue();
        Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("source").getBoolean("snapshot")).isTrue();
        waitForAvailableRecords(3L, TimeUnit.SECONDS);
        System.out.println("TOTAL CONSUMED = " + consumeAvailableRecords(this::print));
        stopConnector();
        MySQLConnection forTestDatabase2 = MySQLConnection.forTestDatabase("connector_test");
        Throwable th2 = null;
        try {
            JdbcConnection connect = forTestDatabase2.connect();
            Throwable th3 = null;
            try {
                connect.query("SELECT * FROM products", resultSet -> {
                    if (Testing.Print.isEnabled()) {
                        connect.print(resultSet);
                    }
                });
                connect.execute(new String[]{"INSERT INTO products VALUES (default,'robot','Toy robot',1.304);"});
                connect.query("SELECT * FROM products", resultSet2 -> {
                    if (Testing.Print.isEnabled()) {
                        connect.print(resultSet2);
                    }
                });
                if (connect != null) {
                    if (0 != 0) {
                        try {
                            connect.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    } else {
                        connect.close();
                    }
                }
                Testing.print("*** Restarting connector after inserts were made");
                start(MySqlConnector.class, this.config);
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
                Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("myServer.connector_test.products").size()).isEqualTo(1);
                Assertions.assertThat(consumeRecordsByTopic2.topics().size()).isEqualTo(1);
                assertInsert((SourceRecord) consumeRecordsByTopic2.recordsForTopic("myServer.connector_test.products").get(0), "id", 110);
                Testing.print("*** Done with inserts and restart");
                Testing.print("*** Stopping connector");
                stopConnector();
                Testing.print("*** Restarting connector");
                start(MySqlConnector.class, this.config);
                MySQLConnection forTestDatabase3 = MySQLConnection.forTestDatabase("connector_test");
                Throwable th5 = null;
                try {
                    JdbcConnection connect2 = forTestDatabase3.connect();
                    Throwable th6 = null;
                    try {
                        try {
                            connect2.execute(new String[]{"INSERT INTO products VALUES (1001,'roy','old robot',1234.56);"});
                            connect2.query("SELECT * FROM products", resultSet3 -> {
                                if (Testing.Print.isEnabled()) {
                                    connect2.print(resultSet3);
                                }
                            });
                            if (connect2 != null) {
                                if (0 != 0) {
                                    try {
                                        connect2.close();
                                    } catch (Throwable th7) {
                                        th6.addSuppressed(th7);
                                    }
                                } else {
                                    connect2.close();
                                }
                            }
                            AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(1);
                            Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic("myServer.connector_test.products").size()).isEqualTo(1);
                            Assertions.assertThat(consumeRecordsByTopic3.topics().size()).isEqualTo(1);
                            assertInsert((SourceRecord) consumeRecordsByTopic3.recordsForTopic("myServer.connector_test.products").get(0), "id", 1001);
                            Testing.print("*** Done with simple insert");
                            forTestDatabase = MySQLConnection.forTestDatabase("connector_test");
                            th = null;
                        } finally {
                        }
                        try {
                            JdbcConnection connect3 = forTestDatabase.connect();
                            Throwable th8 = null;
                            try {
                                connect3.execute(new String[]{"UPDATE products SET id=2001, description='really old robot' WHERE id=1001"});
                                connect3.query("SELECT * FROM products", resultSet4 -> {
                                    if (Testing.Print.isEnabled()) {
                                        connect3.print(resultSet4);
                                    }
                                });
                                if (connect3 != null) {
                                    if (0 != 0) {
                                        try {
                                            connect3.close();
                                        } catch (Throwable th9) {
                                            th8.addSuppressed(th9);
                                        }
                                    } else {
                                        connect3.close();
                                    }
                                }
                                List recordsForTopic = consumeRecordsByTopic(3).recordsForTopic("myServer.connector_test.products");
                                Assertions.assertThat(recordsForTopic.size()).isEqualTo(3);
                                assertInsert((SourceRecord) recordsForTopic.get(0), "id", 2001);
                                assertDelete((SourceRecord) recordsForTopic.get(1), "id", 1001);
                                assertTombstone((SourceRecord) recordsForTopic.get(2), "id", 1001);
                                Testing.print("*** Done with PK change");
                                MySQLConnection forTestDatabase4 = MySQLConnection.forTestDatabase("connector_test");
                                Throwable th10 = null;
                                try {
                                    JdbcConnection connect4 = forTestDatabase4.connect();
                                    Throwable th11 = null;
                                    try {
                                        connect4.execute(new String[]{"UPDATE products SET weight=1345.67 WHERE id=2001"});
                                        connect4.query("SELECT * FROM products", resultSet5 -> {
                                            if (Testing.Print.isEnabled()) {
                                                connect4.print(resultSet5);
                                            }
                                        });
                                        if (connect4 != null) {
                                            if (0 != 0) {
                                                try {
                                                    connect4.close();
                                                } catch (Throwable th12) {
                                                    th11.addSuppressed(th12);
                                                }
                                            } else {
                                                connect4.close();
                                            }
                                        }
                                        AbstractConnectorTest.SourceRecords consumeRecordsByTopic4 = consumeRecordsByTopic(1);
                                        Assertions.assertThat(consumeRecordsByTopic4.topics().size()).isEqualTo(1);
                                        List recordsForTopic2 = consumeRecordsByTopic4.recordsForTopic("myServer.connector_test.products");
                                        Assertions.assertThat(recordsForTopic2.size()).isEqualTo(1);
                                        assertUpdate((SourceRecord) recordsForTopic2.get(0), "id", 2001);
                                        recordsForTopic2.forEach(this::validate);
                                        Testing.print("*** Done with simple update");
                                        MySQLConnection forTestDatabase5 = MySQLConnection.forTestDatabase("connector_test");
                                        Throwable th13 = null;
                                        try {
                                            JdbcConnection connect5 = forTestDatabase5.connect();
                                            Throwable th14 = null;
                                            try {
                                                connect5.execute(new String[]{"ALTER TABLE connector_test.products ADD COLUMN volume FLOAT NOT NULL, ADD COLUMN alias VARCHAR(30) NOT NULL AFTER description"});
                                                connect5.execute(new String[]{"UPDATE products SET volume=13.5 WHERE id=2001"});
                                                connect5.query("SELECT * FROM products", resultSet6 -> {
                                                    if (Testing.Print.isEnabled()) {
                                                        connect5.print(resultSet6);
                                                    }
                                                });
                                                if (connect5 != null) {
                                                    if (0 != 0) {
                                                        try {
                                                            connect5.close();
                                                        } catch (Throwable th15) {
                                                            th14.addSuppressed(th15);
                                                        }
                                                    } else {
                                                        connect5.close();
                                                    }
                                                }
                                                AbstractConnectorTest.SourceRecords consumeRecordsByTopic5 = consumeRecordsByTopic(2);
                                                Assertions.assertThat(consumeRecordsByTopic5.topics().size()).isEqualTo(2);
                                                Assertions.assertThat(consumeRecordsByTopic5.recordsForTopic("myServer").size()).isEqualTo(1);
                                                List recordsForTopic3 = consumeRecordsByTopic5.recordsForTopic("myServer.connector_test.products");
                                                Assertions.assertThat(recordsForTopic3.size()).isEqualTo(1);
                                                assertUpdate((SourceRecord) recordsForTopic3.get(0), "id", 2001);
                                                recordsForTopic3.forEach(this::validate);
                                                Testing.print("*** Done with schema change (same db and fully-qualified name)");
                                                MySQLConnection forTestDatabase6 = MySQLConnection.forTestDatabase("emptydb");
                                                Throwable th16 = null;
                                                try {
                                                    JdbcConnection connect6 = forTestDatabase6.connect();
                                                    Throwable th17 = null;
                                                    try {
                                                        connect6.execute(new String[]{"CREATE TABLE connector_test.stores ( id INT(11) PRIMARY KEY NOT NULL AUTO_INCREMENT, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL );"});
                                                        if (connect6 != null) {
                                                            if (0 != 0) {
                                                                try {
                                                                    connect6.close();
                                                                } catch (Throwable th18) {
                                                                    th17.addSuppressed(th18);
                                                                }
                                                            } else {
                                                                connect6.close();
                                                            }
                                                        }
                                                        AbstractConnectorTest.SourceRecords consumeRecordsByTopic6 = consumeRecordsByTopic(1);
                                                        Assertions.assertThat(consumeRecordsByTopic6.topics().size()).isEqualTo(1);
                                                        Assertions.assertThat(consumeRecordsByTopic6.recordsForTopic("myServer").size()).isEqualTo(1);
                                                        consumeRecordsByTopic6.recordsForTopic("myServer").forEach(this::validate);
                                                        Testing.print("*** Done with PK change (different db and fully-qualified name)");
                                                        MySQLConnection forTestDatabase7 = MySQLConnection.forTestDatabase("connector_test");
                                                        Throwable th19 = null;
                                                        try {
                                                            connect2 = forTestDatabase7.connect();
                                                            Throwable th20 = null;
                                                            try {
                                                                try {
                                                                    connect2.execute(new String[]{"UPDATE products_on_hand SET quantity=20 WHERE product_id=109"});
                                                                    connect2.query("SELECT * FROM products_on_hand", resultSet7 -> {
                                                                        if (Testing.Print.isEnabled()) {
                                                                            connect2.print(resultSet7);
                                                                        }
                                                                    });
                                                                    if (connect2 != null) {
                                                                        if (0 != 0) {
                                                                            try {
                                                                                connect2.close();
                                                                            } catch (Throwable th21) {
                                                                                th20.addSuppressed(th21);
                                                                            }
                                                                        } else {
                                                                            connect2.close();
                                                                        }
                                                                    }
                                                                    AbstractConnectorTest.SourceRecords consumeRecordsByTopic7 = consumeRecordsByTopic(1);
                                                                    Assertions.assertThat(consumeRecordsByTopic7.topics().size()).isEqualTo(1);
                                                                    List recordsForTopic4 = consumeRecordsByTopic7.recordsForTopic("myServer.connector_test.products_on_hand");
                                                                    Assertions.assertThat(recordsForTopic4.size()).isEqualTo(1);
                                                                    assertUpdate((SourceRecord) recordsForTopic4.get(0), "product_id", 109);
                                                                    recordsForTopic4.forEach(this::validate);
                                                                    Testing.print("*** Done with verifying no additional events");
                                                                    stopConnector();
                                                                } finally {
                                                                }
                                                            } finally {
                                                            }
                                                        } finally {
                                                            if (forTestDatabase7 != null) {
                                                                if (0 != 0) {
                                                                    try {
                                                                        forTestDatabase7.close();
                                                                    } catch (Throwable th22) {
                                                                        th19.addSuppressed(th22);
                                                                    }
                                                                } else {
                                                                    forTestDatabase7.close();
                                                                }
                                                            }
                                                        }
                                                    } catch (Throwable th23) {
                                                        if (connect6 != null) {
                                                            if (0 != 0) {
                                                                try {
                                                                    connect6.close();
                                                                } catch (Throwable th24) {
                                                                    th17.addSuppressed(th24);
                                                                }
                                                            } else {
                                                                connect6.close();
                                                            }
                                                        }
                                                        throw th23;
                                                    }
                                                } finally {
                                                    if (forTestDatabase6 != null) {
                                                        if (0 != 0) {
                                                            try {
                                                                forTestDatabase6.close();
                                                            } catch (Throwable th25) {
                                                                th16.addSuppressed(th25);
                                                            }
                                                        } else {
                                                            forTestDatabase6.close();
                                                        }
                                                    }
                                                }
                                            } catch (Throwable th26) {
                                                if (connect5 != null) {
                                                    if (0 != 0) {
                                                        try {
                                                            connect5.close();
                                                        } catch (Throwable th27) {
                                                            th14.addSuppressed(th27);
                                                        }
                                                    } else {
                                                        connect5.close();
                                                    }
                                                }
                                                throw th26;
                                            }
                                        } finally {
                                            if (forTestDatabase5 != null) {
                                                if (0 != 0) {
                                                    try {
                                                        forTestDatabase5.close();
                                                    } catch (Throwable th28) {
                                                        th13.addSuppressed(th28);
                                                    }
                                                } else {
                                                    forTestDatabase5.close();
                                                }
                                            }
                                        }
                                    } catch (Throwable th29) {
                                        if (connect4 != null) {
                                            if (0 != 0) {
                                                try {
                                                    connect4.close();
                                                } catch (Throwable th30) {
                                                    th11.addSuppressed(th30);
                                                }
                                            } else {
                                                connect4.close();
                                            }
                                        }
                                        throw th29;
                                    }
                                } finally {
                                    if (forTestDatabase4 != null) {
                                        if (0 != 0) {
                                            try {
                                                forTestDatabase4.close();
                                            } catch (Throwable th31) {
                                                th10.addSuppressed(th31);
                                            }
                                        } else {
                                            forTestDatabase4.close();
                                        }
                                    }
                                }
                            } catch (Throwable th32) {
                                if (connect3 != null) {
                                    if (0 != 0) {
                                        try {
                                            connect3.close();
                                        } catch (Throwable th33) {
                                            th8.addSuppressed(th33);
                                        }
                                    } else {
                                        connect3.close();
                                    }
                                }
                                throw th32;
                            }
                        } finally {
                            if (forTestDatabase != null) {
                                if (0 != 0) {
                                    try {
                                        forTestDatabase.close();
                                    } catch (Throwable th34) {
                                        th.addSuppressed(th34);
                                    }
                                } else {
                                    forTestDatabase.close();
                                }
                            }
                        }
                    } finally {
                    }
                } finally {
                    if (forTestDatabase3 != null) {
                        if (0 != 0) {
                            try {
                                forTestDatabase3.close();
                            } catch (Throwable th35) {
                                th5.addSuppressed(th35);
                            }
                        } else {
                            forTestDatabase3.close();
                        }
                    }
                }
            } catch (Throwable th36) {
                if (connect != null) {
                    if (0 != 0) {
                        try {
                            connect.close();
                        } catch (Throwable th37) {
                            th3.addSuppressed(th37);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th36;
            }
        } finally {
            if (forTestDatabase2 != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase2.close();
                    } catch (Throwable th38) {
                        th2.addSuppressed(th38);
                    }
                } else {
                    forTestDatabase2.close();
                }
            }
        }
    }

    @Test
    public void shouldConsumeEventsWithNoSnapshot() throws SQLException, InterruptedException {
        Testing.Files.delete(DB_HISTORY_PATH);
        this.config = Configuration.create().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.hostname")).with(MySqlConnectorConfig.PORT, System.getProperty("database.port")).with(MySqlConnectorConfig.USER, "snapper").with(MySqlConnectorConfig.PASSWORD, "snapperpass").with(MySqlConnectorConfig.SSL_MODE, MySqlConnectorConfig.SecureConnectionMode.DISABLED.name().toLowerCase()).with(MySqlConnectorConfig.SERVER_ID, 18780).with(MySqlConnectorConfig.SERVER_NAME, "myServer1").with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10).with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class).with(MySqlConnectorConfig.DATABASE_WHITELIST, "connector_test_ro").with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER.name().toLowerCase()).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH).build();
        start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(33);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("myServer1.connector_test_ro.products").size()).isEqualTo(9);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("myServer1.connector_test_ro.products_on_hand").size()).isEqualTo(9);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("myServer1.connector_test_ro.customers").size()).isEqualTo(4);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("myServer1.connector_test_ro.orders").size()).isEqualTo(5);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(5);
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase("connector_test_ro").size()).isEqualTo(6);
        consumeRecordsByTopic.forEach(this::validate);
        stopConnector();
        consumeRecordsByTopic.recordsForTopic("myServer1.connector_test_ro.orders").forEach(sourceRecord -> {
            print(sourceRecord);
        });
        consumeRecordsByTopic.recordsForTopic("myServer1.connector_test_ro.customers").forEach(sourceRecord2 -> {
            print(sourceRecord2);
        });
    }

    @Test
    public void shouldConsumeEventsWithMaskedAndBlacklistedColumns() throws SQLException, InterruptedException {
        Testing.Files.delete(DB_HISTORY_PATH);
        this.config = Configuration.create().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.hostname")).with(MySqlConnectorConfig.PORT, System.getProperty("database.port")).with(MySqlConnectorConfig.USER, "snapper").with(MySqlConnectorConfig.PASSWORD, "snapperpass").with(MySqlConnectorConfig.SSL_MODE, MySqlConnectorConfig.SecureConnectionMode.DISABLED.name().toLowerCase()).with(MySqlConnectorConfig.SERVER_ID, 18780).with(MySqlConnectorConfig.SERVER_NAME, "myServer2").with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10).with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class).with(MySqlConnectorConfig.DATABASE_WHITELIST, "connector_test_ro").with(MySqlConnectorConfig.COLUMN_BLACKLIST, "connector_test_ro.orders.order_number").with(MySqlConnectorConfig.MASK_COLUMN(12), "connector_test_ro.customers.email").with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH).build();
        start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(27);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("myServer2.connector_test_ro.products").size()).isEqualTo(9);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("myServer2.connector_test_ro.products_on_hand").size()).isEqualTo(9);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("myServer2.connector_test_ro.customers").size()).isEqualTo(4);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("myServer2.connector_test_ro.orders").size()).isEqualTo(5);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(4);
        consumeRecordsByTopic.forEach(this::validate);
        stopConnector();
        consumeRecordsByTopic.recordsForTopic("myServer2.connector_test_ro.orders").forEach(sourceRecord -> {
            print(sourceRecord);
            try {
                ((Struct) sourceRecord.value()).get("order_number");
                Assert.fail("The 'order_number' field was found but should not exist");
            } catch (DataException e) {
            }
        });
        consumeRecordsByTopic.recordsForTopic("myServer2.connector_test_ro.customers").forEach(sourceRecord2 -> {
            Struct struct = (Struct) sourceRecord2.value();
            if (struct.getStruct("after") != null) {
                Assertions.assertThat(struct.getStruct("after").getString("email")).isEqualTo("************");
            }
            if (struct.getStruct("before") != null) {
                Assertions.assertThat(struct.getStruct("before").getString("email")).isEqualTo("************");
            }
            print(sourceRecord2);
        });
    }
}
