Redis消息队列的几种方式

1 基于rpush+lpop

list类似于java中的linkedList 。如果插入头或者尾的话,时间复杂度为O(1),在其他地方插入需要O(n)。查询也是一样。所以一般将list当作队列来使用。存储结构quickList+ziplist

使用

while (true){
          Jedis jedis=JedisUtil.getJedis();
          String message=jedis.lpop("myQueue");
          if(StringUtils.isEmpty(message)){
                 try{
                 	//减少redis服务端压力。根据业务设定轮询时间
                     Thread.sleep(1000);
                  }catch (Exception e){

                   }
                    continue;
                }
                /**
                 * dealMessage
                 */
 }

优点

  • List类型是基于链表实现,插入删除元素时间复杂度仅为常量级,有先进先出的特点来保证数据的顺序
  • Redis支持消息持久化,在服务端数据是安全

缺点

  • 没有消费者确认机制。

2 基于rpush+blpop

while (true){
            Jedis jedis=JedisUtil.getJedis();

            List<String> returnList=null;
            try {
                //意思是说,在使用 blpop的时候,如果中途因为网络波动或者某些其他原因导致连接池失效,那么就永远接收不到信息了
                //为了解决这个问题,就需要 blpop的超时功能。让 blpop每几分钟就断开,检查一下网络,再重新连上。
                //阻塞过程中与redis保持连接
                 returnList = jedis.blpop(100, "myqueue");
            }catch (Exception e){

            }
            if(returnList!=null&&returnList.size()!=0) {
                String message=returnList.get(1);
                /**
                 * dealMessage()
                 */
            }
            JedisUtil.returnBrokenResource(jedis);
        }

blpop原理

redis在blpop命令处理过程时,首先会去查找key对应的list,如果存在,则pop出数据响应给客户端。否则将对应的key push到blocking_keys数据结构当中,对应的value是被阻塞的client。当下次push命令发出时,服务器检查blocking_keys当中是否存在对应的key,如果存在,则将key添加到ready_keys链表当中,同时将value插入链表当中并响应客户端。类似于监听吧。

比起之前的rpush+lpop。这个能够即时获取消息

上述两种方法一般用作程序上下游简单的传递消息。为了提高程序效率,可以批量lpop。减少网络io

可以使用pipeline进行批量进行lpop

  public static List<String>lPopByPipeLineBatch(String key,int length){
         List<String> result=new ArrayList<>(length);
         Jedis jedis=JedisUtil.getJedis();
         if(jedis==null){
             return result;
         }
         Pipeline pipeline=null;
         try {
             pipeline=jedis.pipelined();
             for(int i=0;i<length;i++){
                 pipeline.lpop(key);
             }
             List<Object> objectList=pipeline.syncAndReturnAll();
             for(Object returnObject:objectList){
                 if(returnObject==null){
                     continue;
                 }
                 if(returnObject instanceof String){
                     result.add((String) returnObject);
                 }
                 return result;
             }
         }catch (Exception e){
             
         }finally {
             JedisUtil.returnBrokenResource(jedis);
         }
         
     }

或者使用(需要具有原子性,可以在事务或者Lua中执行)

lrange myqueue 0 n-1 (读取前 n个数据)

ltrim myqueue n -1(删除前n个数据)

应用场景

1 应用解耦,现在都是服务上下游之间通过redis队列进行通信

2 削峰。

3 Pub/Sub机制

Redis 也支持消息的发布订阅模式,订阅者(Sub)通过SUBSCRIBE 命令向redis 服务订阅频道(channel),当发布者通过PUBLISH 命令向chinnel发布命令时,订阅该频道的客户端都会受到此消息。

pubsub机制

消费者:

订阅消息: subscribe channel [channel… ]

取消订阅: unsubscribe channel

生产者产生消息:

publish channel message

原理

发布订阅模式。对于每个channel,下面都有一个已注册clients列表。subscribe既将该client加入注册列表。对于一个publish channel message. 遍历该channel下的clients列表,并推送相应信息到client

pubsub模型

优点

  • 典型的广播模式,一个消息可以发布到多个消费者
  • 多信道订阅,消费者可以同时订阅多个信道,从而接收多类消息
  • 消息即时发送,消息不用等待消费者读取,消费者会自动接收到信道发布的消息

缺点

  • 若消费者客户端出现消息积压,到一定程度,会被强制断开,导致消息意外丢失。通常发生在消息的生产远大于消费速度时。

不适合做消息存储,消息积压的业务。而是擅长处理广播,即时通讯,即时反馈的业务。

应用场景

用户上线监听;活跃用户监听;

4 sorted set 具有优先级的消息队列

​ 有序集合(Sorted Set)是不允许重复的String类型元素的集合,且每个元素都会关联一个Double类型的分数。有序集合的成员是唯一的,但分数是可以重复。集合中最大的成员数为 2^32 - 1 也就是每个集合可存储40多亿个成员。基于Sorted Set以上的特点在实际开发中有许多的应用,比如做游戏的实时战绩排行榜、博客中文章点赞排行榜等各类排行榜和优先队列的实现。

​ 增删改时间复杂度为log(n)

​ 使用sorted set可以用作具有优先级的消息队列,但是消息内容不重复。

添加zadd 
zadd key score1 value1… scoren valuen
获取前n个数
zrange key 0 ,n-1 [withscores]
zrevrange 0 ,n-1 [withscores]
删除元素(和上一步要在一个事务中执行)
zrem key member [member …]
返回有序集中指定分数区间内的所有的成员。有序集成员按分数值递减(从大到小)的次序排列。
zrevrangebyscore key max min [withscores] [limit offset count]
zrangebyscore key max min [withscores]  [limit offset count]
场景: zadd中存放 订单id ,sorce为时间戳。优先消费最早的时间戳,删除过期订单。

应用场景

查看最新发布的新闻。例如有新闻发布列表,只展示前当前时间半小时内的最新新闻。可以用sorted set进行存储。对于过期的新闻可以定时根据时间范围进行删除。

5 redis Stream

redis5.0推出的新的数据结构。解决之前消息没有存储,没有确认机制的问题。数据结构为 radix tree(基数树,适合存储key为整型的数据).查询效率log(n)

在这里插入图片描述

1 追加新消息 ,xadd,生产消息

XADD key ID field string [field string …]

消息Id说明:由两部分组成:时间戳-序号。时间戳是毫秒级单位,是生成消息的Redis服务器时间,它是个64位整型(int64)。序号是在这个毫秒时间点内的消息序号,它也是个64位整型。

2 查看消息

xrange key start end [COUNT count ]

xrange key - +

3 创建消费组

XGROUP CREATE key groupName ID (ID=0-0 表示从头开始,$表示从尾开始)

4 新建消费者

XREADGROUP group mygroup consumerName [Count count] [block miliseonds] Streams stream ID(>代表从0开始)

5 xack 确认消费消息

xack key groupName ID

6 xpending

为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM 设计了 Pending 列表,用于记录读取但并未处理完毕的消息。命令XPENDIING 用来获消费组或消费内消费者的未处理完毕的消息。

xpending key group [start end count] [consumer]

7 消息转移

xclaim key group consumer min-idle-time ID

8 对于坏消息 del

xdel key ID

应用场景

1 派单系统,消息通讯。

N
列表,用于记录读取但并未处理完毕的消息。命令XPENDIING` 用来获消费组或消费内消费者的未处理完毕的消息。

xpending key group [start end count] [consumer]

7 消息转移

xclaim key group consumer min-idle-time ID

8 对于坏消息 del

xdel key ID

应用场景

1 派单系统,消息通讯。

N

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐