debezium+kafka实现sqlserver数据同步
debezium+kafka实现sqlserver数据同步
·
1. 需要准备的软件环境
jdk1.8
zookeeper3.4.14
kafka2.12-2.2.0
debezium-connector-sqlserver
confluent-kafka-connect-jdbc
2. 开启sqlserver数据库CDC功能
开启数据库CDC功能
USE CDCTEST;
GO
EXEC sys.sp_cdc_enable_db
查询哪些数据库启用了CDC功能
select * from sys.databases where is_cdc_enabled = 1
开启数据库表CDC功能
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name = 'testUser',
@supports_net_changes = 1,
@role_name = null;
查看表是否启用了CDC
select name, is_tracked_by_cdc from sys.tables where object_id = OBJECT_ID('dbo.testUser');
关闭表CDC功能
EXEC sys.sp_cdc_disable_table
@source_schema = 'dbo',
@source_name = 'testUser',
@capture_instance = 'all';
关闭数据库CDC功能
EXEC sys.sp_cdc_disable_db;
CDC功能开启后,数据库会生成多张系统表,其中dbo_testUser_CT会记录所有数据库变动信息。
operation=2,表示新增
operation=3或者4,表示更新,3表示旧值,4表示新值
operation=1,表示删除
3. kafka connetor source 和sink配置
下载debezium connector 相关 sqlserver的 jar 包,上传解压后的文件debezium-connector-sqlserver到/usr/local/kafka/plugins,如果没有该目录则手动创建。
下载confluent-kafka-connect-jdbc jar 包,上传解压后的文件debezium-connector-sqlserver到/usr/local/kafka/plugins。
配置kafka中的 connector config
vi /usr/local/kafka/config/connect-standalone.properties
bootstrap.servers=192.168.63.101:9092
plugin.path=/usr/local/kafka/plugins
在kafka根目录下创建文件source.properties
name=sourceConnector
connector.class=io.debezium.connector.sqlserver.SqlServerConnector
#debezium监控的数据库地址
database.hostname=192.168.xx.xxx
database.port=1433
#数据库连接账号
database.user=sa
database.password=*********
#数据库名称
database.dbname=CDCTestDB
#SQL Server实例/群集的逻辑名称,Kafka topic前缀
database.server.name=cdctest
#Debezium应该捕获其更改的所有表的列表
table.whitelist=dbo.TestUser
#kafka地址
database.history.kafka.bootstrap.servers=192.168.63.101:9092
#存储数据库架构历史记录的Kafka topic
database.history.kafka.topic=history.cdctest
在kafka根目录下创建文件sink.properties
name=sinkConnector
#消费的topic,对应source.properties中的database.server.name+table.whitelist
topics=cdctest.dbo.TestUser
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#JDBC连接URL
connection.url=jdbc:jtds:sqlserver://192.168.xx.xxx:1433/CDCTestDB
#数据库账号信息
connection.user=sa
connection.password=**********
#debezium获取的json信息转换为kafka-connect-jdbc能够消费的信息
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
#使用的插入模式
insert.mode=upsert
#将null记录值视为删除,要求pk.mode是record_key
delete.enabled=true
#如果目标表不存在,自动创建目标表
auto.create=true
#同步表结构更新
auto.evolve=true
#主键模式
pk.mode=record_key
#主键
pk.fields=userId
#目标表
table.name.format=TestUserSink
4.启动kafka connector并测试数据同步功能
启动kafka
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
启动kafka connector
cd /usr/local/kafka
./bin/connect-standalone.sh config/connect-standalone.properties source.properties sink.properties
配置正确并启动成功后,目前库会直接自动生成与TestUser相同结构的表TestUserSink,并读取CDC功能开启后的所有变动生成相应数据。对数据库表TestUser进行insert、update、delete,查看TestUserSink数据是否同步变更。
更多推荐
已为社区贡献4条内容
所有评论(0)