package io.debezium.connector.mysql;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.data.KeyValueStore;
import io.debezium.data.SchemaChangeHistory;
import io.debezium.data.VerifyRecord;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/mysql/SnapshotReaderIT.class */
public class SnapshotReaderIT {
    private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-snapshot.txt").toAbsolutePath();
    private static final String DB_NAME = "connector_test_ro";
    private static final String LOGICAL_NAME = "logical_server_name";
    private Configuration config;
    private MySqlTaskContext context;
    private SnapshotReader reader;
    private CountDownLatch completed;

    @Before
    public void beforeEach() {
        Testing.Files.delete(DB_HISTORY_PATH);
        this.completed = new CountDownLatch(1);
    }

    @After
    public void afterEach() {
        if (this.reader != null) {
            try {
                this.reader.stop();
            } finally {
                this.reader = null;
            }
        }
        if (this.context != null) {
            try {
                this.context.shutdown();
            } finally {
                this.context = null;
                Testing.Files.delete(DB_HISTORY_PATH);
            }
        }
    }

    protected Configuration.Builder simpleConfig() {
        String property = System.getProperty("database.hostname");
        String property2 = System.getProperty("database.port");
        Assertions.assertThat(property).isNotNull();
        Assertions.assertThat(property2).isNotNull();
        return Configuration.create().with(MySqlConnectorConfig.HOSTNAME, property).with(MySqlConnectorConfig.PORT, property2).with(MySqlConnectorConfig.USER, "snapper").with(MySqlConnectorConfig.PASSWORD, "snapperpass").with(MySqlConnectorConfig.SSL_MODE, MySqlConnectorConfig.SecureConnectionMode.DISABLED.toString().toLowerCase()).with(MySqlConnectorConfig.SERVER_ID, 18911).with(MySqlConnectorConfig.SERVER_NAME, LOGICAL_NAME).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(MySqlConnectorConfig.DATABASE_WHITELIST, DB_NAME).with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class).with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
        List poll;
        this.config = simpleConfig().build();
        this.context = new MySqlTaskContext(this.config);
        this.context.start();
        this.reader = new SnapshotReader("snapshot", this.context);
        SnapshotReader snapshotReader = this.reader;
        CountDownLatch countDownLatch = this.completed;
        countDownLatch.getClass();
        snapshotReader.uponCompletion(countDownLatch::countDown);
        this.reader.generateInsertEvents();
        this.reader.useMinimalBlocking(true);
        this.reader.start();
        KeyValueStore createForTopicsBeginningWith = KeyValueStore.createForTopicsBeginningWith("logical_server_name.");
        SchemaChangeHistory schemaChangeHistory = new SchemaChangeHistory(LOGICAL_NAME);
        while (true) {
            poll = this.reader.poll();
            if (poll == null) {
                break;
            } else {
                poll.forEach(sourceRecord -> {
                    VerifyRecord.isValid(sourceRecord);
                    createForTopicsBeginningWith.add(sourceRecord);
                    schemaChangeHistory.add(sourceRecord);
                });
            }
        }
        Assertions.assertThat(poll).isNull();
        Assertions.assertThat(schemaChangeHistory.recordCount()).isEqualTo(0);
        Assertions.assertThat(createForTopicsBeginningWith.collectionCount()).isEqualTo(4);
        KeyValueStore.Collection collection = createForTopicsBeginningWith.collection(DB_NAME, "products");
        Assertions.assertThat(collection.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat(collection.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection2 = createForTopicsBeginningWith.collection(DB_NAME, "products_on_hand");
        Assertions.assertThat(collection2.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat(collection2.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection2.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection3 = createForTopicsBeginningWith.collection(DB_NAME, "customers");
        Assertions.assertThat(collection3.numberOfCreates()).isEqualTo(4L);
        Assertions.assertThat(collection3.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection3.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection4 = createForTopicsBeginningWith.collection(DB_NAME, "orders");
        Assertions.assertThat(collection4.numberOfCreates()).isEqualTo(5L);
        Assertions.assertThat(collection4.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection4.numberOfValueSchemaChanges()).isEqualTo(1L);
        if (this.completed.await(10L, TimeUnit.SECONDS)) {
            Testing.print("completed the snapshot");
        } else {
            Assert.fail("failed to complete the snapshot within 10 seconds");
        }
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseUsingReadEvents() throws Exception {
        List poll;
        this.config = simpleConfig().with(MySqlConnectorConfig.DATABASE_WHITELIST, "connector_(.*)").build();
        this.context = new MySqlTaskContext(this.config);
        this.context.start();
        this.reader = new SnapshotReader("snapshot", this.context);
        SnapshotReader snapshotReader = this.reader;
        CountDownLatch countDownLatch = this.completed;
        countDownLatch.getClass();
        snapshotReader.uponCompletion(countDownLatch::countDown);
        this.reader.generateReadEvents();
        this.reader.useMinimalBlocking(true);
        this.reader.start();
        KeyValueStore createForTopicsBeginningWith = KeyValueStore.createForTopicsBeginningWith("logical_server_name.");
        SchemaChangeHistory schemaChangeHistory = new SchemaChangeHistory(LOGICAL_NAME);
        while (true) {
            poll = this.reader.poll();
            if (poll == null) {
                break;
            } else {
                poll.forEach(sourceRecord -> {
                    VerifyRecord.isValid(sourceRecord);
                    createForTopicsBeginningWith.add(sourceRecord);
                    schemaChangeHistory.add(sourceRecord);
                });
            }
        }
        Assertions.assertThat(poll).isNull();
        Assertions.assertThat(schemaChangeHistory.recordCount()).isEqualTo(0);
        Assertions.assertThat(createForTopicsBeginningWith.databases()).containsOnly(new Object[]{DB_NAME, "connector_test"});
        Assertions.assertThat(createForTopicsBeginningWith.collectionCount()).isEqualTo(8);
        KeyValueStore.Collection collection = createForTopicsBeginningWith.collection(DB_NAME, "products");
        Assertions.assertThat(collection.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfReads()).isEqualTo(9L);
        Assertions.assertThat(collection.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection2 = createForTopicsBeginningWith.collection(DB_NAME, "products_on_hand");
        Assertions.assertThat(collection2.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfReads()).isEqualTo(9L);
        Assertions.assertThat(collection2.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection2.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection3 = createForTopicsBeginningWith.collection(DB_NAME, "customers");
        Assertions.assertThat(collection3.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfReads()).isEqualTo(4L);
        Assertions.assertThat(collection3.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection3.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection4 = createForTopicsBeginningWith.collection(DB_NAME, "orders");
        Assertions.assertThat(collection4.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfReads()).isEqualTo(5L);
        Assertions.assertThat(collection4.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection4.numberOfValueSchemaChanges()).isEqualTo(1L);
        if (this.completed.await(10L, TimeUnit.SECONDS)) {
            Testing.print("completed the snapshot");
        } else {
            Assert.fail("failed to complete the snapshot within 10 seconds");
        }
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Exception {
        List poll;
        this.config = simpleConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build();
        this.context = new MySqlTaskContext(this.config);
        this.context.start();
        this.reader = new SnapshotReader("snapshot", this.context);
        SnapshotReader snapshotReader = this.reader;
        CountDownLatch countDownLatch = this.completed;
        countDownLatch.getClass();
        snapshotReader.uponCompletion(countDownLatch::countDown);
        this.reader.generateInsertEvents();
        this.reader.useMinimalBlocking(true);
        this.reader.start();
        KeyValueStore createForTopicsBeginningWith = KeyValueStore.createForTopicsBeginningWith("logical_server_name.");
        SchemaChangeHistory schemaChangeHistory = new SchemaChangeHistory(LOGICAL_NAME);
        while (true) {
            poll = this.reader.poll();
            if (poll == null) {
                break;
            } else {
                poll.forEach(sourceRecord -> {
                    VerifyRecord.isValid(sourceRecord);
                    createForTopicsBeginningWith.add(sourceRecord);
                    schemaChangeHistory.add(sourceRecord);
                });
            }
        }
        Assertions.assertThat(poll).isNull();
        Assertions.assertThat(schemaChangeHistory.recordCount()).isEqualTo(12);
        Assertions.assertThat(schemaChangeHistory.databaseCount()).isEqualTo(2);
        Assertions.assertThat(schemaChangeHistory.databases()).containsOnly(new Object[]{DB_NAME, ""});
        Assertions.assertThat(createForTopicsBeginningWith.collectionCount()).isEqualTo(4);
        KeyValueStore.Collection collection = createForTopicsBeginningWith.collection(DB_NAME, "products");
        Assertions.assertThat(collection.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat(collection.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection2 = createForTopicsBeginningWith.collection(DB_NAME, "products_on_hand");
        Assertions.assertThat(collection2.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat(collection2.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection2.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection3 = createForTopicsBeginningWith.collection(DB_NAME, "customers");
        Assertions.assertThat(collection3.numberOfCreates()).isEqualTo(4L);
        Assertions.assertThat(collection3.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection3.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection4 = createForTopicsBeginningWith.collection(DB_NAME, "orders");
        Assertions.assertThat(collection4.numberOfCreates()).isEqualTo(5L);
        Assertions.assertThat(collection4.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection4.numberOfValueSchemaChanges()).isEqualTo(1L);
        if (this.completed.await(10L, TimeUnit.SECONDS)) {
            Testing.print("completed the snapshot");
        } else {
            Assert.fail("failed to complete the snapshot within 10 seconds");
        }
    }

    @Test
    public void shouldCreateSnapshotSchemaOnly() throws Exception {
        List poll;
        this.config = simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).build();
        this.context = new MySqlTaskContext(this.config);
        this.context.start();
        this.reader = new SnapshotReader("snapshot", this.context);
        SnapshotReader snapshotReader = this.reader;
        CountDownLatch countDownLatch = this.completed;
        countDownLatch.getClass();
        snapshotReader.uponCompletion(countDownLatch::countDown);
        this.reader.generateInsertEvents();
        this.reader.useMinimalBlocking(true);
        this.reader.start();
        KeyValueStore createForTopicsBeginningWith = KeyValueStore.createForTopicsBeginningWith("logical_server_name.");
        SchemaChangeHistory schemaChangeHistory = new SchemaChangeHistory(LOGICAL_NAME);
        while (true) {
            poll = this.reader.poll();
            if (poll == null) {
                break;
            } else {
                poll.forEach(sourceRecord -> {
                    VerifyRecord.isValid(sourceRecord);
                    createForTopicsBeginningWith.add(sourceRecord);
                    schemaChangeHistory.add(sourceRecord);
                });
            }
        }
        Assertions.assertThat(poll).isNull();
        Assertions.assertThat(schemaChangeHistory.recordCount()).isEqualTo(0);
        Assertions.assertThat(createForTopicsBeginningWith.collectionCount()).isEqualTo(0);
        if (this.completed.await(10L, TimeUnit.SECONDS)) {
            Testing.print("completed the snapshot");
        } else {
            Assert.fail("failed to complete the snapshot within 10 seconds");
        }
    }
}
