1. 需要准备的软件环境

jdk1.8
zookeeper3.4.14
kafka2.12-2.2.0
debezium-connector-sqlserver
confluent-kafka-connect-jdbc 

2. 开启sqlserver数据库CDC功能

开启数据库CDC功能

USE CDCTEST; 
GO
EXEC sys.sp_cdc_enable_db

查询哪些数据库启用了CDC功能

select * from sys.databases where is_cdc_enabled = 1

开启数据库表CDC功能

EXEC sys.sp_cdc_enable_table 
@source_schema = 'dbo',
@source_name = 'testUser',
@supports_net_changes = 1, 
@role_name = null;

查看表是否启用了CDC

select name, is_tracked_by_cdc from sys.tables where object_id = OBJECT_ID('dbo.testUser');

关闭表CDC功能

EXEC sys.sp_cdc_disable_table 
@source_schema = 'dbo', 
@source_name = 'testUser', 
@capture_instance = 'all';

关闭数据库CDC功能

EXEC sys.sp_cdc_disable_db;

CDC功能开启后,数据库会生成多张系统表,其中dbo_testUser_CT会记录所有数据库变动信息。
operation=2,表示新增
operation=3或者4,表示更新,3表示旧值,4表示新值
operation=1,表示删除

3. kafka connetor source 和sink配置

下载debezium connector 相关 sqlserver的 jar 包,上传解压后的文件debezium-connector-sqlserver到/usr/local/kafka/plugins,如果没有该目录则手动创建。

下载confluent-kafka-connect-jdbc jar 包,上传解压后的文件debezium-connector-sqlserver到/usr/local/kafka/plugins。

配置kafka中的 connector config

    vi  /usr/local/kafka/config/connect-standalone.properties
	
	bootstrap.servers=192.168.63.101:9092
	plugin.path=/usr/local/kafka/plugins

在kafka根目录下创建文件source.properties

name=sourceConnector
connector.class=io.debezium.connector.sqlserver.SqlServerConnector
#debezium监控的数据库地址
database.hostname=192.168.xx.xxx
database.port=1433
#数据库连接账号
database.user=sa
database.password=*********
#数据库名称
database.dbname=CDCTestDB
#SQL Server实例/群集的逻辑名称,Kafka topic前缀
database.server.name=cdctest
#Debezium应该捕获其更改的所有表的列表
table.whitelist=dbo.TestUser
#kafka地址
database.history.kafka.bootstrap.servers=192.168.63.101:9092
#存储数据库架构历史记录的Kafka topic
database.history.kafka.topic=history.cdctest

在kafka根目录下创建文件sink.properties

name=sinkConnector
#消费的topic,对应source.properties中的database.server.name+table.whitelist
topics=cdctest.dbo.TestUser
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#JDBC连接URL
connection.url=jdbc:jtds:sqlserver://192.168.xx.xxx:1433/CDCTestDB
#数据库账号信息
connection.user=sa
connection.password=**********
#debezium获取的json信息转换为kafka-connect-jdbc能够消费的信息
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
#使用的插入模式
insert.mode=upsert
#将null记录值视为删除,要求pk.mode是record_key
delete.enabled=true
#如果目标表不存在,自动创建目标表
auto.create=true
#同步表结构更新
auto.evolve=true
#主键模式
pk.mode=record_key
#主键
pk.fields=userId
#目标表
table.name.format=TestUserSink

4.启动kafka connector并测试数据同步功能

启动kafka

/usr/local/kafka/bin/kafka-server-start.sh   -daemon /usr/local/kafka/config/server.properties

启动kafka connector

cd /usr/local/kafka

./bin/connect-standalone.sh  config/connect-standalone.properties  source.properties sink.properties

配置正确并启动成功后,目前库会直接自动生成与TestUser相同结构的表TestUserSink,并读取CDC功能开启后的所有变动生成相应数据。对数据库表TestUser进行insert、update、delete,查看TestUserSink数据是否同步变更。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐