1. 问题描述

        本地调试过程中发现,明明设置了kafka自动提交

enable.auto.commit = true

        但flink任务启动后,看日志,有时还是

enable.auto.commit = false

        kafka的 enable.auto.commit时而为true时而为false,导致的现象是,每次重启flink测试任务,有时是从kafka最新消息开始消费(enable.auto.commit=true),有时是从更早之前消费(enable.auto.commit=false)。

2. 问题验证

2.1 flink taskmanager并行度slot设置和kafka topic分区数量不匹配

        刚开始以为本地环境,kafka topic只设置了 分区,之前为测试将flink-conf.yaml配置文件中的slot设置大了

taskmanager.numberOfTaskSlots: 34

        改成并行度为1,重启flink,刚开始好像ok,但后面又出现了kafka的enable.auto.commit时而为true时而为false的问题。

        所以问题验证失败,还要继续找原因。

2.2 flink checkpoint和kafka offset设置有冲突

        查看flink官方文档:https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/kafka/

Consumer Offset Committing #

Kafka source commits the current consuming offset when checkpoints are completed, for ensuring the consistency between Flink’s checkpoint state and committed offsets on Kafka brokers.

If checkpointing is not enabled, Kafka source relies on Kafka consumer’s internal automatic periodic offset committing logic, configured by enable.auto.commit and auto.commit.interval.ms in the properties of Kafka consumer.

Note that Kafka source does NOT rely on committed offsets for fault tolerance. Committing offset is only for exposing the progress of consumer and consuming group for monitoring.

        意思是如果checkpoint开启了,kafka offset会在checkpoint完成的时候自动提交,如果没开checkpoint关闭,kafka offset的提交依赖于enable.auto.commit和auto.commit.interval.ms两个配置

        看自己flink任务配置,开启了checkpoint,checkpoint间隔10分钟,超时时间1分钟。

flink.checkpoint.use=true
flink.checkpoint.interval=600000
flink.checkpoint.timeout=60000

         本地测试验证问题的时候,很多是1分钟内解决战斗,开启了checkpoint,但没有完成过一次checkpoint,导致kafka每次消费后都没有提交,下次重启任务的时候,又是重头开始消费。

        另外大胆猜测,开启了checkpoint后,enable.auto.commit的设置可能是随机的。这个没有找到确凿证据,但从多次验证来看,目前是这样的。

3. 问题总结

        从上面推测和验证来看flink开启checkpoint后,会影响kafka的enable.auto.commit设置项,从而影响任务重启后的消息偏移。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐