一、应用场景

  • 1、事件触发
    数据库的触发器(trigger)能够一定程度上实现我们需要的事件触发,但通常需要配合定时任务才能实现。
  • 2、数据同步
    典型的解决方法有
    • 双写
    • 变更数据抓取(change data capture, CDC)
    其中变更数据抓取既能解决事件触发的问题也可以很好的解决数据同步的问题

二、Debezium

Debezium 是 Redhat 开源的数据变更抓取组件,其利用了 Kafka Connect 所以拥有高可用与开箱即用的调度接口,Debezium的Snapshot Mode 可以 initial 设置为将表中的现有数据全部导入 Kafka也可以设置为schema_only只获取增量数据,并且全量数据与增量数据形式一致,可以统一处理。
目前支持MySQL、MongoDB、PostgreSQL、Oracle、SQL Server、Db2等数据库。

三、Debezium-Oracle

基于Oracle 数据库的日志模式下可使用LogMiner和XStream进行日志解析,默认使用LogMiner。
官网:https://debezium.io/
docker-compose 启动参考: https://github.com/debezium/debezium-examples/tree/master/tutorial

  1. 下载 Oracle Instant Client
    链接: https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html
  2. docker-compose.yaml
    可根据官方docker-compose-oracle.yaml自定义docker-compose.yaml
version: '3.7'
services:
  connect:
    image: debezium/connect-with-oracle-jdbc:1.6
    build:
      context: debezium-with-oracle-jdbc
      args:
        DEBEZIUM_VERSION: 1.6
    ports:
     - 8083:8083
     - 5005:5005
    environment:
     - BOOTSTRAP_SERVERS=192.168.51.43:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=debezium_connect_configs
     - OFFSET_STORAGE_TOPIC=debezium_connect_offsets
     - STATUS_STORAGE_TOPIC=debezium_connect_statuses
     - LD_LIBRARY_PATH=/instant_client
    volumes:
     - "/opt/dockerfile/debezium/config:/kafka/config"
  1. docker-compose.yaml同级目录下创建debezium-with-oracle-jdbc文件夹,并在debezium-with-oracle-jdbc下创建oracle_instantclient文件夹
    在这里插入图片描述
    在这里插入图片描述

  2. 解压下载的Oracle Instant Client到 debezium-with-oracle-jdbc/oracle_instantclient
    在这里插入图片描述

  3. 在 debezium-with-oracle-jdbc下创建 Dockerfile

FROM debezium/connect:1.6
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
ENV INSTANT_CLIENT_DIR=/instant_client/

USER root
RUN yum -y install libaio && yum clean all

USER kafka
# Deploy Oracle client and drivers

COPY oracle_instantclient/* $INSTANT_CLIENT_DIR
COPY oracle_instantclient/xstreams.jar /kafka/libs
COPY oracle_instantclient/ojdbc8.jar /kafka/libs

如debezium无法完成Kafka topic的自动创建,请检查kafka的配置或使用以下命令创建TOPIC

./kafka-topics.sh --create --topic `topic` --replication-factor 1 --partitions 1 --zookeeper `zk_ip:zk_port` --config cleanup.policy=compact
  1. 创建专用只读用户
    因为这里oracle用的是11g,官方文档的创建用户并附权仅支持12c及以上的版本,经过一段时间的摸索11g的用户权限如下
create user debezium_readuser identified by 123456;

grant create session to debezium_readuser;
grant flashback any table to debezium_readuser;
grant select any table to debezium_readuser;
grant select_catalog_role to debezium_readuser;
grant execute_catalog_role to debezium_readuser;
grant select any transaction to debezium_readuser;
grant create table to debezium_readuser;
grant lock any table to debezium_readuser;
grant alter any table to debezium_readuser;
grant create sequence to debezium_readuser;
grant resource to debezium_readuser;
  1. 序列化
    Debezium默认使用Kafka Connect 提供的一个 JSON 转换器可将记录键和值序列化为 JSON 文档。默认行为是 JSON 转换器包含记录的消息模式、表结构、创建表语句、字段类型等非必要信息,这使得每条记录都非常冗长变得难以解析。
{
   "schema": {
      "type": "struct",
      "fields": [
         {
            "type": "struct",
            "fields": [
               {
                  "type": "string",
                  "optional": true,
                  "field": "CUST_TAX_CODE"
               },
               {
                  "type": "string",
                  "optional": true,
                  "field": "CUST_NAME"
               },
               {
                  "type": "struct",
                  "fields": [
                     {
                        "type": "int32",
                        "optional": false,
                        "field": "scale"
                     },
                     {
                        "type": "bytes",
                        "optional": false,
                        "field": "value"
                     }
                  ],
                  "optional": true,
                  "name": "io.debezium.data.VariableScaleDecimal",
                  "version": 1,
                  "doc": "Variable scaled decimal",
                  "field": "BILLING_MACHINE_NO"
               },
               {
                  "type": "int64",
                  "optional": true,
                  "name": "io.debezium.time.Timestamp",
                  "version": 1,
                  "field": "CHARGE_END_DATE"
               },
               {
                  "type": "string",
                  "optional": true,
                  "field": "TAX_AUTHORITY_NAME"
               },
               {
                  "type": "string",
                  "optional": true,
                  "field": "DEPARTMENT_NAME"
               },
               {
                  "type": "string",
                  "optional": true,
                  "field": "ADDR"
               },
               {
                  "type": "string",
                  "optional": true,
                  "field": "CONTACT"
               },
               {
                  "type": "string",
                  "optional": true,
                  "field": "MOBILE"
               },
               {
                  "type": "string",
                  "optional": true,
                  "field": "TEL"
               },
               {
                  "type": "struct",
                  "fields": [
                     {
                        "type": "int32",
                        "optional": false,
                        "field": "scale"
                     },
                     {
                        "type": "bytes",
                        "optional": false,
                        "field": "value"
                     }
                  ],
                  "optional": true,
                  "name": "io.debezium.data.VariableScaleDecimal",
                  "version": 1,
                  "doc": "Variable scaled decimal",
                  "field": "IDX"
               },
               {
                  "type": "int64",
                  "optional": true,
                  "name": "io.debezium.time.Timestamp",
                  "version": 1,
                  "field": "INVOICE_DATE"
               },
               {
                  "type": "int32",
                  "optional": true,
                  "field": "AMMONT"
               }
            ],
            "optional": true,
            "name": "CRM.CRM.TEMP.Value",
            "field": "before"
         },
         {
            "type": "struct",
            "fields": [
               {
                  "type": "string",
                  "optional": true,
                  "field": "CUST_TAX_CODE"
               },
               {
                  "type": "string",
                  "optional": true,
                  "field": "CUST_NAME"
               },
               {
                  "type": "struct",
                  "fields": [
                     {
                        "type": "int32",
                        "optional": false,
                        "field": "scale"
                     },
                     {
                        "type": "bytes",
                        "optional": false,
                        "field": "value"
                     }
                  ],
                  "optional": true,
                  "name": "io.debezium.data.VariableScaleDecimal",
                  "version": 1,
                  "doc": "Variable scaled decimal",
                  "field": "BILLING_MACHINE_NO"
               },
               {
                  "type": "int64",
                  "optional": true,
                  "name": "io.debezium.time.Timestamp",
                  "version": 1,
                  "field": "CHARGE_END_DATE"
               },
               {
                  "type": "string",
                  "optional": true,
                  "field": "TAX_AUTHORITY_NAME"
               },
               {
                  "type": "string",
                  "optional": true,
                  "field": "DEPARTMENT_NAME"
               },
               {
                  "type": "string",
                  "optional": true,
                  "field": "ADDR"
               },
               {
                  "type": "string",
                  "optional": true,
                  "field": "CONTACT"
               },
               {
                  "type": "string",
                  "optional": true,
                  "field": "MOBILE"
               },
               {
                  "type": "string",
                  "optional": true,
                  "field": "TEL"
               },
               {
                  "type": "struct",
                  "fields": [
                     {
                        "type": "int32",
                        "optional": false,
                        "field": "scale"
                     },
                     {
                        "type": "bytes",
                        "optional": false,
                        "field": "value"
                     }
                  ],
                  "optional": true,
                  "name": "io.debezium.data.VariableScaleDecimal",
                  "version": 1,
                  "doc": "Variable scaled decimal",
                  "field": "IDX"
               },
               {
                  "type": "int64",
                  "optional": true,
                  "name": "io.debezium.time.Timestamp",
                  "version": 1,
                  "field": "INVOICE_DATE"
               },
               {
                  "type": "int32",
                  "optional": true,
                  "field": "AMMONT"
               }
            ],
            "optional": true,
            "name": "CRM.CRM.TEMP.Value",
            "field": "after"
         },
         {
            "type": "struct",
            "fields": [
               {
                  "type": "string",
                  "optional": false,
                  "field": "version"
               },
               {
                  "type": "string",
                  "optional": false,
                  "field": "connector"
               },
               {
                  "type": "string",
                  "optional": false,
                  "field": "name"
               },
               {
                  "type": "int64",
                  "optional": false,
                  "field": "ts_ms"
               },
               {
                  "type": "string",
                  "optional": true,
                  "name": "io.debezium.data.Enum",
                  "version": 1,
                  "parameters": {
                     "allowed": "true,last,false"
                  },
                  "default": "false",
                  "field": "snapshot"
               },
               {
                  "type": "string",
                  "optional": false,
                  "field": "db"
               },
               {
                  "type": "string",
                  "optional": true,
                  "field": "sequence"
               },
               {
                  "type": "string",
                  "optional": false,
                  "field": "schema"
               },
               {
                  "type": "string",
                  "optional": false,
                  "field": "table"
               },
               {
                  "type": "string",
                  "optional": true,
                  "field": "txId"
               },
               {
                  "type": "string",
                  "optional": true,
                  "field": "scn"
               },
               {
                  "type": "string",
                  "optional": true,
                  "field": "commit_scn"
               },
               {
                  "type": "string",
                  "optional": true,
                  "field": "lcr_position"
               }
            ],
            "optional": false,
            "name": "io.debezium.connector.oracle.Source",
            "field": "source"
         },
         {
            "type": "string",
            "optional": false,
            "field": "op"
         },
         {
            "type": "int64",
            "optional": true,
            "field": "ts_ms"
         },
         {
            "type": "struct",
            "fields": [
               {
                  "type": "string",
                  "optional": false,
                  "field": "id"
               },
               {
                  "type": "int64",
                  "optional": false,
                  "field": "total_order"
               },
               {
                  "type": "int64",
                  "optional": false,
                  "field": "data_collection_order"
               }
            ],
            "optional": true,
            "field": "transaction"
         }
      ],
      "optional": false,
      "name": "CRM.CRM.TEMP.Envelope"
   },
   "payload": {
      "before": {
         "CUST_TAX_CODE": "1",
         "CUST_NAME": "1111safdadsa",
         "BILLING_MACHINE_NO": null,
         "CHARGE_END_DATE": null,
         "TAX_AUTHORITY_NAME": null,
         "DEPARTMENT_NAME": null,
         "ADDR": null,
         "CONTACT": null,
         "MOBILE": null,
         "TEL": null,
         "IDX": null,
         "INVOICE_DATE": null,
         "AMMONT": null
      },
      "after": {
         "CUST_TAX_CODE": "11",
         "CUST_NAME": "1111safdadsa",
         "BILLING_MACHINE_NO": null,
         "CHARGE_END_DATE": null,
         "TAX_AUTHORITY_NAME": null,
         "DEPARTMENT_NAME": null,
         "ADDR": null,
         "CONTACT": null,
         "MOBILE": null,
         "TEL": null,
         "IDX": null,
         "INVOICE_DATE": null,
         "AMMONT": null
      },
      "source": {
         "version": "1.6.1.Final",
         "connector": "oracle",
         "name": "CRM",
         "ts_ms": 1630709790000,
         "snapshot": "false",
         "db": "CRM",
         "sequence": null,
         "schema": "CRM",
         "table": "TEMP",
         "txId": "05000d00b12d0200",
         "scn": "94815695",
         "commit_scn": "94827647",
         "lcr_position": null
      },
      "op": "u",
      "ts_ms": 1630681011434,
      "transaction": null
   }
}

这时可以在debezium的配置文件 connect-distributed.properties 中设置

key.converter.schemas.enable=false
value.converter.schemas.enable=false

完整的connect-distributed.properties配置文件

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

offset.storage.replication.factor=1
#offset.storage.partitions=25

offset.flush.interval.ms=60000

plugin.path=/kafka/connect
task.shutdown.graceful.timeout.ms=10000
offset.flush.timeout.ms=5000
value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

官方序列化文档:https://debezium.io/documentation/reference/1.6/configuration/avro.html
修改配置之后

{
   "before": {
      "CUST_TAX_CODE": "112",
      "CUST_NAME": "1111safdadsa",
      "BILLING_MACHINE_NO": null,
      "CHARGE_END_DATE": null,
      "TAX_AUTHORITY_NAME": null,
      "DEPARTMENT_NAME": null,
      "ADDR": null,
      "CONTACT": null,
      "MOBILE": null,
      "TEL": null,
      "IDX": null,
      "INVOICE_DATE": null,
      "AMMONT": 1
   },
   "after": {
      "CUST_TAX_CODE": "1123",
      "CUST_NAME": "1111safdadsa",
      "BILLING_MACHINE_NO": null,
      "CHARGE_END_DATE": null,
      "TAX_AUTHORITY_NAME": null,
      "DEPARTMENT_NAME": null,
      "ADDR": null,
      "CONTACT": null,
      "MOBILE": null,
      "TEL": null,
      "IDX": null,
      "INVOICE_DATE": null,
      "AMMONT": 1
   },
   "source": {
      "version": "1.6.1.Final",
      "connector": "oracle",
      "name": "CRM",
      "ts_ms": 1630710669000,
      "snapshot": "false",
      "db": "CRM",
      "sequence": null,
      "schema": "CRM",
      "table": "TEMP",
      "txId": "060006005cf70100",
      "scn": "94909708",
      "commit_scn": "94909733",
      "lcr_position": null
   },
   "op": "u",
   "ts_ms": 1630681870103,
   "transaction": null
}

Kafka 的topic格式为database.server.name
debezium-oracle连接器映射数字类型
https://debezium.io/documentation/reference/1.6/connectors/oracle.html#oracle-numeric-types
解析映射的数字类型
https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation

RESTFul

创建Connector 配置项可参考 https://debezium.io/documentation/reference/1.6/connectors/oracle.html

POST http://192.168.51.72:8083/connectors/
{
  "name": "test",
  "config": {
    "connector.class" : "io.debezium.connector.oracle.OracleConnector",
    "tasks.max" : "1",
    "database.server.name" : "CRM",
    "database.hostname" : "db_host",
    "database.port" : "db_port",
    "database.user" : "db_user",
    "database.password" : "db_password",
    "database.dbname" : "db_name",
    "database.history.kafka.bootstrap.servers" : "kafka_ip:kafka_port",
    "database.history.kafka.topic": "schema-changes.inventory",
    "schema.include.list": "CRM",
    "table.include.list": "CRM.TABLE1",
    "snapshot.mode": "schema_only",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false
  }
}
  • 查询所有Connector
GET http://192.168.51.72:8083/connectors/
  • 删除指定Connector
DELETE http://192.168.51.72:8083/connectors/test
  • 查询Connector状态
GET http://192.168.51.72:8083/connectors/test/status

异常处理:

出现异常 io.debezium.text.ParsingException: DDL statement couldn't be parsed. Please open a Jira issue with the statement 'alter table SYSTEM.LOGMNR_ATTRCOL$ exchange partition P106 with table SYS.LOGMNRLT_106_ATTRCOL$ excluding indexes without validation;'\nmismatched input 'with' expecting {'DISABLE', 'ENABLE', ';'}

{
    "name": "test",
    "connector": {
        "state": "RUNNING",
        "worker_id": "175.66.31.2:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "FAILED",
            "worker_id": "175.66.31.2:8083",
            "trace": "org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.\n\tat io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)\n\tat io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:211)\n\tat io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:63)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:159)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:122)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: io.debezium.text.ParsingException: DDL statement couldn't be parsed. Please open a Jira issue with the statement 'alter table SYSTEM.LOGMNR_ATTRCOL$ exchange partition P106 with table SYS.LOGMNRLT_106_ATTRCOL$ excluding indexes without validation;'\nmismatched input 'with' expecting {'DISABLE', 'ENABLE', ';'}\n\tat io.debezium.antlr.ParsingErrorListener.syntaxError(ParsingErrorListener.java:43)\n\tat org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41)\n\tat org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:544)\n\tat org.antlr.v4.runtime.DefaultErrorStrategy.reportInputMismatch(DefaultErrorStrategy.java:327)\n\tat org.antlr.v4.runtime.DefaultErrorStrategy.reportError(DefaultErrorStrategy.java:139)\n\tat io.debezium.ddl.parser.oracle.generated.PlSqlParser.alter_table(PlSqlParser.java:64113)\n\tat io.debezium.ddl.parser.oracle.generated.PlSqlParser.unit_statement(PlSqlParser.java:2302)\n\tat io.debezium.connector.oracle.antlr.OracleDdlParser.parseTree(OracleDdlParser.java:68)\n\tat io.debezium.connector.oracle.antlr.OracleDdlParser.parseTree(OracleDdlParser.java:32)\n\tat io.debezium.antlr.AntlrDdlParser.parse(AntlrDdlParser.java:82)\n\tat io.debezium.connector.oracle.antlr.OracleDdlParser.parse(OracleDdlParser.java:63)\n\tat io.debezium.connector.oracle.OracleSchemaChangeEventEmitter.emitSchemaChangeEvent(OracleSchemaChangeEventEmitter.java:81)\n\tat io.debezium.pipeline.EventDispatcher.dispatchSchemaChangeEvent(EventDispatcher.java:297)\n\tat io.debezium.connector.oracle.logminer.LogMinerQueryResultProcessor.processResult(LogMinerQueryResultProcessor.java:169)\n\tat io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:187)\n\t... 8 more\nCaused by: org.antlr.v4.runtime.InputMismatchException\n\tat org.antlr.v4.runtime.DefaultErrorStrategy.sync(DefaultErrorStrategy.java:270)\n\tat io.debezium.ddl.parser.oracle.generated.PlSqlParser.alter_table(PlSqlParser.java:64052)\n\t... 17 more\n"
        }
    ],
    "type": "source"
}

解决办法:删除debezium设置的CONFIG_STORAGE_TOPIC、OFFSET_STORAGE_TOPIC、STATUS_STORAGE_TOPIC 对应Kafka的topic,重新启动即可

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐