Redisson分布式锁源码解析
Redisson实现分布式锁源码解析锁是在执行多线程时用于强行限制资源访问的同步机制,在分布式系统场景下,为了保证多个进程服务对共享资源的读写同步,保证数据的最终一致性,而单机服务的同步锁只能保证在单个服务中多线程的竞争数据安全性,所以需要分布式锁。分布式锁应该具备的条件:1、在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;2、高可用的获取锁与释放锁;3、高性能的获取锁与释放锁
Redisson实现分布式锁源码解析
锁是在执行多线程时用于强行限制资源访问的同步机制,在分布式系统场景下,为了保证多个进程服务对共享资源的读写同步,保证数据的最终一致性,而单机服务的同步锁只能保证在单个服务中多线程的竞争数据安全性,所以需要分布式锁。
分布式锁应该具备的条件:
1、在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;
2、高可用的获取锁与释放锁;
3、高性能的获取锁与释放锁;
4、具备可重入特性;
5、具备锁失效机制,防止死锁;
6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。
传统的实现方案
1、基于数据库实现分布式锁;
2、基于缓存(Redis等)实现分布式锁;
3、基于Zookeeper实现分布式锁;
redis实现简单分布式锁
实现思路:
(1)获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的UUID,通过此在释放锁的时候进行判断。
(2)获取锁后,需要对锁进行续命,用定时维护超时时间,比如说的超时时间是30秒,那就每过10s判断改锁是否还持有,持有的话则将时长续为30秒
(3)释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。
截取的代码如下:
public String deduc3() {
String lockKey = "lock:pro-100";
UUID uuid = UUID.randomUUID();
// 加锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, uuid, 30, TimeUnit.SECONDS);
// 加锁失败
if (!lock) {
return "error_code";
}
// 锁续命
new Thread(()->{
while (true){
if (uuid.equals(redisTemplate.opsForValue().get(lockKey))) {
redisTemplate.expire(lockKey,30,TimeUnit.SECONDS);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}else {
return;
}
}
}).start();
// 业务处理
try {
String productKey = "pro-100";
Integer count = (Integer) redisTemplate.opsForValue().get(productKey);
count--;
redisTemplate.opsForValue().set(productKey, count);
System.out.println("剩余库存::" + count);
} catch (Exception e) {
System.out.println("系统异常");
} finally {
// 释放锁
if (uuid.equals(redisTemplate.opsForValue().get(lockKey))){
redisTemplate.delete(lockKey);
}
}
return "ok";
}
分析:
在多个进程的多线程操作中,由于redis 的setnx的原子性和排斥性,可以保证再加锁的过程中,只会有一个线程成功,并且加有失效时间,防止死锁。加了uuid的客户端标识,防止其他线程篡改锁信息,保证的锁的安全性。
不足:续命和释放锁时redis的操作没有保证原子性,使用lua脚本会更好。无重入锁功能。获取锁失败后无自旋。
redisson实现分布式锁
redisson内部实现了分布式锁,并且将上述的不足都一一考虑了。
引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.1</version>
</dependency>
@Autowired
private RedissonClient redissonClient;
public String deduc2() {
String lockKey = "lock:pro-100";
RLock lock = redissonClient.getLock(lockKey);
lock.lock();
// 锁续命
try {
String productKey = "pro-100";
Integer count = (Integer) redisTemplate.opsForValue().get(productKey);
count--;
redisTemplate.opsForValue().set(productKey, count);
System.out.println("剩余库存::" + count);
} catch (Exception e) {
System.out.println("系统异常");
} finally {
lock.unlock();
}
return "ok";
}
源码解析
1、创建一个锁对象
redissonClient.getLock(lockKey)
name:锁名称
id :随机序列号
pubsub: 锁订阅
entryName 随机序列号+锁名称
2、加锁
lock.lock();—》RedissonLock对象
this.lock(-1L, (TimeUnit)null, false);----》
tryAcquire(long leaseTime, TimeUnit unit, long threadId)–》
leaseTime加锁时传的是-1,所以会走下面的tryLockInnerAsync()方法,开启一个异步任务加锁,返回一个ttl
这里是一段lua脚本
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);
三个参数说明:
Collections.singletonList(this.getName()),
new Object[]{this.internalLockLeaseTime,
this.getLockName(threadId)
KEYS[1]锁名称 ARGV[1]锁时长(默认30秒) ARGV[2] 客户端id
第一次进来判断锁存不存在,不存在则hset ,并设置超时时间,返回空
如果不是第一次进来,说明锁已经存在,如果不是当前客户端,说明是其他客户端,则返回锁的过期时间
加锁成功后,会调用this.scheduleExpirationRenewal(threadId);方法
第一次进来,oldentry没有值,所以会走renewExpiration();刷新超时时间
启动定时任务,做锁续命操作, RedissonLock.this.renewExpirationAsync(threadId);定时操作为锁超时时间/3取的,30过期,定时任务则为10秒一次执行
又是一段lua脚本
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end;
return 0;
三个参数说明:
Collections.singletonList(this.getName()),
new Object[]{this.internalLockLeaseTime,
this.getLockName(threadId)
KEYS[1]锁名称 ARGV[1]锁时长(默认30秒) ARGV[2] 客户端id
判断锁是否还持有,存在重新设置锁的过期时间为30秒
回到lock方法,如果加锁成功,则ttl为null,加锁流程结束。
如果加锁失败,这会有一个ttl过期时间,则会走这里
1、subscribe(threadId);订阅消息,渠道为prefixName(“redisson_lock__channel”, this.getName());
2、先尝试加锁,失败则会根据ttl时间做休眠,让出CPU,也叫自旋,如果加锁成功则跳出
3、加锁成功后,unsubscribe(future, threadId);取消订阅
订阅里面有一个onmessage()
如果收到解锁消息,则立刻唤醒自旋休眠中的线程。
3、释放锁
unlock方法,
释放锁,
lua脚本
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
5个参数
KEYS[1]锁名字 KEYS[2] channel名 ARGV[1] 消息 ARGV[2] 锁时长 ARGV[3] 客户端id/锁的版本
1、如果说不存在则直接返回空
2、存在,则做说版本减1,如果版本号大于0,说明是重入锁,锁未完全释放,解锁不成功
3、如果不是重入锁或者重入锁的版本为0,则删除锁,所释放成功,同时会发布该锁的释放消息,
通知其他自旋的线程唤醒。
成功后,会调用onComplete()方法—》cancelExpirationRenewal(threadId);
取消锁的续命任务
总结:
1、redisson利用redis的lua保证原子操作性。
2、使用定时任务维持锁的超时
3、间歇性自旋等待锁的释放
更多推荐
所有评论(0)