Flink connector Oracle CDC 实时同步数据到MySQL(Oracle19c)
注意不要使用Oracle的SYS和SYSTEM用户做为CDC用户。因为这两个用户能够捕获到大量Oracle数据库内部的变更信息,对于业务数据来说是不必要的。Debezium会过滤掉这两个用户捕获到的变更内容。'/opt/oracle/oradata/recovery_area'这个路径如果不存在的话需要自己手动去创建(Oracle用户下创建)FlinkconnectorOracleCDC实时数据同
准备工作
在这一步需要配置Oracle。主要包含。
- 开启Archive log
- 开启数据库和数据表的supplemental log
- 创建CDC用户并赋予权限
注意:不要使用Oracle的SYS和SYSTEM用户做为CDC用户。因为这两个用户能够捕获到大量Oracle数据库内部的变更信息,对于业务数据来说是不必要的。Debezium会过滤掉这两个用户捕获到的变更内容。
下面开始配置步骤。在安装Oracle的机器上执行:
su - oracle
sqlplus / as sysdba
进入Sqlplus。然后开启Archive log。
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
# 检查Archive log是否成功开启
archive log list;
注意:'/opt/oracle/oradata/recovery_area' 这个路径如果不存在的话需要自己手动去创建(Oracle用户下创建)
- 本步骤需要重启数据库,请选择在合适的时间操作。
- 例子中的
/opt/oracle/oradata/recovery_area
目录oracle用户需要有读写权限。 - 如果执行
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
的时候报ORA-32001: write to SPFILE requested but no SPFILE is in use。需要检查spfile文件。
show parameter spfile;
# 如果输出value为空,说明没有创建spfile,执行下面SQL创建
create spfile from pfile;
# 关闭并重启
shutdown immediate;
startup;
# 检查spfile是否成功创建
show parameter spfile;
接下来进行相关重要的配置:
-- 检查日志归档是否开启
archive log list;
-- 为捕获的数据库启用补充日志记录,以便数据更改捕获更改的数据库行之前的状态,下面说明了如何在数据库级别进行配置。
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
- 创建表空间
CREATE TABLESPACE logminer_tbs DATAFILE '/home/oracle/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
-- 创建用户family绑定表空间LOGMINER_TBS
CREATE USER C##family IDENTIFIED BY zyhcdc DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
-- 授予family用户dba的权限
grant connect,resource,dba to C##family;
- 并授予权限
GRANT CREATE SESSION TO C##family;
GRANT SELECT ON V_$DATABASE to C##family;
GRANT FLASHBACK ANY TABLE TO C##family;
GRANT SELECT ANY TABLE TO C##family;
GRANT SELECT_CATALOG_ROLE TO C##family;
GRANT EXECUTE_CATALOG_ROLE TO C##family;
GRANT SELECT ANY TRANSACTION TO C##family;
GRANT EXECUTE ON SYS.DBMS_LOGMNR TO C##family;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO C##family;
GRANT CREATE TABLE TO C##family;
GRANT LOCK ANY TABLE TO C##family;
GRANT ALTER ANY TABLE TO C##family;
GRANT CREATE SEQUENCE TO C##family;
GRANT EXECUTE ON DBMS_LOGMNR TO C##family;
GRANT EXECUTE ON DBMS_LOGMNR_D TO C##family;
GRANT SELECT ON V_$LOG TO C##family;
GRANT SELECT ON V_$LOG_HISTORY TO C##family;
GRANT SELECT ON V_$LOGMNR_LOGS TO C##family;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO C##family;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO C##family;
GRANT SELECT ON V_$LOGFILE TO C##family;
GRANT SELECT ON V_$ARCHIVED_LOG TO C##family;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO C##family;
本地用Navicate连接Oracle:
-- 创建 STUDENT_INFO 表
create table student_info (
sid number(10) constraint pk_sid primary key,
sname varchar2(10),
sex varchar2(2)
);
-- 修改STUDENT_INFO表让其支持增量日志,先在Oracle里创建STUDENT_INFO表再执行下面的语句
ALTER TABLE C##FAMILY.STUDENT_INFO ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
至此,Oracle配置完成;
Flink CDC 配置:
这里用Flink SQL CLI 做演示:
使用以下命令切换到 Flink 目录
cd flink-1.14.0
使用以下命令启动 Flink SQL CLI:
./bin/sql-client.sh
我们应该看到 CLI 客户端的欢迎屏幕。
在 Flink SQL CLI 中使用 Flink DDL 创建表
创建从相应数据库表中捕获更改数据的表。
Oracle CDC 表可以定义如下:
-- register an Oracle table 'student_info' in Flink SQL
Flink SQL> CREATE TABLE student_info (
SID INT NOT NULL,
SNAME STRING,
SEX STRING,
PRIMARY KEY(SID) NOT ENFORCED
) WITH (
'connector' = 'oracle-cdc',
'hostname' = 'localhost',
'port' = '1521',
'username' = 'C##family',
'password' = 'zyhcdc',
'database-name' = 'ORCLCDB',
'schema-name' = 'C##FAMILY',
'table-name' = 'STUDENT_INFO');
-- read snapshot and binlogs from products table
Flink SQL> SELECT * FROM student_info;
在定义一个数据流出到MySQL的表:
-- register an Oracle table 'mysql_user' in Flink SQL
Flink SQL> CREATE TABLE mysql_user (
SID INT ,
SNAME STRING,
SEX STRING,
PRIMARY KEY(SID) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/test_cdc',
'username' = 'root',
'password' = 'root123',
'table-name' = 'user'
);
Flink SQL> insert into mysql_user select SID,SNAME,SEX from student_info;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 1966fdd63bb36c14908fe8e31408db58
完成以上配置;Flink connector Oracle CDC实时数据同步到MySQL的操作就完成了;后续可以自行测试一下。
如果有主键的话会自动进行merge。
更多文章请扫码关注公众号,有问题的小伙伴也可以在公众号上提出。
更多推荐
所有评论(0)