package io.debezium.connector.mysql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.storage.file.history.FileSchemaHistory;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Savepoint;
import java.sql.Statement;
import java.util.Iterator;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

@SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 5, minor = 6, reason = "DDL uses fractional second data types, not supported until MySQL 5.6")
/* loaded from: input_file:io/debezium/connector/mysql/BinlogReaderBufferIT.class */
public class BinlogReaderBufferIT extends AbstractConnectorTest {
    private static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("file-schema-history-connect.txt").toAbsolutePath();
    private final UniqueDatabase DATABASE = new UniqueDatabase("myServer1", "connector_test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
    private final UniqueDatabase RO_DATABASE = new UniqueDatabase("myServer2", "connector_test_ro", this.DATABASE).withDbHistoryPath(SCHEMA_HISTORY_PATH);
    private Configuration config;

    @Before
    public void beforeEach() {
        stopConnector();
        this.DATABASE.createAndInitialize();
        this.RO_DATABASE.createAndInitialize();
        initializeConnectorTestFramework();
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
    }

    @After
    public void afterEach() {
        try {
            stopConnector();
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
        } catch (Throwable th) {
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
            throw th;
        }
    }

    @Test
    public void shouldCorrectlyManageRollback() throws SQLException, InterruptedException {
        boolean equals = System.getProperty("database.port", "3306").equals(System.getProperty("database.replica.port", "3306"));
        if (!equals) {
            Thread.sleep(5000L);
        }
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost")).with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306")).with(MySqlConnectorConfig.USER, "snapper").with(MySqlConnectorConfig.PASSWORD, "snapperpass").with(MySqlConnectorConfig.SERVER_ID, 18765).with(CommonConnectorConfig.TOPIC_PREFIX, this.DATABASE.getServerName()).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, this.DATABASE.getDatabaseName()).with(MySqlConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER, 10000).with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH).build();
        start(MySqlConnector.class, this.config);
        consumeRecordsByTopic(39);
        if (equals) {
            MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
            try {
                JdbcConnection connect = forTestDatabase.connect();
                try {
                    Connection connection = connect.connection();
                    connect.setAutoCommit(false);
                    Statement createStatement = connection.createStatement();
                    createStatement.executeUpdate("CREATE TEMPORARY TABLE tmp_ids (a int)");
                    createStatement.executeUpdate("INSERT INTO tmp_ids VALUES(5)");
                    connection.commit();
                    createStatement.executeUpdate("DROP TEMPORARY TABLE tmp_ids");
                    createStatement.executeUpdate("UPDATE products SET weight=100.12 WHERE id=109");
                    connection.rollback();
                    connect.query("SELECT * FROM products", resultSet -> {
                        if (Testing.Print.isEnabled()) {
                            connect.print(resultSet);
                        }
                    });
                    connect.setAutoCommit(true);
                    if (connect != null) {
                        connect.close();
                    }
                    if (forTestDatabase != null) {
                        forTestDatabase.close();
                    }
                    Thread.sleep(5000L);
                    assertNoRecordsToConsume();
                    assertEngineIsRunning();
                    Testing.print("*** Done with rollback TX");
                } finally {
                }
            } catch (Throwable th) {
                if (forTestDatabase != null) {
                    try {
                        forTestDatabase.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    @Test
    public void shouldProcessSavepoint() throws SQLException, InterruptedException {
        if (!System.getProperty("database.port", "3306").equals(System.getProperty("database.replica.port", "3306"))) {
            Thread.sleep(5000L);
        }
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost")).with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306")).with(MySqlConnectorConfig.USER, "snapper").with(MySqlConnectorConfig.PASSWORD, "snapperpass").with(MySqlConnectorConfig.SERVER_ID, 18765).with(CommonConnectorConfig.TOPIC_PREFIX, this.DATABASE.getServerName()).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, this.DATABASE.getDatabaseName()).with(MySqlConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH).build();
        start(MySqlConnector.class, this.config);
        consumeRecordsByTopic(39);
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                Connection connection = connect.connection();
                connect.setAutoCommit(false);
                Statement createStatement = connection.createStatement();
                createStatement.executeUpdate("INSERT INTO customers VALUES(default, 'first', 'first', 'first')");
                connection.setSavepoint();
                createStatement.executeUpdate("INSERT INTO customers VALUES(default, 'second', 'second', 'second')");
                connection.commit();
                connect.query("SELECT * FROM customers", resultSet -> {
                    if (Testing.Print.isEnabled()) {
                        connect.print(resultSet);
                    }
                });
                connect.setAutoCommit(true);
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
                Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(1);
                Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("customers"))).hasSize(2);
                Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(2);
                Testing.print("*** Done with savepoint TX");
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldProcessLargeTransaction() throws SQLException, InterruptedException {
        if (!System.getProperty("database.port", "3306").equals(System.getProperty("database.replica.port", "3306"))) {
            Thread.sleep(5000L);
        }
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost")).with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306")).with(MySqlConnectorConfig.USER, "snapper").with(MySqlConnectorConfig.PASSWORD, "snapperpass").with(MySqlConnectorConfig.SERVER_ID, 18765).with(CommonConnectorConfig.TOPIC_PREFIX, this.DATABASE.getServerName()).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, this.DATABASE.getDatabaseName()).with(MySqlConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER, 9).with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH).build();
        start(MySqlConnector.class, this.config);
        consumeRecordsByTopic(39);
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                Connection connection = connect.connection();
                connect.setAutoCommit(false);
                Statement createStatement = connection.createStatement();
                for (int i = 0; i < 40; i++) {
                    createStatement.executeUpdate(String.format("INSERT INTO customers\nVALUES (default,\"%s\",\"%s\",\"%s\")", Integer.valueOf(i), Integer.valueOf(i), Integer.valueOf(i)));
                }
                connection.commit();
                connect.query("SELECT * FROM customers", resultSet -> {
                    if (Testing.Print.isEnabled()) {
                        connect.print(resultSet);
                    }
                });
                connect.setAutoCommit(true);
                if (connect != null) {
                    connect.close();
                }
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(40);
                int i2 = 0;
                Iterator it = consumeRecordsByTopic.allRecordsInOrder().iterator();
                while (it.hasNext()) {
                    Struct struct = (Struct) ((SourceRecord) it.next()).value();
                    Assertions.assertThat(struct.getString("op")).isEqualTo("c");
                    int i3 = i2;
                    i2++;
                    Assertions.assertThat(struct.getStruct("after").getString("email")).isEqualTo(Integer.toString(i3));
                }
                Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(1);
                Testing.print("*** Done with large TX");
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-411"})
    public void shouldProcessRolledBackSavepoint() throws SQLException, InterruptedException {
        int i;
        int i2;
        int i3;
        boolean equals = System.getProperty("database.port", "3306").equals(System.getProperty("database.replica.port", "3306"));
        if (!equals) {
            Thread.sleep(5000L);
        }
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost")).with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306")).with(MySqlConnectorConfig.USER, "snapper").with(MySqlConnectorConfig.PASSWORD, "snapperpass").with(MySqlConnectorConfig.SERVER_ID, 18765).with(CommonConnectorConfig.TOPIC_PREFIX, this.DATABASE.getServerName()).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, this.DATABASE.getDatabaseName()).with(MySqlConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH).build();
        start(MySqlConnector.class, this.config);
        consumeRecordsByTopic(39);
        if (equals) {
            MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
            try {
                JdbcConnection connect = forTestDatabase.connect();
                try {
                    Connection connection = connect.connection();
                    connect.setAutoCommit(false);
                    Statement createStatement = connection.createStatement();
                    createStatement.executeUpdate("CREATE TEMPORARY TABLE tmp_ids (a int)");
                    createStatement.executeUpdate("INSERT INTO tmp_ids VALUES(5)");
                    connection.commit();
                    createStatement.executeUpdate("DROP TEMPORARY TABLE tmp_ids");
                    createStatement.executeUpdate("INSERT INTO customers VALUES(default, 'first', 'first', 'first')");
                    Savepoint savepoint = connection.setSavepoint();
                    createStatement.executeUpdate("INSERT INTO customers VALUES(default, 'second', 'second', 'second')");
                    connection.rollback(savepoint);
                    connection.commit();
                    connect.query("SELECT * FROM customers", resultSet -> {
                        if (Testing.Print.isEnabled()) {
                            connect.print(resultSet);
                        }
                    });
                    connect.setAutoCommit(true);
                    if (connect != null) {
                        connect.close();
                    }
                    if (forTestDatabase != null) {
                        forTestDatabase.close();
                    }
                    if (!MySqlTestConnection.isMySQL5() || MySqlTestConnection.isPerconaServer()) {
                        i = 1;
                        i2 = 1;
                        i3 = 1;
                    } else {
                        i = 3;
                        i2 = 2;
                        i3 = 2;
                    }
                    AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(i);
                    Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(i3);
                    Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("customers"))).hasSize(i2);
                    Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(i);
                    Testing.print("*** Done with savepoint TX");
                } finally {
                }
            } catch (Throwable th) {
                if (forTestDatabase != null) {
                    try {
                        forTestDatabase.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }
}
