flink 写入kafka报错 *** ms has passed since batch creation plus linger time
首先我们来看看batchsize 和linger timeKafka需要在吞吐量和延迟之间取得平衡,可以通过下面两个参数控制.batch.size当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送批次的大小可以通过batch.size 参数设置.默认是16KB较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。一个非常大的批次大小可能会浪费内存。因
首先我们来看看batchsize 和linger time
Kafka需要在吞吐量和延迟之间取得平衡,可以通过下面两个参数控制.
batch.size
当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送
批次的大小可以通过batch.size 参数设置.默认是16KB
较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。
一个非常大的批次大小可能会浪费内存。因为我们会预先分配这个资源。
例子
比如说发送消息的频率就是每秒300条,那么如果比如batch.size调节到了32KB,或者64KB,是否可以提升发送消息的整体吞吐量。
因为理论上来说,提升batch的大小,可以允许更多的数据缓冲在里面,那么一次Request发送出去的数据量就更多了,这样吞吐量可能会有所提升。
但是这个东西也不能无限的大,过于大了之后,要是数据老是缓冲在Batch里迟迟不发送出去,那么岂不是你发送消息的延迟就会很高。
比如说,一条消息进入了Batch,但是要等待5秒钟Batch才凑满了64KB,才能发送出去。那这条消息的延迟就是5秒钟。
所以需要在这里按照生产环境的发消息的速率,调节不同的Batch大小自己测试一下最终出去的吞吐量以及消息的延迟,设置一个最合理的参数。
linger.ms
上面比如我们设置batch size为32KB,但是比如有的时刻消息比较少,过了很久,比如5min也没有凑够32KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间,即使数据没达到32KB,也将这个批次发送出去. 比如设置5ms,就是到了5ms,大小没到32KB,也会发出去
总结
同时设置batch.size和 linger.ms,就是哪个条件先满足就都会将消息发送出去
Kafka需要考虑高吞吐量与延时的平衡.
于是我们回到这个问题 网上其他答案比如查看网络原因之类的 我这边不work 注意到我这边原来的linger time没有配置 但是batch size 配置10k以上 评估当前环境的sink速度 加上了一个lingertime的配置 我这边配置成了10000ms 问题解决。
更多推荐
所有评论(0)