基于Debezium 1.6和Oracle 11g 的 Debezium-Oracle实战
一、应用场景1、事件触发数据库的触发器(trigger)能够一定程度上实现我们需要的事件触发,但通常需要配合定时任务才能实现。2、数据同步典型的解决方法有•双写•变更数据抓取(change data capture, CDC)其中变更数据抓取既能解决事件触发的问题也可以很好的解决数据同步的问题二、DebeziumDebezium 是 Redhat 开源的数据变更抓取组件,其利用了 Kafka Co
一、应用场景
- 1、事件触发
数据库的触发器(trigger)能够一定程度上实现我们需要的事件触发,但通常需要配合定时任务才能实现。 - 2、数据同步
典型的解决方法有
• 双写
• 变更数据抓取(change data capture, CDC)
其中变更数据抓取既能解决事件触发的问题也可以很好的解决数据同步的问题
二、Debezium
Debezium 是 Redhat 开源的数据变更抓取组件,其利用了 Kafka Connect 所以拥有高可用与开箱即用的调度接口,Debezium的Snapshot Mode 可以 initial 设置为将表中的现有数据全部导入 Kafka也可以设置为schema_only只获取增量数据,并且全量数据与增量数据形式一致,可以统一处理。
目前支持MySQL、MongoDB、PostgreSQL、Oracle、SQL Server、Db2等数据库。
三、Debezium-Oracle
基于Oracle 数据库的日志模式下可使用LogMiner和XStream进行日志解析,默认使用LogMiner。
官网:https://debezium.io/
docker-compose 启动参考: https://github.com/debezium/debezium-examples/tree/master/tutorial
- 下载 Oracle Instant Client
链接: https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html - docker-compose.yaml
可根据官方docker-compose-oracle.yaml
自定义docker-compose.yaml
version: '3.7'
services:
connect:
image: debezium/connect-with-oracle-jdbc:1.6
build:
context: debezium-with-oracle-jdbc
args:
DEBEZIUM_VERSION: 1.6
ports:
- 8083:8083
- 5005:5005
environment:
- BOOTSTRAP_SERVERS=192.168.51.43:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=debezium_connect_configs
- OFFSET_STORAGE_TOPIC=debezium_connect_offsets
- STATUS_STORAGE_TOPIC=debezium_connect_statuses
- LD_LIBRARY_PATH=/instant_client
volumes:
- "/opt/dockerfile/debezium/config:/kafka/config"
-
在
docker-compose.yaml
同级目录下创建debezium-with-oracle-jdbc
文件夹,并在debezium-with-oracle-jdbc
下创建oracle_instantclient
文件夹
-
解压下载的Oracle Instant Client到
debezium-with-oracle-jdbc/oracle_instantclient
下
-
在 debezium-with-oracle-jdbc下创建
Dockerfile
FROM debezium/connect:1.6
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
ENV INSTANT_CLIENT_DIR=/instant_client/
USER root
RUN yum -y install libaio && yum clean all
USER kafka
# Deploy Oracle client and drivers
COPY oracle_instantclient/* $INSTANT_CLIENT_DIR
COPY oracle_instantclient/xstreams.jar /kafka/libs
COPY oracle_instantclient/ojdbc8.jar /kafka/libs
如debezium无法完成Kafka topic的自动创建,请检查kafka的配置或使用以下命令创建TOPIC
./kafka-topics.sh --create --topic `topic` --replication-factor 1 --partitions 1 --zookeeper `zk_ip:zk_port` --config cleanup.policy=compact
- 创建专用只读用户
因为这里oracle用的是11g,官方文档的创建用户并附权仅支持12c及以上的版本,经过一段时间的摸索11g的用户权限如下
create user debezium_readuser identified by 123456;
grant create session to debezium_readuser;
grant flashback any table to debezium_readuser;
grant select any table to debezium_readuser;
grant select_catalog_role to debezium_readuser;
grant execute_catalog_role to debezium_readuser;
grant select any transaction to debezium_readuser;
grant create table to debezium_readuser;
grant lock any table to debezium_readuser;
grant alter any table to debezium_readuser;
grant create sequence to debezium_readuser;
grant resource to debezium_readuser;
- 序列化
Debezium默认使用Kafka Connect 提供的一个 JSON 转换器可将记录键和值序列化为 JSON 文档。默认行为是 JSON 转换器包含记录的消息模式、表结构、创建表语句、字段类型等非必要信息,这使得每条记录都非常冗长变得难以解析。
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "CUST_TAX_CODE"
},
{
"type": "string",
"optional": true,
"field": "CUST_NAME"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "scale"
},
{
"type": "bytes",
"optional": false,
"field": "value"
}
],
"optional": true,
"name": "io.debezium.data.VariableScaleDecimal",
"version": 1,
"doc": "Variable scaled decimal",
"field": "BILLING_MACHINE_NO"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.Timestamp",
"version": 1,
"field": "CHARGE_END_DATE"
},
{
"type": "string",
"optional": true,
"field": "TAX_AUTHORITY_NAME"
},
{
"type": "string",
"optional": true,
"field": "DEPARTMENT_NAME"
},
{
"type": "string",
"optional": true,
"field": "ADDR"
},
{
"type": "string",
"optional": true,
"field": "CONTACT"
},
{
"type": "string",
"optional": true,
"field": "MOBILE"
},
{
"type": "string",
"optional": true,
"field": "TEL"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "scale"
},
{
"type": "bytes",
"optional": false,
"field": "value"
}
],
"optional": true,
"name": "io.debezium.data.VariableScaleDecimal",
"version": 1,
"doc": "Variable scaled decimal",
"field": "IDX"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.Timestamp",
"version": 1,
"field": "INVOICE_DATE"
},
{
"type": "int32",
"optional": true,
"field": "AMMONT"
}
],
"optional": true,
"name": "CRM.CRM.TEMP.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "CUST_TAX_CODE"
},
{
"type": "string",
"optional": true,
"field": "CUST_NAME"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "scale"
},
{
"type": "bytes",
"optional": false,
"field": "value"
}
],
"optional": true,
"name": "io.debezium.data.VariableScaleDecimal",
"version": 1,
"doc": "Variable scaled decimal",
"field": "BILLING_MACHINE_NO"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.Timestamp",
"version": 1,
"field": "CHARGE_END_DATE"
},
{
"type": "string",
"optional": true,
"field": "TAX_AUTHORITY_NAME"
},
{
"type": "string",
"optional": true,
"field": "DEPARTMENT_NAME"
},
{
"type": "string",
"optional": true,
"field": "ADDR"
},
{
"type": "string",
"optional": true,
"field": "CONTACT"
},
{
"type": "string",
"optional": true,
"field": "MOBILE"
},
{
"type": "string",
"optional": true,
"field": "TEL"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "scale"
},
{
"type": "bytes",
"optional": false,
"field": "value"
}
],
"optional": true,
"name": "io.debezium.data.VariableScaleDecimal",
"version": 1,
"doc": "Variable scaled decimal",
"field": "IDX"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.Timestamp",
"version": 1,
"field": "INVOICE_DATE"
},
{
"type": "int32",
"optional": true,
"field": "AMMONT"
}
],
"optional": true,
"name": "CRM.CRM.TEMP.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "txId"
},
{
"type": "string",
"optional": true,
"field": "scn"
},
{
"type": "string",
"optional": true,
"field": "commit_scn"
},
{
"type": "string",
"optional": true,
"field": "lcr_position"
}
],
"optional": false,
"name": "io.debezium.connector.oracle.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"field": "transaction"
}
],
"optional": false,
"name": "CRM.CRM.TEMP.Envelope"
},
"payload": {
"before": {
"CUST_TAX_CODE": "1",
"CUST_NAME": "1111safdadsa",
"BILLING_MACHINE_NO": null,
"CHARGE_END_DATE": null,
"TAX_AUTHORITY_NAME": null,
"DEPARTMENT_NAME": null,
"ADDR": null,
"CONTACT": null,
"MOBILE": null,
"TEL": null,
"IDX": null,
"INVOICE_DATE": null,
"AMMONT": null
},
"after": {
"CUST_TAX_CODE": "11",
"CUST_NAME": "1111safdadsa",
"BILLING_MACHINE_NO": null,
"CHARGE_END_DATE": null,
"TAX_AUTHORITY_NAME": null,
"DEPARTMENT_NAME": null,
"ADDR": null,
"CONTACT": null,
"MOBILE": null,
"TEL": null,
"IDX": null,
"INVOICE_DATE": null,
"AMMONT": null
},
"source": {
"version": "1.6.1.Final",
"connector": "oracle",
"name": "CRM",
"ts_ms": 1630709790000,
"snapshot": "false",
"db": "CRM",
"sequence": null,
"schema": "CRM",
"table": "TEMP",
"txId": "05000d00b12d0200",
"scn": "94815695",
"commit_scn": "94827647",
"lcr_position": null
},
"op": "u",
"ts_ms": 1630681011434,
"transaction": null
}
}
这时可以在debezium的配置文件 connect-distributed.properties
中设置
key.converter.schemas.enable=false
value.converter.schemas.enable=false
完整的connect-distributed.properties
配置文件
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.replication.factor=1
#offset.storage.partitions=25
offset.flush.interval.ms=60000
plugin.path=/kafka/connect
task.shutdown.graceful.timeout.ms=10000
offset.flush.timeout.ms=5000
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
官方序列化文档:https://debezium.io/documentation/reference/1.6/configuration/avro.html
修改配置之后
{
"before": {
"CUST_TAX_CODE": "112",
"CUST_NAME": "1111safdadsa",
"BILLING_MACHINE_NO": null,
"CHARGE_END_DATE": null,
"TAX_AUTHORITY_NAME": null,
"DEPARTMENT_NAME": null,
"ADDR": null,
"CONTACT": null,
"MOBILE": null,
"TEL": null,
"IDX": null,
"INVOICE_DATE": null,
"AMMONT": 1
},
"after": {
"CUST_TAX_CODE": "1123",
"CUST_NAME": "1111safdadsa",
"BILLING_MACHINE_NO": null,
"CHARGE_END_DATE": null,
"TAX_AUTHORITY_NAME": null,
"DEPARTMENT_NAME": null,
"ADDR": null,
"CONTACT": null,
"MOBILE": null,
"TEL": null,
"IDX": null,
"INVOICE_DATE": null,
"AMMONT": 1
},
"source": {
"version": "1.6.1.Final",
"connector": "oracle",
"name": "CRM",
"ts_ms": 1630710669000,
"snapshot": "false",
"db": "CRM",
"sequence": null,
"schema": "CRM",
"table": "TEMP",
"txId": "060006005cf70100",
"scn": "94909708",
"commit_scn": "94909733",
"lcr_position": null
},
"op": "u",
"ts_ms": 1630681870103,
"transaction": null
}
Kafka 的topic格式为database.server.name
debezium-oracle连接器映射数字类型
https://debezium.io/documentation/reference/1.6/connectors/oracle.html#oracle-numeric-types
解析映射的数字类型
https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation
RESTFul
创建Connector 配置项可参考 https://debezium.io/documentation/reference/1.6/connectors/oracle.html
POST http://192.168.51.72:8083/connectors/
{
"name": "test",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"tasks.max" : "1",
"database.server.name" : "CRM",
"database.hostname" : "db_host",
"database.port" : "db_port",
"database.user" : "db_user",
"database.password" : "db_password",
"database.dbname" : "db_name",
"database.history.kafka.bootstrap.servers" : "kafka_ip:kafka_port",
"database.history.kafka.topic": "schema-changes.inventory",
"schema.include.list": "CRM",
"table.include.list": "CRM.TABLE1",
"snapshot.mode": "schema_only",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false
}
}
- 查询所有Connector
GET http://192.168.51.72:8083/connectors/
- 删除指定Connector
DELETE http://192.168.51.72:8083/connectors/test
- 查询Connector状态
GET http://192.168.51.72:8083/connectors/test/status
异常处理:
出现异常 io.debezium.text.ParsingException: DDL statement couldn't be parsed. Please open a Jira issue with the statement 'alter table SYSTEM.LOGMNR_ATTRCOL$ exchange partition P106 with table SYS.LOGMNRLT_106_ATTRCOL$ excluding indexes without validation;'\nmismatched input 'with' expecting {'DISABLE', 'ENABLE', ';'}
{
"name": "test",
"connector": {
"state": "RUNNING",
"worker_id": "175.66.31.2:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "175.66.31.2:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.\n\tat io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)\n\tat io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:211)\n\tat io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:63)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:159)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:122)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: io.debezium.text.ParsingException: DDL statement couldn't be parsed. Please open a Jira issue with the statement 'alter table SYSTEM.LOGMNR_ATTRCOL$ exchange partition P106 with table SYS.LOGMNRLT_106_ATTRCOL$ excluding indexes without validation;'\nmismatched input 'with' expecting {'DISABLE', 'ENABLE', ';'}\n\tat io.debezium.antlr.ParsingErrorListener.syntaxError(ParsingErrorListener.java:43)\n\tat org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41)\n\tat org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:544)\n\tat org.antlr.v4.runtime.DefaultErrorStrategy.reportInputMismatch(DefaultErrorStrategy.java:327)\n\tat org.antlr.v4.runtime.DefaultErrorStrategy.reportError(DefaultErrorStrategy.java:139)\n\tat io.debezium.ddl.parser.oracle.generated.PlSqlParser.alter_table(PlSqlParser.java:64113)\n\tat io.debezium.ddl.parser.oracle.generated.PlSqlParser.unit_statement(PlSqlParser.java:2302)\n\tat io.debezium.connector.oracle.antlr.OracleDdlParser.parseTree(OracleDdlParser.java:68)\n\tat io.debezium.connector.oracle.antlr.OracleDdlParser.parseTree(OracleDdlParser.java:32)\n\tat io.debezium.antlr.AntlrDdlParser.parse(AntlrDdlParser.java:82)\n\tat io.debezium.connector.oracle.antlr.OracleDdlParser.parse(OracleDdlParser.java:63)\n\tat io.debezium.connector.oracle.OracleSchemaChangeEventEmitter.emitSchemaChangeEvent(OracleSchemaChangeEventEmitter.java:81)\n\tat io.debezium.pipeline.EventDispatcher.dispatchSchemaChangeEvent(EventDispatcher.java:297)\n\tat io.debezium.connector.oracle.logminer.LogMinerQueryResultProcessor.processResult(LogMinerQueryResultProcessor.java:169)\n\tat io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:187)\n\t... 8 more\nCaused by: org.antlr.v4.runtime.InputMismatchException\n\tat org.antlr.v4.runtime.DefaultErrorStrategy.sync(DefaultErrorStrategy.java:270)\n\tat io.debezium.ddl.parser.oracle.generated.PlSqlParser.alter_table(PlSqlParser.java:64052)\n\t... 17 more\n"
}
],
"type": "source"
}
解决办法:删除debezium设置的CONFIG_STORAGE_TOPIC、OFFSET_STORAGE_TOPIC、STATUS_STORAGE_TOPIC 对应Kafka的topic,重新启动即可
更多推荐
所有评论(0)