本文的上半部分介绍了基本原理和基本概念,请参考

本文的上半部分: Debezium connector for MySQL 基本概念

5. 部署

要部署 Debezium MySQL 连接器,您需要安装 Debezium MySQL 连接器插件程序,配置连接器,然后通过将其配置添加到 Kafka Connect 来启动连接器。

先决条件

程序

  1. 下载 Debezium MySQL 连接器插件

  2. 将文件提取到您的 Kafka Connect 环境中。

  3. 将包含 JAR 文件的目录添加到Kafka Connect 的plugin.path.

  4. 配置连接器并将配置添加到您的 Kafka Connect 集群。

  5. 重新启动 Kafka Connect 进程以获取新的 JAR 文件。

如果您正在使用不可变容器,请参阅Debezium 的Apache Zookeeper、Apache Kafka、MySQL 和 Kafka Connect 容器映像,其中 MySQL 连接器已安装并准备好运行。

您还可以在 Kubernetes 和 OpenShift 上运行 Debezium

5.1 MySQL 连接器配置示例

以下是连接器实例的配置样例,该样例从位于 192.168.99.100 的端口 3306 上的 MySQL 服务器捕获数据,我们在逻辑上将其命名为fullfillment. 通常,您通过设置可用于连接器的配置属性,在 JSON 文件中配置 Debezium MySQL 连接器。

您可以选择为数据库中的 某个库和某些表 生成事件。或者,您可以忽略、屏蔽或截断包含敏感数据、大于指定大小或您不需要的列。

{
    "name": "inventory-connector", // (1)
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", // (2)
        "database.hostname": "192.168.99.100", // (3)
        "database.port": "3306", // (4)
        "database.user": "debezium-user", // (5)
        "database.password": "debezium-user-pw", // (6)
        "database.server.id": "184054", // (7)
        "database.server.name": "fullfillment", // (8)
        "database.include.list": "inventory", // (9)
        "database.history.kafka.bootstrap.servers": "kafka:9092", // (10)
        "database.history.kafka.topic": "dbhistory.fullfillment", // (11)
        "include.schema.changes": "true" // (12)
    }
}
编号描述
1向 Kafka Connect 服务注册时的连接器名称。
2连接器的类名。
3MySQL 服务器地址。
4MySQL 服务器端口号。
5具有适当权限的 MySQL 用户。
6MySQL 用户的密码。
7连接器的唯一 ID。
8MySQL 服务器或集群的逻辑名称。
9指定服务器托管的数据库列表。
10连接器用于将 DDL 语句写入和恢复到数据库历史主题的 Kafka 代理列表。
11数据库历史主题的名称。本主题仅供内部使用,消费者不得使用。
12指定连接器是否应为 DDL 更改生成事件并将它们发送到fulfillment架构更改主题以供使用者使用的标志。

有关可以为 Debezium MySQL 连接器设置的配置属性的完整列表,请参阅MySQL 连接器配置属性

您可以使用命令将此配置发送POST到正在运行的 Kafka Connect 服务。该服务记录配置并启动一个执行以下操作的连接器任务:

  • 连接到 MySQL 数据库。

  • 在捕获模式下读取表的更改数据表。

  • 流将事件记录更改为 Kafka 主题。

5.2 添加连接器配置

要开始运行 MySQL 连接器,请配置连接器配置,并将配置添加到您的 Kafka Connect 集群。

先决条件:

  • MySQL 设置为使用 Debezium 连接器。

  • Debezium MySQL 连接器已安装。

步骤:

  1. 为 MySQL 连接器创建配置。

  2. 使用Kafka Connect REST API将该连接器配置添加到您的 Kafka Connect 集群。

结果:

连接器启动后,它会为连接器配置的 MySQL 数据库执行一致的快照。然后,连接器开始为行级操作生成数据更改事件,并将更改事件记录流式传输到 Kafka 主题。

5.3 连接器属性

Debezium MySQL 连接器有许多配置属性,您可以使用它们来为您的应用程序实现正确的连接器行为。许多属性都有默认值。有关属性的信息组织如下:

除非默认值可用,否则需要以下配置属性。

5.3.1 必需的 Debezium MySQL 连接器配置属性
字段默认描述
name无默认值连接器的唯一名称。尝试使用相同名称再次注册失败。所有 Kafka Connect 连接器都需要此属性。
connector.class无默认值连接器的 Java 类的名称。始终指定 io.debezium.connector.mysql.MySqlConnectorMySQL 连接器。
tasks.max1应为此连接器创建的最大任务数。MySQL 连接器始终使用单个任务,因此不使用此值,因此默认值始终是可以接受的。
database.hostname无默认值MySQL 数据库服务器的 IP 地址或主机名。
database.port3306MySQL 数据库服务器的整数端口号。
database.user无默认值连接到 MySQL 数据库服务器时要使用的 MySQL 用户的名称。
database.password无默认值连接到 MySQL 数据库服务器时使用的密码。
database.server.name无默认值为 Debezium 在其中捕获更改的特定 MySQL 数据库服务器/集群标识并提供命名空间的逻辑名称。逻辑名称在所有其他连接器中应该是唯一的,因为它用作所有接收此连接器发出的事件的 Kafka 主题名称的前缀。数据库服务器逻辑名称中只能使用字母数字字符、连字符、点和下划线。 +不要更改此属性的值。如果您更改名称值,在重新启动后,连接器不会继续向原始主题发出事件,而是向名称基于新值的主题发出后续事件。连接器也无法恢复其数据库历史主题。
database.server.id随机的此数据库客户端的数字 ID,在 MySQL 集群中所有当前运行的数据库进程中必须是唯一的。此连接器作为另一台服务器(具有此唯一 ID)加入 MySQL 数据库集群,因此它可以读取 binlog。默认情况下,会生成一个介于 5400 和 6400 之间的随机数,但建议显式设置一个值。
database.include.list空字符串一个可选的、以逗号分隔的正则表达式列表,与要为其捕获更改的数据库的名称相匹配。连接器不会捕获名称不在的任何数据库中的更改database.include.list。默认情况下,连接器会捕获所有数据库中的更改。不要同时设置database.exclude.list连接器配置属性。
database.exclude.list空字符串一个可选的、以逗号分隔的正则表达式列表,与您不想捕获其更改的数据库的名称相匹配。连接器捕获名称不在database.exclude.list. 不要同时设置database.include.list连接器配置属性。
table.include.list空字符串一个可选的、以逗号分隔的正则表达式列表,与您要捕获其更改的表的完全限定表标识符匹配。连接器不会捕获任何未包含在table.include.list. 每个标识符的格式为databaseName表名。默认情况下,连接器会捕获每个数据库中每个非系统表中的更改,这些表的更改正在被捕获。不要同时指定table.exclude.list连接器配置属性。
table.exclude.list空字符串一个可选的、以逗号分隔的正则表达式列表,匹配您不想捕获其更改的表的完全限定表标识符。连接器捕获未包含在任何表中的更改table.exclude.list。每个标识符的格式为databaseName表名。不要同时指定table.include.list连接器配置属性。
column.exclude.list空字符串一个可选的、以逗号分隔的正则表达式列表,与要从更改事件记录值中排除的列的完全限定名称匹配。列的完全限定名称的格式为databaseName表名列名
column.include.list空字符串一个可选的、以逗号分隔的正则表达式列表,与要包含在更改事件记录值中的列的完全限定名称匹配。列的完全限定名称的格式为databaseName表名列名
column.truncate.to._length_.chars不适用如果字段值长于指定的字符数,则以逗号分隔的可选正则表达式列表匹配基于字符的列的完全限定名称,这些列的值应在更改事件记录值中被截断。您可以在单个配置中配置具有不同长度的多个属性。长度必须是正整数。列的完全限定名称的格式为databaseName表名列名
column.mask.with._length_.chars不适用一个可选的、以逗号分隔的正则表达式列表,匹配基于字符的列的完全限定名称,其值应在更改事件消息值中替换为由指定数量的星号 ( *) 字符组成的字段值。您可以在单个配置中配置具有不同长度的多个属性。每个长度必须是正整数或零。列的完全限定名称的格式为databaseName表名列名
column.mask.hash.*hashAlgorithm*.with.salt.*salt*; column.mask.hash.v2.*hashAlgorithm*.with.salt.*salt*不适用一个可选的、以逗号分隔的正则表达式列表,匹配基于字符的列的完全限定名称。列的完全限定名称采用. 在生成的更改事件记录中,指定列的值将替换为假名。*<databaseName>*.*<tableName>*.*<columnName>* 假名由应用指定的hashAlgorithmsalt产生的散列值组成。基于所使用的散列函数,参照完整性得以保持,而列值被替换为假名。Java Cryptography Architecture Standard Algorithm Name Documentation的MessageDigest 部分描述了支持的散列函数。 在以下示例中,CzQMA0cB5K是随机选择的盐。 column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName如有必要,假名会自动缩短为列的长度。连接器配置可以包括指定不同哈希算法和盐的多个属性。 根据使用的hashAlgorithm、选择的salt和实际数据集,生成的数据集可能不会被完全屏蔽。 如果值在不同的地方或系统中进行散列,则应使用散列策略版本 2 来确保保真度。
column.propagate.source.type不适用一个可选的,以逗号分隔的正则表达式列表,匹配列的完全限定名称,其原始类型和长度应作为参数添加到发出的更改事件记录中的相应字段模式。这些架构参数:__Debezium.source.column.type``__Debezium.source.column.length``__Debezium.source.column.scale分别用于传播可变宽度类型的原始类型名称和长度。这对于正确调整接收器数据库中相应列的大小很有用。列的完全限定名称是以下形式之一:数据库名称表名列名**数据库名称架构名称表名列名
datatype.propagate.source.type不适用一个可选的,以逗号分隔的正则表达式列表,它与列的特定于数据库的数据类型名称相匹配,其原始类型和长度应作为参数添加到发出的更改事件记录中的相应字段模式中。这些架构参数:__debezium.source.column.type``__debezium.source.column.length``__debezium.source.column.scale分别用于传播可变宽度类型的原始类型名称和长度。这对于正确调整接收器数据库中相应列的大小很有用。完全限定的数据类型名称是以下形式之一:数据库名称表名类型名称**数据库名称架构名称表名类型名称查看MySQL 连接器如何映射数据类型以获取 MySQL 特定的数据类型名称列表。
time.precision.modeadaptive_time_microseconds时间、日期和时间戳可以用不同类型的精度表示,包括:( adaptive_time_microseconds默认)根据数据库列的类型使用毫秒、微秒或纳秒精度值精确地捕获数据库中的日期、日期时间和时间戳值, TIME 类型字段除外,它们始终以微秒为单位捕获。 adaptive(已弃用)根据数据库列的类型,使用毫秒、微秒或纳秒精度值精确捕获数据库中的时间和时间戳值。 connect始终使用 Kafka Connect 的时间、日期和时间戳的内置表示来表示时间和时间戳值,无论数据库列的精度如何,它们都使用毫秒精度。
decimal.handling.modeprecise指定连接器应如何处理DECIMALNUMERICprecise的值:(默认)使用java.math.BigDecimal二进制形式的更改事件中表示的值精确地表示它们。 double使用double值表示它们,这可能会导致精度损失,但更易于使用。 string将值编码为格式化字符串,这很容易使用,但有关真实类型的语义信息会丢失。
bigint.unsigned.handling.modelong指定 BIGINT UNSIGNED 列应如何在更改事件中表示。可能的设置是: long使用 Java 表示值long,这可能无法提供精度,但在消费者中易于使用。long通常是首选设置。 precise用于表示值,这些值使用二进制表示和 Kafka Connect 的类型java.math.BigDecimal编码在更改事件中。org.apache.kafka.connect.data.Decimal处理大于 2^63 的值时使用此设置,因为这些值无法通过使用long.
include.schema.changestrue布尔值,指定连接器是否应将数据库架构中的更改发布到与数据库服务器 ID 同名的 Kafka 主题。通过使用包含数据库名称并且其值包含 DDL 语句的键来记录每个架构更改。这与连接器内部记录数据库历史的方式无关。
include.schema.commentsfalse布尔值,指定连接器是否应解析和发布元数据对象的表和列注释。启用此选项将对内存使用产生影响。逻辑模式对象的数量和大小在很大程度上影响了 Debezium 连接器消耗的内存量,并且向它们中的每一个添加潜在的大字符串数据可能会非常昂贵。
include.queryfalse布尔值,指定连接器是否应包含生成更改事件的原始 SQL 查询。 如果您将此选项设置为,那么您还必须使用设置为的选项true配置 MySQL 。当是时,对于快照过程生成的事件不存在查询。 设置为可能会公开通过在更改事件中包含原始 SQL 语句而显式排除或屏蔽的表或字段。因此,默认设置为。binlog_rows_query_log_events``ON``include.query``true include.query``true``false
event.deserialization.failure.handling.modefail指定连接器在反序列化二进制日志事件期间应如何对异常做出反应。 fail传播异常,该异常指示有问题的事件及其二进制日志偏移量,并导致连接器停止。 warn记录有问题的事件及其二进制日志偏移量,然后跳过该事件。 ignore跳过有问题的事件并且不记录任何内容。
inconsistent.schema.handling.modefail指定连接器应如何对与内部模式表示中不存在的表相关的二进制日志事件作出反应。即内部表示与数据库不一致。 fail抛出一个异常,指示有问题的事件及其二进制日志偏移量,并导致连接器停止。 warn记录有问题的事件及其二进制日志偏移量并跳过该事件。 skip跳过有问题的事件并且不记录任何内容。
max.batch.size2048正整数值,指定在此连接器的每次迭代期间应处理的每批事件的最大大小。默认为 2048。
max.queue.size8192正整数值,指定阻塞队列可以保存的最大记录数。当 Debezium 读取从数据库流式传输的事件时,它会将事件放入阻塞队列中,然后再将它们写入 Kafka。在连接器接收消息的速度快于将消息写入 Kafka 的速度或 Kafka 不可用时,阻塞队列可以为从数据库读取更改事件提供背压。当连接器定期记录偏移量时,将忽略队列中保存的事件。始终将 的值设置max.queue.size为大于 的值max.batch.size
max.queue.size.in.bytes0一个长整数值,指定阻塞队列的最大容量(以字节为单位)。默认情况下,没有为阻塞队列指定卷限制。要指定队列可以使用的字节数,请将此属性设置为正长值。 如果max.queue.size也设置了,当队列的大小达到任一属性指定的限制时,写入队列将被阻止。例如,如果设置max.queue.size=1000, 和max.queue.size.in.bytes=5000,则在队列包含 1000 条记录或队列中的记录量达到 5000 字节后阻止写入队列。
poll.interval.ms1000正整数值,指定连接器在开始处理一批事件之前应等待新更改事件出现的毫秒数。默认为 1000 毫秒或 1 秒。
connect.timeout.ms30000一个正整数值,指定此连接器在尝试连接到 MySQL 数据库服务器后在超时之前应等待的最长时间(以毫秒为单位)。默认为 30 秒。
gtid.source.includes无默认值与 GTID 集中的源 UUID 匹配的正则表达式的逗号分隔列表,用于查找 MySQL 服务器中的 binlog 位置。仅使用具有与其中一种包含模式匹配的源的 GTID 范围。不要同时指定gtid.source.excludes.
gtid.source.excludes无默认值与 GTID 集中的源 UUID 匹配的正则表达式的逗号分隔列表,用于查找 MySQL 服务器中的 binlog 位置。仅使用具有不匹配任何这些排除模式的源的 GTID 范围。不要同时为 指定值gtid.source.includes
gtid.new.channel.position 已弃用并计划移除earliest当设置为latest时,当连接器看到一个新的 GTID 通道时,它会从该 GTID 通道中最后执行的事务开始消费。如果设置为earliest(默认),则连接器从第一个可用(未清除)GTID 位置开始读取该通道。earliest当您使用 Debezium 连接到主服务器的主动-被动 MySQL 设置时非常有用。在这种情况下,在故障转移期间,具有新 UUID(和 GTID 通道)的副本在连接 Debezium 之前开始接收写入。使用latest.
tombstones.on.deletetrue控制删除事件后是否有墓碑事件。 true- 删除操作由删除事件和随后的墓碑事件表示。 false- 只发出一个删除事件。 删除源记录后,发出 tombstone 事件(默认行为)允许 Kafka 完全删除与已删除行的键相关的所有事件,以防主题启用日志压缩。
message.key.columns不适用一个表达式列表,指定连接器用于形成自定义消息键的列,以形成它发布到指定表的 Kafka 主题的更改事件记录。默认情况下,Debezium 使用表的主键列作为它发出的记录的消息键。代替默认值,或为缺少主键的表指定键,您可以基于一个或多个列配置自定义消息键。 要为表建立自定义消息键,请列出表,然后列出要用作消息键的列。每个列表条目采用以下格式: *<fully-qualified_tableName>*:_<keyColumn>_,*<keyColumn>* 要基于多个列名的表键,在列名之间插入逗号。每个完全限定的表名都是以下格式的正则表达式: 该属性可以包含多个表的条目。使用分号分隔列表中的表条目。 下面的示例设置表和的消息键:对于 表,列和指定为消息键。对于任何数据库中的表,列和服务器作为消息键。 *<databaseName>*.*<tableName>* inventory.customers``purchase.orders inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4 inventory.customer``pk1``pk2``purchaseorders``pk3``pk4用于创建自定义消息键的列数没有限制。但是,最好使用指定唯一键所需的最小数量。
binary.handling.mode字节指定如何在更改事件中表示二进制列,例如 、、blob。可能的设置:将二进制数据表示为字节数组。将二进制数据表示为 base64 编码的字符串。将二进制数据表示为十六进制编码 (base16) 字符串。binary``varbinary bytes base64 hex
schema.name.adjustment.modeavro指定应如何调整架构名称以与连接器使用的消息转换器兼容。可能的设置: avro用下划线替换不能在 Avro 类型名称中使用的字符。 none不应用任何调整。
5.3.2 高级 MySQL 连接器配置属性

下表描述了高级 MySQL 连接器属性。这些属性的默认值很少需要更改。因此,您无需在连接器配置中指定它们。

表 23. MySQL 连接器高级配置属性的描述

字段默认描述
connect.keep.alivetrue一个布尔值,指定是否应使用单独的线程来确保与 MySQL 服务器/集群的连接保持活动状态。
table.ignore.builtintrue一个布尔值,指定是否应忽略内置系统表。无论表包含和排除列表如何,这都适用。默认情况下,系统表不会被捕获其更改,并且在对任何系统表进行更改时不会生成任何事件。
database.ssl.modedisabled指定是否使用加密连接。可能的设置有: disabled指定使用未加密的连接。 preferred如果服务器支持安全连接,则建立加密连接。如果服务器不支持安全连接,则回退到未加密的连接。 required建立加密连接,如果因任何原因无法建立连接,则失败。 verify_ca行为类似required,但另外它会根据配置的证书颁发机构 (CA) 证书验证服务器 TLS 证书,如果服务器 TLS 证书与任何有效的 CA 证书不匹配,则会失败。 verify_identity行为类似verify_ca,但另外验证服务器证书是否与远程连接的主机匹配。
binlog.buffer.size0二进制日志阅读器使用的前瞻缓冲区的大小。默认设置0禁用缓冲。 在特定情况下,MySQL binlog 中可能包含ROLLBACK语句完成的未提交数据。典型示例是在单个事务中使用保存点或混合临时和常规表更改。因此它可以确定是否从事务中流式传输更改。binlog 缓冲区的大小定义了 Debezium 在搜索事务边界时可以缓冲的事务中的最大更改数。如果事务的大小大于缓冲区,则 Debezium 必须在流式传输时回退并重新读取未放入缓冲区的事件。 当检测到事务开始时,Debezium 会尝试前滚 binlog 位置并找到COMMITROLLBACK 注意:此功能正在孵化。鼓励反馈。预计此功能尚未完全完善。
snapshot.modeinitial指定连接器启动时运行快照的条件。可能的设置有: initial- 仅当没有为逻辑服务器名称记录偏移时,连接器才运行快照。 initial_only- 连接器仅在没有记录逻辑服务器名称的偏移量时运行快照,然后停止;即它不会从 binlog 中读取更改事件。 when_needed- 连接器在它认为有必要时在启动时运行快照。也就是说,当没有可用的偏移量时,或者当先前记录的偏移量指定了服务器中不可用的 binlog 位置或 GTID 时。 never- 连接器从不使用快照。首次使用逻辑服务器名称启动时,连接器从 binlog 的开头读取。谨慎配置此行为。只有当 binlog 保证包含数据库的全部历史时才有效。 schema_only- 连接器运行模式而不是数据的快照。当您不需要主题包含数据的一致快照但需要它们仅具有自连接器启动以来的更改时,此设置很有用。 schema_only_recovery- 这是已捕获更改的连接器的恢复设置。当您重新启动连接器时,此设置可以恢复损坏或丢失的数据库历史主题。您可以定期设置它来“清理”一个意外增长的数据库历史主题。数据库历史主题需要无限保留。
snapshot.locking.modeminimal控制连接器是否持有全局 MySQL 读锁以及持有多长时间,这会在连接器执行快照时阻止对数据库的任何更新。可能的设置有: minimal- 连接器仅对快照的初始部分持有全局读锁,在此期间连接器读取数据库模式和其他元数据。快照中的剩余工作涉及从每个表中选择所有行。连接器可以通过使用 REPEATABLE READ 事务以一致的方式执行此操作。即使不再持有全局读锁并且其他 MySQL 客户端正在更新数据库,情况也是如此。 minimal_percona- 连接器保持全局备份锁仅用于连接器读取数据库模式和其他元数据的快照的初始部分。快照中的剩余工作涉及从每个表中选择所有行。连接器可以通过使用 REPEATABLE READ 事务以一致的方式执行此操作。即使不再持有全局备份锁并且其他 MySQL 客户端正在更新数据库,情况也是如此。此模式不会将表刷新到磁盘,不会被长时间运行的读取阻塞,并且仅在 Percona Server 中可用。 extended- 在快照期间阻止所有写入。如果有客户端正在提交 MySQL 从 REPEATABLE READ 语义中排除的操作,请使用此设置。 none- 防止连接器在快照期间获取任何表锁。虽然所有快照模式都允许使用此设置,但当且当在快照运行时没有发生架构更改时使用它是安全的。对于使用 MyISAM 引擎定义的表,尽管在 MyISAM 获取表锁时设置了此属性,但表仍将被锁定。这种行为与获取行级锁的 InnoDB 引擎不同。
snapshot.include.collection.list中指定的所有表table.include.list一个可选的、以逗号分隔的正则表达式列表,与*<databaseName>.<tableName>*要包含在快照中的表的完全限定名称 ( ) 匹配。指定的项目必须在连接器的table.include.list属性中命名。snapshot.mode仅当连接器的属性设置为 以外的值时,此属性才会生效never。 此属性不影响增量快照的行为。
snapshot.select.statement.overrides无默认值指定要包含在快照中的表行。如果您希望快照仅包含表中行的子集,请使用该属性。此属性仅影响快照。它不适用于连接器从日志中读取的事件。该属性包含格式为 的完全限定表名的逗号分隔列表*<databaseName>.<tableName>*。例如, "snapshot.select.statement.overrides": "inventory.products,customers.orders" 对于列表中的每个表,添加一个进一步的配置属性,该属性指定SELECT连接器在拍摄快照时要在表上运行的语句。指定的SELECT语句确定要包含在快照中的表行子集。使用以下格式指定此SELECT语句属性的名称:. 例如, 。 例子: snapshot.select.statement.overrides.*<databaseName>*.*<tableName>*``snapshot.select.statement.overrides.customers.orders 如果您希望快照仅包含未软删除的记录,请从customers.orders包含软删除列的表中添加以下属性:delete_flag``"snapshot.select.statement.overrides": "customer.orders", "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC"在生成的快照中,连接器仅包含delete_flag = 0.
min.row.count.to.stream.results1000在快照期间,连接器会查询连接器配置为捕获更改的每个表。连接器使用每个查询结果来生成包含该表中所有行的数据的读取事件。此属性确定 MySQL 连接器是将表的结果放入内存(速度快但需要大量内存)还是流式传输结果(可能较慢但适用于非常大的表)。此属性的设置指定在连接器流式传输结果之前表必须包含的最小行数。 要跳过所有表大小检查并始终在快照期间流式传输所有结果,请将此属性设置为0
heartbeat.interval.ms0控制连接器向 Kafka 主题发送心跳消息的频率。默认行为是连接器不发送心跳消息。 心跳消息对于监视连接器是否从数据库接收更改事件很有用。心跳消息可能有助于减少连接器重新启动时需要重新发送的更改事件的数量。要发送心跳消息,请将此属性设置为正整数,表示心跳消息之间的毫秒数。
heartbeat.topics.prefix__debezium-heartbeat控制连接器向其发送心跳消息的主题的名称。主题名称具有以下模式: heartbeat.topics.prefixserver.name 例如,如果数据库服务器名称为fulfillment,则默认主题名称为__debezium-heartbeat.fulfillment
heartbeat.action.query无默认值指定连接器发送心跳消息时连接器在源数据库上执行的查询。 例如,这可用于定期捕获源数据库中设置的已执行 GTID 的状态。 INSERT INTO gtid_history_table (select * from mysql.gtid_executed)
database.initial.statements无默认值建立与数据库的 JDBC 连接(而不是读取事务日志的连接)时要执行的 SQL 语句的分号分隔列表。要将分号指定为 SQL 语句中的字符而不是分隔符,请使用两个分号 ( ;;)。 连接器可能会自行决定建立 JDBC 连接,因此该属性仅用于配置会话参数。它不适用于执行 DML 语句。
snapshot.delay.ms无默认值连接器启动时执行快照之前连接器应等待的时间间隔(以毫秒为单位)。如果您在集群中启动多个连接器,此属性对于避免快照中断很有用,这可能会导致连接器重新平衡。
snapshot.fetch.size无默认值在快照期间,连接器分批读取表内容。此属性指定批处理中的最大行数。
snapshot.lock.timeout.ms10000正整数,指定执行快照时等待获取表锁的最长时间(以毫秒为单位)。如果连接器在此时间间隔内无法获取表锁,则快照失败。了解MySQL 连接器如何执行数据库快照
enable.time.adjustertrue指示连接器是否将 2 位数年份规范转换为 4 位数的布尔值。设置为何false时将转换完全委托给数据库。 MySQL 允许用户插入 2 位或 4 位的年份值。对于 2 位值,该值将映射到 1970 - 2069 范围内的年份。默认行为是连接器进行转换。
source.struct.versionv2sourceDebezium 事件中块的架构版本。Debezium 0.10 对块的结构进行了一些重大更改,source以统一所有连接器的暴露结构。 通过将此选项设置为v1,可以生成早期版本中使用的结构。但是,不建议使用此设置,并计划在未来的 Debezium 版本中删除。
sanitize.field.namestrue如果连接器配置将key.converterorvalue.converter属性设置为 Avro 转换器。 false如果不。指示字段名称是否经过清理以符合Avro 命名要求
skipped.operations无默认值流式传输期间要跳过的操作类型的逗号分隔列表。以下值是可能的:c用于插入/创建、u用于更新、d用于删除。默认情况下,不会跳过任何操作。
signal.data.collection无默认值用于向连接器发送信号的数据集合的完全限定名称。 使用以下格式指定集合名称: *<databaseName>*.*<tableName>*
incremental.snapshot.allow.schema.changesfalse在增量快照期间允许架构更改。启用后,连接器将在增量快照期间检测架构更改并重新选择当前块以避免锁定 DDL。 请注意,不支持对主键的更改,如果在增量快照期间执行,可能会导致不正确的结果。另一个限制是,如果架构更改仅影响列的默认值,则在从 binlog 流处理 DDL 之前不会检测到更改。这不会影响快照事件的值,但快照事件的架构可能具有过时的默认值。
incremental.snapshot.chunk.size1024连接器在增量快照块期间获取并读入内存的最大行数。增加块大小提供了更高的效率,因为快照运行的快照查询更少,但更大的大小。然而,更大的块大小也需要更多的内存来缓冲快照数据。将块大小调整为在您的环境中提供最佳性能的值。
read.onlyfalse切换到替代增量快照水印实现以避免写入信号数据收集
provide.transaction.metadatafalse确定连接器是否生成具有事务边界的事件并使用事务元数据丰富更改事件信封。指定true是否希望连接器执行此操作。有关详细信息,请参阅事务元数据。
transaction.topic${database.server.name}.transaction控制连接器向其发送事务元数据消息的主题的名称。占位符${database.server.name}可用于引用连接器的逻辑名称;默认为${database.server.name}.transaction,例如dbserver1.transaction
5.3.3 Debezium 连接器数据库历史配置属性

Debezium 提供了一组database.history.*属性来控制连接器如何与模式历史主题(schema history topic)交互。

下表描述了database.history用于配置 Debezium 连接器的属性。

表 24. 连接器数据库历史配置属性

字段默认描述
database.history.kafka.topic连接器存储数据库架构历史的 Kafka 主题的全名。
database.history.kafka.bootstrap.servers连接器用于建立与 Kafka 集群的初始连接的主机/端口对列表。此连接用于检索连接器先前存储的数据库模式历史记录,并用于编写从源数据库读取的每个 DDL 语句。每对都应该指向 Kafka Connect 进程使用的同一个 Kafka 集群。
database.history.kafka.recovery.poll.interval.ms100一个整数值,指定连接器在启动/恢复期间轮询持久数据时应等待的最大毫秒数。默认值为 100 毫秒。
database.history.kafka.query.timeout.ms3000一个整数值,指定连接器在使用 Kafka 管理客户端获取集群信息时应等待的最大毫秒数。
database.history.kafka.recovery.attempts4在连接器恢复失败并出现错误之前连接器应尝试读取持久历史数据的最大次数。没有收到数据后等待的最长时间为recovery.attemptsx recovery.poll.interval.ms
database.history.skip.unparseable.ddlfalse一个布尔值,指定连接器是否应忽略格式错误或未知的数据库语句或停止处理,以便人们可以解决问题。安全的默认值为false. 跳过应该小心使用,因为它会在处理 binlog 时导致数据丢失或损坏。
database.history.store.only.monitored.tables.ddl


已弃用并计划在未来版本中删除;改为使用database.history.store.only.captured.tables.ddl
false一个布尔值,指定连接器是否应记录所有 DDL 语句


true仅记录那些与 Debezium 正在捕获其更改的表相关的 DDL 语句。true请谨慎设置,因为如果您更改哪些表已捕获其更改,则可能需要丢失数据。


安全的默认值为false.
database.history.store.only.captured.tables.ddlfalse一个布尔值,指定连接器是否应记录所有 DDL 语句


true仅记录那些与 Debezium 正在捕获其更改的表相关的 DDL 语句。true请谨慎设置,因为如果您更改哪些表已捕获其更改,则可能需要丢失数据。


安全的默认值为false.

用于配置生产者和消费者客户端的直通数据库历史记录属性

Debezium 依赖 Kafka 生产者将模式更改写入数据库历史主题。同样,当连接器启动时,它依赖于 Kafka 消费者从数据库历史主题中读取。database.history.producer.*您可以通过将值分配给以和database.history.consumer.*前缀开头的一组传递配置属性来定义 Kafka 生产者和消费者客户端的配置。传递的生产者和消费者数据库历史属性控制一系列行为,例如这些客户端如何保护与 Kafka 代理的连接,如以下示例所示:

database.history.producer.security.protocol=SSL
database.history.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.producer.ssl.keystore.password=test1234
database.history.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.producer.ssl.truststore.password=test1234
database.history.producer.ssl.key.password=test1234

database.history.consumer.security.protocol=SSL
database.history.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.consumer.ssl.keystore.password=test1234
database.history.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.consumer.ssl.truststore.password=test1234
database.history.consumer.ssl.key.password=test1234

Debezium 在将属性传递给 Kafka 客户端之前从属性名称中去除前缀​​。

有关Kafka 生产者配置属性Kafka 消费者配置属性的更多详细信息,请参阅 Kafka 文档。

5.3.4 Debezium 连接器 Kafka 信号配置属性

当 MySQL 连接器配置为只读时,信号表的替代方案是信号 Kafka 主题。

Debezium 提供了一组signal.*属性来控制连接器如何与 Kafka 信号主题交互。

下表描述了这些signal属性。

表 25. Kafka 信号配置属性

字段默认描述
signal.kafka.topic连接器监视即席信号的 Kafka 主题的名称。
signal.kafka.bootstrap.servers连接器用于建立与 Kafka 集群的初始连接的主机/端口对列表。每对都应该指向 Kafka Connect 进程使用的同一个 Kafka 集群。
signal.kafka.poll.timeout.ms100一个整数值,指定连接器在轮询信号时应等待的最大毫秒数。默认值为 100 毫秒。
5.3.5 Debezium 连接器传递信号 Kafka 消费者客户端配置属性

Debezium 连接器提供信号 Kafka 消费者的直通配置。直通信号属性以前缀 开头signals.consumer.*。例如,连接器将属性传递signal.consumer.security.protocol=SSL给 Kafka 消费者。

数据库历史客户端的传递属性一样,Debezium 在将它们传递给 Kafka 信号消费者之前从属性中去除前缀​​。

5.3.6 Debezium 连接器直通数据库驱动程序配置属性

Debezium 连接器提供数据库驱动程序的直通配置。直通数据库属性以前缀 开头database.*。例如,连接器将属性传递database.foobar=false给 JDBC URL。

数据库历史客户端的传递属性一样,Debezium 在将它们传递给数据库驱动程序之前从属性中去除前缀​​。

6. 监控

除了 Zookeeper、Kafka 和 Kafka Connect 提供的对 JMX 指标的内置支持之外,Debezium MySQL 连接器还提供三种类型的指标。

Debezium 监控文档提供了有关如何使用 JMX 公开这些指标的详细信息。

6.1 快照指标

MBean

debezium.mysql:type=connector-metrics,context=snapshot,server=<mysql.server.name>

除非快照操作处于活动状态,或者自上次连接器启动以来已发生快照,否则不会公开快照指标。

下表列出了可用的快照指标。

属性类型描述
LastEventstring连接器读取的最后一个快照事件。
MilliSecondsSinceLastEventlong自连接器读取并处理最新事件以来的毫秒数。
TotalNumberOfEventsSeenlong自上次启动或重置以来,此连接器已看到的事件总数。
NumberOfEventsFilteredlong已被连接器上配置的包含/排除列表过滤规则过滤的事件数。
CapturedTablesstring[]连接器捕获的表列表。
QueueTotalCapacityint用于在快照程序和主 Kafka Connect 循环之间传递事件的队列长度。
QueueRemainingCapacityint用于在快照程序和主 Kafka Connect 循环之间传递事件的队列的可用容量。
TotalTableCountint快照中包含的表总数。
RemainingTableCountint快照尚未复制的表数。
SnapshotRunningboolean快照是否已启动。
SnapshotAbortedboolean快照是否已中止。
SnapshotCompletedboolean快照是否完成。
SnapshotDurationInSecondslong到目前为止,快照已拍摄的总秒数,即使未完成也是如此。
RowsScannedMap<String, Long>包含为快照中的每个表扫描的行数的映射。在处理过程中,表格会逐渐添加到地图中。每扫描 10,000 行并在完成表时更新。
MaxQueueSizeInByteslong队列的最大缓冲区(以字节为单位)。max.queue.size.in.bytes如果设置为正长值,则此指标可用。
CurrentQueueSizeInByteslong队列中的当前记录量(以字节为单位)。

执行增量快照时,连接器还提供以下附加快照指标:

属性类型描述
ChunkIdstring当前快照块的标识符。
ChunkFromstring定义当前块的主键集的下限。
ChunkTostring定义当前块的主键集的上限。
TableFromstring当前快照表的主键集的下限。
TableTostring当前快照表的主键集的上限。

Debezium MySQL 连接器还提供HoldingGlobalLock自定义快照指标。该指标设置为一个布尔值,指示连接器当前是否持有全局或表写锁。

6.2 stream 指标

仅当启用 binlog 事件缓冲时,事务相关属性才可用。有关更多详细信息,请参阅binlog.buffer.size高级连接器配置属性。

MBean

debezium.mysql:type=connector-metrics,context=streaming,server=<mysql.server.name>

下表列出了可用的流式指标。

属性类型描述
LastEventstring连接器读取的最后一个流事件。
MilliSecondsSinceLastEventlong自连接器读取并处理最新事件以来的毫秒数。
TotalNumberOfEventsSeenlong自上次启动或指标重置以来,此连接器已看到的事件总数。
TotalNumberOfCreateEventsSeenlong自上次启动或指标重置以来,此连接器已看到的创建事件总数。
TotalNumberOfUpdateEventsSeenlong自上次启动或指标重置以来,此连接器已看到的更新事件总数。
TotalNumberOfDeleteEventsSeenlong自上次启动或指标重置以来,此连接器已看到的删除事件总数。
NumberOfEventsFilteredlong已被连接器上配置的包含/排除列表过滤规则过滤的事件数。
CapturedTablesstring[]连接器捕获的表列表。
QueueTotalCapacityint用于在流媒体和主 Kafka Connect 循环之间传递事件的队列长度。
QueueRemainingCapacityint队列的空闲容量,用于在流媒体和 Kafka Connect 主循环之间传递事件。
Connectedboolean表示连接器当前是否连接到数据库服务器的标志。
MilliSecondsBehindSourcelong上次更改事件的时间戳与处理它的连接器之间的毫秒数。这些值将包含运行数据库服务器和连接器的机器上的时钟之间的任何差异。
NumberOfCommittedTransactionslong已提交的已处理事务数。
SourceEventPositionMap<String, String>最后接收到的事件的坐标。
LastTransactionIdstring最后处理的事务的事务标识符。
MaxQueueSizeInByteslong队列的最大缓冲区(以字节为单位)。max.queue.size.in.bytes如果设置为正长值,则此指标可用。
CurrentQueueSizeInByteslong队列中的当前记录量(以字节为单位)。

Debezium MySQL 连接器还提供以下额外的流媒体指标:

表 26. 其他流媒体指标的描述

属性类型描述
BinlogFilenamestring连接器最近读取的 binlog 文件的名称。
BinlogPositionlong连接器已读取的 binlog 中的最新位置(以字节为单位)。
IsGtidModeEnabledboolean表示连接器当前是否正在跟踪来自 MySQL 服务器的 GTID 的标志。
GtidSetstring连接器在读取 binlog 时处理的最新 GTID 集的字符串表示形式。
NumberOfSkippedEventslongMySQL 连接器已跳过的事件数。通常,由于 MySQL 二进制日志中的格式错误或不可解析的事件,事件会被跳过。
NumberOfDisconnectslongMySQL 连接器断开连接的次数。
NumberOfRolledBackTransactionslong已回滚且未流式传输的已处理事务的数量。
NumberOfNotWellFormedTransactionslong不符合BEGIN+ COMMIT/预期协议的事务数ROLLBACK。这个值应该0在正常情况下。
NumberOfLargeTransactionslong未放入预读缓冲区的事务数。为获得最佳性能,该值应明显小于NumberOfCommittedTransactionsNumberOfRolledBackTransactions

6.3 schema history 指标

MBean

debezium.mysql:type=connector-metrics,context=schema-history,server=<mysql.server.name>

下表列出了可用的架构历史指标。

属性类型描述
Statusstring之一STOPPEDRECOVERING存储中恢复历史),RUNNING描述数据库历史的状态。
RecoveryStartTimelong恢复开始的时间(以纪元秒为单位)。
ChangesRecoveredlong在恢复阶段读取的更改数。
ChangesAppliedlong在恢复和运行时应用的架构更改总数。
MilliSecondsSinceLast​RecoveredChangelong自上次更改从历史存储中恢复以来经过的毫秒数。
MilliSecondsSinceLast​AppliedChangelong自应用上次更改以来经过的毫秒数。
LastRecoveredChangestring从历史存储中恢复的最后一次更改的字符串表示形式。
LastAppliedChangestring最后应用更改的字符串表示形式。

7. 会出现问题时的情形

Debezium 是一个分布式系统,可以捕获多个上游数据库中的所有更改;它永远不会错过或丢失事件。当系统正常运行或被仔细管理时,Debezium 提供每个更改事件记录的一次交付。

如果确实发生故障,则系统不会丢失任何事件。但是,当它从故障中恢复时,它可能会重复一些更改事件。在这些异常情况下,Debezium 和 Kafka 一样,至少提供一次更改事件的传递。

本节的其余部分描述了 Debezium 如何处理各种故障和问题。

7.1 配置和启动错误

在以下情况下,连接器尝试启动失败,在日志中报告错误或异常,并停止运行:

  • 连接器的配置无效。

  • 连接器无法使用指定的连接参数成功连接到 MySQL 服务器。

  • 连接器尝试在 binlog 中 MySQL 不再具有可用历史记录的位置重新启动。

在这些情况下,错误消息包含有关问题的详细信息以及可能的建议解决方法。更正配置或解决 MySQL 问题后,重新启动连接器。

7.2 MySQL不可用

如果您的 MySQL 服务器不可用,Debezium MySQL 连接器将失败并出现错误并且连接器停止。当服务器再次可用时,重新启动连接器。

但是,如果为高可用性 MySQL 集群启用了 GTID,您可以立即重新启动连接器。它将连接到集群中的不同 MySQL 服务器,在服务器的 binlog 中找到代表最后一个事务的位置,并开始从该特定位置读取新服务器的 binlog。

如果未启用 GTID,则连接器仅记录它所连接的 MySQL 服务器的 binlog 位置。要从正确的 binlog 位置重新启动,您必须重新连接到该特定服务器。

7.3 Kafka Connect 优雅停止

当 Kafka Connect 正常停止时,在 Debezium MySQL 连接器任务停止并在新的 Kafka Connect 进程上重新启动时会有短暂的延迟。

7.4 Kafka Connect 进程崩溃

如果 Kafka Connect 崩溃,进程会停止并且任何 Debezium MySQL 连接器任务都会终止,而不会记录它们最近处理的偏移量。在分布式模式下,Kafka Connect 会重新启动其他进程上的连接器任务。但是,MySQL 连接器从早期进程记录的最后一个偏移量恢复。这意味着替换任务可能会生成一些在崩溃之前处理的相同事件,从而创建重复事件。

每个更改事件消息都包含特定于源的信息,您可以使用这些信息来识别重复事件,例如:

  • 事件起源

  • MySQL 服务器的事件时间

  • binlog 文件名和位置

  • GTID(如果使用)

7.5 Kafka 不可用

Kafka Connect 框架使用 Kafka 生产者 API 记录 Kafka 中的 Debezium 更改事件。如果 Kafka 代理变得不可用,则 Debezium MySQL 连接器会暂停,直到连接重新建立并且连接器从中断的地方恢复。

7.6 MySQL 清除 binlog 文件

如果 Debezium MySQL 连接器停止的时间过长,MySQL 服务器会清除旧的 binlog 文件,连接器的最后位置可能会丢失。当连接器重新启动时,MySQL 服务器不再具有起始点,连接器执行另一个初始快照。如果禁用快照,则连接器将失败并出现错误。

有关 MySQL 连接器如何执行初始快照的详细信息,请参阅快照。

点击查看原文: Debezium connector for MySQL :: Debezium Documentation

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐