redis也可以用来实现延时消息的功能。理论上也有两种方式

  • 订阅 key 过期事件(pub/sub)
  • 使用 sorted-set 存储消息,score为消息的过期时间

然而实际上订阅过期事件存在诸多问题,所以并不合适:

  • 过期事件的不准确,过期时间只在key被删除时才触发,并不是在key过期后就马上删除的
  • pub/sub 不支持持久化,服务器宕机期间的事件会丢失
  • pub/sub 存在丢失的可能,线上使用的redis pub/sub 有丢失过消息(非过期时间)
  • 所有的key过期都会发送过期事件,对redis性能有一定影响。(除非单独使用一个redis作为队列服务)

然后这里以 redisson 的 RDelayedQueue 为例,介绍 redis 延时队列的使用和原理。

使用

public static void main(String[] args) throws Exception {
    Config config = new Config();
    config.useSingleServer().setAddress("redis://127.0.0.1:6379");
    RedissonClient client = Redisson.create(config);

    RBlockingQueue<String> blockingQueue = client.getBlockingQueue("delay-queue");
    RDelayedQueue<String> delayedQueue = client.getDelayedQueue(blockingQueue);

    // 一个消费者
    new Thread(() -> {
        while (true) {
            try {
                System.err.println("\n\n curTime=" + LocalDateTime.now() + " receive message : " + blockingQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }).start();

    // 发送延时消息
    for (int i = 0; i < 5; i++) {
        delayedQueue.offer("msg-" + i, 10, TimeUnit.SECONDS);
        System.err.println("\n\n curTime=" + LocalDateTime.now() + " send message : " + i);
    }
}

原理解析

实际上,redisson 使用了 两个list + 一个 sorted-set + pub/sub 来实现延时队列,而不是单一的sort-set。

  • sorted-set:存放未到期的消息&到期时间,提供消息延时排序功能
  • list-0:存放未到期消息,作为消息的原始顺序视图,提供如查询、删除指定第几条消息的功能(分析源码得出的,查看哪些地方有使用这个list)
  • list-q:消费队列,存放到期后的消息,提供消费

先来个流程图:
在这里插入图片描述

大概就是这样子了。

再结合源码简单描述一下:

  1. org.redisson.RedissonDelayedQueue#RedissonDelayedQueue
    首先创建延时队列的时候,会创建一个QueueTransferTask, 在里面会订阅一个topic,订阅成功后,执行其pushTask方法,里面会查询sorted-set中100个已到期的消息,将其push到list-q中,并从sorted-set和list-0中移除。(这里是为了投递历史未处理的消息)
    protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
        super(codec, commandExecutor, name);
        channelName = prefixName("redisson_delay_queue_channel", getRawName());
        queueName = prefixName("redisson_delay_queue", getRawName());
        timeoutSetName = prefixName("redisson_delay_queue_timeout", getRawName());
        
        QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
            @Override
            protected RFuture<Long> pushTaskAsync() {
                return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                        "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
                      + "if #expiredValues > 0 then "
                          + "for i, v in ipairs(expiredValues) do "
                              + "local randomId, value = struct.unpack('dLc0', v);"
                              + "redis.call('rpush', KEYS[1], value);"
                              + "redis.call('lrem', KEYS[3], 1, v);"
                          + "end; "
                          + "redis.call('zrem', KEYS[2], unpack(expiredValues));"
                      + "end; "
                        // get startTime from scheduler queue head task
                      + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
                      + "if v[1] ~= nil then "
                         + "return v[2]; "
                      + "end "
                      + "return nil;",
                      Arrays.<Object>asList(getRawName(), timeoutSetName, queueName),
                      System.currentTimeMillis(), 100);
            }
            
            @Override
            protected RTopic getTopic() {
                return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName);
            }
        };
        
        queueTransferService.schedule(queueName, task);
        
        this.queueTransferService = queueTransferService;
    }
    
    
  2. org.redisson.RedissonDelayedQueue#offerAsync(V, long, java.util.concurrent.TimeUnit)
    发送延时消息时,会将消息写入 list-0 和 sorted-set 中,msg会添加一个randomId,支持发送相同的消息。并且判断sorted-set首调消息如果是刚插入的,则publish timeout(到期时间) 到 topic
    public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
        if (delay < 0) {
            throw new IllegalArgumentException("Delay can't be negative");
        }
        
        long delayInMs = timeUnit.toMillis(delay);
        long timeout = System.currentTimeMillis() + delayInMs;
     
        long randomId = ThreadLocalRandom.current().nextLong();
        return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
                "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" 
              + "redis.call('zadd', KEYS[2], ARGV[1], value);"
              + "redis.call('rpush', KEYS[3], value);"
              // if new object added to queue head when publish its startTime 
              // to all scheduler workers 
              + "local v = redis.call('zrange', KEYS[2], 0, 0); "
              + "if v[1] == value then "
                 + "redis.call('publish', KEYS[4], ARGV[1]); "
              + "end;",
              Arrays.<Object>asList(getRawName(), timeoutSetName, queueName, channelName),
              timeout, randomId, encode(e));
    }
    
  3. org.redisson.QueueTransferTask#scheduleTask
    订阅到topic消息后,会先判断其是否临期(delay<10ms),如果是则调用pushTask方法(1中有说明),不是则启动一个定时任务(使用的netty时间轮),延时delay后执行pushTask方法。
    // 订阅topic onMessage 时调用
    private void scheduleTask(final Long startTime) {
        TimeoutTask oldTimeout = lastTimeout.get();
        if (startTime == null) {
            return;
        }
        
        if (oldTimeout != null) {
            oldTimeout.getTask().cancel();
        }
        
        long delay = startTime - System.currentTimeMillis();
        if (delay > 10) {
        	// 使用 netty 时间轮 启动一个定时任务
            Timeout timeout = connectionManager.newTimeout(new TimerTask() {                    
                @Override
                public void run(Timeout timeout) throws Exception {
                    pushTask();
                    
                    TimeoutTask currentTimeout = lastTimeout.get();
                    if (currentTimeout.getTask() == timeout) {
                        lastTimeout.compareAndSet(currentTimeout, null);
                    }
                }
            }, delay, TimeUnit.MILLISECONDS);
            if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {
                timeout.cancel();
            }
        } else {
            pushTask();
        }
    }    
    
    // pushTaskAsync 就是前面1中重写的方法
    private void pushTask() {
        RFuture<Long> startTimeFuture = pushTaskAsync();
        startTimeFuture.onComplete((res, e) -> {
            if (e != null) {
                if (e instanceof RedissonShutdownException) {
                    return;
                }
                log.error(e.getMessage(), e);
                scheduleTask(System.currentTimeMillis() + 5 * 1000L);
                return;
            }
            
            if (res != null) {
                scheduleTask(res);
            }
        });
    }
    
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐