kafka 延迟队列
最近在看Kafka延迟队列的实现方式,发现大部分讲的都很片面,都是时间轮相关的东西,搞得一知半解的,最终根据自己的理解,设计了一套延迟队列,和大家一起讨论一下,服务流程如下如图所示,所有的消息进来之后,都会被分配到delay队列中,然后delay队列消费消息满足时间要求后再发送到业务队列中,这样做的目的是避免消息阻塞,如果我们没有delay队列,所有消息都在业务队列中,那必然会产生一定的堆积,因为
最近在看Kafka延迟队列的实现方式,发现大部分讲的都很片面,都是时间轮相关的东西,搞得一知半解的,最终根据自己的理解,设计了一套延迟队列,和大家一起讨论一下,服务流程如下
如图所示,所有的消息进来之后,都会被分配到delay队列中,然后delay队列消费消息满足时间要求后再发送到业务队列中,这样做的目的是避免消息阻塞,如果我们没有delay队列,所有消息都在业务队列中,那必然会产生一定的堆积,因为这个队列本身要做的事情太多,delay队列就是为了分担他的压力
这里说下为什么有三个delay队列,我这里其实是想根据业务划分优先级,也是为了可以减少消息的延迟,将数据做了归类,比如延迟1分钟左右的数据,放在高优先级的队列中, 10分钟延迟的放在中优先级,2小时延迟的放在低优先级
具体的操作方式:每个delay队列中的消息,不仅要存当前消息的内容,还要存下一个要消费的消息位置(offet),这样就可以避免我们以 O(n)的复杂度去遍历队列,检查要执行的队列数据,另外考虑到有新增数据插队的情况,需要在缓存中也维护一份当前最优先的offet值,方便我们做插队处理
kafka中可以修改offet值,通过seek() 函数即可
这里还要考虑一个问题,就是当最优先的队列数据还有1小时才要执行,那我们怎么处理,是sleep吗?如果sleep太久的话,当程序代码不能在max.poll.interval.ms
配置的期望时间内处理这些消息的话,kafka就会认为这个消费者已经挂了,会进行rebalance
,同时你这个消费者就无法再拉取到任何消息了,Kafka本身提供很优雅的解决方案,pause() 方法可以暂定消费,resume() 方法可以恢复消费,这样就不会出现异常了
更多推荐
所有评论(0)