package io.debezium.connector.mysql;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.util.Testing;
import java.nio.file.Path;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/mysql/MySqlRestartIT.class */
public class MySqlRestartIT extends AbstractConnectorTest {
    private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-restart.txt").toAbsolutePath();
    private final UniqueDatabase DATABASE = new UniqueDatabase("restart", "connector_test").withDbHistoryPath(DB_HISTORY_PATH);
    private Configuration config;

    @Before
    public void beforeEach() {
        stopConnector();
        this.DATABASE.createAndInitialize();
        initializeConnectorTestFramework();
        Testing.Files.delete(DB_HISTORY_PATH);
    }

    @After
    public void afterEach() {
        try {
            stopConnector();
            Testing.Files.delete(DB_HISTORY_PATH);
        } catch (Throwable th) {
            Testing.Files.delete(DB_HISTORY_PATH);
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-1276"})
    public void shouldNotDuplicateEventsAfterRestart() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("restart_table")).build();
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"CREATE TABLE restart_table (id INT PRIMARY KEY, val INT)", "INSERT INTO restart_table VALUES(1, 10)"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                start(MySqlConnector.class, this.config, sourceRecord -> {
                    return sourceRecord.valueSchema().field("after") != null && ((Struct) sourceRecord.value()).getStruct("after").getInt32("id").equals(5);
                });
                Assertions.assertThat(consumeRecordsByTopic(15).recordsForTopic(this.DATABASE.topicForTable("restart_table")).size()).isEqualTo(1);
                forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                try {
                    connect = forTestDatabase.connect();
                    try {
                        connect.connect().setAutoCommit(false);
                        connect.execute(new String[]{"INSERT INTO restart_table VALUES(2,12)", "INSERT INTO restart_table VALUES(3,13)", "INSERT INTO restart_table VALUES(4,14)", "INSERT INTO restart_table VALUES(5,15)", "INSERT INTO restart_table VALUES(6,16)"});
                        if (connect != null) {
                            connect.close();
                        }
                        if (forTestDatabase != null) {
                            forTestDatabase.close();
                        }
                        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(3);
                        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("restart_table")).size()).isEqualTo(3);
                        Assertions.assertThat(((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("restart_table")).get(0)).value()).getStruct("after").getInt32("id")).isEqualTo(2);
                        stopConnector();
                        start(MySqlConnector.class, this.config);
                        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
                        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(this.DATABASE.topicForTable("restart_table")).size()).isEqualTo(2);
                        Assertions.assertThat(((Struct) ((SourceRecord) consumeRecordsByTopic2.recordsForTopic(this.DATABASE.topicForTable("restart_table")).get(0)).value()).getStruct("after").getInt32("id")).isEqualTo(5);
                        stopConnector();
                    } finally {
                        if (connect != null) {
                            try {
                                connect.close();
                            } catch (Throwable th) {
                                th.addSuppressed(th);
                            }
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }
}
