1、使用 StringDebeziumDeserializationSchema

SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1657163088, file=binlog.000020, pos=7231, row=1, server_id=1, event=2}} ConnectRecord{topic=‘mysql_binlog_source.flink.user’, kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.flink.user.Key:STRUCT}, value=Struct{before=Struct{id=3,username=test,password=test123},after=Struct{id=3,username=test,password=test},source=Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1657163088000,db=flink,table=user,server_id=1,file=binlog.000020,pos=7379,row=0},op=u,ts_ms=1657163088967}, valueSchema=Schema{mysql_binlog_source.flink.user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

2、使用 JsonDebeziumDeserializationSchema

{“before”:null,“after”:{“id”:2,“username”:“root”,“password”:“root”},“source”:{“version”:“1.5.4.Final”,“connector”:“mysql”,“name”:“mysql_binlog_source”,“ts_ms”:0,“snapshot”:“false”,“db”:“flink”,“sequence”:null,“table”:“user”,“server_id”:0,“gtid”:null,“file”:“”,“pos”:0,“row”:0,“thread”:null,“query”:null},“op”:“r”,“ts_ms”:1657163169368,“transaction”:null}

在这里插入图片描述


以上两种都是原本就提供的,显然第二种更便于下游进行数据处理,那么自定义的会更好,可读性更强。

3、使用自定义反序列化器

package com.daidai.cdc;

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

/**
 * {
 * "database":"",
 * "table":""
 * "operation":"",
 * "data":""
 * }
 */
public class CustomerSchema implements DebeziumDeserializationSchema<String> {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {

        //获取 database和 table
        String topic = sourceRecord.topic();
        //分隔符得写 \\. 不然就报错
        String[] strings = topic.split("\\.");
        String database = strings[1];
        String table = strings[2];

        //获取 data
        Struct value = (Struct) sourceRecord.value();
        JSONObject data = new JSONObject();
        Struct before = value.getStruct("before");
        JSONObject beforeData = new JSONObject();
        for (Field field : before.schema().fields()) {
            Object o = before.get(field);
            beforeData.put(field.name(), o);
        }
        Struct after = value.getStruct("after");
        JSONObject afterData = new JSONObject();
        for (Field field : after.schema().fields()) {
            Object o = after.get(field);
            afterData.put(field.name(), o);
        }
        data.put("before", beforeData);
        data.put("after", afterData);

        //获取操作类型
        Envelope.Operation op = Envelope.operationFor(sourceRecord);

        //装配数据
        JSONObject object = new JSONObject();
        object.put("database", database);
        object.put("table", table);
        object.put("operation", op);
        object.put("data", data);

        collector.collect(object.toJSONString());
    }

    @Override
    public TypeInformation getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

在这里插入图片描述

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐