1、环境介绍
操作系统:centos 7.9
jdk版本:11.0.12
kafka版本:2.8.0
Debezium版本:1.6(debezium-connector-sqlserver-1.6.1.Final-plugin.tar.gz)
sqlserver版本:sqlserver 2016

kafka connect 分布式部署
https://blog.csdn.net/zyj81092211/article/details/119647591
kafka connector 配置 Debezium
https://blog.csdn.net/zyj81092211/article/details/119840744

2、sqlserver 2016安装
https://blog.csdn.net/zyj81092211/article/details/119914398

3、创建测试数据库MyDB
在这里插入图片描述
在这里插入图片描述
4、sqlserver开启CDC
(1)数据库开启CDC
官方文档参考:
https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/enable-and-disable-change-data-capture-sql-server?view=sql-server-ver15
视图->模板资源管理器->sqlserver 模板->change data capture->Enable Database for CDC
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
(2)表开启CDC
示例解释:
在这里插入图片描述

新建表
在这里插入图片描述
在这里插入图片描述
添加MyDB_CT文件组
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

启动sqlserver代理
在这里插入图片描述

视图->模板资源管理器->sqlserver 模板->change data capture->Enable a Table Specifying Filegroup Option
在这里插入图片描述
双击打开模板,并修改

USE MyDB
GO

EXEC sys.sp_cdc_enable_table
	@source_schema = N'dbo',
	@source_name   = N'MyTable', 
	@role_name     = N'MyRole',  
	@filegroup_name = N'MyDB_CT',
	@supports_net_changes = 0
GO

在这里插入图片描述

(4)测试

在这里插入图片描述
找到sys.sp_cdc_help_change_data_capture,这个存储过程可以验证
在这里插入图片描述
新建查询,运行存储过程

USE MyDB;
GO
EXEC sys.sp_cdc_help_change_data_capture
GO

在这里插入图片描述
这里可以启动停止作业和日志查看
在这里插入图片描述

(5)其他操作
检查哪些表开启了cdc

SELECT s.name AS Schema_Name, tb.name AS Table_Name
, tb.object_id, tb.type, tb.type_desc, tb.is_tracked_by_cdc
FROM sys.tables tb
INNER JOIN sys.schemas s on s.schema_id = tb.schema_id
WHERE tb.is_tracked_by_cdc = 1

检查数据库开启cdc和运行

SELECT * 
FROM sys.change_tracking_databases 
WHERE database_id=DB_ID('MyDatabase')
EXECUTE sys.sp_cdc_enable_db;  
GO  

运行cdc任务

EXEC sys.sp_cdc_start_job;  
GO  

5、创建连接器
官方解释:
在这里插入图片描述
在这里插入图片描述

curl -H "Content-Type: application/json" -X POST -d  '{
    "name": "source-sqlserver-05", 
    "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", 
        "database.hostname": "10.99.99.201", 
        "database.port": "1433", 
        "database.user": "sa", 
        "database.password": "Smtgbk_123", 
        "database.dbname": "MyDB", 
        "database.server.name": "sqlserver05", 
        "table.include.list": "dbo.MyTable", 
        "database.history.kafka.bootstrap.servers": "kafkac01.wtown.com:9092,kafkac02.wtown.com:9092,kafkac03.wtown.com:9092", 
        "database.history.kafka.topic": "dbhistory.sqlserver05" 
    }
}' http://kafkac01.wtown.com:8083/connectors/

查看运行状态
在这里插入图片描述

查看创建了三个topic
在这里插入图片描述

消费sqlserver05.dbo.MyTable

kafka-console-consumer.sh --bootstrap-server kafkac01.wtown.com:9092,kafkac02.wtown.com:9092,kafkac03.wtown.com:9092 --topic  sqlserver05.dbo.MyTable --from-beginning

sqlserver插入数据

insert into MyTable values(4,'哈哈哈','三班');

发现数据已进入topic
在这里插入图片描述

报错
1、
在这里插入图片描述
这种错误重新命名topic和连接器名字 从新建立解决;删除从新建相同名字还是失败

2、
No maximum LSN recorded in the database; please ensure that the SQL Server Agent is running
[io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]
报这个错误,使用4-(5)检查sqlserver配置
日志路径:/data/kafka-connect/logs/connect.log

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐