Flink-----Flink CDC 实现数据实时同步
initial()模式即获取创建表有史以来的日志,但是遇见布置CDC后的日志就报错·latest()模式即获取最新的日志,但运行就报错以上两个错误都是以下显示错误说明提示没有为该表设置日志归档错误原因cdc底层自动将配置的表名转为小写,而oracle日志的表名是大写,导致cdc无法找到配置表的日志,所以就报没有为该表配置日志归档,但这一步确实已经做过了解决办法1.加配置文件a)Stream模式b)
Flink CDC 实现数据实时同步
1.什么是Flink_CDC
CDC 全称是 Change Data Capture(变化数据获取) ,它是一个比较广义的概念,只要能捕获变更的数据,我们都可以称为 CDC 。业界主要有基于查询的 CDC 和基于日志的 CDC ,可以从下面表格对比他们功能和差异点。
2.Flink_CDC应用场景
1.数据同步:用于备份,容灾
2.数据分发:一个数据源分发给多个下游系统
3.数据采集:面向数据仓库/数据湖的ETL数据集成,是非常重要的数据源
3.传统实时数据获取与FlinkCDC数据实时获取
传统实时数据获取:
FlinkCDC实时数据获取:
对比:
Flink 1.11 引入了 Flink CDC,flink-cdc解决了普通的CDC必须通过kafka的问题,简化了流程,第一个图是普通的cdc的流程,通过cdc的工具将mysql的数据采集到kafka,在通过flink、sparkStreaming等流式计算写入到hbase、es,大数据湖等。流程相对复杂,flink-cdc做的就是可以省去普通cdc到kafka的过程。将采集、计算都在flink中完成
Flink_CDC优势:
1.Flink的操作者和SQL模块都比较成熟且易于使用
2.Flink的作业可以通过调整运算器的并行度来完成,易于扩展处理能力
3.Flink支持先进的状态后端(State Backends),允许访问大量的状态数据
4.Flink提供更多的Source和Sink等
5.Flink拥有更大的用户群和活跃的支持社区,问题更容易解决
6.Flink开源协议允许云厂商进行全托管深度定制,而Kafka Streams则只能由其自己部署和运营
7.和Flink Table/SQL模块集成了数据库表和变化记录流(例如CDC的数据流)。作为同一事物的两面,结果是Upsert Message结构(+I表示新增、-U表示记录更新前的值、+U表示记录的更新值、-D表示删除)
3.Flink CDC两种实现方式
1.FlinkDataStream_CDC实现:
利用Flink_CDC自带的连接资源,如MySQLSource通过设置hostname、port、username、password、database、table、deserializer、startupOptions等参数配置
实现获取CRUD数据变化日志
2.FlinkSQL_CDC实现:
通过FlinkSQL创建虚拟表获取关键字段的变化情况并且配置hostname、port、username、password、database、table等参数可以看到具体表数据的变化过程
注意:FlinkSQL_CDC2.0仅支持Flink1.13之后的版本
4.两种方式对比:
1.FlinkDataStream_CDC支持多库多表的操作(优点)
2.FlinkFlinkDataStream_CDC需要自定义序列化器(缺点)
3.FlinkSQL_CDC只能单表操作(缺点)
4.FlinkSQL_CDC自动序列化(优点)
5.FlinkCDC配置
·1.以DBA身份连接到数据库
su - oracle
cd /opt/oracle
sqlplus / as sysdba
·2.启用日志归档
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
注意:'/opt/oracle/oradata/recovery_area’对应自己的目录
·3.检查是否启用了日志归档
archive log list;
·4.为捕获日志的库或表启用日志归档
为表开启:inventory(库名).customers(表名)对应自己表
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
为库开启:
-- Enable supplemental logging for database
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
5.创建表空间
sqlplus / AS SYSDBA;
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit;
注意:logminer_tbs用于下面
'/opt/oracle/oradata/SID/logminer_tbs.dbf’路径应该对应自己的
6.创建用户
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
7.为用户授权
CREATE TABLESPACE logminer_tbs DATAFILE '/data/dg/datafile/logminer_tbs.dbf' SIZE 1000M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER flinkcdc IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
GRANT CREATE SESSION TO flinkcdc;
GRANT SET CONTAINER TO flinkcdc;
GRANT SELECT ON V_$DATABASE to flinkcdc;
GRANT FLASHBACK ANY TABLE TO flinkcdc;
GRANT SELECT ANY TABLE TO flinkcdc;
GRANT SELECT_CATALOG_ROLE TO flinkcdc;
GRANT EXECUTE_CATALOG_ROLE TO flinkcdc;
GRANT SELECT ANY TRANSACTION TO flinkcdc;
GRANT LOGMINING TO flinkcdc;
GRANT CREATE TABLE TO flinkcdc;
GRANT LOCK ANY TABLE TO flinkcdc;
GRANT ALTER ANY TABLE TO flinkcdc;
GRANT CREATE SEQUENCE TO flinkcdc;
GRANT EXECUTE ON DBMS_LOGMNR TO flinkcdc;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkcdc;
GRANT SELECT ON V_$LOG TO flinkcdc;
GRANT SELECT ON V_$LOG_HISTORY TO flinkcdc;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkcdc;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkcdc;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkcdc;
GRANT SELECT ON V_$LOGFILE TO flinkcdc;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkcdc;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkcdc;
#其中有两条未执行成功,替换为下面两条
GRANT EXECUTE ON SYS.DBMS_LOGMNR TO flinkcdc
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkcdc
exit;
到此CDC配置结束
7.DataStream模式代码演示
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import scala.Tuple7;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/**
* @since JDK 1.8
*/
public class FlinkToOracle_2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Tuple7<Integer, String, String, Integer, String, String, Integer>> stream = env.addSource(new source());
stream.addSink(new SinkOracle());
stream.print();
env.execute();
}
public static class SinkOracle extends RichSinkFunction<Tuple7<Integer,String,String,Integer,String,String,Integer>> {
private Connection conn;
private PreparedStatement ps;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName("oracle.jdbc.driver.OracleDriver");
conn= DriverManager.getConnection("jdbc:oracle:thin:@10.158.5.111:1521/orcl", "ods", "ods");
ps = conn.prepareStatement("insert into TEST_OGG.WORKER(ID,NAME,SEX,AGE,DEPT,WORK,SALARY) values(?,?,?,?,?,?,?)");
}
@Override
public void invoke(Tuple7<Integer, String, String, Integer, String, String, Integer> value, Context context) throws Exception {
ps.setInt(1, value._1());
ps.setString(2, value._2());
ps.setString(3, value._3());
ps.setInt(4, value._4());
ps.setString(5, value._5());
ps.setString(6, value._6());
ps.setInt(7, value._7());
ps.execute();
8.Flink SQL 模式代码演示
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import scala.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/**
* @since JDK 1.8
*/
public class FlinkSQL_CDC {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
TableResult tableResult = tableEnv.executeSql("CREATE TABLE WORK(" +
"ID INT," +
"NAME STRING," +
"SEX STRING," +
"AGE INT," +
"DEPT STRING," +
"WORK STRING," +
"SALARY INT" +
") WITH (" +
"'connector'='oracle-cdc'," +
"'hostname'='10.158.5.88'," +
"'port'='1521'," +
"'username'='ods'," +
"'passwordd'='ods'," +
"'database-name'='orcl'," +
"'schema-name'='TEST_OGG'," +
"'table-name'='WORKER'," +
"'debezium.log.mining.continuous.mine'='true'," +
"'debezium.log.mining.strategy'='online_catalog'," +
"'debezium.database.tablename.case.insensitive'='false'," +
"'scan.startup.mode'='latest-offset')" +
"");
Table resultTable = tableEnv.sqlQuery("select * from WORK");
DataStream<Tuple2<Boolean, Row>> table2Datstream = tableEnv.toRetractStream(resultTable, Row.class);
SingleOutputStreamOperator<Tuple7<String, String, String, String, String, String, String>> outputStream = table2Datstream.map(new MapFunction<Tuple2<Boolean, Row>, Tuple7<String, String, String, String, String, String, String>>() {
@Override
public Tuple7<String, String, String, String, String, String, String> map(Tuple2<Boolean, Row> s) throws Exception {
String s1 = s.f1.getField(0).toString();
String s2 = s.f1.getField(1).toString();
String s3 = s.f1.getField(2).toString();
String s4 = s.f1.getField(3).toString();
String s5 = s.f1.getField(4).toString();
String s6 = s.f1.getField(5).toString();
String s7 = s.f1.getField(6).toString();
return new Tuple7<String, String, String, String, String, String, String>(s1,s2,s3,s4,s5,s6,s7);
}
});
outputStream.addSink(new MySinkOracle220());
outputStream.print();
env.execute();
}
public static class MySinkOracle220 extends RichSinkFunction<Tuple7<String,String,String,String,String,String,String>> {
private Connection conn;
private PreparedStatement ps;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName("oracle.jdbc.driver.OracleDriver");
conn= DriverManager.getConnection("jdbc:oracle:thin:@10.158.5.222:1521/dbm", "ods", "123456");
ps = conn.prepareStatement("insert into OGG.WORKER(ID,NAME,SEX,AGE,DEPT,WORK,SALARY) values(?,?,?,?,?,?,?)");
}
@Override
public void invoke(scala.Tuple7<String, String, String, String, String, String, String> value, Context context) throws Exception {
ps.setString(1, value._1());
ps.setString(2, value._2());
ps.setString(3, value._3());
ps.setString(4, value._4());
ps.setString(5, value._5());
ps.setString(6, value._6());
ps.setString(7, value._7());
ps.execute();
}
@Override
public void close() throws Exception {
super.close();
if (ps != null)
ps.close();
if (conn != null)
conn.close();
}
}
}
9 ·删除日志命令
su - oracle
rman target /
delete archivelog u ntil time 'sysdate' ;
crosscheck archivelog all;
delete noprompt expired archivelog all;
10.FlinkCDC问题总结
10.1CDC捕获日志问题
·initial()模式即获取创建表有史以来的日志,但是遇见布置CDC后的日志就报错
·latest()模式即获取最新的日志,但运行就报错
以上两个错误都是以下显示
错误说明:提示没有为该表设置日志归档
错误原因:cdc底层自动将配置的表名转为小写,而oracle日志的表名是大写,导致cdc无法找到配置表的日志,所以就报没有为该表配置日志归档,但这一步确实已经做过了
解决办法:
1.加配置文件
a)Stream模式:“database.tablename.case.insensitive”,“false”
b)SQL模式:‘debezium.database.tablename.case.insensitive’=‘false’
2.修改jar包底层源码,经测试失败,可能jar包依赖重,修改不到位,修改后整个项目都报错
3.升级oracle版本,据说12c版本不会出现该异常
10.2捕获数据延迟
OracleCDC的归档日志增长很快,且读取log慢,导致捕捉数据变化延迟较大
解决办法:
Stream模式:“log.mining.strategy”,“online_catalog”
“log.mining.continuous.mine” ,“true”
SQL模式: ‘debezium.log.mining.strategy’ = ‘online_catalog’
‘debezium.log.mining.continuous.mine’ = ‘true’、
10.3 日志乱码问题
如:{“scale”:0,“value”:“F3A=”},原值为6000
解决办法:
配置文件添加如下
properties.setProperty(“decimal.handling.mode”,“string”);
10.4 日志存储问题
配置时给定归档日志存储空间为10G,经测试,10G的内存很快就存满了,同时FlinkCDC官网也提示了归档日志会占用大量磁盘空间,建议定期清理过期的日志。是给予更大的内存还是定期删除日志?无论给予多大的磁盘,如果不定期清理,磁盘也会很快占满
总结
FlinkCDC有两种模式实现,FlinkDataStream模式相比FlinkSQL模式好处是可以监听多库和多表的组合,而FlinkSQL模式只能对单独一张表可进行监听;FlinkDataStream需要自己序列化,而FlinkSQL模式可以具体到关心变化的字段,不需要自己序列化。从这来看,两种模式均有优劣,从业务库的表较多来看,肯定FlinkDataStream模式更适合我们使用,直接对库进行设置日志归档,不用对每个表都设置归档日志,也不用对每个表都进行单独的监控。
更多推荐
所有评论(0)