虽然是公有云的鼻祖,AWS在某些产品的实现却太不给力;可能是习惯了阿里云喂到嘴边的感觉,AWS很多方案需要自己折腾,蛋疼!比如这里要讲的mysql数据同步方案。

阿里云产品DTS,点几下就OK了,AWS,只能通过插件安装、配置、调试、蹚坑……,具体就是用msk的debezium插件连接器。没见识过的一脸懵逼,下面说下配置步骤和一些注意的坑。

1.前提

安装好rds和msk

rds:binlog_format=row,打开log_bin

msk:auto.create.topics.enable=true

2.安装debezium插件

(1)下载安装包,并将其上传到S3的某个bucket;

https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.7.Final/debezium-connector-mysql-1.9.7.Final-plugin.zip

(2)通过浏览S3或直接输入文件URI:

注意坑:AWS控制台的这个功能需要有梯子才行,否则:“浏览S3”找不到你的桶,直接输入文件URI自动检查文件也提示失败:Network Failure。

(3)安装后就有了:

3.安装msk连接器
3.1.选择自定义插件
3.2.连接器配置

说明:

connector.class=io.debezium.connector.mysql.MySqlConnector  #固定写法
database.hostname   # db IP或域名
database.user       # db username
database.password   # db 密码
database.port=3306  # db 端口
database.server.id=1001  # 编号,db范围唯一
tasks.max=1              # 任务并发数
database.server.name=omp-user   # DDL语句的topic,程序消费用
include.schema.changes=true     # 包含DDL
database.include.list=yunkc_db  # db范围
table.include.list=yunkc_db.t1,yunkc_db.t2  # 表范围
database.history.kafka.topic=cdc1           # DDL语句,内部用,消费者不要消费该topic
database.history.kafka.bootstrap.servers    # kafka bootstrap server地址
3.3.为mskconnect创建角色
3.4.日志配置

注意:一定要配置,否则报错无法排查

4.消息
捕获数据变化
./kafka-console-consumer.sh --bootstrap-server xxxxxx --topic omp-user.yunkc_db.t1
Struct{before=Struct{id=1},after=Struct{id=2},source=Struct{version=1.9.7.Final,connector=mysql,name=omp-user,ts_ms=1678348957000,db=yunkc_db,table=t1,server_id=351097349,file=mysql-bin-changelog.000001,pos=344,row=0,thread=16},op=u,ts_ms=1678348957646}
Struct{before=Struct{id=2},after=Struct{id=3},source=Struct{version=1.9.7.Final,connector=mysql,name=omp-user,ts_ms=1678349029000,db=yunkc_db,table=t1,server_id=351097349,file=mysql-bin-changelog.000001,pos=611,row=0,thread=16},op=u,ts_ms=1678349029762}
Struct{before=Struct{id=3},after=Struct{id=4},source=Struct{version=1.9.7.Final,connector=mysql,name=omp-user,ts_ms=1678349036000,db=yunkc_db,table=t1,server_id=351097349,file=mysql-bin-changelog.000001,pos=878,row=0,thread=16},op=u,ts_ms=1678349036808}

捕获结构变化
./kafka-console-consumer.sh --bootstrap-server xxxxxx --topic omp-user
Struct{source=Struct{version=1.9.7.Final,connector=mysql,name=omp-user,ts_ms=1678349943178,db=yunkc_db,table=t2,server_id=351097349,file=mysql-bin-changelog.000001,pos=1287,row=0},databaseName=yunkc_db,ddl=create table t2(id int),
tableChanges=[Struct{type=CREATE,id="yunkc_db"."t2",table=Struct{defaultCharsetName=utf8,primaryKeyColumnNames=[],columns=[Struct{name=id,jdbcType=4,typeName=INT,typeExpression=INT,position=1,optional=true,autoIncremented=false,generated=false}]}}]}
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐