redis-延迟队列
rocketmq在发送延时消息时,是先把消息按照延迟时间段发送到指定的队列中(把延时时间段相同的消息放到同一个队列中,保证了消息处理的顺序性,可以让同一个队列中消息延时时间是相同的,整个RocketMQ中延时消息时按照递增顺序排序,保证信息处理的先后顺序性。延迟队列可以通过 zset 来实现,因为 zset 中有一个 score,我们可以把时间作为 score,将 value 存到 redis 中
延迟队列
延迟消息队列使用场景
-
定时任务,比如任务A和任务B是同条流水线上的,当任务A完成了,一个小时后执行任务B
- 我们打车,在规定时间内,没有车主接单,那么平台就会推送消息给你,提示暂时没有车主接单。
- 网上支付场景,下单了,如果没有在规定时间付款,平台通常会发消息提示订单在有效期内没有支付完成,此订单自动取消之类的信息。
- 我们买东西,如果在一定时间内,没有对该订单进行评分,这时候平台会给一个默认分数给此订单。
-
重试业务,比如业务A需要调用其它服务,而服务出现问题,这时候就需要做业务重试
- 当分布式锁加锁失败时,将消息放入到延迟队列中处理
实现方式:
- 基于消息的延迟:指为每条消息设置不同的延迟时间,那么每当队列中有新消息进入的时候就会重新根据延迟时间排序,或者定义时间轮,新消息落在指定位置;
- 基于队列的延迟: 设置不同延迟级别的队列,比如5s、1min、30mins、1h等,每个队列中消息的延迟时间都是相同的。
基于第一种不少组件都有实现方案,比如redis的sortset间接实现,kafka内部时间轮,rmq可安装插件实现。第一种实时性高,不过主观看会比较依赖组件本身,但自己实现就得考虑持久化、高可用等问题,建议直接使用组件本身;第二种方案可以基于组件去实现,通用性会高点,不过实时性不高,更适合用于重试业务场景。当然Redis本身并不支持延迟队列,所以我们只是实现一个比较简单的延迟队列,而且Redis不太适合大量消息堆积,所以只适合比较简单的场景,然假如我们对消息的实时性以及可靠性要求非常高,可能就需要使用MQ或kafka来实现了。
消息延迟流程图
Redis实现延迟队列的基本原理
延迟队列可以通过 zset 来实现,因为 zset 中有一个 score,我们可以把时间作为 score,将 value 存到 redis 中,然后通过轮询的方式,去不断的读取消息出来
整体思路
1.消息体设置有效期,设置好score,然后放入zset中
2.通过排名拉取消息
3.有效期到了,就把当前消息从zset中移除
zadd命令
使用方式:ZADD key score member [[score member][score member] …]
将一个或多个 member 元素及其 score 值加入到有序集 key 当中。如果 key 不存在,则创建一个空的有序集并执行 ZADD 操作。如果某个 member 已经是有序集的成员,那么更新这个 member 的 score 值,并通过重新插入这个 member 元素,来保证该 member 在正确的位置上。score 值可以是整数值或双精度浮点数。
ZRANGEBYSCORE命令
使用方式:ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
1.返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。有序集成员按 score 值递增(从小到大)次序排列。
2.具有相同 score 值的成员按字典序来排列
3.可选的 LIMIT 参数指定返回结果的数量及区间(就像SQL中的 SELECT LIMIT offset, count ),注意当 offset 很大时,定位 offset 的操作可能需要遍历整个有序集,此过程最坏复杂度为 O(N) 时间。
4.可选的 WITHSCORES 参数决定结果集是单单返回有序集的成员,还是将有序集成员及其 score 值一起返回。
ZREM命令
使用方式:ZREM key member [member …]
移除有序集 key 中的一个或多个成员,不存在的成员将被忽略。
当 key 存在但不是有序集类型时,返回一个错误。
Redis用来进行实现延时队列是具有这些优势的:
- Redis zset支持高性能的 score 排序。
- Redis是在内存上进行操作的,速度非常快。
- 支持指定消息 remove
- Redis可以搭建集群,当消息很多时候,我们可以用集群来提高消息处理的速度,提高可用性。
- Redis具有持久化机制,当出现故障的时候,可以通过AOF和RDB方式来对数据进行恢复,保证了数据的可靠性
Redis延时队列劣势
- 使用 Redis 实现的延时消息队列也存在数据持久化, 消息可靠性的问题
- 没有重试机制 - 处理消息出现异常没有重试机制, 这些需要自己去实现, 包括重试次数的实现等
- 没有 ACK 机制 - 例如在获取消息并已经删除了消息情况下, 正在处理消息的时候客户端崩溃了, 这条正在处理的这些消息就会丢失, MQ 是需要明确的返回一个值给 MQ 才会认为这个消息是被正确的消费了
用消息中间件实现延时队列
Rabbitmq 延时队列
优点:消息持久化,分布式
缺点:延时相同的消息必须扔在同一个队列,每一种延时就需要建立一个队列。因为当后面的消息比前面的消息先过期,还是只能等待前面的消息过期,这里的过期检测是惰性的。
使用: RabbitMQ 可以针对 Queue 设置 x-expires 或者针对 Message 设置 x-message-ttl ,来控制消息的生存时间(可以根据 Queue 来设置,也可以根据 message 设置), Queue 还可以配置 x-dead-letter-exchange 和 x-dead-letter-routing-key (可选)两个参数, 如果队列内出现了 dead letter ,则按照这两个参数重新路由转发到指定的队列,此时就可以实现延时队列了。
RocketMQ实现延时队列
rocketmq在发送延时消息时,是先把消息按照延迟时间段发送到指定的队列中(把延时时间段相同的消息放到同一个队列中,保证了消息处理的顺序性,可以让同一个队列中消息延时时间是相同的,整个RocketMQ中延时消息时按照递增顺序排序,保证信息处理的先后顺序性。)。之后,通过一个定时器来轮询处理这些队列里的信息,判断是否到期。对于到期的消息会发送到相应的处理队列中,进行处理。
Kafka实现延时队
Kafka基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer),Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,可以进行相关的延时队列设置。
Netty实现延时队列
Netty也有基于时间轮算法来实现延时队列。Netty在构建延时队列主要用HashedWheelTimer,HashedWheelTimer底层数据结构是使用DelayedQueue,采用时间轮的算法来实现。
更多推荐
所有评论(0)