MySQL 的 Debezium 连接器

MySQL 有一个二进制日志(binlog),它按照提交到数据库的顺序记录所有操作。这包括对表模式的更改以及对表中数据的更改。MySQL 使用 binlog 进行复制和恢复。

Debezium MySQL 连接器读取 binlog,为行级INSERT、、UPDATEDELETE操作生成更改事件,并将更改事件发送到 Kafka 主题。客户端应用程序读取这些 Kafka 主题。

由于 MySQL 通常设置为在指定时间段后清除 binlog,因此 MySQL 连接器会对您的每个数据库执行初始一致快照。MySQL 连接器从创建快照的位置读取 binlog。

有关与此连接器兼容的 MySQL 数据库版本的信息,请参阅Debezium 版本概述

连接器的工作原理

连接器支持的 MySQL 拓扑的概述对于规划您的应用程序很有用。为了优化配置和运行 Debezium MySQL 连接器,了解连接器如何跟踪表结构、公开模式更改、执行快照以及确定 Kafka 主题名称会很有帮助。

Debezium MySQL 连接器尚未在 MariaDB 上进行测试,但来自社区的多份报告表明该连接器已成功用于该数据库。计划在未来的 Debezium 版本中提供对 MariaDB 的官方支持。

支持的 MySQL 拓扑

Debezium MySQL 连接器支持以下 MySQL 拓扑:

  • 独立

    当使用单个 MySQL 服务器时,服务器必须启用 binlog(并且可选地启用 GTID),以便 Debezium MySQL 连接器可以监控服务器。这通常是可以接受的,因为二进制日志也可以用作增量备份。在这种情况下,MySQL 连接器始终连接并跟随这个独立的 MySQL 服务器实例。

  • 主副本和副本

    Debezium MySQL 连接器可以跟随主服务器之一或副本之一(如果该副本启用了其 binlog),但连接器仅看到该服务器可见的集群中的更改。通常,除了多主拓扑之外,这不是问题。连接器将其位置记录在服务器的 binlog 中,这在集群中的每台服务器上都是不同的。因此,连接器必须只跟随一个 MySQL 服务器实例。如果该服务器出现故障,则必须重新启动或恢复该服务器,然后连接器才能继续。

  • 高可用集群

    MySQL 存在多种高可用性解决方案,它们使容忍问题和故障并几乎立即从问题和故障中恢复变得更加容易。大多数 HA MySQL 集群使用 GTID,以便副本能够跟踪任何主服务器上的所有更改。

  • 多主

    网络数据库 (NDB) 集群复制使用一个或多个 MySQL 副本节点,每个节点从多个主服务器复制。这是聚合多个 MySQL 集群的复制的强大方法。此拓扑需要使用 GTID。Debezium MySQL 连接器可以使用这些多主 MySQL 副本作为源,并且只要新副本赶上旧副本,就可以故障转移到不同的多主 MySQL 副本。也就是说,新副本具有在第一个副本上看到的所有事务。即使连接器仅使用数据库和/或表的一个子集,这也有效,因为可以将连接器配置为在尝试重新连接到新的多主 MySQL 副本并在二进制日志。

  • 托管

    支持 Debezium MySQL 连接器以使用托管选项,例如 Amazon RDS 和 Amazon Aurora。因为这些托管选项不允许全局读锁,所以使用表级锁来创建一致快照

架构历史主题

当数据库客户端查询数据库时,客户端使用数据库的当前模式。但是,数据库架构可以随时更改,这意味着连接器必须能够识别每次插入、更新或删除操作被记录时的架构。此外,连接器不能只使用当前模式,因为连接器可能正在处理在更改表模式之前记录的相对较旧的事件。

为了确保正确处理架构更改后发生的更改,MySQL 在 binlog 中不仅包括对数据的行级更改,还包括应用于数据库的 DDL 语句。当连接器读取 binlog 并遇到这些 DDL 语句时,它会解析它们并更新每个表模式的内存表示。连接器使用此模式表示来识别每次插入、更新或删除操作时的表结构,并产生适当的更改事件。在单独的数据库历史 Kafka 主题中,连接器记录所有 DDL 语句以及每个 DDL 语句出现在 binlog 中的位置。

当连接器在崩溃或优雅停止后重新启动时,连接器会从特定位置,即从特定时间点开始读取 binlog。连接器通过读取数据库历史 Kafka 主题并解析所有 DDL 语句,直到连接器启动的二进制日志中的点,来重建此时存在的表结构。

此数据库历史主题仅供连接器使用。连接器可以选择将模式更改事件发送到针对消费者应用程序的不同主题

当 MySQL 连接器捕获应用了架构更改工具(例如gh-ost或)的表中的更改pt-online-schema-change时,会在迁移过程中创建辅助表。需要配置连接器以捕获对这些帮助表的更改。如果消费者不需要为帮助表生成的记录,则可以应用单个消息转换将它们过滤掉。

查看接收 Debezium 事件记录的主题的默认名称。

架构更改主题

您可以配置 Debezium MySQL 连接器以生成模式更改事件,这些事件描述应用于数据库中捕获的表的模式更改。连接器将架构更改事件写入名为 的 Kafka 主题*<serverName>*,其中是连接器配置属性*serverName*中指定的逻辑服务器名称。database.server.name连接器发送到架构更改主题的消息包含有效负载,并且(可选)还包含更改事件消息的架构。

架构更改事件消息的有效负载包括以下元素:

  • ddl

    提供导致架构更改的 SQL CREATEALTER或语句。DROP

  • databaseName

    应用 DDL 语句的数据库的名称。的值databaseName用作消息键。

  • pos

    语句出现在 binlog 中的位置。

  • tableChanges

    架构更改后整个表架构的结构化表示。该tableChanges字段包含一个数组,其中包含表中每一列的条目。由于结构化表示以 JSON 或 Avro 格式呈现数据,因此消费者可以轻松读取消息,而无需先通过 DDL 解析器对其进行处理。

对于处于捕获模式的表,连接器不仅将模式更改的历史记录存储在模式更改主题中,还会存储在内部数据库历史记录主题中。内部数据库历史主题仅供连接器使用,不适合消费应用程序直接使用。确保需要有关架构更改通知的应用程序仅使用来自架构更改主题的信息。
永远不要对数据库历史主题进行分区。要使数据库历史主题正确运行,它必须保持连接器向其发出的事件记录的一致的全局顺序。为确保主题不会在分区之间拆分,请使用以下方法之一设置主题的分区计数:如果您手动创建数据库历史主题,请将分区计数指定为1.如果您使用 Apache Kafka 代理自动创建数据库历史主题,则会创建主题,请将Kafkanum.partitions配置选项的值设置为1.
连接器向其模式更改主题发出的消息格式处于孵化状态,如有更改,恕不另行通知。

示例:发送到 MySQL 连接器架构更改主题的消息

以下示例显示了 JSON 格式的典型架构更改消息。该消息包含表模式的逻辑表示。

{
  "schema": {
  ...
  },
  "payload": {
        "source": {  // (1)
        "version": "1.9.5.Final",
        "connector": "mysql",
        "name": "dbserver1",
        "ts_ms": 0,
        "snapshot": "false",
        "db": "inventory",
        "sequence": null,
        "table": "customers",
        "server_id": 0,
        "gtid": null,
        "file": "mysql-bin.000003",
        "pos": 219,
        "row": 0,
        "thread": null,
        "query": null
    },
    "databaseName": "inventory", // (2)
    "schemaName": null,
    "ddl": "ALTER TABLE customers ADD COLUMN middle_name VARCHAR(2000)", // (3)
    "tableChanges": [ // (4)
        {
        "type": "ALTER", // (5)
        "id": "\"inventory\".\"customers\"",  // (6)
        "table": { // (7)
            "defaultCharsetName": "latin1",
            "primaryKeyColumnNames": [  // (8)
                "id"
            ],
            "columns": [ // (9)
                {
                "name": "id",
                "jdbcType": 4,
                "nativeType": null,
                "typeName": "INT",
                "typeExpression": "INT",
                "charsetName": null,
                "length": 11,
                "scale": null,
                "position": 1,
                "optional": false,
                "autoIncremented": true,
                "generated": true
            },
            {
                "name": "first_name",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "latin1",
                "length": 255,
                "scale": null,
                "position": 2,
                "optional": false,
                "autoIncremented": false,
                "generated": false
            },                        {
                "name": "last_name",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "latin1",
                "length": 255,
                "scale": null,
                "position": 3,
                "optional": false,
                "autoIncremented": false,
                "generated": false
            },
            {
                "name": "email",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "latin1",
                "length": 255,
                "scale": null,
                "position": 4,
                "optional": false,
                "autoIncremented": false,
                "generated": false
            },
            {
                "name": "middle_name",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "latin1",
                "length": 2000,
                "scale": null,
                "position": 5,
                "optional": true,
                "autoIncremented": false,
                "generated": false
            }
          ]
        }
      }
    ]
  },
  "payload": {
    "databaseName": "inventory",
    "ddl": "CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512), weight FLOAT ); ALTER TABLE products AUTO_INCREMENT = 101;",
    "source" : {
      "version": "1.9.5.Final",
      "name": "mysql-server-1",
      "server_id": 0,
      "ts_ms": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "snapshot": true,
      "thread": null,
      "db": null,
      "table": null,
      "query": null
    }
  }
}
物品字段名称描述
1sourcesource字段的结构与连接器写入特定于表的主题的标准数据更改事件完全相同。此字段可用于关联不同主题的事件。
2databaseName schemaName标识包含更改的数据库和架构。该databaseName字段的值用作记录的消息键。
3ddl此字段包含负责架构更改的 DDL。该ddl字段可以包含多个 DDL 语句。每个语句都适用于数据库中的databaseName字段。多个 DDL 语句按照它们应用于数据库的顺序出现。 客户端可以提交多个适用于多个数据库的 DDL 语句。如果 MySQL 以原子方式应用它们,则连接器按顺序获取 DDL 语句,按数据库对它们进行分组,并为每个组创建一个模式更改事件。如果 MySQL 单独应用它们,连接器会为每个语句创建一个单独的模式更改事件。
4tableChanges包含由 DDL 命令生成的架构更改的一个或多个项目的数组。
5type描述变化的种类。该值为以下之一:CREATE表已创建。ALTER表已修改。DROP表已删除。
6id创建、更改或删除的表的完整标识符。在表重命名的情况下,此标识符是表名的串联。*<old>*,*<new>*
7table表示应用更改后的表元数据。
8primaryKeyColumnNames组成表的主键的列的列表。
9columns已更改表中每一列的元数据。

另请参阅:模式历史主题

快照

首次启动 Debezium MySQL 连接器时,它会执行数据库的初始一致快照。以下流程描述了连接器如何创建此快照。此流程适用于默认快照模式,即initial. 有关其他快照模式的信息,请参阅MySQL 连接器snapshot.mode配置属性

行动
1获取阻止其他数据库客户端写入的全局读锁。 快照本身不会阻止其他客户端应用可能会干扰连接器尝试读取 binlog 位置和表模式的 DDL。连接器在读取 binlog 位置时保持全局读锁,并如后面的步骤所述释放锁。
2启动具有可重复读取语义的事务,以确保事务中的所有后续读取都针对一致的快照完成。
3读取当前的 binlog 位置。
4读取连接器配置为捕获更改的数据库和表的架构。
5释放全局读锁。其他数据库客户端现在可以写入数据库。
6如果适用,将 DDL 更改写入架构更改主题,包括所有必要DROP…CREATE…DDL 语句。
7扫描数据库表。对于每一行,连接器将CREATE事件发送到相关的特定于表的 Kafka 主题。
8提交事务。
9在连接器偏移中记录完成的快照。
  • 连接器重新启动

    如果连接器在执行初始快照时发生故障、停止或重新平衡,则在连接器重新启动后,它会执行新的快照。在初始快照完成后,Debezium MySQL 连接器从 binlog 中的相同位置重新启动,因此它不会错过任何更新。如果连接器停止的时间足够长,MySQL 可能会清除旧的二进制日志文件,连接器的位置就会丢失。如果位置丢失,连接器将恢复为其起始位置的*初始快照。*有关对 Debezium MySQL 连接器进行故障排除的更多提示,请参阅出现问题时的行为

  • 不允许全局读锁

    某些环境不允许全局读锁。如果 Debezium MySQL 连接器检测到不允许全局读锁,则连接器使用表级锁代替并使用此方法执行快照。这要求 Debezium 连接器的数据库用户具有LOCK TABLES权限。表 3. 使用表级锁执行初始快照的工作流程步行动1获取表级锁。2启动具有可重复读取语义的事务,以确保事务中的所有后续读取都针对一致的快照完成。3读取和过滤数据库和表的名称。4读取当前的 binlog 位置。5读取连接器配置为捕获更改的数据库和表的架构。6如果适用,将 DDL 更改写入架构更改主题,包括所有必要DROP…CREATE…DDL 语句。7扫描数据库表。对于每一行,连接器将CREATE事件发送到相关的特定于表的 Kafka 主题。8提交事务。9释放表级锁。10在连接器偏移中记录完成的快照。

即席快照

默认情况下,连接器仅在首次启动后才运行初始快照操作。在这个初始快照之后,在正常情况下,连接器不会重复快照过程。连接器捕获的任何未来更改事件数据仅通过流式处理进入。

但是,在某些情况下,连接器在初始快照期间获得的数据可能会变得陈旧、丢失或不完整。为了提供一种重新捕获表数据的机制,Debezium 包含一个执行临时快照的选项。数据库中的以下更改可能会导致执行临时快照:

  • 修改连接器配置以捕获一组不同的表。
  • Kafka 主题被删除,必须重建。
  • 由于配置错误或其他问题而发生数据损坏。

您可以通过启动所谓的ad-hoc 快照为之前捕获快照的表重新运行快照。即席快照需要使用信令表。您可以通过向 Debezium 信号表发送信号请求来启动临时快照。

当您启动现有表的临时快照时,连接器会将内容附加到表已存在的主题中。如果删除了以前存在的主题,如果启用了自动主题创建,Debezium 可以自动创建主题。

即席快照信号指定要包含在快照中的表。快照可以捕获数据库的全部内容,或仅捕获数据库中表的子集。

execute-snapshot您可以通过向信令表发送消息来指定要捕获的表。将execute-snapshot信号的类型设置为incremental,并提供要包含在快照中的表的名称,如下表所述:

场地默认价值
typeincremental指定要运行的快照类型。 设置类型是可选的。目前,您只能请求incremental快照。
data-collections不适用一个数组,其中包含要生成快照的表的完全限定名称。名称的格式与配置选项 的格式相同。signal.data.collection

触发临时快照

execute-snapshot您可以通过将具有信号类型的条目添加到信令表来启动临时快照。连接器处理完消息后,将开始快照操作。快照进程读取第一个和最后一个主键值,并将这些值用作每个表的起点和终点。根据表中的条目数和配置的块大小,Debezium 将表划分为块,并继续对每个块进行快照,一次一个。

目前,execute-snapshot操作类型仅触发增量快照。有关详细信息,请参阅增量快照

增量快照

为了提供管理快照的灵活性,Debezium 包含一个补充快照机制,称为增量快照。增量快照依靠 Debezium 机制向 Debezium 连接器发送信号。增量快照基于DDD-3设计文档。

在增量快照中,Debezium 不是像在初始快照中那样一次捕获数据库的完整状态,而是在一系列可配置的块中分阶段捕获每个表。您可以指定您希望快照捕获的表和每个块的大小。块大小决定了快照在数据库上的每次提取操作期间收集的行数。增量快照的默认块大小为 1 KB。

随着增量快照的进行,Debezium 使用水印来跟踪其进度,维护它捕获的每个表行的记录。与标准初始快照过程相比,这种分阶段捕获数据的方法具有以下优势:

  • 您可以在流式数据捕获的同时运行增量快照,而不是将流式传输推迟到快照完成。连接器在整个快照过程中继续从更改日志中捕获近乎实时的事件,并且两个操作都不会阻塞另一个操作。
  • 如果增量快照的进度中断,您可以恢复它而不会丢失任何数据。进程恢复后,快照从它停止的点开始,而不是从头重新捕获表。
  • 您可以随时按需运行增量快照,并根据需要重复该过程以适应数据库更新。例如,您可以在修改连接器配置以将表添加到其table.include.list属性后重新运行快照。

增量快照过程

当您运行增量快照时,Debezium 按主键对每个表进行排序,然后根据配置的块大小将表拆分为块。逐块工作,然后捕获块中的每个表行。对于它捕获的每一行,快照都会发出一个READ事件。该事件表示块的快照开始时行的值。

随着快照的进行,其他进程可能会继续访问数据库,可能会修改表记录。为反映此类更改,INSERTUPDATEDELETE操作将照常提交到事务日志。同样,正在进行的 Debezium 流式处理继续检测这些更改事件并将相应的更改事件记录发送到 Kafka。

Debezium 如何解决具有相同主键的记录之间的冲突

在某些情况下,流式处理发出的UPDATEDELETE事件被乱序接收。也就是说,流式处理可能会在快照捕获包含该行的READ事件的块之前发出一个修改表行的事件。当快照最终为该行发出相应的READ事件时,它的值已经被取代。为了确保以正确的逻辑顺序处理乱序到达的增量快照事件,Debezium 采用了一种缓冲方案来解决冲突。只有在解决了快照事件和流事件之间的冲突后,Debezium 才会向 Kafka 发出事件记录。

快照窗口

为了帮助解决延迟到达事件和修改同一表行的流事件之间的冲突READ,Debezium 采用了所谓的快照窗口。快照窗口划分了增量快照捕获指定表块数据的时间间隔。在一个块的快照窗口打开之前,Debezium 遵循其通常的行为并从事务日志直接向下游发送事件到目标 Kafka 主题。但是从特定块的快照打开的那一刻起,直到它关闭,Debezium 执行重复数据删除步骤以解决具有相同主键的事件之间的冲突。

对于每个数据集合,Debezium 发出两种类型的事件,并将它们的记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为READ操作发出。同时,随着用户不断更新数据集合中的记录,事务日志也更新以反映每次提交,Debezium 会针对每次更改发出UPDATE或操作。DELETE

当快照窗口打开时,Debezium 开始处理快照块,它将快照记录传递到内存缓冲区。在快照窗口期间,READ缓冲区中事件的主键与传入流事件的主键进行比较。如果未找到匹配项,则将流式事件记录直接发送到 Kafka。如果 Debezium 检测到匹配,它会丢弃缓冲的READ事件,并将流式记录写入目标主题,因为流式事件在逻辑上取代了静态快照事件。块的快照窗口关闭后,缓冲区仅包含READ不存在相关事务日志事件的事件。Debezium 将这些剩余READ事件发送到表的 Kafka 主题。

连接器对每个快照块重复该过程。

触发增量快照

目前,启动增量快照的唯一方法是将临时快照信号发送到源数据库上的信令表。INSERT您将信号作为 SQL查询提交给表。Debezium 检测到信号表中的变化后,它会读取信号,并运行请求的快照操作。

您提交的查询指定要包含在快照中的表,并且可以选择指定快照操作的类型。目前,快照操作的唯一有效选项是默认值incremental.

要指定要包含在快照中的表,请提供一个data-collections列出这些表的数组,例如,
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

增量快照信号的data-collections数组没有默认值。如果data-collections数组为空,Debezium 检测到不需要任何操作并且不执行快照。

如果要包含在快照中的表.的名称在数据库、模式或表的名称中包含点 (),则要将表添加到data-collections数组中,您必须用双引号对名称的每个部分进行转义. 例如,要包含存在于**public**架构中且名称为 的表**My.Table**,请使用以下格式:**"public"."My.Table"**.

先决条件

程序

  1. 发送 SQL 查询以将临时增量快照请求添加到信令表:

    INSERT INTO _<signalTable>_ (id, type, data) VALUES (_'<id>'_, _'<snapshotType>'_, '{"data-collections": ["_<tableName>_","_<tableName>_"],"type":"_<snapshotType>_"}');
    

    例如,

    INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.table1", "schema2.table2"],"type":"incremental"}');
    

    id命令中的、type和参数的值data对应于信令表的字段

    下表描述了这些参数:

    价值描述
    myschema.debezium_signal指定源数据库上的信令表的完全限定名称
    ad-hoc-1id参数指定一个任意字符串,该字符串被分配为id信号请求的标识符。 使用此字符串将日志消息标识到信令表中的条目。Debezium 不使用此字符串。相反,在快照期间,Debezium 会生成自己的id字符串作为水印信号。
    execute-snapshot指定type参数指定信号要触发的操作。
    data-collections信号字段的必需组件,data它指定要包含在快照中的表名数组。 该数组按表的完全限定名称列出表,使用的格式与您在signal.data.collection配置属性中指定连接器信号表的名称时使用的格式相同。
    incremental信号字段的可选type组件data,指定要运行的快照操作的种类。 目前,唯一有效的选项是默认值incremental. 在您提交给信令表的 SQL 查询中指定一个type值是可选的。 如果您未指定值,则连接器将运行增量快照。

以下示例显示了连接器捕获的增量快照事件的 JSON。

示例:增量快照事件消息

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 
    },
    "op":"r", 
    "ts_ms":"1620393591654",
    "transaction":null
}
物品字段名称描述
1snapshot指定要运行的快照操作的类型。 目前,唯一有效的选项是默认值incremental. 在您提交给信令表的 SQL 查询中指定一个type值是可选的。 如果您未指定值,则连接器将运行增量快照。
2op指定事件类型。 快照事件的值为r,表示READ操作。
只读增量快照

MySQL 连接器允许使用与数据库的只读连接来运行增量快照。要运行具有只读访问权限的增量快照,连接器使用已执行的全局事务 ID (GTID) 设置为高水位线和低水位线。通过将二进制日志 (binlog) 事件的 GTID 或服务器的心跳与低水位线和高水位线进行比较来更新块窗口的状态。

要切换到只读实现,请将read.only属性的值设置为true

先决条件

  • 启用 MySQL GTID
  • 如果连接器从多线程副本(即,值replica_parallel_workers大于的副本0)读取,则必须设置以下选项之一:
    • replica_preserve_commit_order=ON
    • slave_preserve_commit_order=ON
即席只读增量快照

当 MySQL 连接为只读时,信令表机制还可以通过向signal.kafka.topic属性中指定的 Kafka 主题发送消息来运行快照。

Kafka 消息的键必须与database.server.name连接器配置选项的值匹配。

该值是一个带有typedata字段的 JSON 对象。

信号类型是execute-snapshotdata字段必须有以下字段:

场地默认价值
typeincremental要执行的快照的类型。目前仅incremental支持。 有关详细信息,请参阅下一节。
data-collections不适用要快照的表的限定名称数组。名称的格式与signal.data.collection配置选项 的格式相同。

执行快照 Kafka 消息的示例:

键 = `test_connector`

值 = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

快照事件的操作类型

READMySQL 连接器将快照事件作为操作发出("op" : "r")。如果您希望连接器将快照事件作为CREATE( c) 事件发出,请配置 DebeziumReadToInsertEvent单消息转换 (SMT) 以修改事件类型。

以下示例显示了如何配置 SMT:

示例:使用ReadToInsertEventSMT 更改快照事件的类型

转换=快照插入,...
transforms.snapshotasinsert.type=io.debezium.connector.mysql.transforms.ReadToInsertEvent

主题名称

默认情况下,MySQL 连接器将表中发生的所有 、 和 操作的更改事件写入INSERT特定UPDATEDELETE该表的单个 Apache Kafka 主题。

连接器使用以下约定来命名更改事件主题:

serverName.databaseName.tableName

假设fulfillment是服务器名称,inventory是数据库名称,并且数据库包含名为orderscustomers和的表products。Debezium MySQL 连接器向三个 Kafka 主题发出事件,每个主题对应一个数据库中的表:

履行.库存.订单
履行.库存.客户
履行.库存.产品

以下列表提供了默认名称组件的定义:

  • 服务器名称

    database.server.name由连接器配置属性指定的服务器的逻辑名称。

  • 模式名称

    发生操作的模式的名称。

  • tableName

    发生操作的表的名称。

连接器应用类似的命名约定来标记其内部数据库历史主题、模式更改主题事务元数据主题

如果默认主题名称不符合您的要求,您可以配置自定义主题名称。要配置自定义主题名称,请在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 自定义主题命名的更多信息,请参阅主题路由

交易元数据

Debezium 可以生成表示事务边界和丰富数据更改事件消息的事件。

Debezium 接收交易元数据的时间限制Debezium 仅注册和接收部署连接器后发生的事务的元数据。部署连接器之前发生的事务的元数据不可用。

BEGINDebezium 为每个事务中的和END分隔符生成事务边界事件。事务边界事件包含以下字段:

  • status

    BEGINEND

  • id

    唯一交易标识符的字符串表示形式。

  • event_count(用于END活动)

    事务发出的事件总数。

  • data_collections(用于END活动)

    一对data_collectionevent_count元素的数组。表示连接器针对源自数据集合的更改发出的事件数。

例子

{
  "status": "BEGIN",
  "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10",
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10",
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "s1.a",
      "event_count": 1
    },
    {
      "data_collection": "s2.a",
      "event_count": 1
    }
  ]
}

除非通过transaction.topic选项覆盖,否则连接器会向主题发出事务事件。**.transaction

更改数据事件丰富

启用事务元数据后,数据消息Envelope会增加一个新transaction字段。此字段以字段组合的形式提供有关每个事件的信息:

  • id- 唯一交易标识符的字符串表示
  • total_order- 事件在事务产生的所有事件中的绝对位置
  • data_collection_order- 事件在事务发出的所有事件中的每个数据收集位置

以下是消息的示例:

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "transaction": {
    "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10",
    "total_order": "1",
    "data_collection_order": "1"
  }
}

对于没有启用 GTID 的系统,事务标识符是使用 binlog 文件名和 binlog 位置的组合构建的。例如,如果事务 BEGIN 事件对应的 binlog 文件名和位置分别为 mysql-bin.000002 和 1913,则 Debezium 构造的事务标识符将为file=mysql-bin.000002,pos=1913.

数据更改事件

Debezium MySQL 连接器为每个行级、、和操作生成数据INSERT更改UPDATE事件DELETE。每个事件都包含一个键和一个值。键和值的结构取决于已更改的表。

Debezium 和 Kafka Connect 是围绕连续的事件消息流设计的。但是,这些事件的结构可能会随着时间的推移而发生变化,这对于消费者来说可能难以处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果您使用的是模式注册表,消费者可以使用它从注册表中获取模式的模式 ID。这使得每个事件都是独立的。

以下骨架 JSON 显示了更改事件的基本四个部分。但是,如何配置您选择在应用程序中使用的 Kafka Connect 转换器决定了这四个部分在更改事件中的表示。仅当您将转换器配置为生成该字段时,该schema字段才处于更改事件中。同样,仅当您将转换器配置为生成它时,事件键和事件有效负载才在更改事件中。如果您使用 JSON 转换器并将其配置为生成所有四个基本更改事件部分,则更改事件具有以下结构:

{
 "schema": { 
   ...
  },
 "payload": { 
   ...
 },
 "schema": { 
   ...
 },
 "payload": { 
   ...
 },
}
物品字段名称描述
1schema第一个schema字段是事件键的一部分。它指定了一个 Kafka Connect 模式,该模式描述了事件键payload部分中的内容。换句话说,第一个schema字段描述了主键的结构,如果表没有主键,则描述唯一键的结构,用于已更改的表。 可以通过设置message.key.columns连接器配置属性来覆盖表的主键。在这种情况下,第一个模式字段描述了由该属性标识的键的结构。
2payload第一个payload字段是事件键的一部分。它具有前一个字段描述的结构,schema并且包含已更改行的键。
3schema第二个schema字段是事件值的一部分。它指定了描述事件值payload部分内容的 Kafka Connect 模式。换句话说,第二个schema描述了被改变的行的结构。通常,此模式包含嵌套模式。
4payload第二个payload字段是事件值的一部分。它具有前一个字段描述的结构,schema并且包含已更改行的实际数据。

默认情况下,连接器将事件记录流更改为名称与事件源表相同的主题。请参阅主题名称

MySQL 连接器确保所有 Kafka Connect 模式名称都遵循Avro 模式名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头,即 az、AZ 或 _。逻辑服务器名称中的每个剩余字符以及数据库和表名称中的每个字符都必须是拉丁字母、数字或下划线,即 az、AZ、0-9 或 _。如果存在无效字符,则将其替换为下划线字符。如果逻辑服务器名称、数据库名称或表名称包含无效字符,并且用于区分名称的唯一字符无效并因此替换为下划线,这可能会导致意外冲突。

更改事件键

更改事件的键包含更改表键的模式和更改行的实际键。PRIMARY KEY在连接器创建事件时,模式及其相应的有效负载都包含已更改表(或唯一约束)中每一列的字段。

请考虑下customers表,其后是此表的更改事件键的示例。

CREATE TABLE customers (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;

捕获对表的更改的每个更改事件customers都具有相同的事件键架构。只要customers表具有先前的定义,捕获customers表更改的每个更改事件都具有以下关键结构。在 JSON 中,它看起来像这样:

{
 "schema": { 
    "type": "struct",
    "name": "mysql-server-1.inventory.customers.Key", 
    "optional": false, 
    "fields": [ 
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
 "payload": { 
    "id": 1001
  }
}
物品字段名称描述
1schema键的模式部分指定了描述键payload部分内容的 Kafka Connect 模式。
2mysql-server-1.inventory.customers.Key定义密钥有效负载结构的架构名称。此架构描述了已更改表的主键结构。键架构名称的格式为connector-name数据库名称表名Key. 在这个例子中: mysql-server-1是生成此事件的连接器的名称。 inventory是包含已更改表的数据库。 customers是更新的表。
3optionalpayload指示事件键是否必须在其字段中包含值。在此示例中,密钥的有效负载中的值是必需的。当表没有主键时,键的有效负载字段中的值是可选的。
4fields指定 中预期的payload每个字段,包括每个字段的名称、类型以及是否需要。
5payload包含为其生成此更改事件的行的键。在此示例中,键包含一个id值为 的字段1001

更改事件值

更改事件中的值比键复杂一些。与键一样,值也有一个schema部分和一个payload部分。该schema部分包含描述该部分Envelope结构的架构payload,包括其嵌套字段。创建、更新或删除数据的操作的更改事件都有一个带有信封结构的值负载。

考虑用于显示更改事件键示例的相同示例表:

CREATE TABLE customers (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;

对此表的更改的更改事件的值部分描述为:

创建事件

以下示例显示了连接器为在customers表中创建数据的操作生成的更改事件的值部分:

{
  "schema": { 
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value", 
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source", 
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "mysql-server-1.inventory.customers.Envelope" 
  },
  "payload": { 
    "op": "c", 
    "ts_ms": 1465491411815, 
    "before": null, 
    "after": { 
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": { 
      "version": "1.9.5.Final",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 0,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "thread": 7,
      "query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"
    }
  }
}
物品字段名称描述
1schema值的架构,它描述了值的有效负载的结构。在连接器为特定表生成的每个更改事件中,更改事件的值架构都是相同的。
2name在该schema部分中,每个name字段都为值的有效负载中的字段指定架构。 mysql-server-1.inventory.customers.Value是有效负载beforeafter字段的架构。此架构特定于customers表。 模式名称beforeafter字段的格式为,这确保模式名称在数据库中是唯一的。这意味着当使用Avro 转换器时,每个逻辑源中每个表的生成 Avro 模式都有自己的演变和历史。*logicalName*.*tableName*.Value
3nameio.debezium.connector.mysql.Source是有效负载source字段的架构。此模式特定于 MySQL 连接器。连接器将它用于它生成的所有事件。
4namemysql-server-1.inventory.customers.Envelope是有效负载整体结构的架构,其中mysql-server-1是连接器名称,inventory是数据库,customers是表。
5payload该值的实际数据。这是更改事件提供的信息。 看起来事件的 JSON 表示比它们描述的行大得多。这是因为 JSON 表示必须包括消息的模式和有效负载部分。但是,通过使用Avro 转换器,您可以显着减小连接器流式传输到 Kafka 主题的消息的大小。
6op强制字符串,描述导致连接器生成事件的操作类型。在本例中,c表示操作创建了一行。有效值为:c= 创建u= 更新d= 删除r= 读取(仅适用于快照)
7ts_ms显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 在source对象中,ts_ms指示在数据库中进行更改的时间。通过比较 forpayload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和 Debezium 之间的延迟。
8before一个可选字段,指定事件发生之前行的状态。当该op字段c用于创建时,如本例所示,该before字段是null因为此更改事件用于新内容。
9after一个可选字段,指定事件发生后行的状态。在此示例中,该after字段包含新行的idfirst_namelast_nameemail列的值。
10source描述事件源元数据的必填字段。此字段包含可用于将此事件与其他事件进行比较的信息,包括事件的来源、事件发生的顺序以及事件是否属于同一事务的一部分。源元数据包括:Debezium 版本连接器名称记录事件的 binlog 名称二进制日志位置活动中的行如果事件是快照的一部分包含新行的数据库和表的名称创建事件的 MySQL 线程的 ID(仅限非快照)MySQL 服务器 ID(如果可用)在数据库中进行更改时的时间戳如果启用了binlog_rows_query_log_eventsMySQL 配置选项并启用了连接器配置include.query属性,则该source字段还提供该query字段,该字段包含导致更改事件的原始 SQL 语句。

更新事件

示例表中更新的更改事件的值与该表的创建customers事件具有相同的模式。同样,事件值的有效负载具有相同的结构。但是,事件值有效负载在更新事件中包含不同的值。以下是连接器为表中的更新生成的事件中的更改事件值示例:customers

{
  "schema": { ... },
  "payload": {
    "before": { 
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": { 
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": { 
      "version": "1.9.5.Final",
      "name": "mysql-server-1",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 1465581029100,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 484,
      "row": 0,
      "thread": 7,
      "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
    },
    "op": "u", 
    "ts_ms": 1465581029523 
  }
}
物品字段名称描述
1before一个可选字段,指定事件发生之前行的状态。在更新事件值中,该before字段包含每个表列的字段以及数据库提交之前该列中的值。在本例中,first_name值为Anne.
2after一个可选字段,指定事件发生后行的状态。您可以比较beforeafter结构来确定对此行的更新是什么。在示例中,first_name值为 now Anne Marie
3source描述事件源元数据的必填字段。字段结构与创建事件中的source字段相同,但有些值不同,例如示例更新事件来自binlog中的不同位置。源元数据包括:Debezium 版本连接器名称记录事件的 binlog 名称二进制日志位置活动中的行如果事件是快照的一部分包含更新行的数据库和表的名称创建事件的 MySQL 线程的 ID(仅限非快照)MySQL 服务器 ID(如果可用)在数据库中进行更改时的时间戳如果启用了binlog_rows_query_log_eventsMySQL 配置选项并启用了连接器配置include.query属性,则该source字段还提供该query字段,该字段包含导致更改事件的原始 SQL 语句。
4op描述操作类型的强制字符串。在更新事件值中,op字段值为u,表示该行因更新而更改。
5ts_ms显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 在source对象中,ts_ms指示在数据库中进行更改的时间。通过比较 forpayload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和 Debezium 之间的延迟。
更新行的主键/唯一键的列会更改行键的值。当一个键发生变化时,Debezium 输出三个事件:一个DELETE事件和一个墓碑事件,该事件具有该行的旧键,然后是一个具有该行的新键的事件。详细信息在下一节中。

主键更新

更改行的主键字段的UPDATE操作称为主键更改。对于主键更改,代替UPDATE事件记录,连接器会发出DELETE旧键的CREATE事件记录和新(更新的)键的事件记录。这些事件具有通常的结构和内容,此外,每个事件都有一个与主键更改相关的消息头:

  • 事件DELETE记录具有__debezium.newkey消息头。此标头的值是更新行的新主键。
  • 事件CREATE记录具有__debezium.oldkey消息头。此标头的值是更新行所具有的先前(旧)主键。

删除事件

删除更改事件中的值与同一表的创建更新schema事件具有相同的部分。示例表的删除事件中的部分如下所示:payload``customers

{
  "schema": { ... },
  "payload": {
    "before": { 
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": null, 
    "source": { 
      "version": "1.9.5.Final",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 1465581902300,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 805,
      "row": 0,
      "thread": 7,
      "query": "DELETE FROM customers WHERE id=1004"
    },
    "op": "d", 
    "ts_ms": 1465581902461 
  }
}
物品字段名称描述
1before可选字段,指定事件发生前行的状态。在删除事件值中,该before字段包含在使用数据库提交删除之前该行中的值。
2after可选字段,指定事件发生后行的状态。在删除事件值中,after字段为null,表示该行不再存在。
3source描述事件源元数据的必填字段。在删除事件值中,source字段结构与同一表的创建更新事件相同。许多source字段值也相同。在删除事件值中,ts_mspos字段值以及其他值可能已更改。但是删除事件值source中的字段提供了相同的元数据:Debezium 版本连接器名称记录事件的 binlog 名称二进制日志位置活动中的行如果事件是快照的一部分包含更新行的数据库和表的名称创建事件的 MySQL 线程的 ID(仅限非快照)MySQL 服务器 ID(如果可用)在数据库中进行更改时的时间戳如果启用了binlog_rows_query_log_eventsMySQL 配置选项并启用了连接器配置include.query属性,则该source字段还提供该query字段,该字段包含导致更改事件的原始 SQL 语句。
4op描述操作类型的强制字符串。op字段值为d,表示该行已被删除。
5ts_ms显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 在source对象中,ts_ms指示在数据库中进行更改的时间。通过比较 forpayload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和 Debezium 之间的延迟。

删除更改事件记录为消费者提供了处理删除该行所需的信息。包含旧值是因为某些消费者可能需要它们才能正确处理删除。

MySQL 连接器事件旨在与Kafka 日志压缩一起使用。只要至少保留每个键的最新消息,日志压缩就可以删除一些较旧的消息。这让 Kafka 可以回收存储空间,同时确保主题包含完整的数据集并可用于重新加载基于键的状态。

墓碑事件

当删除一行时,删除事件值仍然适用于日志压缩,因为 Kafka 可以删除所有具有相同键的早期消息。但是,要让 Kafka 删除具有相同键的所有消息,消息值必须是null. 为了实现这一点,在 Debezium 的 MySQL 连接器发出删除事件后,连接器会发出一个特殊的墓碑事件,该事件具有相同的键但有一个null值。

数据类型映射

Debezium MySQL 连接器表示对行的更改,事件的结构类似于行所在的表。该事件包含每个列值的字段。该列的 MySQL 数据类型决定了 Debezium 如何表示事件中的值。

存储字符串的列在 MySQL 中使用字符集和排序规则定义。MySQL 连接器在读取 binlog 事件中列值的二进制表示时使用列的字符集。

连接器可以将 MySQL 数据类型映射到文字语义类型。

  • 文字类型:如何使用 Kafka Connect 模式类型表示值。
  • 语义类型:Kafka Connect 模式如何捕获字段(模式名称)的含义。

如果默认数据类型转换不能满足您的需求,您可以为连接器创建自定义转换器。

基本类型

下表显示了连接器如何映射基本 MySQL 数据类型。

MySQL 类型文字类型语义类型
BOOLEAN, BOOLBOOLEAN不适用
BIT(1)BOOLEAN不适用
BIT(>1)BYTESio.debezium.data.Bits lengthschema 参数包含一个表示位数的整数 。byte[]包含little-endian形式的位,并调整大小以包含指定数量的位。例如,n位在哪里: numBytes = n/8 + (n%8== 0 ? 0 : 1)
TINYINTINT16不适用
SMALLINT[(M)]INT16不适用
MEDIUMINT[(M)]INT32不适用
INT, INTEGER[(M)]INT32不适用
BIGINT[(M)]INT64不适用
REAL[(M,D)]FLOAT32不适用
FLOAT[(M,D)]FLOAT64不适用
DOUBLE[(M,D)]FLOAT64不适用
CHAR(M)]STRING不适用
VARCHAR(M)]STRING不适用
BINARY(M)]BYTES或者STRINGn/a 原始字节(默认)、base64 编码的字符串或十六进制编码的字符串,取决于binary.handling.mode连接器配置属性设置。
VARBINARY(M)]BYTES或者STRINGn/a 原始字节(默认)、base64 编码的字符串或十六进制编码的字符串,取决于binary.handling.mode连接器配置属性设置。
TINYBLOBBYTES或者STRINGn/a 原始字节(默认)、base64 编码的字符串或十六进制编码的字符串,取决于binary.handling.mode连接器配置属性设置。
TINYTEXTSTRING不适用
BLOBBYTES或者STRINGn/a 原始字节(默认)、base64 编码的字符串或十六进制编码的字符串,取决于binary.handling.mode连接器配置属性设置。 仅支持最大为 2GB 的值。建议使用声明检查模式将大列值外部化。
TEXTSTRING不适用 仅支持最大为 2GB 的值。建议使用声明检查模式将大列值外部化。
MEDIUMBLOBBYTES或者STRINGn/a 原始字节(默认)、base64 编码的字符串或十六进制编码的字符串,取决于binary.handling.mode连接器配置属性设置。
MEDIUMTEXTSTRING不适用
LONGBLOBBYTES或者STRINGn/a 原始字节(默认)、base64 编码的字符串或十六进制编码的字符串,取决于binary.handling.mode连接器配置属性设置。 仅支持最大为 2GB 的值。建议使用声明检查模式将大列值外部化。
LONGTEXTSTRING不适用 仅支持最大为 2GB 的值。建议使用声明检查模式将大列值外部化。
JSONSTRINGio.debezium.data.Json 包含JSON文档、数组或标量的字符串表示形式。
ENUMSTRINGio.debezium.data.Enum allowedschema 参数包含逗号分隔的允许值列表 。
SETSTRINGio.debezium.data.EnumSet allowedschema 参数包含逗号分隔的允许值列表 。
`YEAR[(24)]`INT32
TIMESTAMP[(M)]STRINGio.debezium.time.ZonedTimestamp 采用ISO 8601格式,精度为微秒。MySQL 允许M0-6.

时间类型

排除TIMESTAMP数据类型,MySQL 时态类型取决于time.precision.mode连接器配置属性的值。对于TIMESTAMP默认值指定为CURRENT_TIMESTAMP或的列NOW,该值1970-01-01 00:00:00用作 Kafka Connect 架构中的默认值。

MySQL 允许、 和列使用零值DATE,因为零值有时优于空值。当列定义允许空值时,MySQL 连接器将零值表示为空值,或者当列不允许空值时,将零值表示为纪元日。DATETIME``TIMESTAMP

没有时区的时间值

DATETIME类型表示本地日期和时间,例如“2018-01-13 09:48:27”。如您所见,没有时区信息。此类列使用 UTC 根据列的精度转换为纪元毫秒或微秒。该TIMESTAMP类型表示没有时区信息的时间戳。MySQL 在写入时将其从服务器(或会话的)当前时区转换为 UTC,在读回值时将其从 UTC 转换为服务器(或会话的)当前时区。例如:

  • DATETIME值为. 2018-06-20 06:37:03_1529476623000
  • TIMESTAMP值为. 2018-06-20 06:37:03_2018-06-20T13:37:03Z

io.debezium.time.ZonedTimestamp根据服务器(或会话的)当前时区,此类列将转换为 UTC 中的等效项。默认从服务器查询时区。如果失败,则必须由数据库connectionTimeZoneMySQL 配置选项明确指定。例如,如果数据库的时区(全局或通过connectionTimeZone选项为连接器配置)是“America/Los_Angeles”,则 TIMESTAMP 值“2018-06-20 06:37:03”ZonedTimestamp由值“2018-06-20T13:37:03Z”。

运行 Kafka Connect 和 Debezium 的 JVM 的时区不会影响这些转换。

有关与时间值相关的属性的更多详细信息,请参见MySQL 连接器配置属性的文档。

  • time.precision.mode=adaptive_time_microseconds(默认)

    MySQL 连接器根据列的数据类型定义确定文字类型和语义类型,以便事件准确表示数据库中的值。所有时间字段都以微秒为单位。只有在 to 范围内的正字段TIME值才能被正确捕获。00:00:00.000000``23:59:59.999999表 13. 映射时time.precision.mode=adaptive_time_microsecondsMySQL 类型文字类型语义类型DATE``INT32``io.debezium.time.Date 表示自纪元以来的天数。TIME[(M)]``INT64``io.debezium.time.MicroTime 以微秒为单位表示时间值,不包括时区信息。MySQL 允许M0-6.DATETIME, DATETIME(0), DATETIME(1), DATETIME(2), DATETIME(3)``INT64``io.debezium.time.Timestamp 表示经过纪元的毫秒数,不包括时区信息。DATETIME(4), DATETIME(5), DATETIME(6)``INT64``io.debezium.time.MicroTimestamp 表示经过纪元的微秒数,不包括时区信息。

  • time.precision.mode=连接

    MySQL 连接器使用定义的 Kafka Connect 逻辑类型。这种方法不如默认方法精确,如果数据库列的小数秒精度值大于3. 00:00:00.000只能处理to范围内的值23:59:59.999time.precision.mode=connect仅当您可以确保TIME表中的值永远不会超过支持的范围时才设置。该connect设置预计将在 Debezium 的未来版本中删除。表 14. 映射时time.precision.mode=connectMySQL 类型文字类型语义类型DATE``INT32``org.apache.kafka.connect.data.Date 表示自纪元以来的天数。TIME[(M)]``INT64``org.apache.kafka.connect.data.Time 表示自午夜以来的时间值(以微秒为单位),不包括时区信息。DATETIME[(M)]``INT64``org.apache.kafka.connect.data.Timestamp 表示自纪元以来的毫秒数,不包括时区信息。

小数类型

Debezium 连接器根据decimal.handling.mode连接器配置属性的设置处理小数。

  • decimal.handling.mode=精确

    表 15. 映射时decimal.handling.mode=preciseMySQL 类型文字类型语义类型NUMERIC[(M[,D])]``BYTES``org.apache.kafka.connect.data.Decimal schema 参数包含一个整数,scale表示小数点移动了多少位。DECIMAL[(M[,D])]``BYTES``org.apache.kafka.connect.data.Decimal schema 参数包含一个整数,scale表示小数点移动了多少位。

  • decimal.handling.mode=double

    表 16. 映射时decimal.handling.mode=doubleMySQL 类型文字类型语义类型NUMERIC[(M[,D])]``FLOAT64不适用DECIMAL[(M[,D])]``FLOAT64不适用

  • 十进制处理模式=字符串

    表 17. 映射时decimal.handling.mode=stringMySQL 类型文字类型语义类型NUMERIC[(M[,D])]``STRING不适用DECIMAL[(M[,D])]``STRING不适用

布尔值

MySQLBOOLEAN以特定方式在内部处理该值。该BOOLEAN列在内部映射到TINYINT(1)数据类型。在流式传输期间创建表时,它使用正确的BOOLEAN映射,因为 Debezium 接收原始 DDL。在快照期间,Debezium 执行以获取为和列SHOW CREATE TABLE返回的表定义。Debezium 然后无法获得原始类型映射,因此映射到.TINYINT(1)``BOOLEAN``TINYINT(1)``TINYINT(1)

为了使您能够将源列转换为布尔数据类型,Debezium 提供了一个TinyIntOneToBooleanConverter 自定义转换器,您可以通过以下方式之一使用它:

  • TINYINT(1)将所有或TINYINT(1) UNSIGNED列映射到BOOLEAN类型。

  • 使用逗号分隔的正则表达式列表枚举列的子集。
    要使用这种类型的转换,您必须converters使用参数设置配置属性,selector如下例所示:

    converters=boolean
    boolean.type=io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter
    boolean.selector=db1.table1.*, db1.table2.column1
    
  • tinyint unsigned注意: MySQL8在执行快照时没有显示类型的长度SHOW CREATE TABLE,这意味着这个转换器不起作用。新选项length.checker可以解决这个问题,默认值为true. 禁用length.checker并指定需要转换为selector属性的列,而不是根据类型转换所有列,如下例所示:

    converters=boolean
    boolean.type=io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter
    boolean.length.checker=false
    boolean.selector=db1.table1.*, db1.table2.column1
    

空间类型

目前,Debezium MySQL 连接器支持以下空间数据类型。

MySQL 类型文字类型语义类型
GEOMETRY,LINESTRING,POLYGON,MULTIPOINT,MULTILINESTRING,MULTIPOLYGON,GEOMETRYCOLLECTIONSTRUCTio.debezium.data.geometry.Geometry 包含具有两个字段的结构:srid (INT32: 定义存储在结构中的几何对象类型的空间参考系统 IDwkb (BYTES): 以 Well-Known-Binary (wkb) 格式编码的几何对象的二进制表示。有关详细信息,请参阅开放地理空间联盟

设置 MySQL

在安装和运行 Debezium 连接器之前,需要执行一些 MySQL 设置任务。

创建用户

Debezium MySQL 连接器需要 MySQL 用户帐户。此 MySQL 用户必须对 Debezium MySQL 连接器捕获更改的所有数据库具有适当的权限。

先决条件

  • 一个 MySQL 服务器。
  • SQL 命令的基本知识。

程序

  1. 创建 MySQL 用户:

    mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
    
  2. 授予用户所需的权限:

    mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
    

    下表描述了权限。

    如果使用不允许全局读取锁定的托管选项(例如 Amazon RDS 或 Amazon Aurora),则使用表级锁定来创建一致快照。在这种情况下,您还需要向LOCK TABLES您创建的用户授予权限。有关更多详细信息,请参阅快照
  3. 最终确定用户的权限:

    mysql> FLUSH PRIVILEGES;
    
关键词描述
SELECT使连接器能够从数据库中的表中选择行。这仅在执行快照时使用。
RELOAD允许连接器使用该FLUSH语句来清除或重新加载内部缓存、刷新表或获取锁。这仅在执行快照时使用。
SHOW DATABASESSHOW DATABASE通过发出语句使连接器能够查看数据库名称。这仅在执行快照时使用。
REPLICATION SLAVE使连接器能够连接并读取 MySQL 服务器 binlog。
REPLICATION CLIENT允许连接器使用以下语句:SHOW MASTER STATUS``SHOW SLAVE STATUS``SHOW BINARY LOGS连接器总是需要这个。
ON标识权限适用的数据库。
TO 'user'指定要授予权限的用户。
IDENTIFIED BY 'password'指定用户的 MySQL 密码。

启用二进制日志

您必须为 MySQL 复制启用二进制日志记录。二进制日志记录复制工具的事务更新以传播更改。

先决条件

  • 一个 MySQL 服务器。
  • 适当的 MySQL 用户权限。

程序

  1. 检查该log-bin选项是否已打开:

    // for MySql 5.x
    mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
    FROM information_schema.global_variables WHERE variable_name='log_bin';
    // for MySql 8.x
    mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
    FROM performance_schema.global_variables WHERE variable_name='log_bin';
    
  2. 如果是OFF,请使用以下属性配置您的 MySQL 服务器配置文件,如下表所述:

    server-id         = 223344
    log_bin           = mysql-bin
    binlog_format     = ROW
    binlog_row_image  = FULL
    expire_logs_days  = 10
    
  3. 通过再次检查 binlog 状态来确认您的更改:

    // for MySql 5.x
    mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
    FROM information_schema.global_variables WHERE variable_name='log_bin';
    // for MySql 8.x
    mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
    FROM performance_schema.global_variables WHERE variable_name='log_bin';
    
财产描述
server-id对于 MySQL 集群中的每个服务器和复制客户端,的值server-id必须是唯一的。在 MySQL 连接器设置期间,Debezium 为连接器分配一个唯一的服务器 ID。
log_bin的值log_bin是二进制日志文件序列的基本名称。
binlog_formatbinlog-format必须设置为ROW或。row
binlog_row_imagebinlog_row_image必须设置为FULL或。full
expire_logs_days这是自动删除 binlog 文件的天数。默认值为0,表示不自动删除。设置值以匹配您的环境需求。请参阅MySQL 清除 binlog 文件

启用 GTID

全局事务标识符 (GTID) 唯一标识集群内服务器上发生的事务。尽管 Debezium MySQL 连接器不需要,但使用 GTID 可以简化复制并使您能够更轻松地确认主服务器和副本服务器是否一致。

GTID 在 MySQL 5.6.5 及更高版本中可用。有关更多详细信息,请参阅MySQL 文档

先决条件

  • 一个 MySQL 服务器。
  • SQL 命令的基本知识。
  • 访问 MySQL 配置文件。

程序

  1. 启用gtid_mode

    mysql> gtid_mode=ON
    
  2. 启用enforce_gtid_consistency

    mysql> enforce_gtid_consistency=ON
    
  3. 确认更改:

    mysql> show global variables like '%GTID%';
    

结果

+--------------------------+-------+
| Variable_name            | Value |
+--------------------------+-------+
| enforce_gtid_consistency | ON    |
| gtid_mode                | ON    |
+--------------------------+-------+
选项描述
gtid_mode布尔值,指定是否启用 MySQL 服务器的 GTID 模式。ON= 启用OFF= 禁用
enforce_gtid_consistency布尔值,指定服务器是否通过允许执行可以以事务安全方式记录的语句来强制执行 GTID 一致性。使用 GTID 时需要。ON= 启用OFF= 禁用

配置会话超时

当为大型数据库制作初始一致快照时,您建立的连接可能会在读取表时超时。您可以通过在 MySQL 配置文件中配置interactive_timeout和来防止这种行为。wait_timeout

先决条件

  • 一个 MySQL 服务器。
  • SQL 命令的基本知识。
  • 访问 MySQL 配置文件。

程序

  1. 配置interactive_timeout

    mysql> interactive_timeout=<duration-in-seconds>
    
  2. 配置wait_timeout

    mysql> wait_timeout=<duration-in-seconds>
    
选项描述
interactive_timeout服务器在关闭交互式连接之前等待其活动的秒数。有关详细信息,请参阅MySQL 的文档。
wait_timeout服务器在关闭非交互式连接之前等待其活动的秒数。有关详细信息,请参阅MySQL 的文档。

启用查询日志事件

您可能希望查看SQL每个 binlog 事件的原始语句。在 MySQL 配置文件中启用该binlog_rows_query_log_events选项允许您执行此操作。

此选项在 MySQL 5.6 及更高版本中可用。

先决条件

  • 一个 MySQL 服务器。
  • SQL 命令的基本知识。
  • 访问 MySQL 配置文件。

程序

  • 启用binlog_rows_query_log_events

    mysql> binlog_rows_query_log_events=ON
    

    binlog_rows_query_log_events设置为启用/禁用对SQL在 binlog 条目中包含原始语句的支持的值。

    • ON= 启用
    • OFF= 禁用

部署

要部署 Debezium MySQL 连接器,您需要安装 Debezium MySQL 连接器存档,配置连接器,然后通过将其配置添加到 Kafka Connect 来启动连接器。

先决条件

程序

  1. 下载 Debezium MySQL 连接器插件
  2. 将文件提取到您的 Kafka Connect 环境中。
  3. 将包含 JAR 文件的目录添加到Kafka Connect 的plugin.path.
  4. 配置连接器并将配置添加到您的 Kafka Connect 集群。
  5. 重新启动 Kafka Connect 进程以获取新的 JAR 文件。

如果您正在使用不可变容器,请参阅Debezium 的Apache Zookeeper、Apache Kafka、MySQL 和 Kafka Connect 容器映像,其中 MySQL 连接器已安装并准备运行。

您还可以在 Kubernetes 和 OpenShift 上运行 Debezium

MySQL 连接器配置示例

以下是连接器实例的配置示例,该实例从位于 192.168.99.100 的端口 3306 上的 MySQL 服务器捕获数据,我们在逻辑上将其命名为fullfillment. 通常,您通过设置可用于连接器的配置属性,在 JSON 文件中配置 Debezium MySQL 连接器。

您可以选择为数据库中的模式和表的子集生成事件。或者,您可以忽略、屏蔽或截断包含敏感数据、大于指定大小或您不需要的列。

{
    "name": "inventory-connector", 
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", 
        "database.hostname": "192.168.99.100", 
        "database.port": "3306", 
        "database.user": "debezium-user", 
        "database.password": "debezium-user-pw", 
        "database.server.id": "184054", 
        "database.server.name": "fullfillment", 
        "database.include.list": "inventory", 
        "database.history.kafka.bootstrap.servers": "kafka:9092", 
        "database.history.kafka.topic": "dbhistory.fullfillment", 
        "include.schema.changes": "true" 
    }
}
向 Kafka Connect 服务注册时的连接器名称。
连接器的类名。
MySQL 服务器地址。
MySQL 服务器端口号。
具有适当权限的 MySQL 用户。
MySQL 用户的密码。
连接器的唯一 ID。
MySQL 服务器或集群的逻辑名称。
指定服务器托管的数据库列表。
连接器用于将 DDL 语句写入和恢复到数据库历史主题的 Kafka 代理列表。
数据库历史主题的名称。本主题仅供内部使用,消费者不得使用。
指定连接器是否应为 DDL 更改生成事件并将它们发送到fulfillment架构更改主题以供使用者使用的标志。

有关可以为 Debezium MySQL 连接器设置的配置属性的完整列表,请参阅MySQL 连接器配置属性

您可以使用命令将此配置发送POST到正在运行的 Kafka Connect 服务。该服务记录配置并启动一个执行以下操作的连接器任务:

  • 连接到 MySQL 数据库。
  • 在捕获模式下读取表的更改数据表。
  • 流将事件记录更改为 Kafka 主题。

添加连接器配置

要开始运行 MySQL 连接器,请配置连接器配置,并将配置添加到您的 Kafka Connect 集群。

先决条件

程序

  1. 为 MySQL 连接器创建配置。
  2. 使用Kafka Connect REST API将该连接器配置添加到您的 Kafka Connect 集群。

结果

连接器启动后,它会为连接器配置的 MySQL 数据库执行一致的快照。然后,连接器开始为行级操作生成数据更改事件,并将更改事件记录流式传输到 Kafka 主题。

连接器属性

Debezium MySQL 连接器有许多配置属性,您可以使用它们来为您的应用程序实现正确的连接器行为。许多属性都有默认值。有关属性的信息组织如下:

除非默认值可用,否则需要以下配置属性。

必需的 Debezium MySQL 连接器配置属性
财产默认描述
name无默认值连接器的唯一名称。尝试使用相同名称再次注册失败。所有 Kafka Connect 连接器都需要此属性。
connector.class无默认值连接器的 Java 类的名称。始终指定 io.debezium.connector.mysql.MySqlConnectorMySQL 连接器。
tasks.max1应为此连接器创建的最大任务数。MySQL 连接器始终使用单个任务,因此不使用此值,因此默认值始终是可以接受的。
database.hostname无默认值MySQL 数据库服务器的 IP 地址或主机名。
database.port3306MySQL 数据库服务器的整数端口号。
database.user无默认值连接到 MySQL 数据库服务器时要使用的 MySQL 用户的名称。
database.password无默认值连接到 MySQL 数据库服务器时使用的密码。
database.server.name无默认值为 Debezium 在其中捕获更改的特定 MySQL 数据库服务器/集群标识并提供命名空间的逻辑名称。逻辑名称在所有其他连接器中应该是唯一的,因为它用作所有接收此连接器发出的事件的 Kafka 主题名称的前缀。数据库服务器逻辑名称中只能使用字母数字字符、连字符、点和下划线。 +不要更改此属性的值。如果您更改名称值,在重新启动后,连接器不会继续向原始主题发出事件,而是向名称基于新值的主题发出后续事件。连接器也无法恢复其数据库历史主题。
database.server.id随机的此数据库客户端的数字 ID,在 MySQL 集群中所有当前运行的数据库进程中必须是唯一的。此连接器作为另一台服务器(具有此唯一 ID)加入 MySQL 数据库集群,因此它可以读取 binlog。默认情况下,会生成一个介于 5400 和 6400 之间的随机数,但建议显式设置一个值。
database.include.list空字符串一个可选的、以逗号分隔的正则表达式列表,与要为其捕获更改的数据库的名称相匹配。连接器不会捕获名称不在的任何数据库中的更改database.include.list。默认情况下,连接器会捕获所有数据库中的更改。不要同时设置database.exclude.list连接器配置属性。
database.exclude.list空字符串一个可选的、以逗号分隔的正则表达式列表,与您不想捕获其更改的数据库的名称相匹配。连接器捕获名称不在database.exclude.list. 不要同时设置database.include.list连接器配置属性。
table.include.list空字符串一个可选的、以逗号分隔的正则表达式列表,与您要捕获其更改的表的完全限定表标识符匹配。连接器不会捕获任何未包含在table.include.list. 每个标识符的格式为databaseName表名。默认情况下,连接器会捕获每个数据库中每个非系统表中的更改,这些表的更改正在被捕获。不要同时指定table.exclude.list连接器配置属性。
table.exclude.list空字符串一个可选的、以逗号分隔的正则表达式列表,匹配您不想捕获其更改的表的完全限定表标识符。连接器捕获未包含在任何表中的更改table.exclude.list。每个标识符的格式为databaseName表名。不要同时指定table.include.list连接器配置属性。
column.exclude.list空字符串一个可选的、以逗号分隔的正则表达式列表,与要从更改事件记录值中排除的列的完全限定名称匹配。列的完全限定名称的格式为databaseName表名列名
column.include.list空字符串一个可选的、以逗号分隔的正则表达式列表,与要包含在更改事件记录值中的列的完全限定名称匹配。列的完全限定名称的格式为databaseName表名列名
column.truncate.to._length_.chars不适用如果字段值长于指定的字符数,则以逗号分隔的可选正则表达式列表匹配基于字符的列的完全限定名称,这些列的值应在更改事件记录值中被截断。您可以在单个配置中配置具有不同长度的多个属性。长度必须是正整数。列的完全限定名称的格式为databaseName表名列名
column.mask.with._length_.chars不适用一个可选的、以逗号分隔的正则表达式列表,匹配基于字符的列的完全限定名称,其值应在更改事件消息值中替换为由指定数量的星号 ( *) 字符组成的字段值。您可以在单个配置中配置具有不同长度的多个属性。每个长度必须是正整数或零。列的完全限定名称的格式为databaseName表名列名
column.mask.hash.*hashAlgorithm*.with.salt.*salt*; column.mask.hash.v2.*hashAlgorithm*.with.salt.*salt*不适用一个可选的、以逗号分隔的正则表达式列表,匹配基于字符的列的完全限定名称。列的完全限定名称采用. 在生成的更改事件记录中,指定列的值将替换为假名。*<databaseName>*.*<tableName>*.*<columnName>* 假名由应用指定的hashAlgorithmsalt产生的散列值组成。基于所使用的散列函数,参照完整性得以保持,而列值被替换为假名。Java Cryptography Architecture Standard Algorithm Name Documentation的MessageDigest 部分描述了支持的散列函数。 在以下示例中,CzQMA0cB5K是随机选择的盐。 column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName如有必要,假名会自动缩短为列的长度。连接器配置可以包括指定不同哈希算法和盐的多个属性。 根据使用的hashAlgorithm、选择的salt和实际数据集,生成的数据集可能不会被完全屏蔽。 如果值在不同的地方或系统中进行散列,则应使用散列策略版本 2 来确保保真度。
column.propagate.source.type不适用一个可选的,以逗号分隔的正则表达式列表,匹配列的完全限定名称,其原始类型和长度应作为参数添加到发出的更改事件记录中的相应字段模式。这些架构参数:__Debezium.source.column.type``__Debezium.source.column.length``__Debezium.source.column.scale分别用于传播可变宽度类型的原始类型名称和长度。这对于正确调整接收器数据库中相应列的大小很有用。列的完全限定名称是以下形式之一:数据库名称表名列名**数据库名称架构名称表名列名
datatype.propagate.source.type不适用一个可选的、以逗号分隔的正则表达式列表,它与列的特定于数据库的数据类型名称相匹配,其原始类型和长度应作为参数添加到发出的更改事件记录中的相应字段模式中。这些架构参数:__debezium.source.column.type``__debezium.source.column.length``__debezium.source.column.scale分别用于传播可变宽度类型的原始类型名称和长度。这对于正确调整接收器数据库中相应列的大小很有用。完全限定的数据类型名称是以下形式之一:数据库名称表名类型名称**数据库名称架构名称表名类型名称查看MySQL 连接器如何映射数据类型以获取 MySQL 特定的数据类型名称列表。
time.precision.modeadaptive_time_microseconds时间、日期和时间戳可以用不同类型的精度表示,包括:( adaptive_time_microseconds默认)根据数据库列的类型使用毫秒、微秒或纳秒精度值精确地捕获数据库中的日期、日期时间和时间戳值, TIME 类型字段除外,它们始终以微秒为单位捕获。 adaptive(已弃用)根据数据库列的类型,使用毫秒、微秒或纳秒精度值精确捕获数据库中的时间和时间戳值。 connect始终使用 Kafka Connect 的时间、日期和时间戳的内置表示来表示时间和时间戳值,无论数据库列的精度如何,它们都使用毫秒精度。
decimal.handling.modeprecise指定连接器应如何处理DECIMALNUMERICprecise的值:(默认)使用java.math.BigDecimal二进制形式的更改事件中表示的值精确地表示它们。 double使用double值表示它们,这可能会导致精度损失,但更易于使用。 string将值编码为格式化字符串,这很容易使用,但有关真实类型的语义信息会丢失。
bigint.unsigned.handling.modelong指定 BIGINT UNSIGNED 列应如何在更改事件中表示。可能的设置是: long使用 Java 表示值long,这可能无法提供精度,但在消费者中易于使用。long通常是首选设置。 precise用于表示值,这些值使用二进制表示和 Kafka Connect 的类型java.math.BigDecimal编码在更改事件中。org.apache.kafka.connect.data.Decimal处理大于 2^63 的值时使用此设置,因为这些值无法通过使用long.
include.schema.changestrue布尔值,指定连接器是否应将数据库架构中的更改发布到与数据库服务器 ID 同名的 Kafka 主题。通过使用包含数据库名称并且其值包含 DDL 语句的键来记录每个架构更改。这与连接器内部记录数据库历史的方式无关。
include.schema.commentsfalse布尔值,指定连接器是否应解析和发布元数据对象的表和列注释。启用此选项将对内存使用产生影响。逻辑模式对象的数量和大小在很大程度上影响了 Debezium 连接器消耗的内存量,并且向它们中的每一个添加潜在的大字符串数据可能会非常昂贵。
include.queryfalse布尔值,指定连接器是否应包含生成更改事件的原始 SQL 查询。 如果您将此选项设置为,那么您还必须使用设置为的选项true配置 MySQL 。当是时,对于快照过程生成的事件不存在查询。 设置为可能会公开通过在更改事件中包含原始 SQL 语句而显式排除或屏蔽的表或字段。因此,默认设置为。binlog_rows_query_log_events``ON``include.query``true include.query``true``false
event.deserialization.failure.handling.modefail指定连接器在反序列化二进制日志事件期间应如何对异常做出反应。 fail传播异常,该异常指示有问题的事件及其二进制日志偏移量,并导致连接器停止。 warn记录有问题的事件及其二进制日志偏移量,然后跳过该事件。 ignore跳过有问题的事件并且不记录任何内容。
inconsistent.schema.handling.modefail指定连接器应如何对与内部模式表示中不存在的表相关的二进制日志事件作出反应。即内部表示与数据库不一致。 fail抛出一个异常,指示有问题的事件及其二进制日志偏移量,并导致连接器停止。 warn记录有问题的事件及其二进制日志偏移量并跳过该事件。 skip跳过有问题的事件并且不记录任何内容。
max.batch.size2048正整数值,指定在此连接器的每次迭代期间应处理的每批事件的最大大小。默认为 2048。
max.queue.size8192正整数值,指定阻塞队列可以保存的最大记录数。当 Debezium 读取从数据库流式传输的事件时,它会将事件放入阻塞队列中,然后再将它们写入 Kafka。在连接器接收消息的速度快于将消息写入 Kafka 的速度或 Kafka 不可用时,阻塞队列可以为从数据库读取更改事件提供背压。当连接器定期记录偏移量时,将忽略队列中保存的事件。始终将 的值设置max.queue.size为大于 的值max.batch.size
max.queue.size.in.bytes0一个长整数值,指定阻塞队列的最大容量(以字节为单位)。默认情况下,没有为阻塞队列指定卷限制。要指定队列可以使用的字节数,请将此属性设置为正长值。 如果max.queue.size也设置了,当队列的大小达到任一属性指定的限制时,写入队列将被阻止。例如,如果设置max.queue.size=1000, 和max.queue.size.in.bytes=5000,则在队列包含 1000 条记录或队列中的记录量达到 5000 字节后阻止写入队列。
poll.interval.ms1000正整数值,指定连接器在开始处理一批事件之前应等待新更改事件出现的毫秒数。默认为 1000 毫秒或 1 秒。
connect.timeout.ms30000一个正整数值,指定此连接器在尝试连接到 MySQL 数据库服务器后在超时之前应等待的最长时间(以毫秒为单位)。默认为 30 秒。
gtid.source.includes无默认值与 GTID 集中的源 UUID 匹配的正则表达式的逗号分隔列表,用于查找 MySQL 服务器中的 binlog 位置。仅使用具有与其中一种包含模式匹配的源的 GTID 范围。不要同时指定gtid.source.excludes.
gtid.source.excludes无默认值与 GTID 集中的源 UUID 匹配的正则表达式的逗号分隔列表,用于查找 MySQL 服务器中的 binlog 位置。仅使用具有不匹配任何这些排除模式的源的 GTID 范围。不要同时为 指定值gtid.source.includes
gtid.new.channel.position 已弃用并计划移除earliest当设置为latest时,当连接器看到一个新的 GTID 通道时,它会从该 GTID 通道中最后执行的事务开始消费。如果设置为earliest(默认),则连接器从第一个可用(未清除)GTID 位置开始读取该通道。earliest当您使用 Debezium 连接到主服务器的主动-被动 MySQL 设置时非常有用。在这种情况下,在故障转移期间,具有新 UUID(和 GTID 通道)的副本在连接 Debezium 之前开始接收写入。使用latest.
tombstones.on.deletetrue控制删除事件后是否有墓碑事件。 true- 删除操作由删除事件和随后的墓碑事件表示。 false- 只发出一个删除事件。 删除源记录后,发出 tombstone 事件(默认行为)允许 Kafka 完全删除与已删除行的键相关的所有事件,以防主题启用日志压缩。
message.key.columns不适用一个表达式列表,指定连接器用于形成自定义消息键的列,以形成它发布到指定表的 Kafka 主题的更改事件记录。默认情况下,Debezium 使用表的主键列作为它发出的记录的消息键。代替默认值,或为缺少主键的表指定键,您可以基于一个或多个列配置自定义消息键。 要为表建立自定义消息键,请列出表,然后列出用作消息键的列。每个列表条目采用以下格式: 要基于多个列名的表键,请在列名之间插入逗号。 *<fully-qualified_tableName>*:_<keyColumn>_,*<keyColumn>* 每个完全限定的表名都是以下格式的正则表达式: 该属性可以包含多个表的条目。使用分号分隔列表中的表条目。 下面的示例设置表和的消息键:对于 表,列和指定为消息键。对于任何数据库中的表,列和服务器作为消息键。 *<databaseName>*.*<tableName>* inventory.customers``purchase.orders inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4 inventory.customer``pk1``pk2``purchaseorders``pk3``pk4用于创建自定义消息键的列数没有限制。但是,最好使用指定唯一键所需的最小数量。
binary.handling.mode字节指定如何在更改事件中表示二进制列,例如 、、blob。可能的设置:将二进制数据表示为字节数组。将二进制数据表示为 base64 编码的字符串。将二进制数据表示为十六进制编码 (base16) 字符串。binary``varbinary bytes base64 hex
schema.name.adjustment.mode欧元指定应如何调整架构名称以与连接器使用的消息转换器兼容。可能的设置: avro用下划线替换不能在 Avro 类型名称中使用的字符。 none不应用任何调整。
高级 MySQL 连接器配置属性

下表描述了高级 MySQL 连接器属性。这些属性的默认值很少需要更改。因此,您无需在连接器配置中指定它们。

财产默认描述
connect.keep.alivetrue一个布尔值,指定是否应使用单独的线程来确保与 MySQL 服务器/集群的连接保持活动状态。
converters无默认值枚举连接器可以使用的自定义转换器实例的符号名称的逗号分隔列表。 例如,boolean。 此属性是使连接器能够使用自定义转换器所必需的。对于您为连接器配置的每个转换器,您还必须添加一个.type属性,该属性指定实现转换器接口的类的完全限定名称。该.type属性使用以下格式: *<converterSymbolicName>*.type 例如, boolean.type:io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter如果要进一步控制已配置转换器的行为,可以添加一个或多个配置参数以将值传递给转换器。要将这些附加配置参数与转换器相关联,请在参数名称前加上转换器的符号名称。 例如,要定义一个selector指定boolean转换器处理的列子集的参数,请添加以下属性: boolean.selector=db1.table1.*, db1.table2.column1
table.ignore.builtintrue一个布尔值,指定是否应忽略内置系统表。无论表包含和排除列表如何,这都适用。默认情况下,系统表不会被捕获其更改,并且在对任何系统表进行更改时不会生成任何事件。
database.ssl.modedisabled指定是否使用加密连接。可能的设置有: disabled指定使用未加密的连接。 preferred如果服务器支持安全连接,则建立加密连接。如果服务器不支持安全连接,则回退到未加密的连接。 required建立加密连接,如果因任何原因无法建立连接,则失败。 verify_ca行为类似required,但另外它会根据配置的证书颁发机构 (CA) 证书验证服务器 TLS 证书,如果服务器 TLS 证书与任何有效的 CA 证书不匹配,则会失败。 verify_identity行为类似verify_ca,但另外验证服务器证书是否与远程连接的主机匹配。
binlog.buffer.size0二进制日志阅读器使用的前瞻缓冲区的大小。默认设置0禁用缓冲。 在特定情况下,MySQL binlog 中可能包含ROLLBACK语句完成的未提交数据。典型示例是在单个事务中使用保存点或混合临时和常规表更改。 当检测到事务开始时,Debezium 会尝试前滚 binlog 位置并找到COMMITROLLBACK因此它可以确定是否从事务中流式传输更改。binlog 缓冲区的大小定义了 Debezium 在搜索事务边界时可以缓冲的事务中的最大更改数。如果事务的大小大于缓冲区,则 Debezium 必须在流式传输时回退并重新读取未放入缓冲区的事件。 注意:此功能正在孵化。鼓励反馈。预计此功能尚未完全完善。
snapshot.modeinitial指定连接器启动时运行快照的条件。可能的设置有: initial- 仅当没有为逻辑服务器名称记录偏移时,连接器才运行快照。 initial_only- 连接器仅在没有记录逻辑服务器名称的偏移量时运行快照,然后停止;即它不会从 binlog 中读取更改事件。 when_needed- 连接器在它认为有必要时在启动时运行快照。也就是说,当没有可用的偏移量时,或者当先前记录的偏移量指定了服务器中不可用的 binlog 位置或 GTID 时。 never- 连接器从不使用快照。首次使用逻辑服务器名称启动时,连接器从 binlog 的开头读取。谨慎配置此行为。只有当 binlog 保证包含数据库的全部历史时才有效。 schema_only- 连接器运行模式而不是数据的快照。当您不需要主题包含数据的一致快照但需要它们仅具有自连接器启动以来的更改时,此设置很有用。 schema_only_recovery- 这是已捕获更改的连接器的恢复设置。当您重新启动连接器时,此设置可以恢复损坏或丢失的数据库历史主题。您可以定期设置它来“清理”一个意外增长的数据库历史主题。数据库历史主题需要无限保留。
snapshot.locking.modeminimal控制连接器是否持有全局 MySQL 读锁以及持有多长时间,这会在连接器执行快照时阻止对数据库的任何更新。可能的设置有: minimal- 连接器仅对快照的初始部分持有全局读锁,在此期间连接器读取数据库模式和其他元数据。快照中的剩余工作涉及从每个表中选择所有行。连接器可以通过使用 REPEATABLE READ 事务以一致的方式执行此操作。即使不再持有全局读锁并且其他 MySQL 客户端正在更新数据库,情况也是如此。 minimal_percona- 连接器持有全局备份锁仅用于连接器读取数据库模式和其他元数据的快照的初始部分。快照中的剩余工作涉及从每个表中选择所有行。连接器可以通过使用 REPEATABLE READ 事务以一致的方式执行此操作。即使不再持有全局备份锁并且其他 MySQL 客户端正在更新数据库,情况也是如此。此模式不会将表刷新到磁盘,不会被长时间运行的读取阻塞,并且仅在 Percona Server 中可用。 extended- 在快照期间阻止所有写入。如果有客户端正在提交 MySQL 从 REPEATABLE READ 语义中排除的操作,请使用此设置。 none- 防止连接器在快照期间获取任何表锁。虽然所有快照模式都允许使用此设置,但当且当在快照运行时没有发生架构更改时使用它是安全的。对于使用 MyISAM 引擎定义的表,尽管在 MyISAM 获取表锁时设置了此属性,但表仍将被锁定。这种行为与获取行级锁的 InnoDB 引擎不同。
snapshot.include.collection.list中指定的所有表table.include.list一个可选的、以逗号分隔的正则表达式列表,与*<databaseName>.<tableName>*要包含在快照中的表的完全限定名称 ( ) 匹配。指定的项目必须在连接器的table.include.list属性中命名。snapshot.mode仅当连接器的属性设置为 以外的值时,此属性才会生效never。 此属性不影响增量快照的行为。
snapshot.select.statement.overrides无默认值指定要包含在快照中的表行。如果您希望快照仅包含表中行的子集,请使用该属性。此属性仅影响快照。它不适用于连接器从日志中读取的事件。该属性包含格式为 的完全限定表名的逗号分隔列表*<databaseName>.<tableName>*。例如, "snapshot.select.statement.overrides": "inventory.products,customers.orders" 对于列表中的每个表,添加一个进一步的配置属性,该属性指定SELECT连接器在拍摄快照时要在表上运行的语句。指定的SELECT语句确定要包含在快照中的表行子集。使用以下格式指定此SELECT语句属性的名称:. 例如, 。 例子: snapshot.select.statement.overrides.*<databaseName>*.*<tableName>*``snapshot.select.statement.overrides.customers.orders 如果您希望快照仅包含未软删除的记录,请从customers.orders包含软删除列的表中添加以下属性:delete_flag``"snapshot.select.statement.overrides": "customer.orders", "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC"在生成的快照中,连接器仅包含delete_flag = 0.
min.row.count.to.stream.results1000在快照期间,连接器会查询连接器配置为捕获更改的每个表。连接器使用每个查询结果来生成包含该表中所有行的数据的读取事件。此属性确定 MySQL 连接器是将表的结果放入内存(速度快但需要大量内存)还是流式传输结果(可能较慢但适用于非常大的表)。此属性的设置指定在连接器流式传输结果之前表必须包含的最小行数。 要跳过所有表大小检查并始终在快照期间流式传输所有结果,请将此属性设置为0
heartbeat.interval.ms0控制连接器向 Kafka 主题发送心跳消息的频率。默认行为是连接器不发送心跳消息。 心跳消息对于监视连接器是否从数据库接收更改事件很有用。心跳消息可能有助于减少连接器重新启动时需要重新发送的更改事件的数量。要发送心跳消息,请将此属性设置为正整数,表示心跳消息之间的毫秒数。
heartbeat.topics.prefix__debezium-heartbeat控制连接器向其发送心跳消息的主题的名称。主题名称具有以下模式: heartbeat.topics.prefixserver.name 例如,如果数据库服务器名称为fulfillment,则默认主题名称为__debezium-heartbeat.fulfillment
heartbeat.action.query无默认值指定连接器发送心跳消息时连接器在源数据库上执行的查询。 例如,这可用于定期捕获源数据库中设置的已执行 GTID 的状态。 INSERT INTO gtid_history_table (select * from mysql.gtid_executed)
database.initial.statements无默认值建立与数据库的 JDBC 连接(而不是读取事务日志的连接)时要执行的 SQL 语句的分号分隔列表。要将分号指定为 SQL 语句中的字符而不是分隔符,请使用两个分号 ( ;;)。 连接器可能会自行决定建立 JDBC 连接,因此该属性仅用于配置会话参数。它不适用于执行 DML 语句。
snapshot.delay.ms无默认值连接器启动时执行快照之前连接器应等待的时间间隔(以毫秒为单位)。如果您在集群中启动多个连接器,此属性对于避免快照中断很有用,这可能会导致连接器重新平衡。
snapshot.fetch.size无默认值在快照期间,连接器分批读取表内容。此属性指定批处理中的最大行数。
snapshot.lock.timeout.ms10000正整数,指定执行快照时等待获取表锁的最长时间(以毫秒为单位)。如果连接器在此时间间隔内无法获取表锁,则快照失败。了解MySQL 连接器如何执行数据库快照
enable.time.adjustertrue指示连接器是否将 2 位数年份规范转换为 4 位数的布尔值。设置为何false时将转换完全委托给数据库。 MySQL 允许用户插入 2 位或 4 位的年份值。对于 2 位值,该值将映射到 1970 - 2069 范围内的年份。默认行为是连接器进行转换。
source.struct.versionv2sourceDebezium 事件中块的架构版本。Debezium 0.10 对块的结构进行了一些重大更改,source以统一所有连接器的暴露结构。 通过将此选项设置为v1,可以生成早期版本中使用的结构。但是,不建议使用此设置,并计划在未来的 Debezium 版本中删除。
sanitize.field.namestrue如果连接器配置将key.converterorvalue.converter属性设置为 Avro 转换器。 false如果不。指示字段名称是否经过清理以符合Avro 命名要求
skipped.operations无默认值流式传输期间要跳过的操作类型的逗号分隔列表。以下值是可能的:c用于插入/创建、u用于更新、d用于删除。默认情况下,不会跳过任何操作。
signal.data.collection无默认值用于向连接器发送信号的数据集合的完全限定名称。 使用以下格式指定集合名称: *<databaseName>*.*<tableName>*
incremental.snapshot.allow.schema.changesfalse在增量快照期间允许架构更改。启用后,连接器将在增量快照期间检测架构更改并重新选择当前块以避免锁定 DDL。 请注意,不支持对主键的更改,如果在增量快照期间执行,可能会导致不正确的结果。另一个限制是,如果架构更改仅影响列的默认值,则在从 binlog 流处理 DDL 之前不会检测到更改。这不会影响快照事件的值,但快照事件的架构可能具有过时的默认值。
incremental.snapshot.chunk.size1024连接器在增量快照块期间获取并读入内存的最大行数。增加块大小提供了更高的效率,因为快照运行的快照查询更少,但更大的大小。然而,更大的块大小也需要更多的内存来缓冲快照数据。将块大小调整为在您的环境中提供最佳性能的值。
read.onlyfalse切换到替代增量快照水印实现以避免写入信号数据收集
provide.transaction.metadatafalse确定连接器是否生成具有事务边界的事件并使用事务元数据丰富更改事件信封。指定true是否希望连接器执行此操作。有关详细信息,请参阅事务元数据。
transaction.topic${database.server.name}.transaction控制连接器向其发送事务元数据消息的主题的名称。占位符${database.server.name}可用于引用连接器的逻辑名称;默认为${database.server.name}.transaction,例如dbserver1.transaction
Debezium 连接器数据库历史配置属性

Debezium 提供了一组database.history.*属性来控制连接器如何与模式历史主题交互。

下表描述了database.history用于配置 Debezium 连接器的属性。

财产默认描述
database.history.kafka.topic连接器存储数据库架构历史的 Kafka 主题的全名。
database.history.kafka.bootstrap.servers连接器用于建立与 Kafka 集群的初始连接的主机/端口对列表。此连接用于检索连接器先前存储的数据库模式历史记录,并用于编写从源数据库读取的每个 DDL 语句。每对都应该指向 Kafka Connect 进程使用的同一个 Kafka 集群。
database.history.kafka.recovery.poll.interval.ms100一个整数值,指定连接器在启动/恢复期间轮询持久数据时应等待的最大毫秒数。默认值为 100 毫秒。
database.history.kafka.query.timeout.ms3000一个整数值,指定连接器在使用 Kafka 管理客户端获取集群信息时应等待的最大毫秒数。
database.history.kafka.recovery.attempts4在连接器恢复失败并出现错误之前连接器应尝试读取持久历史数据的最大次数。没有收到数据后等待的最长时间为recovery.attemptsx recovery.poll.interval.ms
database.history.skip.unparseable.ddlfalse一个布尔值,指定连接器是否应忽略格式错误或未知的数据库语句或停止处理,以便人们可以解决问题。安全的默认值为false. 跳过应该小心使用,因为它会在处理 binlog 时导致数据丢失或损坏。
database.history.store.only.monitored.tables.ddl 已弃用并计划在未来版本中删除;改为使用database.history.store.only.captured.tables.ddlfalse一个布尔值,指定连接器是否应记录所有 DDL 语句 true仅记录那些与 Debezium 正在捕获其更改的表相关的 DDL 语句。true请谨慎设置,因为如果您更改哪些表已捕获其更改,则可能需要丢失数据。 安全的默认值为false.
database.history.store.only.captured.tables.ddlfalse一个布尔值,指定连接器是否应记录所有 DDL 语句 true仅记录那些与 Debezium 正在捕获其更改的表相关的 DDL 语句。true请谨慎设置,因为如果您更改哪些表已捕获其更改,则可能需要丢失数据。 安全的默认值为false.

用于配置生产者和消费者客户端的直通数据库历史记录属性

Debezium 依赖 Kafka 生产者将模式更改写入数据库历史主题。同样,当连接器启动时,它依赖于 Kafka 消费者从数据库历史主题中读取。database.history.producer.*您可以通过将值分配给以和database.history.consumer.*前缀开头的一组传递配置属性来定义 Kafka 生产者和消费者客户端的配置。传递的生产者和消费者数据库历史属性控制一系列行为,例如这些客户端如何保护与 Kafka 代理的连接,如以下示例所示:

database.history.producer.security.protocol=SSL
database.history.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.producer.ssl.keystore.password=test1234
database.history.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.producer.ssl.truststore.password=test1234
database.history.producer.ssl.key.password=test1234

database.history.consumer.security.protocol=SSL
database.history.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.consumer.ssl.keystore.password=test1234
database.history.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.consumer.ssl.truststore.password=test1234
database.history.consumer.ssl.key.password=test1234

Debezium 在将属性传递给 Kafka 客户端之前从属性名称中去除前缀。

有关Kafka 生产者配置属性Kafka 消费者配置属性的更多详细信息,请参阅 Kafka 文档。

Debezium 连接器 Kafka 信号配置属性

当 MySQL 连接器配置为只读时,信号表的替代方案是信号 Kafka 主题。

Debezium 提供了一组signal.*属性来控制连接器如何与 Kafka 信号主题交互。

下表描述了这些signal属性。

财产默认描述
signal.kafka.topic连接器监视即席信号的 Kafka 主题的名称。
signal.kafka.bootstrap.servers连接器用于建立与 Kafka 集群的初始连接的主机/端口对列表。每对都应该指向 Kafka Connect 进程使用的同一个 Kafka 集群。
signal.kafka.poll.timeout.ms100一个整数值,指定连接器在轮询信号时应等待的最大毫秒数。默认值为 100 毫秒。
Debezium 连接器传递信号 Kafka 消费者客户端配置属性

Debezium 连接器提供信号 Kafka 消费者的直通配置。直通信号属性以前缀 开头signals.consumer.*。例如,连接器将属性传递signal.consumer.security.protocol=SSL给 Kafka 消费者。

数据库历史客户端的传递属性一样,Debezium 在将它们传递给 Kafka 信号消费者之前从属性中去除前缀。

Debezium 连接器直通数据库驱动程序配置属性

Debezium 连接器提供数据库驱动程序的直通配置。直通数据库属性以前缀 开头database.*。例如,连接器将属性传递database.foobar=false给 JDBC URL。

数据库历史客户端的传递属性一样,Debezium 在将它们传递给数据库驱动程序之前从属性中去除前缀。

监控

除了 Zookeeper、Kafka 和 Kafka Connect 提供的对 JMX 指标的内置支持之外,Debezium MySQL 连接器还提供三种类型的指标。

Debezium 监控文档提供了有关如何使用 JMX 公开这些指标的详细信息。

快照指标

MBean是. _debezium.mysql:type=connector-metrics,context=snapshot,server=*<mysql.server.name>*

除非快照操作处于活动状态,或者自上次连接器启动以来已发生快照,否则不会公开快照指标。

下表列出了可用的快照指标。

属性类型描述
LastEventstring连接器读取的最后一个快照事件。
MilliSecondsSinceLastEventlong自连接器读取并处理最新事件以来的毫秒数。
TotalNumberOfEventsSeenlong自上次启动或重置以来,此连接器已看到的事件总数。
NumberOfEventsFilteredlong已被连接器上配置的包含/排除列表过滤规则过滤的事件数。
CapturedTablesstring[]连接器捕获的表列表。
QueueTotalCapacityint用于在快照程序和主 Kafka Connect 循环之间传递事件的队列长度。
QueueRemainingCapacityint用于在快照程序和主 Kafka Connect 循环之间传递事件的队列的可用容量。
TotalTableCountint快照中包含的表总数。
RemainingTableCountint快照尚未复制的表数。
SnapshotRunningboolean快照是否已启动。
SnapshotAbortedboolean快照是否已中止。
SnapshotCompletedboolean快照是否完成。
SnapshotDurationInSecondslong到目前为止,快照已拍摄的总秒数,即使未完成也是如此。
RowsScannedMap<String, Long>包含为快照中的每个表扫描的行数的映射。在处理过程中,表格会逐渐添加到地图中。每扫描 10,000 行并在完成表时更新。
MaxQueueSizeInByteslong队列的最大缓冲区(以字节为单位)。max.queue.size.in.bytes如果设置为正长值,则此指标可用。
CurrentQueueSizeInByteslong队列中的当前记录量(以字节为单位)。

执行增量快照时,连接器还提供以下附加快照指标:

属性类型描述
ChunkIdstring当前快照块的标识符。
ChunkFromstring定义当前块的主键集的下限。
ChunkTostring定义当前块的主键集的上限。
TableFromstring当前快照表的主键集的下限。
TableTostring当前快照表的主键集的上限。

Debezium MySQL 连接器还提供HoldingGlobalLock自定义快照指标。该指标设置为一个布尔值,指示连接器当前是否持有全局或表写锁。

流媒体指标

仅当启用 binlog 事件缓冲时,事务相关属性才可用。有关更多详细信息,请参阅binlog.buffer.size高级连接器配置属性。

MBean是. _debezium.mysql:type=connector-metrics,context=streaming,server=*<mysql.server.name>*

下表列出了可用的流式指标。

属性类型描述
LastEventstring连接器读取的最后一个流事件。
MilliSecondsSinceLastEventlong自连接器读取并处理最新事件以来的毫秒数。
TotalNumberOfEventsSeenlong自上次启动或指标重置以来,此连接器已看到的事件总数。
TotalNumberOfCreateEventsSeenlong自上次启动或指标重置以来,此连接器已看到的创建事件总数。
TotalNumberOfUpdateEventsSeenlong自上次启动或指标重置以来,此连接器已看到的更新事件总数。
TotalNumberOfDeleteEventsSeenlong自上次启动或指标重置以来,此连接器已看到的删除事件总数。
NumberOfEventsFilteredlong已被连接器上配置的包含/排除列表过滤规则过滤的事件数。
CapturedTablesstring[]连接器捕获的表列表。
QueueTotalCapacityint用于在流媒体和主 Kafka Connect 循环之间传递事件的队列长度。
QueueRemainingCapacityint队列的空闲容量,用于在流媒体和 Kafka Connect 主循环之间传递事件。
Connectedboolean表示连接器当前是否连接到数据库服务器的标志。
MilliSecondsBehindSourcelong上次更改事件的时间戳与处理它的连接器之间的毫秒数。这些值将包含运行数据库服务器和连接器的机器上的时钟之间的任何差异。
NumberOfCommittedTransactionslong已提交的已处理事务数。
SourceEventPositionMap<String, String>最后接收到的事件的坐标。
LastTransactionIdstring最后处理的事务的事务标识符。
MaxQueueSizeInByteslong队列的最大缓冲区(以字节为单位)。max.queue.size.in.bytes如果设置为正长值,则此指标可用。
CurrentQueueSizeInByteslong队列中的当前记录量(以字节为单位)。

Debezium MySQL 连接器还提供以下额外的流媒体指标:

属性类型描述
BinlogFilenamestring连接器最近读取的 binlog 文件的名称。
BinlogPositionlong连接器已读取的 binlog 中的最新位置(以字节为单位)。
IsGtidModeEnabledboolean表示连接器当前是否正在跟踪来自 MySQL 服务器的 GTID 的标志。
GtidSetstring连接器在读取 binlog 时处理的最新 GTID 集的字符串表示形式。
NumberOfSkippedEventslongMySQL 连接器已跳过的事件数。通常,由于 MySQL 二进制日志中的格式错误或不可解析的事件,事件会被跳过。
NumberOfDisconnectslongMySQL 连接器断开连接的次数。
NumberOfRolledBackTransactionslong已回滚且未流式传输的已处理事务的数量。
NumberOfNotWellFormedTransactionslong不符合BEGIN+ COMMIT/预期协议的事务数ROLLBACK。这个值应该0在正常情况下。
NumberOfLargeTransactionslong未放入预读缓冲区的事务数。为获得最佳性能,该值应明显小于NumberOfCommittedTransactionsNumberOfRolledBackTransactions

架构历史指标

MBean是. _debezium.mysql:type=connector-metrics,context=schema-history,server=*<mysql.server.name>*

下表列出了可用的架构历史指标。

属性类型描述
Statusstring之一STOPPEDRECOVERING存储中恢复历史),RUNNING描述数据库历史的状态。
RecoveryStartTimelong恢复开始的时间(以纪元秒为单位)。
ChangesRecoveredlong在恢复阶段读取的更改数。
ChangesAppliedlong在恢复和运行时应用的架构更改总数。
MilliSecondsSinceLastRecoveredChangelong自上次更改从历史存储中恢复以来经过的毫秒数。
MilliSecondsSinceLastAppliedChangelong自应用上次更改以来经过的毫秒数。
LastRecoveredChangestring从历史存储中恢复的最后一次更改的字符串表示形式。
LastAppliedChangestring最后应用更改的字符串表示形式。

出现问题时的行为

Debezium 是一个分布式系统,可以捕获多个上游数据库中的所有更改;它永远不会错过或丢失事件。当系统正常运行或被仔细管理时,Debezium 提供每个更改事件记录的一次交付。

如果确实发生故障,则系统不会丢失任何事件。但是,当它从故障中恢复时,它可能会重复一些更改事件。在这些异常情况下,Debezium 和 Kafka 一样,至少提供一次更改事件的传递。

本节的其余部分描述了 Debezium 如何处理各种故障和问题。

配置和启动错误

在以下情况下,连接器尝试启动失败,在日志中报告错误或异常,并停止运行:

  • 连接器的配置无效。
  • 连接器无法使用指定的连接参数成功连接到 MySQL 服务器。
  • 连接器尝试在 binlog 中 MySQL 不再具有可用历史记录的位置重新启动。

在这些情况下,错误消息包含有关问题的详细信息以及可能的建议解决方法。更正配置或解决 MySQL 问题后,重新启动连接器。

MySQL变得不可用

如果您的 MySQL 服务器不可用,Debezium MySQL 连接器将失败并出现错误并且连接器停止。当服务器再次可用时,重新启动连接器。

但是,如果为高可用性 MySQL 集群启用了 GTID,您可以立即重新启动连接器。它将连接到集群中的不同 MySQL 服务器,在服务器的 binlog 中找到代表最后一个事务的位置,并开始从该特定位置读取新服务器的 binlog。

如果未启用 GTID,则连接器仅记录它所连接的 MySQL 服务器的 binlog 位置。要从正确的 binlog 位置重新启动,您必须重新连接到该特定服务器。

Kafka Connect 优雅停止

当 Kafka Connect 正常停止时,在 Debezium MySQL 连接器任务停止并在新的 Kafka Connect 进程上重新启动时会有短暂的延迟。

Kafka Connect 进程崩溃

如果 Kafka Connect 崩溃,进程会停止并且任何 Debezium MySQL 连接器任务都会终止,而不会记录它们最近处理的偏移量。在分布式模式下,Kafka Connect 会重新启动其他进程上的连接器任务。但是,MySQL 连接器从早期进程记录的最后一个偏移量恢复。这意味着替换任务可能会生成一些在崩溃之前处理的相同事件,从而创建重复事件。

每个更改事件消息都包含特定于源的信息,您可以使用这些信息来识别重复事件,例如:

  • 事件起源
  • MySQL 服务器的事件时间
  • binlog 文件名和位置
  • GTID(如果使用)

卡夫卡变得不可用

Kafka Connect 框架使用 Kafka 生产者 API 记录 Kafka 中的 Debezium 更改事件。如果 Kafka 代理变得不可用,则 Debezium MySQL 连接器会暂停,直到连接重新建立并且连接器从中断的地方恢复。

MySQL 清除 binlog 文件

如果 Debezium MySQL 连接器停止的时间过长,MySQL 服务器会清除旧的 binlog 文件,连接器的最后位置可能会丢失。当连接器重新启动时,MySQL 服务器不再具有起始点,连接器执行另一个初始快照。如果禁用快照,则连接器将失败并出现错误。

有关 MySQL 连接器如何执行初始快照的详细信息,请参阅快照。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐