Debezium offset源码解析及回调代码
对debezium offset存储流程源码简单分析,分享调整offset代码
Debezium Engine offset调整记录
由于debezium程序是去年开发的,目前使用的是1.5.2.Final版本
首先确定程序offset提交方式
- offset提交到kafka topic
- 相关配置为
#offset kafka格式存储信息
bootstrap.servers=localhost:9092
offset.storage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore
offset.storage.topic=debezium-offset-1
offset.storage.partitions=1
offset.storage.replication.factor=1
#最小提交offset间隔
offset.flush.interval.ms=0
创建debeziumEngine,处理单条数据并提交offset代码
//创建debeziumEngine
public void createDebeziumEngine() {
DebeziumEngine<RecordChangeEvent<SourceRecord>> engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(properties)
.notifying(this::handlePayload)
.build();
engine.run();
}
//格式化数据并处理,提交offset
public void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordRecordCommitter) {
recordChangeEvents.forEach(
recordRecordChangeEvent -> {
SourceRecord sourceRecord = recordRecordChangeEvent.record();
// System.out.println("sourceRecord: "+sourceRecord);
Struct sourceRecordChangeValue = (Struct) sourceRecord.value();
Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
//如果是删除操作,获取操作前数据;否则获取操作后数据
String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;
Struct struct = (Struct) sourceRecordChangeValue.get(record);
//封装变更数据为map
Map<String, Object> payload = struct.schema().fields().stream()
.map(Field::name)
.filter(fieldName -> struct.get(fieldName) != null)
.map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
.collect(toMap(Pair::getKey, Pair::getValue));
Struct dbTb = (Struct) sourceRecordChangeValue.get("source");
String db = dbTb.get("db").toString();
String tb = dbTb.get("table").toString();
System.out.println(db+"."+tb+"表,数据为: "+payload);
//TODO 自定义数据处理
//可根据需要过滤掉获取到的signal表数据
//提交offset
try {
recordRecordCommitter.markProcessed(recordRecordChangeEvent);
recordRecordCommitter.markBatchFinished();
System.out.println("提交offset成功!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
);
}
- 提交offset代码是下面2行代码:
recordRecordCommitter.markProcessed(recordRecordChangeEvent);
recordRecordCommitter.markBatchFinished();
看一下这2个方法的说明:
/**
* Marks a single record as processed, must be called for each record.
* Params:
* record – the record to commit
*/
void markProcessed(R record) throws InterruptedException;
/**
* Marks a batch as finished, this may result in committing offsets/flushing data.
* Should be called when a batch of records is finished being processed.
*/
void markBatchFinished() throws InterruptedException;
-
markProcessed(R record):每条数据提交构建offsetWriter.toFlush属性,等待提交
-
markBatchFinished():每批数据提交offset
maybeFlush方法:
获取当前时间和lastCommit间隔时间:timeSinceLastCommitMillis
根据OffsetCommitPolicy.preformCommit判断是否符合提交offset的条件
如果不配置offset.commit.policy,我的程序里是没有配置的,默认使用PeriodicCommitOffsetPolicy
其中的performCommit方法判断是否满足offset条件,满足条件为:timeSinceLastCommit>minimumTime
再来看minimumTime的值,
可以看到minimumTime是获取了offset.flush.interval.ms配置项的值,
这个配置项的含义为:提交offset的间隔时间,单位为毫秒
在开头我们配置了offset.flush.interval.ms=0,即保证每条数据处理完后都满足提交offset的条件#最小提交offset间隔 offset.flush.interval.ms=0
当满足offset提交提交,进入commitOffsets方法,其中doFlush才是把offset信息提交到kafka topic的方法
到这里,我们熟悉了debezium提交offset的整个流程,下面来看看doFlush这个方法,怎么把offset提交到kafka中,以什么样的方式提交,了解了debezium提交offset的方法,我们才能模拟提交offset message到topic中,达到调整offset的目的
debezium提交offset到kafka topic源码简单分析
前面debezium已经把offset信息存入toFlush这个HashMap中,doFlush方法就是遍历toFlush,获取key和value并进行序列化,传到backingStore,backingStore类型为OffsetBackingStore
最终数据由KafkaBasedLog.send发送到topic
构建的ProducerRecord使用的topic为debezium-offset-1,即配置项offset.storage.topic的值
offset.storage.topic=debezium-offset-1
接下来我们需要创建相同配置的keyConverter和valueConverter类,在序列化相同的key和自定义offset值得value,send到offset.storage.topic中就可以实现自定义offset
首先看下keyConverter和valueConverter是哪个类
这两个类的类型如果没有配置internal.key.converter,internal.value.converter配置项,那么默认就是使用JsonConverter
我们也可以获取到keyConverter.fromConnectData()和valueConverter.fromConnectData()两个方法传入的参数值,根据这个来构建我们自定义的key和value,主要是namesparce,entry的key和value值
查看源码可以发现,namespace就是ENGINE_NAME,即配置项name的值
entry就是markProcessed()方法中构建的offsetWriter.toFlush属性
查看源码发现entry.key的值为database.server.name配置项
database.server.name=mysql-test-114
entry.value的值为有特定key的HashMap
其中gtids的值需要使用select * from mysql.gtid_executed 进行查询
debezium测试的部分配置和自定义的offset发送代码
#debezium connector信息
name=debezium-1.9.7.Final-114
connector.class=io.debezium.connector.mysql.MySqlConnector
database.server.name=mysql-test-114
database.server.id=12346
#offset kafka格式存储信息
bootstrap.servers=localhost:9092
offset.storage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore
offset.storage.topic=debezium-offset-1
offset.storage.partitions=1
offset.storage.replication.factor=1
#offset.storage.flush.interval=6000
- 发送自定义offset代码
public static void main(String[] args) {
Converter converter = new JsonConverter();
Converter converterValue = new JsonConverter();
String bootStrapServers = "localhost:9092";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
properties.put(ProducerConfig.RETRIES_CONFIG,Integer.MAX_VALUE);
KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
Map<String,String> keyMap = new HashMap<>();
keyMap.put("converter.type","key");
Map<String,String> valueMap = new HashMap<>();
valueMap.put("converter.type","value");
valueMap.put("schemas.enable","false");
converter.configure(keyMap,true);
converterValue.configure(valueMap,false);
List<Object> keyList = new ArrayList<>();
keyList.add(0,"debezium-1.9.7.Final-114");
keyList.add(1,Collections.singletonMap("server","mysql-test-114"));
Map<String,Object> valueMessageMap = new HashMap<>();
valueMessageMap.put("transaction_id",null);
valueMessageMap.put("ts_sec",1667526691);
valueMessageMap.put("file","binlog.000010");
valueMessageMap.put("pos",7776);
valueMessageMap.put("gtids","f09d7bcb-5752-11ed-8d01-e0be03323430:1-149");
valueMessageMap.put("snapshot",false);
byte[] key = converter.fromConnectData("debezium-1.9.7.Final-114",null,keyList);
byte[] value = converterValue.fromConnectData("debezium-1.9.7.Final-114",null,valueMessageMap);
producer.send(new ProducerRecord("debezium-offset-1",key,value));
producer.flush();
producer.close();
}
-
说明:
- keyList第一项为name配置项,第二项是database.server.name配置项构建的map
- valueMessageMap中
- transaction_id为null也可以不配置
- ts_sec可自定义,最好和当前时间一致,方便后续回溯
- 如果数据库没有开启gtid,则不需要gtids这项
- snapshot一般配置为false,因为需要回调binlog的需求大部分都是增量同步
-
疑问点:构建自定义offset value时,发现如果开启gtid,则此配置binlog回调优先级最高?
-
总结
- Producer发送信息后,重启debezium engine,发现从指定offset开始读取binlog
更多推荐
所有评论(0)