使用Flink CDC 2.2.1进行ETL-Oracle-MySQL

​ 本文将展示如何基于 Flink CDC 2.2.1快速构建 针对MySQL 和 Oracle 的流式 ETL。演示基于Java语言,使用Maven。

1. Maven依赖

	<properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <flink.sql.connector.cdc.version>2.2.1</flink.sql.connector.cdc.version>
        <flink.version>1.13.6</flink.version>
        <scala.version>2.12</scala.version>
        <oracle.jdbc.version>12.2.0.1</oracle.jdbc.version>
        <mysql.jdbc.version>5.1.49</mysql.jdbc.version>
    </properties>

    <dependencies>

        <!-- jdbc -->
        <dependency>
            <groupId>com.oracle.database.jdbc</groupId>
            <artifactId>ojdbc8</artifactId>
            <scope>runtime</scope>
            <version>${oracle.jdbc.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
            <version>${mysql.jdbc.version}</version>
        </dependency>
        <!-- end jdbc -->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- flink connector cdc  -->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-oracle-cdc</artifactId>
            <version>${flink.sql.connector.cdc.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>${flink.sql.connector.cdc.version}</version>
        </dependency>
        <!-- end flink connector cdc  -->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
    </dependencies>

2. MySQL CDC

2.1 MySQL CDC 2.0变化

​ 监听mysql的binlog变化,在flink cdc1.0版本基础上,MySQL CDC 连接器提供了无锁算法,并发读取,断点续传等高级特性;

2.2 MySQL CDC使用

2.2.1 MySQL环境准备

​ 目前Flink CDC支持的MySQL版本:5.7.x,MySQL 8.0.x;

2.2.1.1 开启binlog

​ 开启binlog前,需要安装MySQL,此处略;

2.2.1.1.1 修改MySQL配置文件

​ 针对/etc/my.cnf文件(这里以CentOS7为基础环境),增加如下内容:

server-id=1
log_bin=mysql-bin
binlog_format=ROW
binlog_row_image=full
expire_logs_days=10
binlog_do_db=mydb

说明:

  • server_id:MySQL5.7及以上版本开启binlog必须要配置这个选项。对于MySQL集群,不同节点的server_id必须不同。
  • log_bin:指定binlog文件名和储存位置。如果不指定路径,默认位置为/var/lib/mysql/。
  • binlog_format:binlog格式。有3个值可以选择:
    • ROW:记录哪条数据被修改和修改之后的数据,会产生大量日志。
    • STATEMENT:记录修改数据的SQL,日志量较小。
    • MIXED:混合使用上述两个模式。CDC要求必须配置为ROW。
  • binlog_row_image:可以设置三个合法值:
    • full,表无论有没有主键约束或者唯一约束,binlog都会记录所有前后镜像;
    • minimal,如果表有主键或唯一索引,前镜像只保留主键列,后镜像只保留修改列;如果表没有主键或唯一索引,前镜像全保留,后镜像只保留修改列;
    • noblob,
      • 如果表有主键或唯一索引,修改列为text/blob列,前镜像忽略text/blob列,后镜像包含被修改的text/blob列;
      • 如果表有主键或唯一索引,修改列不是text/blob列,前后镜像忽略text/blob列。如果表没有主键或唯一索引,修改列为text/blob列 ,前后镜像全保留;
      • 如果表没有主键或唯一索引,修改列不是text/blob列,前镜像全保留,后镜像忽略text/blob列。
  • expire_logs_days:bin_log过期时间,超过该时间的log会自动删除。
  • binlog_do_db:binlog记录哪些数据库。如果需要配置多个库,如例子中配置多项。切勿使用逗号分隔。
2.2.1.1.2 修改查看binlog是否开启成功

​ 执行以下SQL即可查看:

​ show variables like ‘log_bin’;
image-mysql-binlog-state

2.2.1.2 创建数据库和表,并插入数据
   -- MySQL
   CREATE DATABASE mydb;
   USE mydb;
   CREATE TABLE products (
     id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
     name VARCHAR(255) NOT NULL,
     description VARCHAR(512)
   );
   ALTER TABLE products AUTO_INCREMENT = 101;
   
   CREATE TABLE products_sink (
     id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
     NAME VARCHAR(255) NOT NULL,
     description VARCHAR(512)
   );
   
   INSERT INTO products
   VALUES (default,"scooter","Small 2-wheel scooter"),
          (default,"car battery","12V car battery"),
          (default,"hammer","12oz carpenter's hammer"),
          (default,"rocks","box of assorted rocks"),
          (default,"spare tire","24 inch spare tire");

2.2.2 代码实现-捕获MySQL数据表变化

package com.dis.fan;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.TableResult;

/**
 * Flink CDC 2.2.1, 捕获MySQL数据表变化
 *
 * @author 闻武
 * @since 2020-05-31
 */
public class CdcMySQL {

    public static void main(String[] args) throws Exception {
        /**
         * 1 > 程序入口
         */
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.disableOperatorChaining();

        /**
         * 2> 输入参数,可参考: -hostname 192.168.123.58 -port 3306 -username root -password 123456 -databaseName mydb
         * (Idea环境,Run ---> Edit Configurations,打开Run/Debug Configurations,在相关工程的Program arguments中输入)
         */
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String hostname = parameterTool.get("hostname");
        int port = parameterTool.getInt("port");
        String username = parameterTool.get("username");
        String password = parameterTool.get("password");
        String databaseName = parameterTool.get("databaseName");

        /**
         * 3> 流表
         */
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        String strSql = "  CREATE TABLE products_mys_cdc (\n" +
                "    id INT,\n" +
                "    name STRING,\n" +
                "    description STRING,\n" +
                "    PRIMARY KEY (id) NOT ENFORCED\n" +
                "  ) WITH (\n" +
                "    'connector' = 'mysql-cdc',\n" +
                "    'hostname' = '" + hostname + "',\n" +
                "    'port' = '" + port + "',\n" +
                "    'username' = '" + username + "',\n" +
                "    'password' = '" + password + "',\n" +
                "    'database-name' = '" + databaseName + "',\n" +
                "    'table-name' = 'products',\n" +
                "    'debezium.log.mining.continuous.mine'='true',\n"+
                "    'debezium.log.mining.strategy'='online_catalog',\n" +
                "    'debezium.database.tablename.case.insensitive'='false',\n"+
                "    'scan.startup.mode' = 'initial')";

        tableEnv.executeSql(strSql);

        /**
         * 4> 结果打印
         */
        TableResult tableResult = tableEnv.executeSql("select * from products_mys_cdc");
        tableResult.print();
        env.execute();
    }
}

​ 程序启动运行结果如下:
image-log-cdc-mysql

​ 针对products表增删改数据,products_mys_cdc表都会体现出来:

2022-06-01 10:30:11.379 INFO  [Threads.java: 287] - Creating thread debezium-mysqlconnector-mysql_binlog_source-binlog-client
+----+-------------+--------------------------------+--------------------------------+
| op |          id |                           name |                    description |
+----+-------------+--------------------------------+--------------------------------+
| +I |         105 |                     spare tire |             24 inch spare tire |
| +I |         104 |                          rocks |          box of assorted rocks |
| +I |         101 |                        scooter |          Small 2-wheel scooter |
| +I |         103 |                         hammer |        12oz carpenter's hammer |
| +I |         102 |                    car battery |                12V car battery |
2022-06-01 10:30:11.480 INFO  [MySqlStreamingChangeEventSource.java: 916] - Keepalive thread is running
| -D |         102 |                    car battery |                12V car battery |
| -D |         104 |                          rocks |          box of assorted rocks |
| -U |         105 |                     spare tire |             24 inch spare tire |
| +U |         105 |                           leon |             24 inch spare tire |
| +I |         106 |                      bruce lee |                         gongfu |

2.2.3 代码实现-捕获MySQL数据表变化,并写入MySQL

package com.dis.fan;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * Flink CDC 2.2.1, 捕获MySQL数据表变化,并写入MySQL
 *
 * @author 闻武
 * @since 2020-05-31
 */
public class CdcMySQL2MySQL {
    public static void main(String[] args) throws Exception {
        /**
         * 1 > 程序入口
         */
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.disableOperatorChaining();

        /**
         * 2> 输入参数,可参考: -hostname 192.168.123.58 -port 3306 -username root -password 123456 -databaseName mydb
         * (Idea环境,Run ---> Edit Configurations,打开Run/Debug Configurations,在相关工程的Program arguments中输入)
         */
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String hostname = parameterTool.get("hostname");
        int port = parameterTool.getInt("port");
        String username = parameterTool.get("username");
        String password = parameterTool.get("password");
        String databaseName = parameterTool.get("databaseName");

        /**
         * 3> 流表
         */
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        String strSourceSql = "CREATE TABLE IF NOT EXISTS products_mys_cdc (\n" +
                "    id INT,\n" +
                "    name STRING,\n" +
                "    description STRING,\n" +
                "    PRIMARY KEY (id) NOT ENFORCED\n" +
                "  ) WITH (\n" +
                "    'connector' = 'mysql-cdc',\n" +
                "    'hostname' = '" + hostname + "',\n" +
                "    'port' = '" + port + "',\n" +
                "    'username' = '" + username + "',\n" +
                "    'password' = '" + password + "',\n" +
                "    'database-name' = '" + databaseName + "',\n" +
                "    'table-name' = 'products',\n" +
                "    'debezium.log.mining.continuous.mine'='true',\n"+
                "    'debezium.log.mining.strategy'='online_catalog',\n" +
                "    'debezium.database.tablename.case.insensitive'='false',\n"+
                "    'scan.startup.mode' = 'initial')";

        String strSinkSql = " CREATE TABLE IF NOT EXISTS products_mys_sink (" +
                "    id INT,\n" +
                "    name STRING,\n" +
                "    description STRING,\n" +
                "    PRIMARY KEY (id) NOT ENFORCED\n" +
                "  ) WITH (\n" +
                "    'connector' = 'jdbc'," +
                "    'url' = 'jdbc:mysql://" + hostname + ":" + port + "/" + databaseName + "'," +
                "    'table-name' = 'products_sink'," +
                "    'username' = '" + username + "'," +
                "    'password' = '" + password + "' " +
                " )";

        tableEnv.executeSql(strSourceSql);
        tableEnv.executeSql(strSinkSql);
        tableEnv.executeSql("insert into products_mys_sink select * from products_mys_cdc ");
    }
}

​ 程序启动运行结果如下:
image-log-cdc-mysql2mysql

​ 可以通过Flink SQL CLI 监控products_sink表的变化,这里略过flink环境的搭建

2.2.3.1 启动 Flink 集群和 Flink SQL CLI
  1. 使用下面的命令跳转至 Flink 目录下

    cd flink
    
  2. 使用下面的命令启动 Flink 集群

    ./bin/start-cluster.sh
    
  3. 使用下面的命令启动 Flink SQL CLI

    ./bin/sql-client.sh
    
2.2.3.2 在 Flink SQL CLI 中使用 Flink DDL 创建表

​ 首先,开启 checkpoint,每隔3秒做一次 checkpoint

-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s;

​ 然后, 对于数据库中的表 products_sink, 使用 Flink SQL CLI 创建对应的表

-- Flink SQL
Flink SQL> CREATE TABLE products_sink (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'mydb',
    'table-name' = 'products_sink'
  );
2.2.3.3 在 Flink SQL CLI 中使用 Flink DML 查询表

​ 跟踪products_sink数据,执行

-- Flink SQL                   
Flink SQL> select * from products_sink;

​ Flink CDC Cli的表跟踪显示:
image-flinkcdc-cli-before-update-products
​ MySQL客户端执行以下修改语句:

-- MySQL SQL
UPDATE products SET description='First Rank' WHERE id=103;

​ MySQL客户端返回修改成功:
image-mysql-client-update-products-result

​ Flink CDC Cli的表跟踪显示:
image-flinkcdc-cli-after-update-products

​ 其它针对products表的新增、删除操作,也都能尽快反应相关表中去,这里不再展示;

3. Oracle CDC

3.1 捕获Oracle数据变更原理

​ 支持捕获并记录Oracle数据库服务器中发生的行级变更,其原理是使用 Oracle 提供的 LogMiner工具或者原生的 XStream API从Oracle 中获取变更数据。

​ LogMiner 是 Oracle 数据库提供的一个分析工具,该工具可以解析Oracle Redo日志文件,从而将数据库的数据变更日志解析成变更事件输出。通过LogMiner 方式时,Oracle 服务器对解析日志文件的进程做了严格的资源限制,所以对规模特别大的表,数据解析会比较慢,优点是LogMiner免费。

​ XStream API 是 Oracle 数据库为 Oracle GoldenGate (OGG) 提供的内部接口, 客户端可以通过XStream API 高效地获取变更事件,其变更数据不是从 Redo 日志文件中获取,而是从 Oralce 服务器中的一块内存中直接读取,省去了数据落盘到日志文件和解析日志文件的开销,效率更高,但是必须购买 Oracle GoldenGate (OGG) 的 License。
image-oracle-cdc-2.0-arch

3.2 Oracle CDC使用

3.2.1 Oracle环境准备

​ 目前Flink CDC支持的Oracle版本:11,12,19;

3.2.1.1 开启LogMiner

​ 开启LogMiner前,需要安装Oracle,此处略;

3.2.1.1.1 启用归档日志
3.2.1.1.1.1 用dba进入数据库

​ sqlplus / AS SYSDBA

3.2.1.1.1.2 开启归档日志
# 修改归档日志大小,目录
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/data/oracle/db_recovery_file_dest' scope=spfile;

# 重启数据库实例,打开归档日志
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;

# 检查日志归档是否开启
archive log list;
3.2.1.1.1.3 开启补全日志
# 开启单个表
ALTER TABLE schema.table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

# 开启全库
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

# 全体字段补充日志
## 打开all补全日志(建议执行)
alter database add supplemental log data (all) columns; 
## 查看是否打开
select supplemental_log_data_all as all from v$database ;
## 删除all补全日志
alter database drop supplemental log data (all) columns;
3.2.1.1.2 创建Oracle用户并授权
3.2.1.1.2.1 创建表空间
CREATE TABLESPACE logminer_tbs DATAFILE '/data/oracle/oradata/orcl/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
3.2.1.1.2.2 创建用户flinkcdc绑定表空间LOGMINER_TBS
CREATE USER flinkcdc IDENTIFIED BY flinkcdc DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
3.2.1.1.2.3 授予flinkcdc用户dba的权限
GRANT CONNECT,RESOURCE,DBA TO flinkcdc;
3.2.1.1.2.4 授予flinkcdc用户各种权限
GRANT CREATE SESSION TO flinkcdc;
GRANT SELECT ON V_$DATABASE to flinkcdc;
GRANT FLASHBACK ANY TABLE TO flinkcdc;
GRANT SELECT ANY TABLE TO flinkcdc;
GRANT SELECT_CATALOG_ROLE TO flinkcdc;
GRANT EXECUTE_CATALOG_ROLE TO flinkcdc;
GRANT SELECT ANY TRANSACTION TO flinkcdc;
GRANT EXECUTE ON SYS.DBMS_LOGMNR TO flinkcdc;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkcdc;
GRANT CREATE TABLE TO flinkcdc;
GRANT LOCK ANY TABLE TO flinkcdc;
GRANT ALTER ANY TABLE TO flinkcdc;
GRANT CREATE SEQUENCE TO flinkcdc;

GRANT EXECUTE ON DBMS_LOGMNR TO flinkcdc;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkcdc;

GRANT SELECT ON V_$LOG TO flinkcdc;
GRANT SELECT ON V_$LOG_HISTORY TO flinkcdc;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkcdc;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkcdc;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkcdc;
GRANT SELECT ON V_$LOGFILE TO flinkcdc;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkcdc;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkcdc;
3.2.1.2 创建数据库和表,并插入数据
	-- Oracle	
    CREATE TABLE products (
      id         number(10) constraint pk_id primary key,
      name       varchar2(255),
      description   varchar2(512)
    );

    -- 修改products表让其支持增量日志,在Oracle里创建products表后再执行
    ALTER TABLE flinkcdc.products ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
 
    CREATE TABLE products_sink (
      id         number(10) constraint pk_id primary key,
      name       varchar2(255),
      description   varchar2(512)
    );

    -- 修改products_sink表让其支持增量日志,在Oracle里创建products_sink表后再执行
    ALTER TABLE flinkcdc.products_sink ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

    -- 向products表中新增数据
    insert into products (id, name, description) values('101','scooter','Small 2-wheel scooter');
    insert into products (id, name, description) values('102','car battery','12V car battery');
    insert into products (id, name, description) values('103','hammer','12oz carpenter''s hammer');
    insert into products (id, name, description) values('104','rocks','box of assorted rocks');
    insert into products (id, name, description) values('105','spare tire','24 inch spare tire');
    commit;

3.2.2 代码实现-捕获Oracle数据表变化

package com.dis.fan;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * Flink CDC 2.2.1, 捕获Oracle数据表变化
 *
 * @author 闻武
 * @since 2020-05-31
 */
public class CdcOracle {

    public static void main(String[] args) throws Exception {
        /**
         * 1 > 程序入口
         */
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.disableOperatorChaining();

        /**
         * 2> 输入参数,可参考: -hostname 192.168.1.197 -port 1521 -username flinkcdc -password flinkcdc -databaseName orcl -schemaName flinkcdc
         * (Idea环境,Run ---> Edit Configurations,打开Run/Debug Configurations,在相关工程的Program arguments中输入)
         */
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String hostname = parameterTool.get("hostname");
        int port = parameterTool.getInt("port");
        String username = parameterTool.get("username");
        String password = parameterTool.get("password");
        String databaseName = parameterTool.get("databaseName");
        String schemaName = parameterTool.get("schemaName");

        /**
         * 3> 流表
         */
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        String strSql =
                "  CREATE TABLE products_ora_cdc (\n" +
                        "    ID INT,\n" +
                        "    NAME STRING,\n" +
                        "    DESCRIPTION STRING,\n" +
                        "    PRIMARY KEY (ID) NOT ENFORCED\n" +
                        "  ) WITH (\n" +
                        "    'connector' = 'oracle-cdc',\n" +
                        "    'hostname' = '" + hostname + "',\n" +
                        "    'port' = '" + port + "',\n" +
                        "    'username' = '" + username + "',\n" +
                        "    'password' = '" + password + "',\n" +
                        "    'database-name' = '" + databaseName + "',\n" +
                        "    'schema-name' = '" + schemaName + "',  \n" +
                        "    'table-name' = 'products',\n" +
                        "    'debezium.log.mining.continuous.mine'='true',\n" +
                        "    'debezium.log.mining.strategy'='online_catalog',\n" +
                        "    'debezium.database.tablename.case.insensitive'='false',\n" +
                        "    'scan.startup.mode' = 'initial')";

        tableEnv.executeSql(strSql);

        /**
         * 4> 结果打印
         */
        TableResult tableResult = tableEnv.executeSql("select * from products_ora_cdc");
        tableResult.print();
        env.execute();
    }
}

程序启动运行结果如下:

image-log-cdc-oracle

3.2.3 代码实现-捕获Oracle数据表变化,并写入MySQL

package com.dis.fan;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * Flink CDC 2.2.1, 捕获Oracle数据表变化,并写入MySQL
 *
 * @author 闻武
 * @since 2020-05-31
 */
public class CdcOracle2MySQL {
    public static void main(String[] args) throws Exception {
        /**
         * 1 > 程序入口
         */
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.disableOperatorChaining();

        /**
         * 2> 输入参数,可参考: -hostname 192.168.1.197 -port 1521 -username flinkcdc -password flinkcdc -databaseName orcl -schemaName flinkcdc
         * (Idea环境,Run ---> Edit Configurations,打开Run/Debug Configurations,在相关工程的Program arguments中输入)
         */
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String hostname = parameterTool.get("hostname");
        int port = parameterTool.getInt("port");
        String username = parameterTool.get("username");
        String password = parameterTool.get("password");
        String databaseName = parameterTool.get("databaseName");
        String schemaName = parameterTool.get("schemaName");

        /**
         * 3> 流表
         */
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        String strSourceSql = "  CREATE TABLE products_ora_cdc (\n" +
                "    ID INT,\n" +
                "    NAME STRING,\n" +
                "    DESCRIPTION STRING,\n" +
                "    PRIMARY KEY (ID) NOT ENFORCED\n" +
                "  ) WITH (\n" +
                "    'connector' = 'oracle-cdc',\n" +
                "    'hostname' = '" + hostname + "',\n" +
                "    'port' = '" + port + "',\n" +
                "    'username' = '" + username + "',\n" +
                "    'password' = '" + password + "',\n" +
                "    'database-name' = '" + databaseName + "',\n" +
                "    'schema-name' = '" + schemaName + "',  \n" +
                "    'table-name' = 'products',\n" +
                "    'debezium.log.mining.continuous.mine'='true',\n" +
                "    'debezium.log.mining.strategy'='online_catalog',\n" +
                "    'debezium.database.tablename.case.insensitive'='false',\n" +
                "    'scan.startup.mode' = 'initial')";

        String strSinkSql = " CREATE TABLE IF NOT EXISTS products_mys_sink (" +
                "    id INT,\n" +
                "    name STRING,\n" +
                "    description STRING,\n" +
                "    PRIMARY KEY (id) NOT ENFORCED\n" +
                "  ) WITH (\n" +
                "    'connector' = 'jdbc'," +
                "    'url' = 'jdbc:mysql://192.168.123.58:3306/mydb'," +
                "    'table-name' = 'products_sink'," +
                "    'username' = 'root'," +
                "    'password' = '123456' " +
                " )";

        tableEnv.executeSql(strSourceSql);
        tableEnv.executeSql(strSinkSql);
        tableEnv.executeSql("insert into products_mys_sink(id, name, description) select ID, NAME, DESCRIPTION from products_ora_cdc ");
    }
}

​ 剩下的和“2.2.3 代码实现-捕获MySQL数据表变化,并写入MySQL”类似,这里不再赘述;
相关代码下载:
https://gitee.com/flink_acme/flink-cdc-study.git

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐