基于Redis的分布式锁及看门狗机制的分析
原理分布式锁要满足 互斥,防死锁的基本要求。进一步要求 可重入(非必要,但重要),高效。Redisson下的实现以下源码阅读基于 redisson:3.12.0标记资源及占有者特定资源对应的key设置到可访问的redis,并配置一定的超时间和能表示占用者的value(该值可用ObjectId或者类雪花算法,及其他能全局唯一标识线程的算法)。value主要用于帮助实现可重入特性。Redison中简单
原理
分布式锁要满足 互斥,防死锁的基本要求。
进一步要求 可重入(非必要,但重要),高效。
Redisson下的实现
以下源码阅读基于 redisson:3.12.0
标记资源及占有者
特定资源对应的key设置到可访问的redis,并配置一定的超时间和能表示占用者的value(该值可用ObjectId或者类雪花算法,及其他能全局唯一标识线程的算法)。
value主要用于帮助实现可重入特性。
Redison中简单的使用了UUID作为连接管理器的全局唯一标识,再结合本机的线程id,全局唯一标识线程
互斥/可重入
可重入,主要就是通过比较对应资源key的value的值是否与当前线程的ThreadLocal保存的值一致来判断。
防死锁加锁过程
使用带有时效锁去获取资源,并利用看门狗机制做自动续约对可能超时的问题做补偿兜底。
具体加锁及释放的4中方式:
RedissonClient client = Redisson.create();
// 获取指定资源的锁
RLock lock = client.getLock("resource-1");
try {
// 1. 堵塞加锁 配置占用时间是30s
lock.lock();
// 2. 堵塞加锁 指定占用时长
lock.lock(30, TimeUnit.SECONDS);
// 3. 尝试等待10s内获取锁
boolean b = lock.tryLock(10, TimeUnit.SECONDS);
// 4. 尝试等待10s内获取锁,并占用30s
boolean b1 = lock.tryLock(10, 30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
// 如果有指定则使用时间
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 默认配置的加锁时间是30s
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
// tryLockInnerAsync 加锁的lua脚本
// KEYS[1]:getName() 资源名称
// ARGV[1]:internalLockLeaseTime 加锁时间; ARGV[2]:getLockName(threadId) 占用资源的线程标识
// 检查对应资源锁是否存在 0表示不存在 1表示存在
if (redis.call('exists', KEYS[1]) == 0)
then
// 不存在,则对资源加锁,并将占用线程的id写入值,用于可重入
redis.call('hset', KEYS[1], ARGV[2], 1);
// 设置超时时间
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 如果锁已存在,检查hash表的对应资源域的当前线程标识是否存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
then
// 存在,则加锁数增加1
redis.call('hincrby', KEYS[1], ARGV[2], 1);
// 更新加锁时间
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 如果加锁失败,并且之前没有占用锁,则返回资源key的剩余过期的毫秒数,用于之后到期通知
return redis.call('pttl', KEYS[1]);
// unlockInnerAsync 解锁的lua脚本
// KEYS[1]:getName() 资源名称; KEYS[2]:getChannelName() 监听的channel名称
// ARGV[1]:LockPubSub.UNLOCK_MESSAGE 解锁发送给channel的消息; ARGV[2]: internalLockLeaseTime 加锁时间; ARGV[3]: getLockName(threadId) 占用资源的线程标识
// 检查hash表中的 加锁资源域的当前线程是否还持有锁
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0)
then
// 没有直接退出
return nil;
end;
// 定义本地变量,值为当前线程对资源key的加锁次数减1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0)
then
// 如果计数器还大于0,说明之前重入了,等待线程后续继续释放
// 更新过期时间
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
// 锁已经完全释放
// 删除表示占用的资源key
redis.call('del', KEYS[1]);
// 推送释放消息给channel,告知所有监听该channel的客户端
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
Redis的EVAL命令
命令格式:EVAL script numkeys key [key …] arg [arg …]
- script参数是一段 Lua5.1 脚本程序。脚本不必(也不应该[^1])定义为一个 Lua 函数
- numkeys指定后续参数有几个key,即:key [key …]中key的个数。如没有key,则为0
- key [key …] 从 EVAL 的第三个参数开始算起,表示在脚本中所用到的那些 Redis 键(key)。在Lua脚本中通过KEYS[1], KEYS[2]获取。
- arg [arg …] 附加参数。在Lua脚本中通过ARGV[1],ARGV[2]获取。
看门狗机制
看门狗机制是对防死锁但存在执行所需时间片不足的场景的补偿。
通过间隔一定时间对锁自动续约用来实现。
// 自旋加锁源码
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// tryAcquire 返回的是剩余占用的毫秒数
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
// 注册对资源key的订阅
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
// 自旋不断尝试取锁
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
// future 在前面设置好对channel的监听之后,会在收到通知时被唤醒,然后进入下次加锁尝试
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
// 取锁成功或者被中断之后取消对资源的订阅
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
// 看门狗机制的源码
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
// 如果指定了超时时间,则直接使用,不设置看门狗
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
// 取锁成功后,添加到看门狗自动续约调度表中
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
// 添加到过期刷新调度器中,也就是看门狗队列
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}
这部分就是自动续约的源码,主要是借助netty的定时任务取实现,细节可以直接阅读netty的相关知识
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
堵塞加锁如何被通知
利用redis的channel发布订阅机制实现
订阅过程
发布过程
发布订阅命令参数
命令 | 描述 |
---|---|
PSUBSCRIBE pattern [pattern …] | |
订阅一个或多个符合给定模式的频道。 | |
PUBSUB subcommand [argument [argument …]] | 查看订阅与发布系统状态。 |
PUBLISH channel message | 将信息发送到指定的频道。 |
PUNSUBSCRIBE [pattern [pattern …]] | 退订所有给定模式的频道。 |
SUBSCRIBE channel [channel …] | 订阅给定的一个或多个频道的信息。 |
UNSUBSCRIBE [channel [channel …]] | 指退订给定的频道。 |
总结
看完Redis的分布式锁的运行流程,如果阅读过可重入锁ReentrantLock
的源码,了解其流程的同学会发现,两者的相似度非常高.
从如何加锁,到如何实现可重入特性,到如何通知下一个等待锁释放的对象.
特别注意: 因为redis是没有事务这个概念的,虽然通过使用lua脚本做到了正常操作的原子性,但是遇到特殊情况,如redis挂了,那么脚本中的命令也不会全部执行。
故,命令的顺序就非常重要,所有check操作都需要优先,所有设置值得操作必须先带上过期时间,最后才设置value,避免出现一个永远不会过期的对象导致死锁。
对脚本中的命令均要思考,执行到该语句失败,后续命令不执行的情况下,之前设置的key是否会永存?
永存的情况下,程序能否正常执行?
正常执行的情况,这些永存的key是否会导致redis出现严重的内存泄漏?
当然,如果你家的redis挂了,不走恢复逻辑,那也无所谓。
相关推荐
参考资料
- https://www.cnblogs.com/jelly12345/p/14699492.html
- https://www.runoob.com/redis/redis-pub-sub.html
- https://www.cnblogs.com/jelly12345/p/14699492.html
更多推荐