写在前面

Redis Stream 是 Redis 5.0 版本新增加的数据结构。

Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。

简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。

而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

Redis Stream 有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:
在这里插入图片描述
每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。

上图解析:

  • Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。
  • last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
  • pending_ids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。

stream的使用

消息队列相关命令:

  • XADD - 添加消息到末尾
  • XTRIM - 对流进行修剪,限制长度
  • XDEL - 删除消息
  • XLEN - 获取流包含的元素数量,即消息长度
  • XRANGE - 获取消息列表,会自动过滤已经删除的消息
  • XREVRANGE - 反向获取消息列表,ID 从大到小
  • XREAD - 以阻塞或非阻塞方式获取消息列表

消费者组相关命令:

  • XGROUP CREATE - 创建消费者组
  • XREADGROUP GROUP - 读取消费者组中的消息
  • XACK - 将消息标记为"已处理"
  • XGROUP SETID - 为消费者组设置新的最后递送消息ID
  • XGROUP DELCONSUMER - 删除消费者
  • XGROUP DESTROY - 删除消费者组
  • XPENDING - 显示待处理消息的相关信息
  • XCLAIM - 转移消息的归属权
  • XINFO - 查看流和消费者组的相关信息;
  • XINFO GROUPS - 打印消费者组的信息;
  • XINFO STREAM - 打印流信息

XADD命令

使用 XADD 向队列添加消息,如果指定的队列不存在,则创建一个队列。

基本语法:

XADD key ID field value [field value ...]

key :队列名称,如果不存在就创建
ID :消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。
field value : 记录,类似key-value的形式,每次add可以添加多个属性。

基本用法:

# 往streamkey1 中添加一个id为1,name为zhangsan,age为15的值,如果消息的ID我们设置为*,redis会自动生成一个ID并返回
127.0.0.1:6379> xadd streamkey1 * id 1 name zhangsan age 15
"1662703935644-0"
# 往streamkey1 中添加一个id为2,name为lisi,address为qingdao的值
127.0.0.1:6379> xadd streamkey1 * id 2 name lisi address qingdao
"1662703953898-0"
# 我们查看一下streamkey1中有多少个元素了
127.0.0.1:6379> xlen streamkey1
(integer) 2

XTRIM命令

XTRIM将流裁剪为指定数量的项目,如有需要,将驱逐旧的项目(ID较小的项目)。此命令被设想为接受多种修整策略,但目前只实现了一种,即MAXLEN,并且与XADD中的MAXLEN选项完全相同。

该命令返回从流中删除的条目数。

基本语法:

XTRIM key MAXLEN [~] count

key :队列名称
MAXLEN :长度
count :数量
在选项MAXLEN和实际计数中间的参数~的意思是,用户不是真的需要精确的count个项目。它可以多几十个条目,但决不能少于count个。通过使用这个参数,仅当我们移除整个节点的时候才执行修整。这使得命令更高效,而且这也是我们通常想要的。

基本用法:

# 再往streamkey1中 添加一个元素
127.0.0.1:6379> xadd streamkey1 * id 3 name wangwu
"1662704343522-0"
# 现在有三个元素了
127.0.0.1:6379> xlen streamkey1
(integer) 3
# 指定streamkey1 元素最大长度为2,返回值1为删除的元素个数
127.0.0.1:6379> xtrim streamkey1 MAXLEN 2
(integer) 1
# 现在只有两个元素了
127.0.0.1:6379> xlen streamkey1
(integer) 2
# 我们发现,删除了第一个元素
127.0.0.1:6379> xrange streamkey1 - +
1) 1) "1662703953898-0"
   2) 1) "id"
      2) "2"
      3) "name"
      4) "lisi"
      5) "address"
      6) "qingdao"
2) 1) "1662704343522-0"
   2) 1) "id"
      2) "3"
      3) "name"
      4) "wangwu"

# 此时继续往里放元素,还是能够正常放的
127.0.0.1:6379> xadd streamkey1 * id 4
"1662704516937-0"
127.0.0.1:6379> xrange streamkey1 - +
1) 1) "1662703953898-0"
   2) 1) "id"
      2) "2"
      3) "name"
      4) "lisi"
      5) "address"
      6) "qingdao"
2) 1) "1662704343522-0"
   2) 1) "id"
      2) "3"
      3) "name"
      4) "wangwu"
3) 1) "1662704516937-0"
   2) 1) "id"
      2) "4"
127.0.0.1:6379> xlen streamkey1
(integer) 3

XDEL命令

使用 XDEL 删除消息。

基本语法:

XDEL key ID [ID ...]

key:队列名称
ID :消息 ID

基本用法:

# streamkey1中有三个元素
127.0.0.1:6379> xrange streamkey1 - +
1) 1) "1662703953898-0"
   2) 1) "id"
      2) "2"
      3) "name"
      4) "lisi"
      5) "address"
      6) "qingdao"
2) 1) "1662704343522-0"
   2) 1) "id"
      2) "3"
      3) "name"
      4) "wangwu"
3) 1) "1662704516937-0"
   2) 1) "id"
      2) "4"
# 删除id为1662703953898-0的元素
127.0.0.1:6379> xdel streamkey1 1662703953898-0
(integer) 1
# 此时还剩下俩元素了
127.0.0.1:6379> xrange streamkey1 - +
1) 1) "1662704343522-0"
   2) 1) "id"
      2) "3"
      3) "name"
      4) "wangwu"
2) 1) "1662704516937-0"
   2) 1) "id"
      2) "4"

XLEN命令

使用 XLEN 获取流包含的元素数量,即消息长度。

基本语法:

XLEN key

基本用法:

# 查看streamkey1中元素个数
127.0.0.1:6379> xlen streamkey1
(integer) 2

XRANGE命令

使用 XRANGE 获取消息列表,会自动过滤已经删除的消息。

基本语法:

XRANGE key start end [COUNT count]

key :队列名
start :开始值, - 表示最小值
end :结束值, + 表示最大值
count :数量

基本用法:

# 查询streamkey1中所有元素
127.0.0.1:6379> xrange streamkey1 - +
1) 1) "1662704343522-0"
   2) 1) "id"
      2) "3"
      3) "name"
      4) "wangwu"
2) 1) "1662704516937-0"
   2) 1) "id"
      2) "4"
# 查询streamkey1中第一个元素
127.0.0.1:6379> xrange streamkey1 - + COUNT 1
1) 1) "1662704343522-0"
   2) 1) "id"
      2) "3"
      3) "name"
      4) "wangwu"

XREVRANGE命令

使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息。
跟XRANGE命令恰好相反,显示的结果是倒叙排列的。

基本语法:

XREVRANGE key end start [COUNT count]

key :队列名
end :结束值, + 表示最大值
start :开始值, - 表示最小值
count :数量

基本用法:

127.0.0.1:6379> xrange streamkey1 - +
1) 1) "1662704343522-0"
   2) 1) "id"
      2) "3"
      3) "name"
      4) "wangwu"
2) 1) "1662704516937-0"
   2) 1) "id"
      2) "4"
127.0.0.1:6379> xrevrange streamkey1 + -
1) 1) "1662704516937-0"
   2) 1) "id"
      2) "4"
2) 1) "1662704343522-0"
   2) 1) "id"
      2) "3"
      3) "name"
      4) "wangwu"

XREAD命令

使用 XREAD 以阻塞或非阻塞方式获取消息列表。

基本语法:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

count :数量
milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式
key :队列名
id :消息 ID,如果从头读的话,id写0即可

其中STREAMS是必须要输入的,后面的key和id可以输入多个,但是一定要一一对应。

基本用法:

# streamkey1中有两个元素
127.0.0.1:6379> xrange streamkey1 - +
1) 1) "1662704343522-0"
   2) 1) "id"
      2) "3"
      3) "name"
      4) "wangwu"
2) 1) "1662704516937-0"
   2) 1) "id"
      2) "4"
# 读取一个
127.0.0.1:6379> xread COUNT 1 STREAMS streamkey1 0
1) 1) "streamkey1"
   2) 1) 1) "1662704343522-0"
         2) 1) "id"
            2) "3"
            3) "name"
            4) "wangwu"
# 还剩两个
127.0.0.1:6379> xrange streamkey1 - +
1) 1) "1662704343522-0"
   2) 1) "id"
      2) "3"
      3) "name"
      4) "wangwu"
2) 1) "1662704516937-0"
   2) 1) "id"
      2) "4"
# 此时再读的话,还是会读到重复的数据
127.0.0.1:6379> xread COUNT 1 STREAMS streamkey1 0
1) 1) "streamkey1"
   2) 1) 1) "1662704343522-0"
         2) 1) "id"
            2) "3"
            3) "name"
            4) "wangwu"

XGROUP命令

使用 XGROUP 创建、销毁、管理消费者组。

基本语法:

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

key :队列名称,如果不存在就创建
groupname :组名。
$ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。

基本用法:

# 创建streamkey1 的streamgroup1,从头开始消费
127.0.0.1:6379> xgroup create streamkey1 streamgroup1 0-0
OK
# 创建streamkey1 的streamgroup1,从尾开始消费
127.0.0.1:6379> xgroup create streamkey1 streamgroup1 $
OK
# 销毁一个消费者组
127.0.0.1:6379> xgroup destroy streamkey1 streamgroup1
(integer) 1
# 移除一个消费者
XGROUP DELCONSUMER streamkey1 streamgroup1 myconsumer123

XREADGROUP GROUP命令

使用 XREADGROUP GROUP 读取消费组中的消息。
消费的消息会记录在pending列表里,等待xack确认。

基本语法:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

group :消费组名
consumer :消费者名。
count : 读取数量。
milliseconds : 阻塞毫秒数。
key : 队列名。
ID : 消息 ID。

基本用法:

# 队列中有两条消息
127.0.0.1:6379> xrange streamkey1 - +
1) 1) "1662704343522-0"
   2) 1) "id"
      2) "3"
      3) "name"
      4) "wangwu"
2) 1) "1662704516937-0"
   2) 1) "id"
      2) "4"
# 最后的ID我们不能指定为0,需要使用>符号来消费
127.0.0.1:6379> xreadgroup GROUP streamgroup1 streamconsumer1 COUNT 1 STREAMS streamkey1 0
1) 1) "streamkey1"
   2) (empty array)
# 消费key为streamkey1 、消费者组为streamgroup1 、消费者为streamconsumer1 ,消费一条消息
127.0.0.1:6379> xreadgroup GROUP streamgroup1 streamconsumer1 COUNT 1 STREAMS streamkey1 >
1) 1) "streamkey1"
   2) 1) 1) "1662704343522-0"
         2) 1) "id"
            2) "3"
            3) "name"
            4) "wangwu"
127.0.0.1:6379> xreadgroup GROUP streamgroup1 streamconsumer1 COUNT 1 STREAMS streamkey1 >
1) 1) "streamkey1"
   2) 1) 1) "1662704516937-0"
         2) 1) "id"
            2) "4"
127.0.0.1:6379> xreadgroup GROUP streamgroup1 streamconsumer1 COUNT 1 STREAMS streamkey1 >
(nil)

XINFO命令

用于检索关于流和关联的消费者组的不同的信息。

基本语法:

XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

基本用法:

# 查询消费者信息
127.0.0.1:6379> xinfo consumers streamkey1 streamgroup1
1) 1) "name"
   2) "streamconsumer1"
   3) "pending"
   4) (integer) 2
   5) "idle"
   6) (integer) 504511
# 查询组信息
127.0.0.1:6379> xinfo groups streamkey1
1) 1) "name"
   2) "streamgroup1"
   3) "consumers"
   4) (integer) 1
   5) "pending"
   6) (integer) 2
   7) "last-delivered-id"
   8) "1662704516937-0"
# 查询key的完整信息,包括所有条目、组、消费者信息
127.0.0.1:6379> xinfo stream streamkey1
 1) "length"
 2) (integer) 2
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1662704516937-0"
 9) "groups"
10) (integer) 1
11) "first-entry"
12) 1) "1662704343522-0"
    2) 1) "id"
       2) "3"
       3) "name"
       4) "wangwu"
13) "last-entry"
14) 1) "1662704516937-0"
    2) 1) "id"
       2) "4"

XPENDING命令

通过XREADGROUP命令消费的消息,其实并没有实际消费成功,之后使用XACK命令做出应答之后才算真正消费成功。
读取了但是没有应答的消息,存放在XPENDING列表中。

基本语法:

XPENDING key group [start end count] [consumer]

基本用法:

# 读取streamkey1中streamgroup1组的两条pending数据
127.0.0.1:6379> xpending streamkey1 streamgroup1 - + 2
1) 1) "1662704343522-0"
   2) "streamconsumer1"
   3) (integer) 1179060
   4) (integer) 1
2) 1) "1662704516937-0"
   2) "streamconsumer1"
   3) (integer) 1173716
   4) (integer) 1
# 读取streamkey1中streamgroup1组的3条pending数据(只有两条所以显示了两条)
127.0.0.1:6379> xpending streamkey1 streamgroup1 - + 3
1) 1) "1662704343522-0"
   2) "streamconsumer1"
   3) (integer) 1181652
   4) (integer) 1
2) 1) "1662704516937-0"
   2) "streamconsumer1"
   3) (integer) 1176308
   4) (integer) 1
# 读取streamkey1中streamgroup1组的两条pending数据,可以指定消费者
127.0.0.1:6379> xpending streamkey1 streamgroup1 - + 2 streamconsumer1
1) 1) "1662704343522-0"
   2) "streamconsumer1"
   3) (integer) 1199490
   4) (integer) 1
2) 1) "1662704516937-0"
   2) "streamconsumer1"
   3) (integer) 1194146
   4) (integer) 1
127.0.0.1:6379> xpending streamkey1 streamgroup1 - + 2 streamconsumer2
(empty array)

XCLAIM命令

在流的消费者组上下文中,此命令改变待处理消息的所有权, 因此新的所有者是在命令参数中指定的消费者。通常是这样的:

  • 假设有一个具有关联消费者组的流。
  • 某个消费者A在消费者组的上下文中通过XREADGROUP从流中读取一条消息。
  • 作为读取消息的副作用,消费者组的待处理条目列表(PEL)中创建了一个待处理消息条目:这意味着这条消息已传递给给定的消费者,但是尚未通过XACK确认。
  • 突然这个消费者出现故障,且永远无法恢复。
  • 其他消费者可以使用XPENDING检查已经过时很长时间的待处理消息列表,为了继续处理这些消息,他们使用XCLAIM来获得消息的所有权,并继续处理。

命令选项
该命令有多个选项,但是大部分主要用于内部使用,以便将XCLAIM或其他命令的结果传递到AOF文件, 以及传递相同的结果到从节点,并且不太可能对普通用户有用:

IDLE : 设置消息的空闲时间(自最后一次交付到目前的时间)。如果没有指定IDLE,则假设IDLE值为0,即时间计数被重置,因为消息现在有新的所有者来尝试处理它。
TIME : 这个命令与IDLE相同,但它不是设置相对的毫秒数,而是将空闲时间设置为一个指定的Unix时间(以毫秒为单位)。这对于重写生成XCLAIM命令的AOF文件很有用。
RETRYCOUNT : 将重试计数器设置为指定的值。这个计数器在每一次消息被交付的时候递增。通常,XCLAIM不会更改这个计数器,它只在调用XPENDING命令时提供给客户端:这样客户端可以检测到异常,例如在大量传递尝试后由于某种原因从未处理过的消息。
FORCE: 在待处理条目列表(PEL)中创建待处理消息条目,即使某些指定的ID尚未在分配给不同客户端的待处理条目列表(PEL)中。但是消息必须存在于流中,否则不存在的消息ID将会被忽略。
JUSTID: 只返回成功认领的消息ID数组,不返回实际的消息。

基本语法:

XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [force] [justid]

基本用法:

# 10秒后未应答,将该消息的所有权给streamconsumer1 
127.0.0.1:6379> xclaim streamkey1 streamgroup1 streamconsumer1 10000 1662704516937-0
1) 1) "1662704516937-0"
   2) 1) "id"
      2) "4"

XACK命令

将挂起的消息标记为已正确处理,从而有效地将其从使用者组的挂起条目列表中删除。命令的返回值是成功确认的消息数,也就是我们实际上能够在PEL中解析的id。

基本语法:

XACK key group ID [ID ...]

基本用法:

127.0.0.1:6379> xpending streamkey1 streamgroup1 - + 2 streamconsumer1
1) 1) "1662704516937-0"
   2) "streamconsumer1"
   3) (integer) 468271
   4) (integer) 2
2) 1) "1662709302078-0"
   2) "streamconsumer1"
   3) (integer) 416375
   4) (integer) 1
# id为1662704516937-0的消息,我确认处理完毕了,此时就会从pending列表删掉
127.0.0.1:6379> xack streamkey1 streamgroup1 1662704516937-0
(integer) 1
127.0.0.1:6379> xpending streamkey1 streamgroup1 - + 2 streamconsumer1
1) 1) "1662709302078-0"
   2) "streamconsumer1"
   3) (integer) 442613
   4) (integer) 1
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐