package io.debezium.connector.mysql;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.function.Consumer;
import org.apache.kafka.connect.data.Struct;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/mysql/MySqlConnectorJsonIT.class */
public class MySqlConnectorJsonIT extends AbstractConnectorTest {
    private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-json.txt").toAbsolutePath();
    private Configuration config;

    @Before
    public void beforeEach() {
        stopConnector();
        initializeConnectorTestFramework();
        Testing.Files.delete(DB_HISTORY_PATH);
    }

    @After
    public void afterEach() {
        try {
            stopConnector();
            Testing.Files.delete(DB_HISTORY_PATH);
        } catch (Throwable th) {
            Testing.Files.delete(DB_HISTORY_PATH);
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-126"})
    public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws SQLException, InterruptedException {
        this.config = Configuration.create().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.hostname")).with(MySqlConnectorConfig.PORT, System.getProperty("database.port")).with(MySqlConnectorConfig.USER, "snapper").with(MySqlConnectorConfig.PASSWORD, "snapperpass").with(MySqlConnectorConfig.SSL_MODE, MySqlConnectorConfig.SecureConnectionMode.DISABLED.name().toLowerCase()).with(MySqlConnectorConfig.SERVER_ID, 18765).with(MySqlConnectorConfig.SERVER_NAME, "jsonit_binlog").with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10).with(MySqlConnectorConfig.DATABASE_WHITELIST, "json_test").with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER.toString()).with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH).build();
        start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1 + 1 + 1);
        stopConnector();
        Assertions.assertThat(consumeRecordsByTopic).isNotNull();
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("jsonit_binlog").size()).isEqualTo(1 + 1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("jsonit_binlog.json_test.dbz_126_jsontable").size()).isEqualTo(1);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(1 + 1);
        Assertions.assertThat(consumeRecordsByTopic.databaseNames().size()).isEqualTo(1);
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase("json_test").size()).isEqualTo(1 + 1);
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase("regression_test")).isNull();
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase("connector_test")).isNull();
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase("readbinlog_test")).isNull();
        consumeRecordsByTopic.ddlRecordsForDatabase("json_test").forEach(this::print);
        consumeRecordsByTopic.forEach(this::validate);
        ArrayList arrayList = new ArrayList();
        consumeRecordsByTopic.forEach(sourceRecord -> {
            Struct struct = (Struct) sourceRecord.value();
            if (sourceRecord.topic().endsWith("dbz_126_jsontable")) {
                Struct struct2 = struct.getStruct("after");
                Assertions.assertThat(struct2.getInt32("id")).isNotNull();
                String string = struct2.getString("json");
                String string2 = struct2.getString("expectedBinlogStr");
                arrayList.getClass();
                check(string, string2, (v1) -> {
                    r3.add(v1);
                });
            }
        });
        if (arrayList.isEmpty()) {
            return;
        }
        Assert.fail("" + arrayList.size() + " errors with JSON records..." + System.lineSeparator() + String.join(System.lineSeparator(), arrayList));
    }

    @Test
    public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLException, InterruptedException {
        this.config = Configuration.create().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.hostname")).with(MySqlConnectorConfig.PORT, System.getProperty("database.port")).with(MySqlConnectorConfig.USER, "snapper").with(MySqlConnectorConfig.PASSWORD, "snapperpass").with(MySqlConnectorConfig.SSL_MODE, MySqlConnectorConfig.SecureConnectionMode.DISABLED.name().toLowerCase()).with(MySqlConnectorConfig.SERVER_ID, 18765).with(MySqlConnectorConfig.SERVER_NAME, "jsonit_snap").with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10).with(MySqlConnectorConfig.DATABASE_WHITELIST, "json_test").with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class).with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH).build();
        start(MySqlConnector.class, this.config);
        int i = (1 * 2) + 3;
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(i + 1 + 1);
        stopConnector();
        Assertions.assertThat(consumeRecordsByTopic).isNotNull();
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("jsonit_snap").size()).isEqualTo(i + 1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("jsonit_snap.json_test.dbz_126_jsontable").size()).isEqualTo(1);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(1 + 1);
        Assertions.assertThat(consumeRecordsByTopic.databaseNames().size()).isEqualTo(2);
        Assertions.assertThat(consumeRecordsByTopic.databaseNames()).containsOnly(new Object[]{"json_test", ""});
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase("json_test").size()).isEqualTo(i);
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase("regression_test")).isNull();
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase("connector_test")).isNull();
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase("readbinlog_test")).isNull();
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase("").size()).isEqualTo(1);
        consumeRecordsByTopic.ddlRecordsForDatabase("json_test").forEach(this::print);
        consumeRecordsByTopic.forEach(this::validate);
        ArrayList arrayList = new ArrayList();
        consumeRecordsByTopic.forEach(sourceRecord -> {
            Struct struct = (Struct) sourceRecord.value();
            if (sourceRecord.topic().endsWith("dbz_126_jsontable")) {
                Struct struct2 = struct.getStruct("after");
                Assertions.assertThat(struct2.getInt32("id")).isNotNull();
                String string = struct2.getString("json");
                String string2 = struct2.getString("expectedJdbcStr");
                arrayList.getClass();
                check(string, string2, (v1) -> {
                    r3.add(v1);
                });
            }
        });
        if (arrayList.isEmpty()) {
            return;
        }
        Assert.fail("" + arrayList.size() + " errors with JSON records..." + System.lineSeparator() + String.join(System.lineSeparator(), arrayList));
    }

    protected void check(String str, String str2, Consumer<String> consumer) {
        if ((str != null || str2 == null) && (str == null || str.equals(str2))) {
            Assertions.assertThat(str).isEqualTo(str2);
        } else {
            consumer.accept("JSON was:     " + str + System.lineSeparator() + "but expected: " + str2);
        }
    }
}
