package io.debezium.connector.mysql;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/mysql/TransactionMetadataIT.class */
public class TransactionMetadataIT extends AbstractConnectorTest {
    private static final String PRODUCT_INSERT_STMT = "INSERT INTO products (name, description, weight) VALUES ('robot', 'Toy robot', 1.304);";
    private static final String CUSTOMER_INSERT_STMT_1 = "INSERT INTO customers (first_name, last_name, email) VALUES ('Nitin', 'Agarwal', 'test1@abc.com' ); ";
    private static final String CUSTOMER_INSERT_STMT_2 = "INSERT INTO customers (first_name, last_name, email) VALUES ('Rajesh', 'Agarwal', 'test2@abc.com' ); ";
    private static final String ORDER_INSERT_STMT = "INSERT INTO orders (order_date, purchaser, quantity, product_id) VALUES ('2016-01-16', 1001, 1, 1); ";
    private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-tm.txt").toAbsolutePath();
    private static final String SERVER_NAME = "tm_test";
    private final UniqueDatabase DATABASE = new UniqueDatabase(SERVER_NAME, "transaction_metadata_test").withDbHistoryPath(DB_HISTORY_PATH);
    private Configuration config;

    @Before
    public void beforeEach() {
        stopConnector();
        this.DATABASE.createAndInitialize();
        initializeConnectorTestFramework();
        Testing.Files.delete(DB_HISTORY_PATH);
    }

    @After
    public void afterEach() {
        try {
            stopConnector();
            Testing.Files.delete(DB_HISTORY_PATH);
        } catch (Throwable th) {
            Testing.Files.delete(DB_HISTORY_PATH);
            throw th;
        }
    }

    @Test
    public void transactionMetadataEnabled() throws InterruptedException, SQLException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(MySqlConnectorConfig.PROVIDE_TRANSACTION_METADATA, true).with("internal.implementation", "new").build();
        start(MySqlConnector.class, this.config);
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted("mysql", this.DATABASE.getServerName());
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.setAutoCommit(false);
                connect.execute(new String[]{CUSTOMER_INSERT_STMT_1, PRODUCT_INSERT_STMT, ORDER_INSERT_STMT, CUSTOMER_INSERT_STMT_2});
                connect.commit();
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                String str = null;
                ArrayList arrayList = new ArrayList();
                for (int i = 0; str == null && i < 50; i++) {
                    List<SourceRecord> allRecordsInOrder = consumeRecordsByTopic(100).allRecordsInOrder();
                    str = getTxId(allRecordsInOrder);
                    arrayList.addAll(allRecordsInOrder);
                }
                Assert.assertNotNull("Failed to find the transaction", str);
                int findFirstEvent = findFirstEvent(arrayList, str);
                if (arrayList.size() < findFirstEvent + 6) {
                    arrayList.addAll(consumeRecordsByTopic(6).allRecordsInOrder());
                }
                List<SourceRecord> subList = arrayList.subList(findFirstEvent, findFirstEvent + 1 + 4 + 1);
                Assert.assertFalse(subList.isEmpty());
                Assert.assertEquals(6L, subList.size());
                String databaseName = this.DATABASE.getDatabaseName();
                Assert.assertEquals(str, assertBeginTransaction(subList.get(0)));
                assertEndTransaction(subList.get(5), str, 4L, Collect.hashMapOf(databaseName + ".products", 1, databaseName + ".customers", 2, databaseName + ".orders", 1));
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4077"})
    public void shouldUseConfiguredTransactionTopicName() throws InterruptedException, SQLException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(MySqlConnectorConfig.PROVIDE_TRANSACTION_METADATA, true).with("internal.implementation", "new").with(MySqlConnectorConfig.TRANSACTION_TOPIC, "tx.of.${database.server.name}").build();
        start(MySqlConnector.class, this.config);
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted("mysql", this.DATABASE.getServerName());
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.setAutoCommit(false);
                connect.execute(new String[]{CUSTOMER_INSERT_STMT_1, PRODUCT_INSERT_STMT, ORDER_INSERT_STMT, CUSTOMER_INSERT_STMT_2});
                connect.commit();
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                Assertions.assertThat(consumeRecordsByTopic(6).recordsForTopic("tx.of." + this.DATABASE.getServerName())).hasSize(2);
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4077"})
    public void shouldUseConfiguredTransactionTopicNameWithoutServerName() throws InterruptedException, SQLException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(MySqlConnectorConfig.PROVIDE_TRANSACTION_METADATA, true).with("internal.implementation", "new").with(MySqlConnectorConfig.TRANSACTION_TOPIC, "mytransactions").build();
        start(MySqlConnector.class, this.config);
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted("mysql", this.DATABASE.getServerName());
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.setAutoCommit(false);
                connect.execute(new String[]{CUSTOMER_INSERT_STMT_1, PRODUCT_INSERT_STMT, ORDER_INSERT_STMT, CUSTOMER_INSERT_STMT_2});
                connect.commit();
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                Assertions.assertThat(consumeRecordsByTopic(6).recordsForTopic("mytransactions")).hasSize(2);
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private String getTxId(List<SourceRecord> list) {
        return (String) list.stream().map(sourceRecord -> {
            return (Struct) sourceRecord.value();
        }).filter(struct -> {
            return struct.schema().field("source") != null;
        }).filter(struct2 -> {
            return struct2.getStruct("source").getString("table").equals("products");
        }).filter(struct3 -> {
            return struct3.getStruct("after").getString("description").equals("Toy robot");
        }).findFirst().map(struct4 -> {
            return (String) struct4.getStruct("transaction").get("id");
        }).orElse(null);
    }

    private int findFirstEvent(List<SourceRecord> list, String str) {
        int i = 0;
        Iterator<SourceRecord> it = list.iterator();
        while (it.hasNext()) {
            if (((Struct) it.next().value()).getString("id").equals(str)) {
                return i;
            }
            i++;
        }
        return -1;
    }
}
