Redis高级:消息队列

1 认识消息队列

什么是消息队列:字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

在这里插入图片描述

使用消息队列的好处在于解耦:生产者将消息发送到消息队列中,不必等待消费者接收消息,而消费者只需要在消息队列中有消息时取出来进行处理即可,这样就实现了消息发送与处理的解耦,因为这个过程是异步实现的

常见的消息队列相信大家都有所了解,例如rabbitmq,kafka等等,其实redis也是可以用来做消息队列的


2 基于List实现消息队列

Redis的List数据结构是一个双向链表,很容易模拟出队列效果。

消息队列的消息入口和消息出口不在同一边,而List数据结构中提供了相应的命令,我们可以在List左侧插入数据然后在右侧取出,或则在右侧插入然后在左侧取出,对应的命令就是LPUSH与RPOP或者RPUSH与LPOP。但是我们需要注意的是如果我们使用RPOP或者LPOP取出数据时,如果队列中没有数据,是会直接返回null的,如果我们想模拟出阻塞队列的效果(即队列中没有数据时会阻塞等待,直到有数据时再取出),这里应该使用BRPOP或者BLPOP

在这里插入图片描述

演示如下:

首先开启两个redis客户端,客户端一模拟生产者,客户端二模拟消费者

首先在客户端二输入以下命令:

# 在key为k1的list右侧阻塞式地获取一条数据 等待时间为1000秒
brpop k1 1000

当我们回车后,客户端二会进入阻塞,因为此时k1中是没有数据的

在客户端一中输入以下命令:

# 在k1中插入一条数据
lpush k1 v1

此时再看客户端二,就发现客户端二已经拿到了数据:

在这里插入图片描述

基于List实现的消息队列有哪些优点和缺点?

优点:

  • 利用Redis服务的内存进行消息存储
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失,当消息被消费者取出之后,就已经在List中被移除了,如果该消费者出现故障等因素没有处理消息,那么这条消息就永久丢失了
  • 只支持单消费者,List中的消息只能被一个消费者接受并消费,无法实现一条消息被很多消费者消费的需求。

3 基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

相关命令如下:

  • subscribe channel [channel] :订阅一个或多个频道
  • publish channel msg :向一个频道发送消息
  • psubscribe pattern [pattern] :订阅与pattern格式匹配的所有频道

关于 PubSub 的具体命令使用方法可以参看官网:https://redis.io/commands/?group=pubsub

在这里插入图片描述

具体演示如下:

首先开启三个客户端,客户端一模拟生产者,客户端二与客户端三模拟消费者

客户端二中执行以下命令:

# 订阅名为order.q1的channel
subscribe order.q1

客户端三中执行以下命令

# 订阅所有以order.开头的channel
psubscribe order.*

当我们在以上两个客户端中执行以上命令并回车后,它们就会进入阻塞状态,等待接收channel中的消息,如下所示:

在这里插入图片描述

在这里插入图片描述

在客户端一中执行以下命令:

# 在频道order.q1中发布一条消息,内容为hello
publish order.q1 hello

当我们执行完上述命令之后,发现返回值为2,说明有两个消费者接收到该消息

在这里插入图片描述

这时候我们再查看客户端二与客户端三,就可以发现这两个消费者已经接收到客户端一发出去的消息了

在这里插入图片描述

在这里插入图片描述

消费者在消费一条消息后仍然会继续阻塞,等待生产者发布下一条消息

我们可以在客户端一中在执行一条命令

# 在频道order.q2中发布一条消息,内容为hello
publish order.q2 hello

返回值为1,说明只有一个消费者接收到了消息

在这里插入图片描述

再查看客户端二与客户端三,发现只有客户端三收到了消息。这是因为客户端三是订阅了所有以order开头的频道,而客户端二只订阅了名为order.q1的频道

在这里插入图片描述

在这里插入图片描述

基于PubSub的消息队列有哪些优点和缺点?

优点:

  • 采用发布订阅模型,支持多生产、多消费,一条消息可以发给多个消费者,也可以发给一个消费者

缺点:

  • 不支持数据持久化。本身不像List结构那样支持数据持久化,List结构本身就是用来存储数据的,而PubSub则是用来做消息发送的,因此,当一个生产者在频道中发送了一条消息后,如果没有任何消费者订阅该频道,那么该条消息就会直接消失。
  • 消息堆积有上限,超出上限时数据会丢失。当发送一条消息到频道时,如果有消费者监听,那么该消费者就会将频道中的消息缓存至消息缓存区,由消费者进行处理,而消费者的缓存空间是有上限的,如果超出了这个上限数据就会丢失。

4 基于Stream的消息队列

Stream是Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列,而且由于Stream本身是一种数据类型,是可以做消息持久化的,在数据安全方面可以得到保证

4.1 单消费模式

我们先来了解一下Stream最基本的发送消息和读取消息的命令

1) 发送消息的命令

XADD  key  [NOMKSTREAM]  [MAXLEN|MINID [=|~] threshold [LIMIT count]]  *|ID  field value [field value ...]

# key:指定队列
# [NOMKSTREAM]:如果队列不存在,就不创建队列,不设置则表示如果队列不存在就自动创建队列,一般不设置
# [MAXLEN|MINID [=|~] threshold [LIMIT count]]:设置消息队列容许接受的最大消息数量,不设置则表示无数量限制,一般不设置
# *|ID:消息的唯一id,*代表由redis自动生成。id格式是"时间戳-递增数字",例如"1644804662707-0",一般设置为*
# field value [field value ...]:发送到消息队列中的消息,称为entry,格式就是多个key-value键值对

案例

# 创建名为users的队列,并向其中发送一个消息,内容是{name=jack,age=21},并且使用redis自动生成ID
xadd users * name jack age 21

2 )读取消息的命令

XREAD  [COUNT count]  [BLOCK milliseconds]  STREAMS key [key ...]  ID [ID ...]

# [COUNT count]:每次读取消息的最大数量,不指定则全部读取
# [BLOCK milliseconds]:当没有消息时,是否阻塞,以及阻塞时长,不设置则不阻塞
# STREAMS key [key ...]:要从哪个队列读取消息,可以指定多个队列
# ID [ID ...]:消息读取的起始id,只读取大于该id的消息,设置为0时表示从第一个消息开始读取,设置为*时表示只读取最新的消息

案例一:从users中读取一条消息,从第一条消息开始读

xread count 1 streams users 0

需要注意,已经被读取过的消息可以被重复读取,消息是会一直被保存在队列中的,我们只需要指定消息id就可以拿到该消息

案例二:从users中读取一条消息,读取最新的一条消息

xread count 1 streams users $

我们发现,当我们使用上述命令尝试获取最新的一条消息时,返回的结果是null,这是因为消费者认为,在执行这条命令之前所有已经存在于队列中的消息,无论是已经读取的还是没有被读取的,都是旧消息,因此我们只能借用阻塞的形式来读取消息:

xread count 1 block 10000 streams users $

需要注意的是,当我们读取数据的时候,如果指定指定起始ID为$,那么消费者就永远只会处理最新的一条消息,如果消费者在处理一条消息的过程中,又有超过1条以上的消息达到队列,则下次获取时也只会去处理最新的一条消息,这样就会出现消息漏读的问题。

STREAM类型消息队列的XREAD命令具有如下特点:

  • 消息可回溯,消息读完后不会被移除,会一直保存在队列中
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

4.2 消费者组

为了解决上述XREAD命令的问题,我们可以使用消费者组(Consumer Group),所谓的消费者组就是将多个消费者划分到一个组中,监听同一个队列。它具备下列特点:

  • 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度。处于一个组内的多个消费者实际上是竞争关系,凡是进入到这个组的消息,组内的消费者就会竞争该消息的处理权。这种方式可以大大提高消息的处理速度,避免消息堆积。如果想要一条消息被多个消费者处理,可以添加多个消费者组。
  • 消息标识:消费者组会维护一个标识,记录最后一条被处理的消息,哪怕消费者宕机重启,还会从标识之后读取消息。确保每一个消息都会被消费。
  • 消息确认:消费者获取消息后,消息处于pending(待定)状态,然后会存入一个pending-list。当消息处理完成后需要通过 XACK 来确认消息,标记消息为已处理,这条消息才会从 pending-list 中移除。确保消费者拿到消息后对消息的处理

在这里插入图片描述

有关于消费者组的常见命令如下:

1)创建消费者组

xgroup create key groupName ID [mkstream]

# key:队列名称
# groupName:消费者组名称
# ID:消息处理的起始ID,$代表从队列中最新的一条消息,0则代表队列中第一个消息
# [mkstream]:队列不存在时自动创建队列

2) 删除指定的消费者组

XGROUP DESTORY key groupName

3)给指定的消费者组添加消费者

XGROUP CREATECONSUMER key groupname consumername

4)删除消费者组中的指定消费者

XGROUP DELCONSUMER key groupname consumername

5)从消费者组读取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

# group:消费者组名称,从哪个消费者组中读取消息
# consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
# [COUNT count]:获取消息的最大数量
# [BLOCK milliseconds]:当没有消息时最长等待时间
# [NOACK]:当消费者从消费者组中获取到消息后直接确认消息已处理,也就是说消息不会被存入pending-list,一般不建议设置noack
# STREAMS key:指定队列名称
# ID:获取消息的起始ID。如果我们给定了“>”,则表示从下一个未消费的消息开始处理,如果我们给定的是id,则表示根据指定id从               pending-list中获取已消费但未确认的消息,例如给定0是从pending-list中的第一个消息开始处理。一般我们会给定“>”

案例演示:

新建一个消息队列stu,并在消息队列中插入几条数据:

127.0.0.1:6379> xadd stu * name tom age 18
"1663751351087-0"
127.0.0.1:6379> xadd stu * name rose age 20
"1663751372363-0"
127.0.0.1:6379> xadd stu * name jack age 19
"1663751394568-0"
127.0.0.1:6379> xadd stu * name jetty age 25
"1663751412515-0"

创建一个消费者组,监听stu

127.0.0.1:6379> xgroup create stu stuGroup 0
OK

在消费者组中添加一个消费者,并读取消费者组中的一条消息

127.0.0.1:6379> xreadgroup group stuGroup cons1 count 1 block 1000 streams stu >
1) 1) "stu"
   2) 1) 1) "1663751351087-0"
         2) 1) "name"
            2) "tom"
            3) "age"
            4) "18"

这里我们对于消息的起始id给定的是">",也就是处理下一条未被处理的消息,这里处理的消息是第一条,我们不妨再次重试上述命令

127.0.0.1:6379> xreadgroup group stuGroup cons1 count 1 block 1000 streams stu >
1) 1) "stu"
   2) 1) 1) "1663751372363-0"
         2) 1) "name"
            2) "rose"
            3) "age"
            4) "20"

现在处理的消息已经变成第二条了

之前我们说过,基于消费者组的方式来读取消息,队列中的消息不会被重复消费,我们可以在消费者组中再添加一个消费者,并尝试再读取一次消息,看看读到的消息会不会重复:

127.0.0.1:6379> xreadgroup group stuGroup cons2 count 1 block 1000 streams stu >
1) 1) "stu"
   2) 1) 1) "1663751394568-0"
         2) 1) "name"
            2) "jack"
            3) "age"
            4) "19"

现在处理的消息变成了第三条,这也就是我们设置id起始位置为>的作用,消费者只会去处理队列中未被处理的消息。

当然,仅仅只是处理消息还不够,我们之前讲过,消费者获取消息后,消息处于pending(待定)状态,会存入一个pending-list。当消息处理完成后需要通过 XACK 来确认消息,标记消息为已处理,这条消息才会从 pending-list 中移除。确认消息已处理的命令如下:

XACK key group ID [ID ...]

# key:指定队列
# group:指定消费者组
# ID:消息ID

例如我们现在要确认第三条消息已经被处理了:

127.0.0.1:6379> xack stu stuGroup 1663751394568-0
(integer) 1

那怎么确定这条消息已经从pending-list中被移除了呢?我们可以通过以下命令查询pending-list中的消息

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

# key:指定队列
# group:指定消费者组
# [[IDLE min-idle-time]:指定空闲时间,空闲时间是指消费者接收消息以后到现在的时间,这段时间内消费者接收了消息,但是并未确认消                         息如果我们指定5000毫秒,就是指查询5000毫秒内被消费者接收的消息  
# start:需要查询的最小id,指定-表示不限制下限
# end:需要查询的最大id,指定+表示不限制上限
# count:需要查询的消息数量
# [consumer]:根据消费者查询消息,即查询已经被某个消费者处理但是却并未得到该消费者确认的消息

我们之前从stu中取出了三条数据,只确认了最后一条,还剩两条并未确认,我们可以通过上述命令查看是否是这样:

127.0.0.1:6379> xpending stu stuGroup - + 10
1) 1) "1663751351087-0"
   2) "cons1"
   3) (integer) 1769286
   4) (integer) 1
2) 1) "1663751372363-0"
   2) "cons1"
   3) (integer) 1590897
   4) (integer) 1
127.0.0.1:6379> 

如果我们想要处理pending-list中未确认的消息,可以将id起始位置由>改成0,表示处理pending-list中第一条未被处理的消息

127.0.0.1:6379> xreadgroup group stuGroup cons1 count 1 block 1000 streams stu 0
1) 1) "stu"
   2) 1) 1) "1663751351087-0"
         2) 1) "name"
            2) "tom"
            3) "age"
            4) "18"

需要注意的是,如果我们要处理pending-list中的消息,只能用接收该消息的消费者去处理,例如我现在希望使用消费者cons2去处理pending-list中第一条未被处理的消息,就会无法获取到

127.0.0.1:6379> xreadgroup group stuGroup cons2 count 1 block 1000 streams stu 0
1) 1) "stu"
   2) (empty array)

这是因为第一条消息最开始就是由cons1来处理的,这里这样设计的目的也是为了防止重复消费,如果消息在被cons1确定之前被cons2拿到了,那么这两个消费者就同时处理了这条消息,就会出现重复消费的情况

STREAM类型消息队列的基于消费者组读取消息具有如下特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次

最后针对三种消息队列的特点做一个对比:

在这里插入图片描述

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐