最近在研究 ETL 相关工具,其中有一种是通过 kafka-connect 实现数据实时同步的方法,网上也有一些介绍这方面的资料,个人觉得都写的不清楚,不完整,这里重新梳理下,供大家参考:

1. 安装 conflunent 相关服务

下载 confluent 社区版本(开发版和企业版都有限制):

https://packages.confluent.io/archive/7.0/?_ga=2.111476231.227451655.1638238221-425725271.1638238221

解压缩:

bin:一些服务的启动目录

etc:服务的配置文件和 kafka-connect-jdbc 的配置文件目录

logs:日志文件目录

share:存放一些第三方的依赖包

启动相关服务 zookeeper、kafka、schema-register:

先进入到 bin 目录下

启动 zookeeper:这里使用默认配置文件,没有做任何修改,默认配置文件会绑定本地 hostname 对应的 ip(确保 hostname 正确,ping [hostname] 能对应到本机 ip),端口默认 2181

./zookeeper-server-start -daemon /home/confluent-7.0.0/etc/kafka/zookeeper.properties

启动 kafka 服务:kafka 默认连接本地的 zookeeper,端口默认 9092

./kafka-server-start -daemon /home/confluent-7.0.0/etc/kafka/server.properties

启动 schema-register 服务:默认连接本地的 9092kafka 服务

./schema-registry-start -daemon /home/confluent-7.0.0/etc/schema-registry/schema-registry.properties

2. 安装 kafka-web 工具

使用开源的 eagle

https://www.kafka-eagle.org

https://github.com/smartloli/EFAK

下载最新的压缩版本:

http://download.kafka-eagle.org

上传到 linux 服务器解压缩:

修改配置文件:

conf/system-config.properties

设置 zk 的地址:

efak.zk.cluster.alias=cluster1

cluster1.zk.list=192.168.1.200:2181

设置本地 sqllite 地址:

efak.driver=org.sqlite.JDBC

efak.url=jdbc:sqlite:/home/kafka-eagle-bin-2.0.8/efak-web-2.0.8/db/ke.db

efak.username=root

efak.password=www.kafka-eagle.org

修改系统参数,增加 efak 的 home 目录:

vi /etc/profile

export KE_HOME=/home/kafka-eagle-bin-2.0.8/efak-web-2.0.8/

运行 source /etc/profile 让其生效。

启动服务:

bin/ke.sh

登录后台管理:

http://192.168.1.200:8048

默认账号密码:admin/123456

3. 使用 kafka-connect-jdbc 实现 CPC 同步

kafka-connect 应用模型:

confluent 的社区版是不带 kafka-connect-jdbc 包的,需要自己手动去下载:

下载 kafka-connect-jdbc 依赖包:

https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

上传到目录 [CONFLUENT_HOME]/share/java/kafka 目录下:

下载 mysql 驱动 jar 包,也上传到 kafka 目录下:

这里使用了 timstamp 方式来实现增量同步:

配置 kafka-connect-jdbc source 数据读取配置:

配置文件放到 etc/kafka-connect-jdbc/source.jdbc.properties

name=mysql-whitelist-timestamp-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.user=root
connection.password=root
connection.url=jdbc:mysql://192.168.1.201:3306/source?characterEncoding=utf8&useSSL=true

#数据表白名单

table.whitelist=t_prjsaas_sync_event


mode=timestamp
timestamp.column.name=create_time
#incrementing.column.name=id

validate.non.null=false

#topic的前缀,confulent平台会为每张表创建一个topic,topic的名称为前缀+表名
topic.prefix=mysql-test-

配置 kafka-connect-jdbc sink 数据写入配置:

配置文件:etc/kafka-connect-jdbc/dbtest.jdbc.properties

name=mysql-whitelist-timestamp-sink4
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=10

connection.user=root
connection.password=root
connection.url=jdbc:mysql://192.168.1.201:3306/dbtest?characterEncoding=utf8&useSSL=true

 
insert.mode=upsert
table.name.format=${topic}
pk.fields=id
pk.mode=record_value

测试的读取数据库表:

CREATE TABLE `t_prjsaas_sync_event` (
  `id` bigint(20) NOT NULL,
  `biz_id` varchar(255) DEFAULT NULL,
  `biz_type` int(11) NOT NULL,
  `content` text,
  `cost_time` bigint(20) NOT NULL,
  `create_time` datetime DEFAULT NULL,
  `error_msg` varchar(255) DEFAULT NULL,
  `event_time` datetime DEFAULT NULL,
  `event_type` varchar(255) DEFAULT NULL,
  `event_uri` varchar(255) DEFAULT NULL,
  `source` varchar(255) DEFAULT NULL,
  `statues` int(11) NOT NULL,
  PRIMARY KEY (`id`) USING BTREE
)

目标库创建的表(由于我们用了 mysql-test- 作为前缀,因此表名需要修改为 mysql-test-t_prjsaas_sync_event)

CREATE TABLE `mysql-test-t_prjsaas_sync_event` (
  `id` bigint(20) NOT NULL,
  `biz_id` varchar(255) DEFAULT NULL,
  `biz_type` int(11) NOT NULL,
  `content` text,
  `cost_time` bigint(20) NOT NULL,
  `create_time` datetime DEFAULT NULL,
  `error_msg` varchar(255) DEFAULT NULL,
  `event_time` datetime DEFAULT NULL,
  `event_type` varchar(255) DEFAULT NULL,
  `event_uri` varchar(255) DEFAULT NULL,
  `source` varchar(255) DEFAULT NULL,
  `statues` int(11) NOT NULL,
  PRIMARY KEY (`id`) USING BTREE
)

启动数据读取服务 (producer):

./connect-standalone /home/confluent-7.0.0/etc/kafka/connect-standalone.properties   /home/confluent-7.0.0/etc/kafka-connect-jdbc/source.jdbc.properties

kafka-connect 启动默认会监听 8083 端口,同一个机器如果启动另外一个 consumer 会报端口冲突,为了方便测试我们需要修改 connect 的默认端口。

拷贝一份 connect-standalone2.properties,修改端口:listeners=http://:18083 (新版的 kafka 不支持 rest.port 端口,改用 listeners)

启动消费端:

./connect-standalone /home/confluent-7.0.0/etc/kafka/connect-standalone2.properties  /home/confluent-7.0.0/etc/kafka-connect-jdbc/dbtest.jdbc.properties

数据库中修改一条记录的数据:

登录 kafka-web 端口,查看 topic,可以看到一条数据日志:

查看目标库,数据有插入进去,测试完成。

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐