摘要

对于消息中间件来说,大家一定想到了Rabbitmq 和 Kafka等专业消息中间件。将该中间应用到系统中主要是为了增加系统之间异步消息的传递功能。使用过 Rabbitmq 的同学知道它使用起来有多复杂,发消息之前要创建 Exchange,再创建 Queue,还要将 Queue 和 Exchange 通过某种规则绑定起来,发消息的时候要指定 routing-key,还要控制头部信息。消费者在消费消息之前也要进行上面一系列的繁琐过程。但是绝大多数情况下,虽然我们的消息队列只有一组消费者,但还是需要经历上面这些繁琐的过程。

但是如果我们对程序内部来实现一个比较简单消息队列机制,使用专业的Rabbitmq 和 Kafka会比较麻烦同时增加了系统的维护工作。Redis 的消息队列不是专业的消息队列,它没有非常多的高级特性,没有 ack 保证,如果对消息的可靠性有着极致的追求,那么它就不适合使用。

同时本博文的实现的源代码:https://gitee.com/xjl2462612540/RedisPrinciple/tree/master/springboot-redis/src/main/java/com/zhuangxiaoyan/springbootredis/delayqueuing

一、Redis异步消息队列

Redis 的 list(列表) 数据结构常用来作为异步消息队列使用,使用rpush/lpush操作入队列,使用lpop 和 rpop来出队列。

# 创建notify 队列 同时的push三个元素
> rpush notify-queue apple banana pear
(integer) 3

# 查询对队列的长度
> llen notify-queue
(integer) 3

# 弹出的队列的元素
> lpop notify-queue
"apple"

# 查询对队列的长度
> llen notify-queue
(integer) 2

# 弹出的队列的元素
> lpop notify-queue
"banana"

# 查询对队列的长度
> llen notify-queue
(integer) 1

# 弹出的队列的元素
> lpop notify-queue
"pear"

# 查询对队列的长度
> llen notify-queue
(integer) 0

# 弹出的队列的元素
> lpop notify-queue
(nil)

1.1 延迟队列的de应用场景

  • 订单超过30分钟未支付,自动关闭。
  • 订单完成后, 如果用户一直未评价, 5天后自动好评。
  • 会员到期前15天, 到期前3天分别发送短信提醒。
  • 当订单一直处于未支付状态时,如何及时的关闭订单,并退还库存?
  • 如何定期检查处于退款状态的订单是否已经退款成功?
  • 新创建店铺,N天内没有上传商品,系统如何知道该信息,并发送激活短信?等等

1.2 队列延迟实现

队列空了怎么办?

客户端是通过队列的 pop 操作来获取消息,然后进行处理。处理完了再接着获取消息,再进行处理。如此循环往复,这便是作为队列消费者的客户端的生命周期。可是如果队列空了,客户端就会陷入 pop 的死循环,不停地 pop,没有数据,接着再 pop,又没有数据。这就是浪费生命的空轮询。空轮询不但拉高了客户端的 CPU,redis 的 QPS 也会被拉高,如果这样空轮询的客户端有几十来个,Redis 的慢查询可能会显著增多。通常我们使用 sleep 来解决这个问题,让线程睡一会,睡个 1s 钟就可以了。不但客户端的 CPU 能降下来,Redis 的 QPS 也降下来了。

用上面睡眠的办法可以解决问题。但是有个小问题,那就是睡眠会导致消息的延迟增大。如果只有 1 个消费者,那么这个延迟就是 1s。如果有多个消费者,这个延迟会有所下降,因为每个消费者的睡觉时间是岔开来的。阻塞读在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立刻醒过来。消息的延迟几乎为零。用blpop/brpop替代前面的lpop/rpop,就完美解决了上面的问题。

空闲连接自动断开

如果线程一直阻塞在哪里,Redis 的客户端连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用。这个时候blpop/brpop会抛出异常来。所以编写客户端消费者的时候要小心,注意捕获异常,还要重试。

锁冲突处理

  1. 直接抛出异常,通知用户稍后重试;(这种方式比较适合由用户直接发起的请求,用户看到错误对话框后,会先阅读对话框的内容,再点击重试,这样就可以起到人工延时的效果。如果考虑到用户体验,可以由前端的代码替代用户自己来进行延时重试控制。它本质上是对当前请求的放弃,由用户决定是否重新发起新的请求。)
  2. sleep 一会再重试;(sleep 会阻塞当前的消息处理线程,会导致队列的后续消息处理出现延迟。如果碰撞的比较频繁或者队列里消息比较多,sleep 可能并不合适。如果因为个别死锁的 key 导致加锁不成功,线程会彻底堵死,导致后续消息永远得不到及时处理。)
  3. 将请求转移至延时队列,过一会再试;(这种方式比较适合异步消息处理,将当前冲突的请求扔到另一个队列延后处理以避开冲突。)

1.3 延时队列的实现

延时队列可以通过 Redis 的 zset(有序列表) 来实现。我们将消息序列化成一个字符串作为 zset 的value,这个消息的到期处理时间作为score,然后用多个线程轮询 zset 获取到期的任务进行处理,多个线程是为了保障可用性,万一挂了一个线程还有其它线程可以继续处理。因为有多个线程,所以需要考虑并发争抢任务,确保任务不能被多次执行。

同一个任务可能会被多个进程取到之后再使用zrem进行争抢,那些没抢到的进程都是白取了一次任务,这是浪费。可以考虑使用lua scripting来优化一下这个逻辑,将zrangebyscore和zrem一同挪到服务器端进行原子化操作,这样多个进程之间争抢任务时就不会出现这种浪费了。

package com.zhuangxiaoyan.springbootredis.delayqueuing;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import redis.clients.jedis.Jedis;

import java.lang.reflect.Type;
import java.util.Set;
import java.util.UUID;

/**
 * @Classname RedisDelayingQueue
 * @Description redis实现延时队列
 * @Date 2022/4/17 11:37
 * @Created by xjl
 */
public class RedisDelayingQueue<T> {

    static class TaskItem<T> {
        public String id;
        public T msg;
    }

    // fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference
    private Type TaskType = new TypeReference<TaskItem<T>>() {
    }.getType();
    private Jedis jedis;
    private String queueKey;

    public RedisDelayingQueue(Jedis jedis, String queueKey) {
        this.jedis = jedis;
        this.queueKey = queueKey;
    }

    public void delay(T msg) {
        TaskItem task = new TaskItem();
        task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid
        task.msg = msg;
        String s = JSON.toJSONString(task); // fastjson 序列化
        jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试
    }

    public void loop() {
        while (!Thread.interrupted()) {
            // 只取一条
            Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
            if (values.isEmpty()) {
                try {
                    Thread.sleep(500); // 歇会继续
                } catch (InterruptedException e) {
                    break;
                }
                continue;
            }
            String s = String.valueOf(values.iterator().next());
            if (jedis.zrem(queueKey, s) > 0) { // 抢到了
                TaskItem task = JSON.parseObject(s, TaskType); // fastjson 反序列化
                this.handleMsg((T) task.msg);
            }
        }
    }

    public void handleMsg(T msg) {
        System.out.println(msg);
    }

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        RedisDelayingQueue queue = new RedisDelayingQueue<>(jedis, "q-demo");
        
        Thread producer = new Thread() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    queue.delay("codehole" + i);
                }
            }
        };
        
        Thread consumer = new Thread() {
            @Override
            public void run() {
                queue.loop();
            }
        };
        producer.start();
        consumer.start();
        
        try {
            producer.join();
            Thread.sleep(6000);
            consumer.interrupt();
            consumer.join();
        } catch (InterruptedException e) {
        }
    }
}

1.4  Redission实现延时队列

基于Redis的Redisson分布式延迟队列结构的RDelayedQueue Java对象在实现了RQueue接口的基础上提供了向队列按要求延迟添加项目的功能。该功能可以用来实现消息传送延迟按几何增长或几何衰减的发送策略。

RQueue<String> distinationQueue = ...

RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);

// 10秒钟以后将消息发送到指定队列
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);

// 一分钟以后将消息发送到指定队列
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

// 在该对象不再需要的情况下,应该主动销毁。仅在相关的Redisson对象也需要关闭的时候可以不用主动销毁。
delayedQueue.destroy();

二、Redis 延迟队列实战

首先我们分析下这个流程

  • 用户提交任务。首先将任务推送至延迟队列中。
  • 延迟队列接收到任务后,首先将任务推送至job pool中,然后计算其执行时间。
  • 然后生成延迟任务(仅仅包含任务id)放入某个桶中。
  • 时间组件时刻轮询各个桶,当时间到达的时候从job pool中获得任务元信息。
  • 监测任务的合法性如果已经删除则pass。继续轮询。如果任务合法则再次计算时间。
  • 如果合法则计算时间,如果时间合法:topic将任务放入对应的ready queue,从bucket中移除。如果时间不合法,则重新计算时间再次放入bucket,并移除之前的bucket中的内容。
  • 消费端轮询对应topic的ready queue。获取job后做自己的业务逻辑。与此同时,服务端将已经被消费端获取的job按照其设定的TTR,重新计算执行时间,并将其放入bucket。
  • 完成消费后,发送finish消息,服务端根据job id删除对应信息。

我们现在可以了解到中间存在的几个组件

  • 延迟队列,为Redis延迟队列。实现消息传递。
  • Job pool 任务池保存job元信息。根据文章描述使用K/V的数据结构,key为ID,value为job。
  • Delay Bucket 用来保存业务的延迟任务。文章中描述使用轮询方式放入某一个Bucket可以知道其并没有使用topic来区分,个人这里默认使用顺序插入。
  • Timer 时间组件,负责扫描各个Bucket。根据文章描述存在多个Timer,但是同一个Timer同一时间只能扫描一个Bucket。
  • Ready Queue 负责存放需要被完成的任务,但是根据描述根据Topic的不同存在多个Ready Queue。

其中Timer负责轮询,Job pool、Delay Bucket、Ready Queue都是不同职责的集合。

任务状态

  • ready:可执行状态,
  • delay:不可执行状态,等待时钟周期。
  • reserved:已被消费者读取,但没有完成消费。
  • deleted:已被消费完成或者已被删除。

对外提供的接口

接口描述数据
add添加任务Job数据
pop取出待处理任务topic就是任务分组
finish完成任务任务ID
delete删除任务任务ID

其他内容

首先根据状态状态描述,finish和delete操作都是将任务设置成deleted状态。根据文章描述的操作,在执行finish或者delete的操作的时候任务已经从元数据中移除,此时deleted状态可能只存在极短时间,所以实际实现中就直接删除了。文章中并没有说明响应超时后如何处理,所以个人现在将其重新投入了待处理队列。文章中因为使用了集群,所以使用redis的setnx锁来保证多个时间循环处理多个桶的时候不会出现重复循环。这里因为是简单的实现,所以就很简单的每个桶设置一个时间队列处理。也是为了方便简单处理。

博文参考

有赞延迟队列设计

Spring Boot 整合——Redis延时队列的简单实现(基于有赞的设计)_大·风的博客-CSDN博客_spring 延时队列

springboot-samples: springboot开发过程中,一些常用的组件整合

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐