redis中stream数据结构使用详解——redis最适合做消息队列的数据结构
Redis Stream 是 Redis 5.0 版本新增加的数据结构。Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。
文章目录
写在前面
Redis Stream 是 Redis 5.0 版本新增加的数据结构。
Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。
简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。
而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
Redis Stream 有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:
每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。
上图解析:
- Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。
- last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
- pending_ids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。
stream的使用
消息队列相关命令:
- XADD - 添加消息到末尾
- XTRIM - 对流进行修剪,限制长度
- XDEL - 删除消息
- XLEN - 获取流包含的元素数量,即消息长度
- XRANGE - 获取消息列表,会自动过滤已经删除的消息
- XREVRANGE - 反向获取消息列表,ID 从大到小
- XREAD - 以阻塞或非阻塞方式获取消息列表
消费者组相关命令:
- XGROUP CREATE - 创建消费者组
- XREADGROUP GROUP - 读取消费者组中的消息
- XACK - 将消息标记为"已处理"
- XGROUP SETID - 为消费者组设置新的最后递送消息ID
- XGROUP DELCONSUMER - 删除消费者
- XGROUP DESTROY - 删除消费者组
- XPENDING - 显示待处理消息的相关信息
- XCLAIM - 转移消息的归属权
- XINFO - 查看流和消费者组的相关信息;
- XINFO GROUPS - 打印消费者组的信息;
- XINFO STREAM - 打印流信息
XADD命令
使用 XADD 向队列添加消息,如果指定的队列不存在,则创建一个队列。
基本语法:
XADD key ID field value [field value ...]
key :队列名称,如果不存在就创建
ID :消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。
field value : 记录,类似key-value的形式,每次add可以添加多个属性。
基本用法:
# 往streamkey1 中添加一个id为1,name为zhangsan,age为15的值,如果消息的ID我们设置为*,redis会自动生成一个ID并返回
127.0.0.1:6379> xadd streamkey1 * id 1 name zhangsan age 15
"1662703935644-0"
# 往streamkey1 中添加一个id为2,name为lisi,address为qingdao的值
127.0.0.1:6379> xadd streamkey1 * id 2 name lisi address qingdao
"1662703953898-0"
# 我们查看一下streamkey1中有多少个元素了
127.0.0.1:6379> xlen streamkey1
(integer) 2
XTRIM命令
XTRIM将流裁剪为指定数量的项目,如有需要,将驱逐旧的项目(ID较小的项目)。此命令被设想为接受多种修整策略,但目前只实现了一种,即MAXLEN,并且与XADD中的MAXLEN选项完全相同。
该命令返回从流中删除的条目数。
基本语法:
XTRIM key MAXLEN [~] count
key :队列名称
MAXLEN :长度
count :数量
在选项MAXLEN和实际计数中间的参数~的意思是,用户不是真的需要精确的count个项目。它可以多几十个条目,但决不能少于count个。通过使用这个参数,仅当我们移除整个节点的时候才执行修整。这使得命令更高效,而且这也是我们通常想要的。
基本用法:
# 再往streamkey1中 添加一个元素
127.0.0.1:6379> xadd streamkey1 * id 3 name wangwu
"1662704343522-0"
# 现在有三个元素了
127.0.0.1:6379> xlen streamkey1
(integer) 3
# 指定streamkey1 元素最大长度为2,返回值1为删除的元素个数
127.0.0.1:6379> xtrim streamkey1 MAXLEN 2
(integer) 1
# 现在只有两个元素了
127.0.0.1:6379> xlen streamkey1
(integer) 2
# 我们发现,删除了第一个元素
127.0.0.1:6379> xrange streamkey1 - +
1) 1) "1662703953898-0"
2) 1) "id"
2) "2"
3) "name"
4) "lisi"
5) "address"
6) "qingdao"
2) 1) "1662704343522-0"
2) 1) "id"
2) "3"
3) "name"
4) "wangwu"
# 此时继续往里放元素,还是能够正常放的
127.0.0.1:6379> xadd streamkey1 * id 4
"1662704516937-0"
127.0.0.1:6379> xrange streamkey1 - +
1) 1) "1662703953898-0"
2) 1) "id"
2) "2"
3) "name"
4) "lisi"
5) "address"
6) "qingdao"
2) 1) "1662704343522-0"
2) 1) "id"
2) "3"
3) "name"
4) "wangwu"
3) 1) "1662704516937-0"
2) 1) "id"
2) "4"
127.0.0.1:6379> xlen streamkey1
(integer) 3
XDEL命令
使用 XDEL 删除消息。
基本语法:
XDEL key ID [ID ...]
key:队列名称
ID :消息 ID
基本用法:
# streamkey1中有三个元素
127.0.0.1:6379> xrange streamkey1 - +
1) 1) "1662703953898-0"
2) 1) "id"
2) "2"
3) "name"
4) "lisi"
5) "address"
6) "qingdao"
2) 1) "1662704343522-0"
2) 1) "id"
2) "3"
3) "name"
4) "wangwu"
3) 1) "1662704516937-0"
2) 1) "id"
2) "4"
# 删除id为1662703953898-0的元素
127.0.0.1:6379> xdel streamkey1 1662703953898-0
(integer) 1
# 此时还剩下俩元素了
127.0.0.1:6379> xrange streamkey1 - +
1) 1) "1662704343522-0"
2) 1) "id"
2) "3"
3) "name"
4) "wangwu"
2) 1) "1662704516937-0"
2) 1) "id"
2) "4"
XLEN命令
使用 XLEN 获取流包含的元素数量,即消息长度。
基本语法:
XLEN key
基本用法:
# 查看streamkey1中元素个数
127.0.0.1:6379> xlen streamkey1
(integer) 2
XRANGE命令
使用 XRANGE 获取消息列表,会自动过滤已经删除的消息。
基本语法:
XRANGE key start end [COUNT count]
key :队列名
start :开始值, - 表示最小值
end :结束值, + 表示最大值
count :数量
基本用法:
# 查询streamkey1中所有元素
127.0.0.1:6379> xrange streamkey1 - +
1) 1) "1662704343522-0"
2) 1) "id"
2) "3"
3) "name"
4) "wangwu"
2) 1) "1662704516937-0"
2) 1) "id"
2) "4"
# 查询streamkey1中第一个元素
127.0.0.1:6379> xrange streamkey1 - + COUNT 1
1) 1) "1662704343522-0"
2) 1) "id"
2) "3"
3) "name"
4) "wangwu"
XREVRANGE命令
使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息。
跟XRANGE命令恰好相反,显示的结果是倒叙排列的。
基本语法:
XREVRANGE key end start [COUNT count]
key :队列名
end :结束值, + 表示最大值
start :开始值, - 表示最小值
count :数量
基本用法:
127.0.0.1:6379> xrange streamkey1 - +
1) 1) "1662704343522-0"
2) 1) "id"
2) "3"
3) "name"
4) "wangwu"
2) 1) "1662704516937-0"
2) 1) "id"
2) "4"
127.0.0.1:6379> xrevrange streamkey1 + -
1) 1) "1662704516937-0"
2) 1) "id"
2) "4"
2) 1) "1662704343522-0"
2) 1) "id"
2) "3"
3) "name"
4) "wangwu"
XREAD命令
使用 XREAD 以阻塞或非阻塞方式获取消息列表。
基本语法:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
count :数量
milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式
key :队列名
id :消息 ID,如果从头读的话,id写0即可
其中STREAMS是必须要输入的,后面的key和id可以输入多个,但是一定要一一对应。
基本用法:
# streamkey1中有两个元素
127.0.0.1:6379> xrange streamkey1 - +
1) 1) "1662704343522-0"
2) 1) "id"
2) "3"
3) "name"
4) "wangwu"
2) 1) "1662704516937-0"
2) 1) "id"
2) "4"
# 读取一个
127.0.0.1:6379> xread COUNT 1 STREAMS streamkey1 0
1) 1) "streamkey1"
2) 1) 1) "1662704343522-0"
2) 1) "id"
2) "3"
3) "name"
4) "wangwu"
# 还剩两个
127.0.0.1:6379> xrange streamkey1 - +
1) 1) "1662704343522-0"
2) 1) "id"
2) "3"
3) "name"
4) "wangwu"
2) 1) "1662704516937-0"
2) 1) "id"
2) "4"
# 此时再读的话,还是会读到重复的数据
127.0.0.1:6379> xread COUNT 1 STREAMS streamkey1 0
1) 1) "streamkey1"
2) 1) 1) "1662704343522-0"
2) 1) "id"
2) "3"
3) "name"
4) "wangwu"
XGROUP命令
使用 XGROUP 创建、销毁、管理消费者组。
基本语法:
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
key :队列名称,如果不存在就创建
groupname :组名。
$ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。
基本用法:
# 创建streamkey1 的streamgroup1,从头开始消费
127.0.0.1:6379> xgroup create streamkey1 streamgroup1 0-0
OK
# 创建streamkey1 的streamgroup1,从尾开始消费
127.0.0.1:6379> xgroup create streamkey1 streamgroup1 $
OK
# 销毁一个消费者组
127.0.0.1:6379> xgroup destroy streamkey1 streamgroup1
(integer) 1
# 移除一个消费者
XGROUP DELCONSUMER streamkey1 streamgroup1 myconsumer123
XREADGROUP GROUP命令
使用 XREADGROUP GROUP 读取消费组中的消息。
消费的消息会记录在pending列表里,等待xack确认。
基本语法:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
group :消费组名
consumer :消费者名。
count : 读取数量。
milliseconds : 阻塞毫秒数。
key : 队列名。
ID : 消息 ID。
基本用法:
# 队列中有两条消息
127.0.0.1:6379> xrange streamkey1 - +
1) 1) "1662704343522-0"
2) 1) "id"
2) "3"
3) "name"
4) "wangwu"
2) 1) "1662704516937-0"
2) 1) "id"
2) "4"
# 最后的ID我们不能指定为0,需要使用>符号来消费
127.0.0.1:6379> xreadgroup GROUP streamgroup1 streamconsumer1 COUNT 1 STREAMS streamkey1 0
1) 1) "streamkey1"
2) (empty array)
# 消费key为streamkey1 、消费者组为streamgroup1 、消费者为streamconsumer1 ,消费一条消息
127.0.0.1:6379> xreadgroup GROUP streamgroup1 streamconsumer1 COUNT 1 STREAMS streamkey1 >
1) 1) "streamkey1"
2) 1) 1) "1662704343522-0"
2) 1) "id"
2) "3"
3) "name"
4) "wangwu"
127.0.0.1:6379> xreadgroup GROUP streamgroup1 streamconsumer1 COUNT 1 STREAMS streamkey1 >
1) 1) "streamkey1"
2) 1) 1) "1662704516937-0"
2) 1) "id"
2) "4"
127.0.0.1:6379> xreadgroup GROUP streamgroup1 streamconsumer1 COUNT 1 STREAMS streamkey1 >
(nil)
XINFO命令
用于检索关于流和关联的消费者组的不同的信息。
基本语法:
XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
基本用法:
# 查询消费者信息
127.0.0.1:6379> xinfo consumers streamkey1 streamgroup1
1) 1) "name"
2) "streamconsumer1"
3) "pending"
4) (integer) 2
5) "idle"
6) (integer) 504511
# 查询组信息
127.0.0.1:6379> xinfo groups streamkey1
1) 1) "name"
2) "streamgroup1"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 2
7) "last-delivered-id"
8) "1662704516937-0"
# 查询key的完整信息,包括所有条目、组、消费者信息
127.0.0.1:6379> xinfo stream streamkey1
1) "length"
2) (integer) 2
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1662704516937-0"
9) "groups"
10) (integer) 1
11) "first-entry"
12) 1) "1662704343522-0"
2) 1) "id"
2) "3"
3) "name"
4) "wangwu"
13) "last-entry"
14) 1) "1662704516937-0"
2) 1) "id"
2) "4"
XPENDING命令
通过XREADGROUP命令消费的消息,其实并没有实际消费成功,之后使用XACK命令做出应答之后才算真正消费成功。
读取了但是没有应答的消息,存放在XPENDING列表中。
基本语法:
XPENDING key group [start end count] [consumer]
基本用法:
# 读取streamkey1中streamgroup1组的两条pending数据
127.0.0.1:6379> xpending streamkey1 streamgroup1 - + 2
1) 1) "1662704343522-0"
2) "streamconsumer1"
3) (integer) 1179060
4) (integer) 1
2) 1) "1662704516937-0"
2) "streamconsumer1"
3) (integer) 1173716
4) (integer) 1
# 读取streamkey1中streamgroup1组的3条pending数据(只有两条所以显示了两条)
127.0.0.1:6379> xpending streamkey1 streamgroup1 - + 3
1) 1) "1662704343522-0"
2) "streamconsumer1"
3) (integer) 1181652
4) (integer) 1
2) 1) "1662704516937-0"
2) "streamconsumer1"
3) (integer) 1176308
4) (integer) 1
# 读取streamkey1中streamgroup1组的两条pending数据,可以指定消费者
127.0.0.1:6379> xpending streamkey1 streamgroup1 - + 2 streamconsumer1
1) 1) "1662704343522-0"
2) "streamconsumer1"
3) (integer) 1199490
4) (integer) 1
2) 1) "1662704516937-0"
2) "streamconsumer1"
3) (integer) 1194146
4) (integer) 1
127.0.0.1:6379> xpending streamkey1 streamgroup1 - + 2 streamconsumer2
(empty array)
XCLAIM命令
在流的消费者组上下文中,此命令改变待处理消息的所有权, 因此新的所有者是在命令参数中指定的消费者。通常是这样的:
- 假设有一个具有关联消费者组的流。
- 某个消费者A在消费者组的上下文中通过XREADGROUP从流中读取一条消息。
- 作为读取消息的副作用,消费者组的待处理条目列表(PEL)中创建了一个待处理消息条目:这意味着这条消息已传递给给定的消费者,但是尚未通过XACK确认。
- 突然这个消费者出现故障,且永远无法恢复。
- 其他消费者可以使用XPENDING检查已经过时很长时间的待处理消息列表,为了继续处理这些消息,他们使用XCLAIM来获得消息的所有权,并继续处理。
命令选项
该命令有多个选项,但是大部分主要用于内部使用,以便将XCLAIM或其他命令的结果传递到AOF文件, 以及传递相同的结果到从节点,并且不太可能对普通用户有用:
IDLE : 设置消息的空闲时间(自最后一次交付到目前的时间)。如果没有指定IDLE,则假设IDLE值为0,即时间计数被重置,因为消息现在有新的所有者来尝试处理它。
TIME : 这个命令与IDLE相同,但它不是设置相对的毫秒数,而是将空闲时间设置为一个指定的Unix时间(以毫秒为单位)。这对于重写生成XCLAIM命令的AOF文件很有用。
RETRYCOUNT : 将重试计数器设置为指定的值。这个计数器在每一次消息被交付的时候递增。通常,XCLAIM不会更改这个计数器,它只在调用XPENDING命令时提供给客户端:这样客户端可以检测到异常,例如在大量传递尝试后由于某种原因从未处理过的消息。
FORCE: 在待处理条目列表(PEL)中创建待处理消息条目,即使某些指定的ID尚未在分配给不同客户端的待处理条目列表(PEL)中。但是消息必须存在于流中,否则不存在的消息ID将会被忽略。
JUSTID: 只返回成功认领的消息ID数组,不返回实际的消息。
基本语法:
XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [force] [justid]
基本用法:
# 10秒后未应答,将该消息的所有权给streamconsumer1
127.0.0.1:6379> xclaim streamkey1 streamgroup1 streamconsumer1 10000 1662704516937-0
1) 1) "1662704516937-0"
2) 1) "id"
2) "4"
XACK命令
将挂起的消息标记为已正确处理,从而有效地将其从使用者组的挂起条目列表中删除。命令的返回值是成功确认的消息数,也就是我们实际上能够在PEL中解析的id。
基本语法:
XACK key group ID [ID ...]
基本用法:
127.0.0.1:6379> xpending streamkey1 streamgroup1 - + 2 streamconsumer1
1) 1) "1662704516937-0"
2) "streamconsumer1"
3) (integer) 468271
4) (integer) 2
2) 1) "1662709302078-0"
2) "streamconsumer1"
3) (integer) 416375
4) (integer) 1
# id为1662704516937-0的消息,我确认处理完毕了,此时就会从pending列表删掉
127.0.0.1:6379> xack streamkey1 streamgroup1 1662704516937-0
(integer) 1
127.0.0.1:6379> xpending streamkey1 streamgroup1 - + 2 streamconsumer1
1) 1) "1662709302078-0"
2) "streamconsumer1"
3) (integer) 442613
4) (integer) 1
更多推荐
所有评论(0)