IDEA03:数据库CDC、Kafka和连接器Debezium配置
这里记录一下CDC和Kafka的协同工作过程。CDC(Change Data Capture:变更数据捕获)是数据库的一项功能,能够监控数据库表的变化。Kafka是一种分布式消息系统。这里协同的目的是让CDC监控数据库表的更新,然后将更新发布到Kafka,最后让消费者响应这个更新。另外还用到了一个CDC和Kafka之间的连接器,叫Debezium。.....................
写在前面
这里记录一下CDC和Kafka的协同工作过程。
- CDC(Change Data Capture:变更数据捕获)是数据库的一项功能,能够监控数据库表的变化。
- Kafka是一种分布式消息系统。
- 这里协同的目的是让CDC监控数据库表的更新,然后将更新发布到Kafka,最后让消费者响应这个更新。
- 另外还用到了一个CDC和Kafka之间的连接器,叫Debezium。
一、配置数据库CDC
这里是针对SQL Server2019进行配置。
- 关于SQL Server的CDC官方文档可以看这个:https://docs.microsoft.com/zh-cn/sql/relational-databases/track-changes/about-change-data-capture-sql-server?view=sql-server-ver16。
- 配置过程主要参考了博文:CDC(Change Data Capture)功能使用及释疑 。
一些要点如下:
- 在SSMS的工具栏中点击新建查询,打开查询脚本;
- 执行下面语句,打开数据库的CDC功能(默认关闭);
exec sys.sp_cdc_enable_db
- 执行下面语句,在需要监控的数据库表上启用CDC功能;
- 各个选项的意义可以参考官方文档:https://docs.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sys-sp-cdc-enable-table-transact-sql?redirectedfrom=MSDN&view=sql-server-ver16。
- 注意一定要写上
@capture_instance
,否则后面无法关闭表的CDC功能。
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo', -- source_schema
@source_name = 'Test_Table', -- table_name
@capture_instance = 'dbo_Test_Table', -- capture_instance
@supports_net_changes = 1, -- supports_net_changes
@role_name = NULL, -- role_name
@index_name = NULL, -- index_name
@captured_column_list = NULL -- captured_column_list
- 注意检查SQL Server代理是否开启,如果不开启是不能启用CDC的。
- 在SQL Server Configuration Manager->SQL Server服务中也可以开启SQL Server代理。
- 成功开启CDC后,在系统表里面会有6个以
cdc
开头的表; - 执行下列语句应该有返回值。
# select * from cdc.dbo_xxx
select * from cdc.dbo_Test_Table
- 此时对数据库表执行
INSERT
,UPDATE
和DELETE
都会触发CDC产生记录。 - 注意:如果更改了开启CDC的数据表的结构或者某些属性的设置,CDC功能就会停用,需要重新开启。
- 关闭表的CDC功能:
EXEC sys.sp_cdc_disable_table
@source_schema = 'dbo', -- source_schema
@source_name = 'Test_Table', -- table_name
@capture_instance = 'dbo_Test_Table', -- capture_instance
- 另外,如果要新增表,则需要关掉数据库的CDC功能之后再添加,否则在新表上的查询可能会无法生效。
exec sys.sp_cdc_disable_db
二、配置Kafka
Kafka是一个高可用的消息系统,适用于数据批处理和流式数据。这里主要介绍如何在Windows上配置Kafka。
- Kafka的原理介绍可以参考博文:Kafka基本原理详解。
1.安装Zookeeper
- 首先是安装Zookeeper,因为Kafka需要依赖于Zookeeper才能运行。
- 主要参考博文:Windows 安装 Zookeeper 详细步骤。
- 下载的官网地址:https://zookeeper.apache.org/index.html。
- 找最新的稳定版本,下载不带源码的版本。
- 用cmd或者powershell来解压:
tar -zxvf .\apache-zookeeper-xxx.tar.gz
- 解压之后将解压的文件夹整个移动到常用的软件安装目录即可。
- 在
conf
文件夹中复制一份zoo.cfg
配置文件,然后配置data文件夹和log文件夹的地址。
- 进入
bin
文件夹,启动Zookeeper
点击zkServer.cmd启动服务端
点击zkCli.cmd启动客户端
注意:
- 如果同时启动了Tomcat,则由于占用了8080端口而无法启动Zookeeper
- 参考博客:ZooKeeper audit is enabled. Exiting JVM with code 4
- 在
zoo.cfg
中加上下面的修改端口即可:
# 端口号任意
admin.serverPort=17900
- 记得如果不在管理员的账号下,要用管理员身份运行两个
cmd
文件,否则权限会不足。 - 如果点击
cmd
文件时闪退,可以在cmd
文件的倒数第二行加上pause
来暂停窗口查看报错。
2.安装Kafka
- 主要参考博文:windows下安装kafka教程。
- 官网下载地址:https://kafka.apache.org/downloads.html。
- 下载最新二进制版本(Binary downloads),官方推荐是下载Scala 2.13版本。
- 用cmd或者powershell来解压:
tar -zxvf .\kafka_2.13-3.2.1.tgz
- 解压之后将解压的文件夹整个移动到常用的软件安装目录即可。
- 在
config
文件夹下配置server.properties
文件。 - 注意,运行Kafka之前一定要先启动Zookeeper。
- 回到根目录,下面是一些常用的命令:
# 启动Kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties
# 创建主题
.\bin\windows\kafka-topics.bat --create --topic test --bootstrap-server localhost:9092
# 查看所有主题
.\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
# 创建生产者进程
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
# 创建消费者进程
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
三、配置Kafka Connector
Kafka Connect用来连接Kafka和别的系统,是一种通用的连接框架标准。
- 部署过程主要参考了博文:Kafka Connect的部署和使用详解1(安装配置、基本用法)。
Kafka Connector的部署有两种方式:分布式(distributed)和标准(standalone),这里主要是介绍使用standalone模式部署。
1.配置配置文件
- 打开配置文件
conf/connect-standalone.properties
。 - 检查一下 bootstrap.servers是否为Kafka的地址和端口
- 重点是设置
plugin.path
,之后自行下载的connector都要放在该路径下才能生效。 - 检查
offset.storage.file.filename
文件是否存在,如果不存在要手动新建一个offsets文件,否则运行会出错。
2.配置Connector测试
-
Kafka自带了FileStreamSinkConnector和FileStreamSourceConnector这两个Connector,可以用于测试。
-
在
conf/connect-file-source.properties
中配置:
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=./test.txt # 监控输入的文件路径
topic=connect-test # Kafka主题名称
- 在
connect-file-sink.properties
中配置:
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=./test.sink.txt
topics=connect-test
- 用命令行启动这两个connector
# 用standalone形式启动connector进程
.\bin\windows\connect-standalone.bat config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
- 在Kafka根目录下新建两个文件
test.sink.txt
和test.txt
。在test.txt
输入内容,test.sink.txt
会自动输出相同的内容。
3.配置Debezium Connector
- Debezium是专门用于数据库CDC的连接器,官网文档:https://debezium.io/documentation/reference/1.9/。
- 针对于SQL Server的官方文档:https://debezium.io/documentation/reference/1.9/connectors/sqlserver.html#sqlserver-example-configuration。
-
下载连接器存档
debezium-connector-sqlserver-1.9.5.Final-plugin.tar.gz
,解压后把文件夹放到上面配置的plugin.path
路径下即可。 -
启动Debezium connector需要使用REST API格式。REST API是一种JSON格式,用于HTTP协议的数据传输,可以用Postman软件进行管理和配置。
-
Postman的一些使用教程:API测试之Postman使用完全指南(Postman教程,这篇文章就够了) 。
-
官方给的REST API配置样例如下:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "192.168.99.100",
"database.port": "1433",
"database.user": "sa",
"database.password": "Password!",
"database.dbname": "testDB",
"database.server.name": "fullfillment",
"table.include.list": "dbo.customers",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.fullfillment"
}
}
- 属性的说明如下:
- 使用Postman发送配置信息即可启动Debezium Connector
- 常用的一些REST API接口:
GET /connectors:返回所有正在运行的 connector 名
POST /connectors:新建一个 connector;请求体必须是 json 格式并且需要包含 name 字段和 config 字段,name 是 connector 的名字,config 是 json 格式,必须包含你的 connector 的配置信息。
GET /connectors/{name}:获取指定 connetor 的信息
GET /connectors/{name}/config:获取指定 connector 的配置信息
PUT /connectors/{name}/config:更新指定 connector 的配置信息
GET /connectors/{name}/status:获取指定 connector 的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks:获取指定 connector 正在运行的 task。
GET /connectors/{name}/tasks/{taskid}/status:获取指定 connector 的 task 的状态信息
PUT /connectors/{name}/pause:暂停 connector 和它的 task,停止数据处理知道它被恢复。
PUT /connectors/{name}/resume:恢复一个被暂停的 connector
POST /connectors/{name}/restart:重启一个 connector,尤其是在一个 connector 运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskId}/restart:重启一个 task,一般是因为它运行失败才这样做。
DELETE /connectors/{name}:删除一个 connector,停止它的所有 task 并删除配置。
例如:
# 建立新的connector进程
POST http://192.168.1.133:8083/connectors
# 查看所有的connector进程
GET http://192.168.1.133:8083/connectors
# 查看已安装的connector插件
GET http://192.168.1.133:8083/connector-plugins
# 查看connector进程状态
GET http://192.168.1.133:8083/connectors/test-sqlserver-connector/status
# 删除某个connector进程
DELETE http://192.168.1.133:8083/connectors/test-sqlserver-connector
4.一些注意
- 由于Debezium Connector不是由配置文件从控制台用
.bat
启动的,所以在使用REST API发送POST配置之前,一定要确保已经启动了Connector服务。 - 但由于Connector服务启动时一定要加上某个特定Connector的配置文件作为参数,所以这时候用默认自带的FileStreamSinkConnector和FileStreamSourceConnector作为参数就很合适,尽管这两个Connector我们实际中并不需要。
- 或许Debezium Connector也可以用配置文件+
.bat
在控制台启动,但暂时还不清楚如何配置,官方推荐也是用REST API启动的,而且强调了在启动前必须要先启动Connector服务。
四、创建JAVA的消费者
因为生产者已经由上面的CDC充当,所以为了响应CDC的消息,这里用JAVA来实现生产者进程。
- 主要是参考了博文:三分钟教会你如何使用IDEA操作Kafka创建生产者消费者(有详细案列)。
1.首先是导入Maven依赖包:
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.1</version>
</dependency>
2.配置fastjson库
- 由于Debezium Connector的CDC信息都是用JSON格式,所以还要下载一个JSON库,这里是用了fastjson。
- 可以参考博文:fastjson的基本使用方法和java 获取JSONObject中key对应的值。
3.Debezium Connector的CDC消息
- 使用poll可以从订阅的主题中获取消息,它的参数的含义是如果没有消息等待多久返回empty。
- 注意,poll不是每次只拉取一条消息的,而是可以一次性拉取很多条,默认最大是500。
- 关于poll的详细介绍可以参考博文:Apache Kafka(九)- Kafka Consumer 消费行为。
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
- 如果不commit消息,那么Kafka会认为这条消息还没有被处理成功,下次重启消费者进程后仍然会poll到这些消息。
- commit消息的方式有两种:自动commit和手动commit,可以参考博文:Kafka消费消息自动提交与手动提交。。
- 手动commit设置如下:
// disable auto commit of offsets
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 一次性提交所有已经poll过的消息,即poll方法返回的最大偏移量
consumer.commitSync();
- 如果是要在处理完一次poll数据的过程中细粒度地提交,可以参考博客:Kafka系列教程08:Consumer提交消息偏移量。
4.一个消费者进程demo
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class Kafka_Test {
public static void main(String[] args) {
Properties prop=new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.133:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
// latest收最新的数据 none会报错 earliest最早的数据
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"G1");
// 每次poll只拉取2条消息
prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2);
// 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton("fullfillment.dbo.Test_RDF_extractor"));//订阅
while (true){
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : poll) {
System.out.println(record.offset()+"\t"+record.key()+"\t"+record.value());
// 解析JSON值
JSONObject json_record_value = JSON.parseObject(record.value());
JSONObject json_record_value_after = json_record_value.getJSONObject("after");
String ext_code = json_record_value_after.getString("ext_code");
System.out.println(ext_code);
// 细粒度提交消息Offset
// 构建提交参数,包括partition和offset的信息
Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
// 使用下一个偏移量作为提交的值,下一次就从这里开始拉取消息
offsetMap.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, "no metadata"));
// 提交指定的offset
consumer.commitSync(offsetMap);
}
}
}
}
五、一些错误
错误1:io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.\r\n\tat io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:59)\r\n\tat io.debezium.connector.sqlserver.SqlServerConnectorTask.start(SqlServerConnectorTask.java:87)\r\n\tat io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:130)\r\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.initializeAndStart(WorkerSourceTask.java:226)\r\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)\r\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\r\n\tat java.util.concurrent.Executors
-
情景:使用REST API调用Debezium connector的时候启动失败。
-
原因:一般在手动删除了Kafka里面的dbhistory.xxx主题后就会出现这种情况,可能是Debezium本身的校准机制阻止了启动。
-
解决方法:
-
其实可以首先尝试修改connector的
name
;- 这样不需要重装Kafka和Zookeeper;
- topic里面的数据也不会丢失;
- 简直完美;
-
如果不行,再建议删掉Zookeeper和Kafka重装,直接停止所有进程,删除它们的文件夹,然后重新配置即可。
- 如果是一个一个删除,可以参考博客:Windows环境下Kafka删除主题遇到的大坑!!!,需要删除的内容如下:
- (1) Zookeeper需要用客户端删除brokers里面的对应主题,然后删除data和log文件夹
- (2) Kafka需要删除kafka-log文件夹
- 但不知道是什么原因,我删完之后重启Kafka还是会有偏移的记录,可能还要删掉Kafka的
connect.offsets
文件才行。 - 建议还是直接删掉重装一遍比较省事,毕竟Kafka和Zookeeper都无关业务数据和操作。
错误2:kafka-logs__consumer_offsets-7\00000000000000000000.timeindex.swap: 另一个程序正在使用此文件,进程无法访问。
- 情景:启动Kafka时出现,进程无法启动;
- 原因:可能是log文件正在被占用;
- 解决方法:删除kafka-logs下面的文件,重新启动即可。
错误3:正常启动所有kafka相关程序后,消费者程序不能实时接收kafka的消费者信息
- 情景:
- zookeeper、kafka、debezium connector均正常运行未报错;
- 数据库的CDC功能已开启;
- 原因:可能是数据库的SQL SERVER代理没有开启;
- 解决方法:开启数据库的SQL SERVER代理即可;
更多推荐
所有评论(0)