package cn.tenmg.cdc.log.connectors.base;

import cn.tenmg.cdc.log.connectors.base.experimental.MySqlSourceBuilder;
import cn.tenmg.cdc.log.connectors.base.testutils.MySqlContainer;
import cn.tenmg.cdc.log.connectors.base.testutils.MySqlVersion;
import cn.tenmg.cdc.log.connectors.base.testutils.UniqueDatabase;
import cn.tenmg.cdc.log.debezium.JsonDebeziumDeserializationSchema;
import java.util.stream.Stream;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;

/* loaded from: input_file:cn/tenmg/cdc/log/connectors/base/MySqlChangeEventSourceExampleTest.class */
public class MySqlChangeEventSourceExampleTest {
    private static final int DEFAULT_PARALLELISM = 4;

    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM).setRpcServiceSharing(RpcServiceSharing.DEDICATED).withHaLeadershipControl().build());
    private final UniqueDatabase inventoryDatabase = new UniqueDatabase(MYSQL_CONTAINER, "inventory", "mysqluser", "mysqlpw");
    private static final Logger LOG = LoggerFactory.getLogger(MySqlChangeEventSourceExampleTest.class);
    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V5_7);

    @BeforeClass
    public static void startContainers() {
        LOG.info("Starting containers...");
        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
        LOG.info("Containers are started.");
    }

    @Test
    @Ignore("Test ignored because it won't stop and is used for manual test")
    public void testConsumingAllEvents() throws Exception {
        this.inventoryDatabase.createAndInitialize();
        MySqlSourceBuilder.MySqlIncrementalSource build = new MySqlSourceBuilder().hostname(MYSQL_CONTAINER.getHost()).port(MYSQL_CONTAINER.getDatabasePort()).databaseList(this.inventoryDatabase.getDatabaseName()).tableList(this.inventoryDatabase.getDatabaseName() + ".products").username(this.inventoryDatabase.getUsername()).password(this.inventoryDatabase.getPassword()).serverId("5401-5404").deserializer(new JsonDebeziumDeserializationSchema()).includeSchemaChanges(true).build();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(3000L);
        executionEnvironment.fromSource(build, WatermarkStrategy.noWatermarks(), "MySqlParallelSource").setParallelism(DEFAULT_PARALLELISM).print().setParallelism(1);
        executionEnvironment.execute("Print MySQL Snapshot + Binlog");
    }

    private static MySqlContainer createMySqlContainer(MySqlVersion mySqlVersion) {
        return new MySqlContainer(mySqlVersion).withConfigurationOverride("docker/server-gtids/my.cnf").withSetupSQL("docker/setup.sql").m20withDatabaseName("flink-test").m22withUsername("flinkuser").m21withPassword("flinkpw").withLogConsumer(new Slf4jLogConsumer(LOG));
    }
}
