测试使用的是flinkCdc2.1.1版本(无锁同步,1.x版本有同步锁)目前支持mysql5.7及以上版本;

要在mysql.cnf中配置开启开启mysql的bin-log日志,

log_bin=mysql-bin
binlog_format=ROW

并且只支持ROW格式,其他mixed和statement会报错 如果想配置控制binlog日志的范围,在数据库范围可以使用mysql的binlog-do-db和binlog-ignore-db,表级别可以使用replicate_wild_do_table, 不过不建议配置,因为在一些特殊情况下有可能造成同步的数据不完整,参考mysql中replicate_wild_do_table和replicate_do_db区别 - mofy - 博客园

如果不配置binlog-do-db相当于所有的库都可以同步, 一旦配置了binlog-do-db则除了配置了binlog-do-db的库能同步binlog日志,其他的库都不能同步了。

修改mysql.cnf之后要重启数据库生效,重启之后可以查询binlog日志是否已经开启以及格式等信息;

-- 创建mysql用户
CREATE USER 'cdc'@'%' IDENTIFIED BY 'cdc';
-- 赋权
GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, SHUTDOWN, PROCESS, FILE, REFERENCES, INDEX, ALTER, SHOW DATABASES, SUPER, CREATE TEMPORARY TABLES, LOCK TABLES, EXECUTE, REPLICATION SLAVE, REPLICATION CLIENT, CREATE VIEW, SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER ON *.* TO 'cdc'@'%';

-- # 是否启用binlog日志
show variables like 'log_bin';
-- # 查看 binlog 内容
show binlog events;

/*# 事件查询命令
# IN 'log_name' :指定要查询的binlog文件名(不指定就是第一个binlog文件)
# FROM pos :指定从哪个pos起始点开始查起(不指定就是从整个文件首个pos点开始算)
# LIMIT [offset,] :偏移量(不指定就是0)
# row_count :查询总条数(不指定就是所有行) */
show binlog events [IN 'log_name'] [FROM pos] [LIMIT [offset,] row_count];

-- # 设置binlog文件保存事件,过期删除,单位天
set global expire_logs_days=31; 

# mysqlbinlog查看binlog
mysqlbinlog --no-defaults --database=etl_test_1210 --base64-output=decode-rows -v --start-datetime='2021-12-03 09:00:00' --stop-datetime='2021-12-03 10:27:00' mysql-bin.000101 

另外注意flinkcdc使用的mysql用户要有reload权限,否则读取不到存量数据,只能获取增量。

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc' IDENTIFIED BY 'cdc';

java代码


import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MySqlCdcPrint {
    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("127.0.0.1")
                .port(3307)
                .databaseList("cdc_test") // monitor all tables under inventory database
                .tableList("cdc_test.test1") // set captured table
                .username("root")
                .password("123456")

                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // enable checkpoint
        env.enableCheckpointing(60000);  //checkpoint需要什么条件?com/ververica/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.snapshotState()

        env
            .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
            // set 4 parallel source tasks
            .setParallelism(1)
            .print("最终数据===>").setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute("MySqlCdcPrint");
    }
}

打印了表中的数据,并且新增insert数据后也会打印出来

flinkCdc的mysql配置及java测试代码_shy_snow的专栏-CSDN博客

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>flinkCdc</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <flink.version>1.13.5</flink.version>
        <debezium.version>1.5.4.Final</debezium.version>
        <geometry.version>2.2.0</geometry.version>
        <java.version>8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <slf4j.version>1.7.25</slf4j.version>
        <log4j.version>2.16.0</log4j.version>
        <spotless.version>2.4.2</spotless.version>
        <!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
        <flink.forkCount>1</flink.forkCount>
        <flink.reuseForks>true</flink.reuseForks>
        <log4j.configuration>log4j2-test.properties</log4j.configuration>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <!-- add the dependency matching your database -->
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <!-- the dependency is available only for stable releases. -->
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <type>test-jar</type>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>${slf4j.version}</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
</project>

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐