Flink Java CDC 之自定义反序列化器
以上两种都是原本就提供的,显然第二种更便于下游进行数据处理,那么自定义的会更好,可读性更强。
1、使用 StringDebeziumDeserializationSchema
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1657163088, file=binlog.000020, pos=7231, row=1, server_id=1, event=2}} ConnectRecord{topic=‘mysql_binlog_source.flink.user’, kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.flink.user.Key:STRUCT}, value=Struct{before=Struct{id=3,username=test,password=test123},after=Struct{id=3,username=test,password=test},source=Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1657163088000,db=flink,table=user,server_id=1,file=binlog.000020,pos=7379,row=0},op=u,ts_ms=1657163088967}, valueSchema=Schema{mysql_binlog_source.flink.user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
2、使用 JsonDebeziumDeserializationSchema
{“before”:null,“after”:{“id”:2,“username”:“root”,“password”:“root”},“source”:{“version”:“1.5.4.Final”,“connector”:“mysql”,“name”:“mysql_binlog_source”,“ts_ms”:0,“snapshot”:“false”,“db”:“flink”,“sequence”:null,“table”:“user”,“server_id”:0,“gtid”:null,“file”:“”,“pos”:0,“row”:0,“thread”:null,“query”:null},“op”:“r”,“ts_ms”:1657163169368,“transaction”:null}
以上两种都是原本就提供的,显然第二种更便于下游进行数据处理,那么自定义的会更好,可读性更强。
3、使用自定义反序列化器
package com.daidai.cdc;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
/**
* {
* "database":"",
* "table":""
* "operation":"",
* "data":""
* }
*/
public class CustomerSchema implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
//获取 database和 table
String topic = sourceRecord.topic();
//分隔符得写 \\. 不然就报错
String[] strings = topic.split("\\.");
String database = strings[1];
String table = strings[2];
//获取 data
Struct value = (Struct) sourceRecord.value();
JSONObject data = new JSONObject();
Struct before = value.getStruct("before");
JSONObject beforeData = new JSONObject();
for (Field field : before.schema().fields()) {
Object o = before.get(field);
beforeData.put(field.name(), o);
}
Struct after = value.getStruct("after");
JSONObject afterData = new JSONObject();
for (Field field : after.schema().fields()) {
Object o = after.get(field);
afterData.put(field.name(), o);
}
data.put("before", beforeData);
data.put("after", afterData);
//获取操作类型
Envelope.Operation op = Envelope.operationFor(sourceRecord);
//装配数据
JSONObject object = new JSONObject();
object.put("database", database);
object.put("table", table);
object.put("operation", op);
object.put("data", data);
collector.collect(object.toJSONString());
}
@Override
public TypeInformation getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
更多推荐
所有评论(0)