Flink SqlServer CDC 连接器的使用
在使用sqlServerCD连接器之前需要启动对应sqlServer的CDC功能需执行以下步骤开启sqlServer的CDC:在pom文件加入cdc依赖(flinkcdc 2.x引入):代码实现本文内容来自于官网及个人总结: Flink SqlServer CDC 连接器...
·
Flink SqlServer CDC 连接器的使用
本sqlserver使用的是2017版本
-
在使用sqlServerCD连接器之前需要启动对应sqlServer的CDC功能
需执行以下步骤开启sqlServer的CDC:
--开启表级别的CDC @source_name = '数据库表名'
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = '数据库表名', @role_name = NULL, @supports_net_changes = 0;
--查看该表是否启用
SELECT is_cdc_enabled,CASE WHEN is_cdc_enabled=0 THEN 'CDC功能禁用' ELSE 'CDC功能启用' END 描述
FROM sys.databases
WHERE name='数据库表名'
--启用sqlserver的进程
EXEC master.dbo.xp_servicecontrol N'QUERYSTATE', N'SQLSERVERAGENT'
- 在pom文件加入cdc依赖(flinkcdc 2.x引入):
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-sqlserver-cdc</artifactId>
<version>2.2.1</version>
</dependency>
- 代码实现
public class SQLServerTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
SourceFunction<String> sourceFunction = SqlServerSource.<String>builder()
.hostname("host")
.port(1433)
.database("数据库名") // monitor sqlserver database
.tableList("dbo.数据库表名") // monitor products table
.username("用户名")
.password("密码")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
env.addSource(sourceFunction).print();
env.execute();
}
public static class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema {
@Override
public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
HashMap<String, Object> hashMap = new HashMap<>();
String topic = sourceRecord.topic();
String[] split = topic.split("[.]");
String database = split[1];
String table = split[2];
hashMap.put("database",database);
hashMap.put("table",table);
//获取操作类型
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
//获取数据本身
Struct struct = (Struct)sourceRecord.value();
Struct after = struct.getStruct("after");
Struct before = struct.getStruct("before");
/*
1,同时存在 beforeStruct 跟 afterStruct数据的话,就代表是update的数据
2,只存在 beforeStruct 就是delete数据
3,只存在 afterStruct数据 就是insert数据
*/
if (after != null) {
//insert
Schema schema = after.schema();
HashMap<String, Object> hm = new HashMap<>();
for (Field field : schema.fields()) {
hm.put(field.name(), after.get(field.name()));
}
hashMap.put("data",hm);
}else if (before !=null){
//delete
Schema schema = before.schema();
HashMap<String, Object> hm = new HashMap<>();
for (Field field : schema.fields()) {
hm.put(field.name(), before.get(field.name()));
}
hashMap.put("data",hm);
}else if(before !=null && after !=null){
//update
Schema schema = after.schema();
HashMap<String, Object> hm = new HashMap<>();
for (Field field : schema.fields()) {
hm.put(field.name(), after.get(field.name()));
}
hashMap.put("data",hm);
}
String type = operation.toString().toLowerCase();
if ("create".equals(type)) {
type = "insert";
}else if("delete".equals(type)) {
type = "delete";
}else if("update".equals(type)) {
type = "update";
}
hashMap.put("type",type);
Gson gson = new Gson();
collector.collect(gson.toJson(hashMap));
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
}
本文内容来自于官网及个人总结: Flink SqlServer CDC 连接器
更多推荐
已为社区贡献1条内容
所有评论(0)