上篇文章讲到了,使用redisson实现延迟队列,本文主要对其设计原理进行分析。

redisson实现延迟队列,有三个队列组成,其中一个zset类型的延迟队列,另外两个都是list类型的堵塞队列。

整体结构图如下:
在这里插入图片描述

一.实例化延迟队列

实例RedissonDelayedQueue操作,主要设置了一些队列名称,以及创建一个任务

   protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
        super(codec, commandExecutor, name);
        // 设置通道名称
        channelName = prefixName("redisson_delay_queue_channel", getName());
        // 总队列名
        queueName = prefixName("redisson_delay_queue", getName());
        // 延迟队列名
        timeoutSetName = prefixName("redisson_delay_queue_timeout", getName());
        // 设置一个任务
        QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
            
            @Override
            protected RFuture<Long> pushTaskAsync() {
                return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                		// 获取过期数据,KEYS[2]对应timeoutSetName,ARGV[1]对应System.currentTimeMillis(),ARGV[2])对应100
                        "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
                        // 存在过期数据
                      + "if #expiredValues > 0 then "
                          + "for i, v in ipairs(expiredValues) do "
                              + "local randomId, value = struct.unpack('dLc0', v);"
                              // KEYS[1]对应getName(),执行队列中将入过期的元素,获取可执行数据都从该队列获取
                              + "redis.call('rpush', KEYS[1], value);"
                              // KEYS[3]对应queueName,移除该数据
                              + "redis.call('lrem', KEYS[3], 1, v);"
                          + "end; "
                          // 延迟队列移除该过期数据
                          + "redis.call('zrem', KEYS[2], unpack(expiredValues));"
                      + "end; "
                        // 获取延迟队列队头数据,即下一个要过期的数据
                      + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
                      // 数据未过期则返回score,即对应的延迟时间
                      + "if v[1] ~= nil then "
                         + "return v[2]; "
                      + "end "
                      + "return nil;",
                      Arrays.<Object>asList(getName(), timeoutSetName, queueName), 
                      System.currentTimeMillis(), 100);
            }
            
            @Override
            protected RTopic getTopic() {
            	// 消息主题,向channelName发布消息,订阅该主题的客户端接收对应消息
                return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, channelName);
            }
        };
        // 启动任务
        queueTransferService.schedule(queueName, task);
        
        this.queueTransferService = queueTransferService;
    }

二.启动任务,添加监听

schedule方法是将未执行的任务,进行启动,已启用的启用数+1.

 	public synchronized void schedule(String name, QueueTransferTask task) {
        QueueTransferTask oldTask = tasks.putIfAbsent(name, task);
        // 任务不存在,启动任务
        if (oldTask == null) {
            task.start();
        } else {
            oldTask.incUsage();
        }
    }
	
	 public void start() {
        RTopic schedulerTopic = getTopic();
        // 添加监听订阅发布事件
        statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {
            @Override
            public void onSubscribe(String channel) {
                pushTask();
            }
        });
        // 监听消息事件
        messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() {
            @Override
            public void onMessage(CharSequence channel, Long startTime) {
                scheduleTask(startTime);
            }
        });
    }

三.推送消息

推送消息使用offerAsync,该方法异步执行。

	public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
		// 校验延迟时间的合法性
        if (delay < 0) {
            throw new IllegalArgumentException("Delay can't be negative");
        }
        
        long delayInMs = timeUnit.toMillis(delay);
        long timeout = System.currentTimeMillis() + delayInMs;
     
        long randomId = ThreadLocalRandom.current().nextLong();
        return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
                "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" 
                // 向延迟队列添加数据
              + "redis.call('zadd', KEYS[2], ARGV[1], value);"
                // 向总队列添加数据
              + "redis.call('rpush', KEYS[3], value);"
              // 获取延迟队列的队头数据
              + "local v = redis.call('zrange', KEYS[2], 0, 0); "
              // 队头数据等于刚加入的数据,则发布事件,执行pushTask方法
              + "if v[1] == value then "
                 + "redis.call('publish', KEYS[4], ARGV[1]); "
              + "end;",
              Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName), 
              timeout, randomId, encode(e));
    }

四.pushTask

首次添加一个消息,向主题发布事件,客户端监听到该消息执行pushTask。

	private void pushTask() {
		// 执行实例化时,定义的pushTaskAsync,返回结果及延迟队列队头的到期时间
        RFuture<Long> startTimeFuture = pushTaskAsync();
        startTimeFuture.onComplete((res, e) -> {
            if (e != null) {
                if (e instanceof RedissonShutdownException) {
                    return;
                }
                log.error(e.getMessage(), e);
                scheduleTask(System.currentTimeMillis() + 5 * 1000L);
                return;
            }
            // res 为延迟队列队头数据的到期时间
            if (res != null) {
                scheduleTask(res);
            }
        });
    }

五.scheduleTask

当消息添加成功,听到到执行scheduleTask方法。参数startTime为延迟队列队头数据的到期时间。

	private void scheduleTask(final Long startTime) {
        TimeoutTask oldTimeout = lastTimeout.get();
        if (startTime == null) {
            return;
        }
        
        if (oldTimeout != null) {
            oldTimeout.getTask().cancel();
        }
        // 计算延迟时间
        long delay = startTime - System.currentTimeMillis();
        if (delay > 10) {
        	// 启动一个延迟任务,由netty的shHashedWheelTimer实现
            Timeout timeout = connectionManager.newTimeout(new TimerTask() {                    
                @Override
                public void run(Timeout timeout) throws Exception {
                    pushTask();
                    
                    TimeoutTask currentTimeout = lastTimeout.get();
                    if (currentTimeout.getTask() == timeout) {
                        lastTimeout.compareAndSet(currentTimeout, null);
                    }
                }
            }, delay, TimeUnit.MILLISECONDS);
            if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {
                timeout.cancel();
            }
        } else {
            pushTask();
        }
    }
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐