Debezium 抽取sqlserver数据
1、环境介绍操作系统:centos 7.9jdk版本:11.0.12kafka版本:2.8.0Debezium版本:1.6(debezium-connector-sqlserver-1.6.1.Final-plugin.tar.gz)sqlserver版本:sqlserver 20162、sqlserver 2016安装https://blog.csdn.net/zyj81092211/artic
1、环境介绍
操作系统:centos 7.9
jdk版本:11.0.12
kafka版本:2.8.0
Debezium版本:1.6(debezium-connector-sqlserver-1.6.1.Final-plugin.tar.gz)
sqlserver版本:sqlserver 2016
kafka connect 分布式部署
https://blog.csdn.net/zyj81092211/article/details/119647591
kafka connector 配置 Debezium
https://blog.csdn.net/zyj81092211/article/details/119840744
2、sqlserver 2016安装
https://blog.csdn.net/zyj81092211/article/details/119914398
3、创建测试数据库MyDB
4、sqlserver开启CDC
(1)数据库开启CDC
官方文档参考:
https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/enable-and-disable-change-data-capture-sql-server?view=sql-server-ver15
视图->模板资源管理器->sqlserver 模板->change data capture->Enable Database for CDC
(2)表开启CDC
示例解释:
新建表
添加MyDB_CT文件组
启动sqlserver代理
视图->模板资源管理器->sqlserver 模板->change data capture->Enable a Table Specifying Filegroup Option
双击打开模板,并修改
USE MyDB
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'MyTable',
@role_name = N'MyRole',
@filegroup_name = N'MyDB_CT',
@supports_net_changes = 0
GO
(4)测试
找到sys.sp_cdc_help_change_data_capture,这个存储过程可以验证
新建查询,运行存储过程
USE MyDB;
GO
EXEC sys.sp_cdc_help_change_data_capture
GO
这里可以启动停止作业和日志查看
(5)其他操作
检查哪些表开启了cdc
SELECT s.name AS Schema_Name, tb.name AS Table_Name
, tb.object_id, tb.type, tb.type_desc, tb.is_tracked_by_cdc
FROM sys.tables tb
INNER JOIN sys.schemas s on s.schema_id = tb.schema_id
WHERE tb.is_tracked_by_cdc = 1
检查数据库开启cdc和运行
SELECT *
FROM sys.change_tracking_databases
WHERE database_id=DB_ID('MyDatabase')
EXECUTE sys.sp_cdc_enable_db;
GO
运行cdc任务
EXEC sys.sp_cdc_start_job;
GO
5、创建连接器
官方解释:
curl -H "Content-Type: application/json" -X POST -d '{
"name": "source-sqlserver-05",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "10.99.99.201",
"database.port": "1433",
"database.user": "sa",
"database.password": "Smtgbk_123",
"database.dbname": "MyDB",
"database.server.name": "sqlserver05",
"table.include.list": "dbo.MyTable",
"database.history.kafka.bootstrap.servers": "kafkac01.wtown.com:9092,kafkac02.wtown.com:9092,kafkac03.wtown.com:9092",
"database.history.kafka.topic": "dbhistory.sqlserver05"
}
}' http://kafkac01.wtown.com:8083/connectors/
查看运行状态
查看创建了三个topic
消费sqlserver05.dbo.MyTable
kafka-console-consumer.sh --bootstrap-server kafkac01.wtown.com:9092,kafkac02.wtown.com:9092,kafkac03.wtown.com:9092 --topic sqlserver05.dbo.MyTable --from-beginning
sqlserver插入数据
insert into MyTable values(4,'哈哈哈','三班');
发现数据已进入topic
报错
1、
这种错误重新命名topic和连接器名字 从新建立解决;删除从新建相同名字还是失败
2、
No maximum LSN recorded in the database; please ensure that the SQL Server Agent is running
[io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]
报这个错误,使用4-(5)检查sqlserver配置
日志路径:/data/kafka-connect/logs/connect.log
更多推荐
所有评论(0)