Redisson 延迟队列实现原理探究
redis也可以用来实现延时消息的功能。理论上也有两种方式订阅 key 过期事件(pub/sub)使用 sorted-set 存储消息,score为消息的过期时间然而实际上订阅过期事件存在诸多问题,所以并不合适:过期事件的不准确,过期时间只在key被删除时才触发,并不是在key过期后就马上删除的pub/sub 不支持持久化,服务器宕机期间的事件会丢失pub/sub 存在丢失的可能,线上使用的red
·
redis也可以用来实现延时消息的功能。理论上也有两种方式
- 订阅 key 过期事件(pub/sub)
- 使用 sorted-set 存储消息,score为消息的过期时间
然而实际上订阅过期事件存在诸多问题,所以并不合适:
- 过期事件的不准确,过期时间只在key被删除时才触发,并不是在key过期后就马上删除的
- pub/sub 不支持持久化,服务器宕机期间的事件会丢失
- pub/sub 存在丢失的可能,线上使用的redis pub/sub 有丢失过消息(非过期时间)
- 所有的key过期都会发送过期事件,对redis性能有一定影响。(除非单独使用一个redis作为队列服务)
然后这里以 redisson 的 RDelayedQueue
为例,介绍 redis 延时队列的使用和原理。
使用
public static void main(String[] args) throws Exception {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient client = Redisson.create(config);
RBlockingQueue<String> blockingQueue = client.getBlockingQueue("delay-queue");
RDelayedQueue<String> delayedQueue = client.getDelayedQueue(blockingQueue);
// 一个消费者
new Thread(() -> {
while (true) {
try {
System.err.println("\n\n curTime=" + LocalDateTime.now() + " receive message : " + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 发送延时消息
for (int i = 0; i < 5; i++) {
delayedQueue.offer("msg-" + i, 10, TimeUnit.SECONDS);
System.err.println("\n\n curTime=" + LocalDateTime.now() + " send message : " + i);
}
}
原理解析
实际上,redisson 使用了 两个list + 一个 sorted-set + pub/sub 来实现延时队列,而不是单一的sort-set。
- sorted-set:存放未到期的消息&到期时间,提供消息延时排序功能
- list-0:存放未到期消息,作为消息的原始顺序视图,提供如查询、删除指定第几条消息的功能(分析源码得出的,查看哪些地方有使用这个list)
- list-q:消费队列,存放到期后的消息,提供消费
先来个流程图:
大概就是这样子了。
再结合源码简单描述一下:
org.redisson.RedissonDelayedQueue#RedissonDelayedQueue
首先创建延时队列的时候,会创建一个QueueTransferTask
, 在里面会订阅一个topic,订阅成功后,执行其pushTask
方法,里面会查询sorted-set中100个已到期的消息,将其push到list-q中,并从sorted-set和list-0中移除。(这里是为了投递历史未处理的消息)protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) { super(codec, commandExecutor, name); channelName = prefixName("redisson_delay_queue_channel", getRawName()); queueName = prefixName("redisson_delay_queue", getRawName()); timeoutSetName = prefixName("redisson_delay_queue_timeout", getRawName()); QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) { @Override protected RFuture<Long> pushTaskAsync() { return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); " + "if #expiredValues > 0 then " + "for i, v in ipairs(expiredValues) do " + "local randomId, value = struct.unpack('dLc0', v);" + "redis.call('rpush', KEYS[1], value);" + "redis.call('lrem', KEYS[3], 1, v);" + "end; " + "redis.call('zrem', KEYS[2], unpack(expiredValues));" + "end; " // get startTime from scheduler queue head task + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); " + "if v[1] ~= nil then " + "return v[2]; " + "end " + "return nil;", Arrays.<Object>asList(getRawName(), timeoutSetName, queueName), System.currentTimeMillis(), 100); } @Override protected RTopic getTopic() { return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName); } }; queueTransferService.schedule(queueName, task); this.queueTransferService = queueTransferService; }
org.redisson.RedissonDelayedQueue#offerAsync(V, long, java.util.concurrent.TimeUnit)
发送延时消息时,会将消息写入 list-0 和 sorted-set 中,msg会添加一个randomId,支持发送相同的消息。并且判断sorted-set首调消息如果是刚插入的,则publish timeout(到期时间) 到 topicpublic RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) { if (delay < 0) { throw new IllegalArgumentException("Delay can't be negative"); } long delayInMs = timeUnit.toMillis(delay); long timeout = System.currentTimeMillis() + delayInMs; long randomId = ThreadLocalRandom.current().nextLong(); return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_VOID, "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" + "redis.call('zadd', KEYS[2], ARGV[1], value);" + "redis.call('rpush', KEYS[3], value);" // if new object added to queue head when publish its startTime // to all scheduler workers + "local v = redis.call('zrange', KEYS[2], 0, 0); " + "if v[1] == value then " + "redis.call('publish', KEYS[4], ARGV[1]); " + "end;", Arrays.<Object>asList(getRawName(), timeoutSetName, queueName, channelName), timeout, randomId, encode(e)); }
org.redisson.QueueTransferTask#scheduleTask
订阅到topic消息后,会先判断其是否临期(delay<10ms),如果是则调用pushTask
方法(1中有说明),不是则启动一个定时任务(使用的netty时间轮),延时delay后执行pushTask
方法。// 订阅topic onMessage 时调用 private void scheduleTask(final Long startTime) { TimeoutTask oldTimeout = lastTimeout.get(); if (startTime == null) { return; } if (oldTimeout != null) { oldTimeout.getTask().cancel(); } long delay = startTime - System.currentTimeMillis(); if (delay > 10) { // 使用 netty 时间轮 启动一个定时任务 Timeout timeout = connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { pushTask(); TimeoutTask currentTimeout = lastTimeout.get(); if (currentTimeout.getTask() == timeout) { lastTimeout.compareAndSet(currentTimeout, null); } } }, delay, TimeUnit.MILLISECONDS); if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) { timeout.cancel(); } } else { pushTask(); } } // pushTaskAsync 就是前面1中重写的方法 private void pushTask() { RFuture<Long> startTimeFuture = pushTaskAsync(); startTimeFuture.onComplete((res, e) -> { if (e != null) { if (e instanceof RedissonShutdownException) { return; } log.error(e.getMessage(), e); scheduleTask(System.currentTimeMillis() + 5 * 1000L); return; } if (res != null) { scheduleTask(res); } }); }
更多推荐
已为社区贡献2条内容
所有评论(0)