准备工作

在这一步需要配置Oracle。主要包含。

  1. 开启Archive log
  2. 开启数据库和数据表的supplemental log
  3. 创建CDC用户并赋予权限

注意:不要使用Oracle的SYS和SYSTEM用户做为CDC用户。因为这两个用户能够捕获到大量Oracle数据库内部的变更信息,对于业务数据来说是不必要的。Debezium会过滤掉这两个用户捕获到的变更内容。

下面开始配置步骤。在安装Oracle的机器上执行:

su - oracle
sqlplus / as sysdba

进入Sqlplus。然后开启Archive log。

alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
# 检查Archive log是否成功开启
archive log list;

注意:'/opt/oracle/oradata/recovery_area' 这个路径如果不存在的话需要自己手动去创建(Oracle用户下创建)

  1. 本步骤需要重启数据库,请选择在合适的时间操作。
  2. 例子中的/opt/oracle/oradata/recovery_area目录oracle用户需要有读写权限。
  3. 如果执行alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;的时候报ORA-32001: write to SPFILE requested but no SPFILE is in use。需要检查spfile文件。
show parameter spfile;
# 如果输出value为空,说明没有创建spfile,执行下面SQL创建
create spfile from pfile;
# 关闭并重启
shutdown immediate;
startup;
# 检查spfile是否成功创建
show parameter spfile;

接下来进行相关重要的配置:

-- 检查日志归档是否开启
archive log list;

-- 为捕获的数据库启用补充日志记录,以便数据更改捕获更改的数据库行之前的状态,下面说明了如何在数据库级别进行配置。
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

- 创建表空间
CREATE TABLESPACE logminer_tbs DATAFILE '/home/oracle/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

-- 创建用户family绑定表空间LOGMINER_TBS
CREATE USER C##family IDENTIFIED BY zyhcdc DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;

-- 授予family用户dba的权限
 grant connect,resource,dba to C##family;

- 并授予权限
GRANT CREATE SESSION TO C##family;
GRANT SELECT ON V_$DATABASE to C##family;
GRANT FLASHBACK ANY TABLE TO C##family;
GRANT SELECT ANY TABLE TO C##family;
GRANT SELECT_CATALOG_ROLE TO C##family;
GRANT EXECUTE_CATALOG_ROLE TO C##family;
GRANT SELECT ANY TRANSACTION TO C##family;
GRANT EXECUTE ON SYS.DBMS_LOGMNR TO C##family;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO C##family;
GRANT CREATE TABLE TO C##family;
GRANT LOCK ANY TABLE TO C##family;
GRANT ALTER ANY TABLE TO C##family;
GRANT CREATE SEQUENCE TO C##family;

GRANT EXECUTE ON DBMS_LOGMNR TO C##family;
GRANT EXECUTE ON DBMS_LOGMNR_D TO C##family;

GRANT SELECT ON V_$LOG TO C##family;
GRANT SELECT ON V_$LOG_HISTORY TO C##family;
GRANT SELECT ON V_$LOGMNR_LOGS TO C##family;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO C##family;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO C##family;
GRANT SELECT ON V_$LOGFILE TO C##family;
GRANT SELECT ON V_$ARCHIVED_LOG TO C##family;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO C##family;

本地用Navicate连接Oracle:

-- 创建 STUDENT_INFO 表
create table student_info (
  sid         number(10) constraint pk_sid primary key,
  sname       varchar2(10),
  sex         varchar2(2)
);

-- 修改STUDENT_INFO表让其支持增量日志,先在Oracle里创建STUDENT_INFO表再执行下面的语句
ALTER TABLE C##FAMILY.STUDENT_INFO ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

至此,Oracle配置完成;

Flink CDC 配置:

这里用Flink SQL CLI 做演示:

使用以下命令切换到 Flink 目录

cd flink-1.14.0

 使用以下命令启动 Flink SQL CLI:

./bin/sql-client.sh

我们应该看到 CLI 客户端的欢迎屏幕。

在 Flink SQL CLI 中使用 Flink DDL 创建表

创建从相应数据库表中捕获更改数据的表。

Oracle CDC 表可以定义如下:

-- register an Oracle table 'student_info' in Flink SQL
Flink SQL> CREATE TABLE student_info (
     SID INT NOT NULL,
     SNAME STRING,
     SEX STRING,
PRIMARY KEY(SID) NOT ENFORCED
     ) WITH (
     'connector' = 'oracle-cdc',
     'hostname' = 'localhost',
     'port' = '1521',
     'username' = 'C##family',
     'password' = 'zyhcdc',
     'database-name' = 'ORCLCDB',
     'schema-name' = 'C##FAMILY',
     'table-name' = 'STUDENT_INFO');
  
-- read snapshot and binlogs from products table
Flink SQL> SELECT * FROM student_info;

在定义一个数据流出到MySQL的表:

-- register an Oracle table 'mysql_user' in Flink SQL
Flink SQL> CREATE TABLE mysql_user (
     SID INT ,
     SNAME STRING,
     SEX STRING,
PRIMARY KEY(SID) NOT ENFORCED
    ) WITH (
     'connector' = 'jdbc',
     'url' = 'jdbc:mysql://localhost:3306/test_cdc',
     'username' = 'root',
     'password' = 'root123',
     'table-name' = 'user'
    );

Flink SQL> insert into mysql_user select SID,SNAME,SEX from student_info;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 1966fdd63bb36c14908fe8e31408db58

完成以上配置;Flink connector Oracle CDC实时数据同步到MySQL的操作就完成了;后续可以自行测试一下。

如果有主键的话会自动进行merge。 

更多文章请扫码关注公众号,有问题的小伙伴也可以在公众号上提出。

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐