Stream是Redis 5.0引入的一种新数据类型,它以一种抽象的方式来构建日志结构的数据。本文主要介绍Redis Streams的消费者组相关的信息。

1 什么是消费者组

在某些问题中,我们想要做的是从同一流中向许多客户端提供不同的消息子集。一个明显有用的例子是处理缓慢的消息:让N个不同的客户端接收流的不同部分来加快消息的处理。例如:如果有三个消费者A1、A2、A3和一个包含消息1、2、3、4、5、6、7的流,那么我们想要达到的是像下面这样分配消息。

为了实现这一点,Redis使用了一个叫做消费者组的概念。从实现的角度来看,Redis消费者组与Kafka (TM)消费者组没有任何关系,只是在功能上是相似的:允许一组客户端合作消费同一消息流的不同部分。

我们可以将一个消费者组简单理解为一个从流中获取数据的特殊的消费者。它从流中获取数据,然后再服务于多个消费者,同时提供了如下的保证:

1)每条消息都提供给不同的消费者,不会将相同的消息传递给多个消费者。

2)在消费者组中,消费者通过客户端的名称(区分大小写的字符串)进行区分,当断开连接重新连通后,消费者客户端还是提供相同的名字,会被当做同一个消费者。这意味着在消费者组中由客户端提供唯一标识符。

3)每个消费者组都有未被消费的第一个ID的概念,这样当消费者请求新消息时,它可以只提供以前未传递的消息。

4)消费消息需要使用特定命令进行显式确认。Redis将该确认解释为:此消息已正确处理,可以从消费者组中移除。

5)消费者组跟踪所有当前挂起的消息,即已传递给消费者组的某个消费者但尚未确认为已处理的消息。由于此功能,当访问流的消息历史记录时,每个使用者将只看到传递给它的消息。

下面一起来看下消费者组相关的命令。

2 消费者组命令 XGROUP 

使用XGROUP可以:

- 创建与流关联的新消费者组。

- 设置要传递的下一条消息。

- 销毁一个消费者组。

- 往消费者组中添加指定的消费者。

- 从消费者组中移除指定的消费者。

2.1 创建消费者组

在创建命令中我们必须指定一个ID,在示例中是$。这是必需的,因为消费者组必须知道在第一个消费者连接时接下来要提供哪条消息。$表示从现在开始到达流中的新消息才会提供给组中的消费者。我们也可以指定一个有效ID,会提供给消费者大于指定ID的消息。

XGROUP CREATE还支持自动创建流,如果流不存在,使用可选的MKSTREAM子命令作为最后一个参数可以自动创建对应的流:

2.2 设置要传递的下一条消息

使用这种命令,可以修改消费者组要获取的下一个ID,而无需再次删除和创建使用者组。

2.3 销毁一个消费者组

可以使用以下形式完全销毁一个消费者组:

即使存在活动的消费者和待处理消息,消费者组也将被销毁,因此请确保仅在真正需要时才调用此命令。

2.4 消费者的添加与移除

3 从流中读取数据 XREADGROUP

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]

在从流中读取之前,让我们将一些消息放入其中:

使用如下命令通过消费者组读取流中的数据

XREADGROUP命令是XREAD命令的特殊版本,支持消费者组。从语法的角度来看,这两个命令几乎是相同的,但是XREADGROUP需要一个特殊和强制的选项:GROUP<group-name><consumer-name>。

3.1 GROUP <group-name> <consumer-name>

group-name是关联到流的消费者组的名称。consumer-name是客户端用于在消费者组内标识自己的字符串。对应消费者不存在时会自动创建,不同的消费者应该选择不同的消费者名称。

使用XREADGROUP时在STREAMS选项中指定的ID可以是以下两种之一:

1)特殊ID>,意味着消费者希望只接收从未发送给任何其他消费者的消息。这意思是说,请给我新的消息。

2)任意其他的ID,即0或任意其他有效ID或不完整的ID(只有毫秒时间部分),将返回发送命令的消费者的待处理条目信息。所以,基本上如果ID不是>,该命令将返回消费者的待处理条目信息(已发送给它,但尚未确认的条目)。

对于第2点的测试结果如下:

我们可以创建多个消费者来消费这个流中的消息。

就像XREAD一样,XREADGROUP命令也可以以阻塞的方式使用。在这方面两者没有区别。具体请参考Redis Stream介绍(一)

4 消费确认命令 XACK

XACK key group ID [ID …]

当消费者将消息正确处理后,需要调用确认命令来确认消费。只有当调用确认命令后,才会将该消息从待处理的返回中移除。

5 总结

以上就是关于Redis Streams的消费者组相关的介绍和使用命令,对于是否使用消费者组:

1)如果你有一个流和多个客户端,并且你希望所有的客户端都获取到完整的信息,那么你不需要使用消费者组。

2)如果你有一个流和多个客户端,并且你希望在你的客户端上对流进行分区或共享,能获得一个流消息的子集,那么你需要使用消费者组。

Redis流中的消费者组在某些方面可能类似于基于Kafka(TM)分区的消费者组,但是请注意,实际上它们是不同的。Redis流的分区只是概念上的,消息都放在一个Redis键中,不同客户端获取的消息内容取决于谁准备好处理新消息,而不是客户端在处理哪个分区。例如,上述示例中如果消费者 li 在某个时刻永久失败,Redis将继续为zhang和wang提供所有到达的新消息,跟之前相比现在相当于只有两个分区,而Kafka的分区不会自动更改。类似地,如果给定的消费者处理消息的速度比其他消费者快得多,则该消费者将接收更多的消息,而Kafka中消费者只能获取对应分区的消息。

在Redis中,如果你真的想将同一个流中的消息分割成多个Redis实例,你必须使用多个key和一些分片系统,比如Redis Cluster或其他一些特定于应用程序的分片系统。单个Redis流不会自动分区到多个实例。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐