package io.debezium.connector.mysql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.junit.SkipTestDependingOnGtidModeRule;
import io.debezium.connector.mysql.junit.SkipWhenGtidModeIs;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.kafka.KafkaCluster;
import io.debezium.pipeline.signal.channels.FileSignalChannel;
import io.debezium.pipeline.signal.channels.KafkaSignalChannel;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

@SkipWhenGtidModeIs(value = SkipWhenGtidModeIs.GtidMode.OFF, reason = "Read only connection requires GTID_MODE to be ON")
/* loaded from: input_file:io/debezium/connector/mysql/ReadOnlyIncrementalSnapshotIT.class */
public class ReadOnlyIncrementalSnapshotIT extends IncrementalSnapshotIT {
    private static KafkaCluster kafka;
    private static final int PARTITION_NO = 0;
    public static final String EXCLUDED_TABLE = "b";

    @Rule
    public TestRule skipTest = new SkipTestDependingOnGtidModeRule();
    private final Path signalsFile = Paths.get("src", "test", "resources").resolve("debezium_signaling_file.txt");

    @Override // io.debezium.connector.mysql.IncrementalSnapshotIT
    @Before
    public void before() throws SQLException {
        super.before();
        kafka.createTopic(getSignalsTopic(), 1, 1);
    }

    @BeforeClass
    public static void startKafka() throws Exception {
        File createTestingDirectory = Testing.Files.createTestingDirectory("signal_cluster");
        Testing.Files.delete(createTestingDirectory);
        kafka = new KafkaCluster().usingDirectory(createTestingDirectory).deleteDataPriorToStartup(true).deleteDataUponShutdown(true).addBrokers(1).withKafkaConfiguration(Collect.propertiesOf("auto.create.topics.enable", "false", "zookeeper.session.timeout.ms", "20000")).startup();
    }

    @AfterClass
    public static void stopKafka() {
        if (kafka != null) {
            kafka.shutdown();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.mysql.IncrementalSnapshotIT
    public Configuration.Builder config() {
        return super.config().with(MySqlConnectorConfig.TABLE_EXCLUDE_LIST, this.DATABASE.getDatabaseName() + ".b").with(MySqlConnectorConfig.READ_ONLY_CONNECTION, true).with(KafkaSignalChannel.SIGNAL_TOPIC, getSignalsTopic()).with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList()).with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka").with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, String.format("%s:%s", this.DATABASE.qualifiedTableName("a42"), "pk1,pk2,pk3,pk4"));
    }

    protected String getSignalsTopic() {
        return this.DATABASE.getDatabaseName() + "signals_topic";
    }

    protected void sendExecuteSnapshotKafkaSignal() throws ExecutionException, InterruptedException {
        sendExecuteSnapshotKafkaSignal(tableDataCollectionId());
    }

    protected void sendExecuteSnapshotKafkaSignal(String str) throws ExecutionException, InterruptedException {
        sendKafkaSignal(String.format("{\"type\":\"execute-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"INCREMENTAL\"}}", str));
    }

    protected void sendStopSnapshotKafkaSignal() throws ExecutionException, InterruptedException {
        sendStopSnapshotKafkaSignal(tableDataCollectionId());
    }

    protected void sendStopSnapshotKafkaSignal(String str) throws ExecutionException, InterruptedException {
        sendKafkaSignal(String.format("{\"type\":\"stop-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"INCREMENTAL\"}}", str));
    }

    protected void sendPauseSnapshotKafkaSignal() throws ExecutionException, InterruptedException {
        sendKafkaSignal("{\"type\":\"pause-snapshot\",\"data\": {\"type\": \"INCREMENTAL\"}}");
    }

    protected void sendResumeSnapshotKafkaSignal() throws ExecutionException, InterruptedException {
        sendKafkaSignal("{\"type\":\"resume-snapshot\",\"data\": {\"type\": \"INCREMENTAL\"}}");
    }

    protected void sendKafkaSignal(String str) throws ExecutionException, InterruptedException {
        ProducerRecord producerRecord = new ProducerRecord(getSignalsTopic(), Integer.valueOf(PARTITION_NO), "is_test", str);
        KafkaProducer kafkaProducer = new KafkaProducer(Configuration.create().withDefault("bootstrap.servers", kafka.brokerList()).withDefault("client.id", "signals").withDefault("key.serializer", StringSerializer.class).withDefault("value.serializer", StringSerializer.class).build().asProperties());
        try {
            kafkaProducer.send(producerRecord).get();
            kafkaProducer.close();
        } catch (Throwable th) {
            try {
                kafkaProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void emptyHighWatermark() throws Exception {
        populateTable();
        startConnector();
        sendAdHocSnapshotSignal();
        Map consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000);
        for (int i = PARTITION_NO; i < 1000; i++) {
            Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
        }
    }

    @Test
    public void filteredEvents() throws Exception {
        populateTable();
        startConnector();
        sendExecuteSnapshotKafkaSignal();
        Thread thread = new Thread(() -> {
            try {
                JdbcConnection databaseConnection = databaseConnection();
                try {
                    databaseConnection.setAutoCommit(false);
                    int i = PARTITION_NO;
                    while (!Thread.interrupted()) {
                        databaseConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (pk, aa) VALUES (%s, %s)", EXCLUDED_TABLE, Integer.valueOf(i + 1000 + 1), Integer.valueOf(i + 1000))});
                        databaseConnection.commit();
                        i++;
                    }
                    if (databaseConnection != null) {
                        databaseConnection.close();
                    }
                } finally {
                }
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        });
        thread.setDaemon(true);
        thread.setName("filtered-binlog-events-thread");
        try {
            thread.start();
            Map consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000);
            for (int i = PARTITION_NO; i < 1000; i++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
            }
        } finally {
            thread.interrupt();
        }
    }

    @Test
    public void inserts4Pks() throws Exception {
        populate4PkTable();
        startConnector();
        sendExecuteSnapshotKafkaSignal(this.DATABASE.qualifiedTableName("a4"));
        Map consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000, entry -> {
            return true;
        }, struct -> {
            return Integer.valueOf((struct.getInt32("pk1").intValue() * 1000) + (struct.getInt32("pk2").intValue() * 100) + (struct.getInt32("pk3").intValue() * 10) + struct.getInt32("pk4").intValue());
        }, sourceRecord -> {
            return ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName());
        }, this.DATABASE.topicForTable("a4"), null);
        for (int i = PARTITION_NO; i < 1000; i++) {
            Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
        }
    }

    @Test
    public void inserts4PksWithSignalFile() throws Exception {
        populate4PkTable();
        startConnector(builder -> {
            return builder.with(FileSignalChannel.SIGNAL_FILE, this.signalsFile.toString()).with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "file");
        });
        sendExecuteSnapshotFileSignal(this.DATABASE.qualifiedTableName("a4"));
        Map consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000, entry -> {
            return true;
        }, struct -> {
            return Integer.valueOf((struct.getInt32("pk1").intValue() * 1000) + (struct.getInt32("pk2").intValue() * 100) + (struct.getInt32("pk3").intValue() * 10) + struct.getInt32("pk4").intValue());
        }, sourceRecord -> {
            return ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName());
        }, this.DATABASE.topicForTable("a4"), null);
        for (int i = PARTITION_NO; i < 1000; i++) {
            Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
        }
    }

    @Test
    public void insertsWithoutPks() throws Exception {
        populate4WithoutPkTable();
        startConnector();
        sendExecuteSnapshotKafkaSignal(this.DATABASE.qualifiedTableName("a42"));
        Map consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000, entry -> {
            return true;
        }, struct -> {
            return Integer.valueOf((struct.getInt32("pk1").intValue() * 1000) + (struct.getInt32("pk2").intValue() * 100) + (struct.getInt32("pk3").intValue() * 10) + struct.getInt32("pk4").intValue());
        }, sourceRecord -> {
            return ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName());
        }, this.DATABASE.topicForTable("a42"), null);
        for (int i = PARTITION_NO; i < 1000; i++) {
            Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
        }
    }

    @Test(expected = ConnectException.class)
    @SkipWhenGtidModeIs(value = SkipWhenGtidModeIs.GtidMode.ON, reason = "Read only connection requires GTID_MODE to be ON")
    public void shouldFailIfGtidModeIsOff() throws Exception {
        populateTable();
        AtomicReference atomicReference = new AtomicReference();
        startConnector((z, str, th) -> {
            atomicReference.set(th);
        });
        waitForConnectorShutdown("mysql", this.DATABASE.getServerName());
        stopConnector();
        Throwable th2 = (Throwable) atomicReference.get();
        if (th2 != null) {
            throw ((RuntimeException) th2);
        }
    }

    @Test
    @FixFor({"DBZ-5453"})
    public void testStopSnapshotKafkaSignal() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(AbstractIncrementalSnapshotChangeEventSource.class);
        populateTable();
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1);
        });
        waitForConnectorToStart();
        waitForAvailableRecords(1L, TimeUnit.SECONDS);
        assertNoRecordsToConsume();
        sendExecuteSnapshotKafkaSignal();
        consumeMixedWithIncrementalSnapshot(1);
        sendStopSnapshotKafkaSignal();
        ArrayList arrayList = new ArrayList();
        String str = topicName();
        String format = String.format("Removed '%s' from incremental snapshot collection list.", tableDataCollectionId());
        Awaitility.await().atMost(Duration.ofMinutes(2L)).until(() -> {
            consumeAvailableRecords(sourceRecord -> {
                if (str.equalsIgnoreCase(sourceRecord.topic())) {
                    arrayList.add(sourceRecord);
                }
            });
            return Boolean.valueOf(logInterceptor.containsMessage(format));
        });
        stopConnector();
    }

    @Test
    public void testPauseDuringSnapshotKafkaSignal() throws Exception {
        populateTable();
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1);
        });
        waitForConnectorToStart();
        waitForAvailableRecords(1L, TimeUnit.SECONDS);
        assertNoRecordsToConsume();
        sendExecuteSnapshotKafkaSignal();
        ArrayList arrayList = new ArrayList();
        String str = topicName();
        Map consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(100);
        sendPauseSnapshotKafkaSignal();
        consumeAvailableRecords(sourceRecord -> {
            if (str.equalsIgnoreCase(sourceRecord.topic())) {
                arrayList.add(sourceRecord);
            }
        });
        int size = arrayList.size() + consumeMixedWithIncrementalSnapshot.size();
        sendResumeSnapshotKafkaSignal();
        Map consumeMixedWithIncrementalSnapshot2 = consumeMixedWithIncrementalSnapshot(1000 - size);
        for (int i = size + 1; i < 1000; i++) {
            Assertions.assertThat(consumeMixedWithIncrementalSnapshot2).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
        }
    }

    private void sendExecuteSnapshotFileSignal(String str) throws IOException {
        Files.write(this.signalsFile, String.format("{\"id\":\"12345\",\"type\":\"execute-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"INCREMENTAL\"}}", str).getBytes(), new OpenOption[PARTITION_NO]);
    }

    protected void populate4PkTable() throws SQLException {
        JdbcConnection databaseConnection = databaseConnection();
        try {
            populate4PkTable(databaseConnection, "a4");
            if (databaseConnection != null) {
                databaseConnection.close();
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected void populate4WithoutPkTable() throws SQLException {
        JdbcConnection databaseConnection = databaseConnection();
        try {
            populate4PkTable(databaseConnection, "a42");
            if (databaseConnection != null) {
                databaseConnection.close();
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
