package io.debezium.connector.mysql.zzz;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlTestConnection;
import io.debezium.connector.mysql.UniqueDatabase;
import io.debezium.connector.mysql.junit.SkipTestDependingOnDatabaseRule;
import io.debezium.connector.mysql.junit.SkipTestDependingOnGtidModeRule;
import io.debezium.connector.mysql.junit.SkipWhenDatabaseIs;
import io.debezium.connector.mysql.junit.SkipWhenGtidModeIs;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

@SkipWhenGtidModeIs(SkipWhenGtidModeIs.GtidMode.OFF)
@SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 5, minor = 6, reason = "DDL uses fractional second data types, not supported until MySQL 5.6")
@SkipWhenDatabaseIs(value = SkipWhenDatabaseIs.Type.MARIADB, reason = "MariaDB does not support purged GTID sets")
/* loaded from: input_file:io/debezium/connector/mysql/zzz/ZZZGtidSetIT.class */
public class ZZZGtidSetIT extends AbstractConnectorTest {
    private static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("file-schema-history-connect.txt").toAbsolutePath();
    private Configuration config;
    private final UniqueDatabase DATABASE = new UniqueDatabase("myServer1", "connector_test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
    private final UniqueDatabase RO_DATABASE = new UniqueDatabase("myServer2", "connector_test_ro", this.DATABASE).withDbHistoryPath(SCHEMA_HISTORY_PATH);

    @Rule
    public TestRule skipTest = new SkipTestDependingOnGtidModeRule();

    @Rule
    public TestRule skipTest2 = new SkipTestDependingOnDatabaseRule();

    @Before
    public void beforeEach() {
        stopConnector();
        this.DATABASE.createAndInitialize();
        this.RO_DATABASE.createAndInitialize();
        initializeConnectorTestFramework();
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
    }

    @After
    public void afterEach() {
        try {
            stopConnector();
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
        } catch (Throwable th) {
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
            throw th;
        }
    }

    private boolean isGtidModeEnabled() throws SQLException {
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            boolean booleanValue = ((Boolean) forTestDatabase.queryAndMap("SHOW GLOBAL VARIABLES LIKE 'GTID_MODE'", resultSet -> {
                if (resultSet.next()) {
                    return Boolean.valueOf("ON".equalsIgnoreCase(resultSet.getString(2)));
                }
                throw new IllegalStateException("Cannot obtain GTID status");
            })).booleanValue();
            if (forTestDatabase != null) {
                forTestDatabase.close();
            }
            return booleanValue;
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-1184"})
    public void shouldProcessPurgedGtidSet() throws SQLException, InterruptedException {
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        purgeDatabaseLogs();
        UniqueDatabase withDbHistoryPath = new UniqueDatabase("myServer2", "connector_test_ro", new UniqueDatabase("myServer1", "connector_test").withDbHistoryPath(SCHEMA_HISTORY_PATH)).withDbHistoryPath(SCHEMA_HISTORY_PATH);
        withDbHistoryPath.createAndInitialize();
        this.config = withDbHistoryPath.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, withDbHistoryPath.qualifiedTableName("customers")).build();
        start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(withDbHistoryPath.topicForTable("customers")).size()).isEqualTo(4);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(2);
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase(withDbHistoryPath.getDatabaseName()).size()).isEqualTo(6);
        consumeRecordsByTopic.forEach(sourceRecord -> {
            this.validate(sourceRecord);
        });
        consumeRecordsByTopic.recordsForTopic(withDbHistoryPath.topicForTable("customers")).forEach(sourceRecord2 -> {
            Matcher matcher = Pattern.compile(".*:(.*)-.*").matcher((String) sourceRecord2.sourceOffset().get("gtids"));
            matcher.matches();
            Assertions.assertThat(matcher.group(1)).isNotEqualTo("1");
        });
        stopConnector();
    }

    private void purgeDatabaseLogs() throws SQLException {
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"FLUSH LOGS"});
                List<String> binlogs = getBinlogs(connect);
                connect.execute(new String[]{"PURGE BINARY LOGS TO '" + binlogs.get(binlogs.size() - 1) + "'"});
                Awaitility.await().atMost(Duration.ofSeconds(10L)).until(() -> {
                    List<String> binlogs2 = getBinlogs(connect);
                    if (binlogs2.size() != 1) {
                        Testing.print("Binlogs before purging: " + binlogs);
                        Testing.print("Binlogs after purging: " + binlogs2);
                    }
                    return Boolean.valueOf(binlogs2.size() == 1);
                });
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private List<String> getBinlogs(JdbcConnection jdbcConnection) throws SQLException {
        return (List) jdbcConnection.queryAndMap("SHOW BINARY LOGS", resultSet -> {
            ArrayList arrayList = new ArrayList();
            while (resultSet.next()) {
                arrayList.add(resultSet.getString(1));
            }
            return arrayList;
        });
    }

    @Test
    @FixFor({"DBZ-1244"})
    public void shouldProcessPurgedLogsWhenDownAndSnapshotNeeded() throws SQLException, InterruptedException {
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        purgeDatabaseLogs();
        UniqueDatabase withDbHistoryPath = new UniqueDatabase("myServer1", "connector_test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
        withDbHistoryPath.createAndInitialize();
        this.config = withDbHistoryPath.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.WHEN_NEEDED).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, withDbHistoryPath.qualifiedTableName("customers")).build();
        start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(16);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(withDbHistoryPath.topicForTable("customers")).size()).isEqualTo(4);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(2);
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase(withDbHistoryPath.getDatabaseName()).size()).isEqualTo(11);
        consumeRecordsByTopic.forEach(sourceRecord -> {
            this.validate(sourceRecord);
        });
        stopConnector();
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(withDbHistoryPath.getDatabaseName());
        try {
            forTestDatabase.execute(new String[]{"INSERT INTO customers VALUES(default,1,1,1)", "INSERT INTO customers VALUES(default,2,2,2)"});
            if (forTestDatabase != null) {
                forTestDatabase.close();
            }
            start(MySqlConnector.class, this.config);
            consumeRecordsByTopic(2);
            stopConnector();
            MySqlTestConnection forTestDatabase2 = MySqlTestConnection.forTestDatabase(withDbHistoryPath.getDatabaseName());
            try {
                forTestDatabase2.execute(new String[]{"INSERT INTO customers VALUES(default,3,3,3)", "INSERT INTO customers VALUES(default,4,4,4)"});
                if (forTestDatabase2 != null) {
                    forTestDatabase2.close();
                }
                purgeDatabaseLogs();
                start(MySqlConnector.class, this.config);
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(20);
                Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(withDbHistoryPath.topicForTable("customers")).size()).isEqualTo(8);
                Assertions.assertThat(consumeRecordsByTopic2.topics().size()).isEqualTo(2);
                Assertions.assertThat(consumeRecordsByTopic2.ddlRecordsForDatabase(withDbHistoryPath.getDatabaseName()).size()).isEqualTo(11);
                stopConnector();
                forTestDatabase2 = MySqlTestConnection.forTestDatabase(withDbHistoryPath.getDatabaseName());
                try {
                    forTestDatabase2.execute(new String[]{"INSERT INTO customers VALUES(default,5,5,5)", "INSERT INTO customers VALUES(default,6,6,6)"});
                    if (forTestDatabase2 != null) {
                        forTestDatabase2.close();
                    }
                    purgeDatabaseLogs();
                    forTestDatabase = MySqlTestConnection.forTestDatabase(withDbHistoryPath.getDatabaseName());
                    try {
                        forTestDatabase.execute(new String[]{"INSERT INTO customers VALUES(default,7,7,7)", "INSERT INTO customers VALUES(default,8,8,8)"});
                        if (forTestDatabase != null) {
                            forTestDatabase.close();
                        }
                        start(MySqlConnector.class, this.config);
                        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(24);
                        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(withDbHistoryPath.topicForTable("customers")).size()).isEqualTo(12);
                        Assertions.assertThat(consumeRecordsByTopic3.topics().size()).isEqualTo(2);
                        Assertions.assertThat(consumeRecordsByTopic3.ddlRecordsForDatabase(withDbHistoryPath.getDatabaseName()).size()).isEqualTo(11);
                        stopConnector();
                    } finally {
                        if (forTestDatabase != null) {
                            try {
                                forTestDatabase.close();
                            } catch (Throwable th) {
                                th.addSuppressed(th);
                            }
                        }
                    }
                } finally {
                    if (forTestDatabase2 != null) {
                        try {
                            forTestDatabase2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                }
            } finally {
            }
        } finally {
        }
    }
}
