使用list实现异步消息队列

Redis 的 list(列表) 数据结构常用来作为异步消息队列使用,使用rpush和lpush操作入队列,使用lpop 和 rpop来出队列。客户端是通过队列的 pop 操作来获取消息,然后进行处理。处理完了再接着获取消息,再进行处理。如此循环往复,这便是作为队列消费者的客户端的生命周期。

可是如果队列空了,客户端就会陷入 pop 的死循环,不停地 pop,没有数据,接着再 pop,又没有数据。这就是浪费生命的空轮询。为了解决这个问题,当lpop没有消息的时候,可适当sleep后重试。

睡眠会导致消息的延迟增大。如果只有 1 个消费者,那么这个延迟就是 1s。如果有多个消费者,这个延迟会有所下降,因为每个消费者的睡觉时间是岔开来的。 有没有什么办法能显著降低延迟呢?当然也有,那就是 blpop/brpop。 这两个指令的前缀字符b代表的是blocking,也就是阻塞读。 阻塞读在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立刻醒过来,消息的延迟几乎为零。用blpop/brpop替代前面的lpop/rpop,就完美解决了上面的问题。

如果线程一直阻塞在哪里,Redis 的客户端连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用。这个时候blpop/brpop会抛出异常来,因此编写客户端消费者的时候要小心,注意捕获异常。也可以直接使用延时队列解决这个问题

使用发布-订阅模式实现异步消息队列

pub/sub的主题订阅者模式的优缺点:

  • 优点:可实现生产一次消费多次,实现1:N的消息队列。
  • 缺点:在消费者下线的情况下,生产的消息会丢失,需要专业的消息队列如rabbitMQ。

使用ZSet实现延时队列

延时队列可以通过 Redis 的 ZSet(有序列表) 来实现。时间戳为score,消息内容为key,调用zadd来生产消息,消费者用zrangebyscore指令获取数据轮询进行处理

为了保障可用性,也可以使用多个消费者线程用zrangebyscore指令获取数据轮询进行处理。多个线程是为了保障可用性,万一挂了一个线程还有其它线程可以继续处理。因为有多个线程,所以需要考虑并发争抢任务,确保任务不能被多次执行。 Redis 的 zrem 方法是多线程多进程争抢任务的关键,它的返回值决定了当前实例有没有抢到任务,因为 loop 方法可能会被多个线程、多个进程调用,同一个任务可能会被多个进程线程抢到,通过 zrem 来决定唯一的属主。 同时,我们要注意一定要对 handle_msg 进行异常捕获,避免因为个别任务处理问题导致循环异常退出。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐