写在前面

这里记录一下CDC和Kafka的协同工作过程。

  • CDC(Change Data Capture:变更数据捕获)是数据库的一项功能,能够监控数据库表的变化。
  • Kafka是一种分布式消息系统。
  • 这里协同的目的是让CDC监控数据库表的更新,然后将更新发布到Kafka,最后让消费者响应这个更新。
  • 另外还用到了一个CDC和Kafka之间的连接器,叫Debezium

一、配置数据库CDC

这里是针对SQL Server2019进行配置。

一些要点如下:

  • 在SSMS的工具栏中点击新建查询,打开查询脚本;
  • 执行下面语句,打开数据库的CDC功能(默认关闭);
exec sys.sp_cdc_enable_db 
EXEC sys.sp_cdc_enable_table
        @source_schema = 'dbo', -- source_schema
        @source_name = 'Test_Table', -- table_name
        @capture_instance = 'dbo_Test_Table', -- capture_instance
        @supports_net_changes = 1, -- supports_net_changes
        @role_name = NULL, -- role_name
        @index_name = NULL, -- index_name
        @captured_column_list = NULL -- captured_column_list
  • 注意检查SQL Server代理是否开启,如果不开启是不能启用CDC的。
  • SQL Server Configuration Manager->SQL Server服务中也可以开启SQL Server代理。

SQL Server代理

  • 成功开启CDC后,在系统表里面会有6个以cdc开头的表;
  • 执行下列语句应该有返回值。
# select * from cdc.dbo_xxx
select * from cdc.dbo_Test_Table
  • 此时对数据库表执行INSERTUPDATEDELETE都会触发CDC产生记录。
  • 注意:如果更改了开启CDC的数据表的结构或者某些属性的设置,CDC功能就会停用,需要重新开启。
  • 关闭表的CDC功能:
EXEC sys.sp_cdc_disable_table
		@source_schema = 'dbo', -- source_schema
        @source_name = 'Test_Table', -- table_name
		@capture_instance = 'dbo_Test_Table', -- capture_instance
  • 另外,如果要新增表,则需要关掉数据库的CDC功能之后再添加,否则在新表上的查询可能会无法生效。
exec sys.sp_cdc_disable_db 

二、配置Kafka

Kafka是一个高可用的消息系统,适用于数据批处理和流式数据。这里主要介绍如何在Windows上配置Kafka。

1.安装Zookeeper

下载

  • 用cmd或者powershell来解压:
tar -zxvf .\apache-zookeeper-xxx.tar.gz
  • 解压之后将解压的文件夹整个移动到常用的软件安装目录即可。
  • conf文件夹中复制一份zoo.cfg配置文件,然后配置data文件夹和log文件夹的地址。

zoo.cfg配置文件

  • 进入bin文件夹,启动Zookeeper

点击zkServer.cmd启动服务端
点击zkCli.cmd启动客户端

注意:

# 端口号任意
admin.serverPort=17900
  • 记得如果不在管理员的账号下,要用管理员身份运行两个cmd文件,否则权限会不足。
  • 如果点击cmd文件时闪退,可以在cmd文件的倒数第二行加上pause来暂停窗口查看报错。

在cmd中添加pause暂停窗口

2.安装Kafka

下载

  • 用cmd或者powershell来解压:
tar -zxvf .\kafka_2.13-3.2.1.tgz
  • 解压之后将解压的文件夹整个移动到常用的软件安装目录即可。
  • config文件夹下配置server.properties文件。
  • 注意,运行Kafka之前一定要先启动Zookeeper
  • 回到根目录,下面是一些常用的命令:
# 启动Kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties
# 创建主题
.\bin\windows\kafka-topics.bat --create --topic test --bootstrap-server localhost:9092
# 查看所有主题
.\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
# 创建生产者进程
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
# 创建消费者进程
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

三、配置Kafka Connector

Kafka Connect用来连接Kafka和别的系统,是一种通用的连接框架标准。

Kafka Connector的部署有两种方式:分布式(distributed)和标准(standalone),这里主要是介绍使用standalone模式部署。

1.配置配置文件
  • 打开配置文件conf/connect-standalone.properties
  • 检查一下 bootstrap.servers是否为Kafka的地址和端口
  • 重点是设置plugin.path,之后自行下载的connector都要放在该路径下才能生效。
  • 检查offset.storage.file.filename文件是否存在,如果不存在要手动新建一个offsets文件,否则运行会出错。

配置

2.配置Connector测试
  • Kafka自带了FileStreamSinkConnectorFileStreamSourceConnector这两个Connector,可以用于测试。

  • conf/connect-file-source.properties中配置:

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=./test.txt  # 监控输入的文件路径
topic=connect-test  # Kafka主题名称
  • connect-file-sink.properties 中配置:
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=./test.sink.txt
topics=connect-test
  • 用命令行启动这两个connector
# 用standalone形式启动connector进程
.\bin\windows\connect-standalone.bat config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
  • 在Kafka根目录下新建两个文件test.sink.txttest.txt。在test.txt输入内容,test.sink.txt会自动输出相同的内容。
3.配置Debezium Connector

部署过程

  • 下载连接器存档debezium-connector-sqlserver-1.9.5.Final-plugin.tar.gz,解压后把文件夹放到上面配置的plugin.path路径下即可。

  • 启动Debezium connector需要使用REST API格式。REST API是一种JSON格式,用于HTTP协议的数据传输,可以用Postman软件进行管理和配置。

  • Postman的一些使用教程:API测试之Postman使用完全指南(Postman教程,这篇文章就够了)

  • 官方给的REST API配置样例如下:

{
    "name": "inventory-connector", 
    "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", 
        "database.hostname": "192.168.99.100", 
        "database.port": "1433", 
        "database.user": "sa", 
        "database.password": "Password!", 
        "database.dbname": "testDB", 
        "database.server.name": "fullfillment", 
        "table.include.list": "dbo.customers", 
        "database.history.kafka.bootstrap.servers": "kafka:9092", 
        "database.history.kafka.topic": "dbhistory.fullfillment" 
    }
}
  • 属性的说明如下:

属性说明

  • 使用Postman发送配置信息即可启动Debezium Connector
  • 常用的一些REST API接口:
GET /connectors:返回所有正在运行的 connector 名
POST /connectors:新建一个 connector;请求体必须是 json 格式并且需要包含 name 字段和 config 字段,name 是 connector 的名字,config 是 json 格式,必须包含你的 connector 的配置信息。
GET /connectors/{name}:获取指定 connetor 的信息
GET /connectors/{name}/config:获取指定 connector 的配置信息
PUT /connectors/{name}/config:更新指定 connector 的配置信息
GET /connectors/{name}/status:获取指定 connector 的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks:获取指定 connector 正在运行的 task。
GET /connectors/{name}/tasks/{taskid}/status:获取指定 connector 的 task 的状态信息
PUT /connectors/{name}/pause:暂停 connector 和它的 task,停止数据处理知道它被恢复。
PUT /connectors/{name}/resume:恢复一个被暂停的 connector
POST /connectors/{name}/restart:重启一个 connector,尤其是在一个 connector 运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskId}/restart:重启一个 task,一般是因为它运行失败才这样做。
DELETE /connectors/{name}:删除一个 connector,停止它的所有 task 并删除配置。

例如:

# 建立新的connector进程
POST http://192.168.1.133:8083/connectors
# 查看所有的connector进程
GET http://192.168.1.133:8083/connectors
# 查看已安装的connector插件
GET http://192.168.1.133:8083/connector-plugins
# 查看connector进程状态
GET http://192.168.1.133:8083/connectors/test-sqlserver-connector/status
# 删除某个connector进程
DELETE http://192.168.1.133:8083/connectors/test-sqlserver-connector
4.一些注意
  • 由于Debezium Connector不是由配置文件从控制台用.bat启动的,所以在使用REST API发送POST配置之前,一定要确保已经启动了Connector服务。
  • 但由于Connector服务启动时一定要加上某个特定Connector的配置文件作为参数,所以这时候用默认自带的FileStreamSinkConnectorFileStreamSourceConnector作为参数就很合适,尽管这两个Connector我们实际中并不需要。
  • 或许Debezium Connector也可以用配置文件+.bat在控制台启动,但暂时还不清楚如何配置,官方推荐也是用REST API启动的,而且强调了在启动前必须要先启动Connector服务。

四、创建JAVA的消费者

因为生产者已经由上面的CDC充当,所以为了响应CDC的消息,这里用JAVA来实现生产者进程。

1.首先是导入Maven依赖包:
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>3.1.1</version>
</dependency>
2.配置fastjson库
3.Debezium Connector的CDC消息
  • 使用poll可以从订阅的主题中获取消息,它的参数的含义是如果没有消息等待多久返回empty。
  • 注意,poll不是每次只拉取一条消息的,而是可以一次性拉取很多条,默认最大是500。
  • 关于poll的详细介绍可以参考博文:Apache Kafka(九)- Kafka Consumer 消费行为
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
  • 如果不commit消息,那么Kafka会认为这条消息还没有被处理成功,下次重启消费者进程后仍然会poll到这些消息。
  • commit消息的方式有两种:自动commit和手动commit,可以参考博文:Kafka消费消息自动提交与手动提交。。
  • 手动commit设置如下:
// disable auto commit of offsets
 properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 一次性提交所有已经poll过的消息,即poll方法返回的最大偏移量
consumer.commitSync();
4.一个消费者进程demo
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class Kafka_Test {
    public static void main(String[] args) {
        Properties prop=new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.133:9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        // latest收最新的数据 none会报错 earliest最早的数据
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        prop.put(ConsumerConfig.GROUP_ID_CONFIG,"G1");
        // 每次poll只拉取2条消息
        prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2);
        // 创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Collections.singleton("fullfillment.dbo.Test_RDF_extractor"));//订阅

        while (true){
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : poll) {
                System.out.println(record.offset()+"\t"+record.key()+"\t"+record.value());

                // 解析JSON值
                JSONObject json_record_value = JSON.parseObject(record.value());
                JSONObject json_record_value_after = json_record_value.getJSONObject("after");
                String ext_code = json_record_value_after.getString("ext_code");
                System.out.println(ext_code);

                // 细粒度提交消息Offset
                // 构建提交参数,包括partition和offset的信息
                Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
                // 使用下一个偏移量作为提交的值,下一次就从这里开始拉取消息
                offsetMap.put(new TopicPartition(record.topic(), record.partition()),
                        new OffsetAndMetadata(record.offset() + 1, "no metadata"));
                // 提交指定的offset
                consumer.commitSync(offsetMap);
            }
        }
    }
}

五、一些错误

错误1:io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.\r\n\tat io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:59)\r\n\tat io.debezium.connector.sqlserver.SqlServerConnectorTask.start(SqlServerConnectorTask.java:87)\r\n\tat io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:130)\r\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.initializeAndStart(WorkerSourceTask.java:226)\r\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)\r\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\r\n\tat java.util.concurrent.Executors

  • 情景:使用REST API调用Debezium connector的时候启动失败。

  • 原因:一般在手动删除了Kafka里面的dbhistory.xxx主题后就会出现这种情况,可能是Debezium本身的校准机制阻止了启动。

  • 解决方法

  • 其实可以首先尝试修改connector的name

    • 这样不需要重装Kafka和Zookeeper;
    • topic里面的数据也不会丢失;
    • 简直完美;
  • 如果不行,再建议删掉Zookeeper和Kafka重装,直接停止所有进程,删除它们的文件夹,然后重新配置即可。

    • 如果是一个一个删除,可以参考博客:Windows环境下Kafka删除主题遇到的大坑!!!,需要删除的内容如下:
    • (1) Zookeeper需要用客户端删除brokers里面的对应主题,然后删除data和log文件夹
    • (2) Kafka需要删除kafka-log文件夹
    • 但不知道是什么原因,我删完之后重启Kafka还是会有偏移的记录,可能还要删掉Kafka的connect.offsets文件才行。
    • 建议还是直接删掉重装一遍比较省事,毕竟Kafka和Zookeeper都无关业务数据和操作。

错误2:kafka-logs__consumer_offsets-7\00000000000000000000.timeindex.swap: 另一个程序正在使用此文件,进程无法访问。

  • 情景:启动Kafka时出现,进程无法启动;
  • 原因:可能是log文件正在被占用;
  • 解决方法:删除kafka-logs下面的文件,重新启动即可。

错误3:正常启动所有kafka相关程序后,消费者程序不能实时接收kafka的消费者信息

  • 情景
    • zookeeper、kafka、debezium connector均正常运行未报错;
    • 数据库的CDC功能已开启;
  • 原因:可能是数据库的SQL SERVER代理没有开启;
  • 解决方法:开启数据库的SQL SERVER代理即可;
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐