1. 问题背景

在程序已经稳定运行多天、未对代码做任何修改、查看所消费数据源未出现数据增多的情况下,有一个flink程序最近出现了积压问题,很是疑惑,观察几天并查看了日志发现,每当出现加压时便会伴随该日志出现,因此便着手解决该问题。
在这里插入图片描述

2. 解决问题

在网上搜索一番后,同时看了kafka配置方面的内容,就修改了如下两个配置
session.timeout.ms=30000增加至60000;
request.timeout.ms=20000增加至40000;
当时确实起作用了,不再出现积压,也不会再出现这样的日志,可是过了一段时间后又出现了积压并伴随该日志出现,于是又分别将上述量配置增加至80000和40000,但这次没像上次,重启程序后立刻出现该日志并且伴随数据积压。于是我又检查了下其他参数,发现我的max.poll.records=20000,但是默认值是500,于是又是在网上搜索一番,发现有博主说每条消息处理的时间t和一次拉取的记录数num和max.poll.interval=ms(默认300000ms)具有一定的关系,即:t*num<ms,否则就会被 Coordinator 剔除消息组然后重平衡,如果该说法合理的话,那就是说明我的max.poll.records设置的值太大,于是我把max.poll.records设置为10000,看看什么效果。

另外,最近一次出现严重的数据积压问题,分别查看了消费topic和生产topic的每秒曲线,在消费的topic数据量没有出现增加的情况下,发现生产topic的数据量极大的增加了,所以我很怀疑,也是因为上述的问题导致,消费后的数据又重新消费了,导致生产大量的重复数据。

1. 一次 kafka 消息堆积问题排查
2. Kafka几个常见的错误
3. Kafka stream 开发碰到的问题
4. Flink读取Kafka报Error sending fetch request

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐