Redis实现分布式锁的六种方式。最佳实践:原子性获取锁+LUA脚本释放锁
redis实现分布式锁
Redis实现分布式锁的六种方式。
2022年9月18日
经过反复学习和实践,总体概括一些Redis实现分布式锁迭代版本。
第一版本:
setnx key value
缺点:客户端可能忘记释放锁,或者解锁失败,可能造成死锁第二个版本:加入过期时间,来解决忘记释放锁
setnx key value expire key seconds
缺点是:虽然加入了过期时间,并setnx 不是原子性操作,也可能造成死锁。第三版本:用set 保证原子操作
set key value nx ex
缺点是:在高并发下可能造成误删锁。
如图所示:第四版本(最重要!!):结合LUA脚本,释放锁.
(本文章主要是讲解的第四版本)
// 添加锁:
stringRedisTemplate.opsForValue().setIfAbsent()
第四版本结合LUA脚本实现释放锁:
// 比较线程标示与锁中的标示是否一致
if(redis.call('get', KEYS[1]) == ARGV[1]) then
// 释放锁 del key
return redis.call('del', KEYS[1])
end
return 0`
- 第五种:Redisson是开源的框架,在redis基础上实现的分布式工具的集合。而分布式锁只是Redisson的一个子集。(下面有详细解释)
lock.tryLock(获取锁等待时间,锁自动释放时间); // 释放锁 lock.unlock();
- 第六种:MultiLock联锁 解决的是主从集群一致性问题。(下面有详细解释)
我想说的是:用Redis实现分布式锁有多个方法,而本文介绍了第四种、第五种、第六种。他们能解决不同方面的问题。
而最重要的是:使用最多的是第四种方案
,下文中有关于加锁、释放锁的的实际代码。
一、什么是分布式锁
1、分布式锁的概念
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
- 多进程可见:多个JVM都能看得到。比如redis、mysql等,那么所有的JVM进程都能看得到
- 互斥锁:只允许一个进程能拿到锁
- 高可用:大多数情况下,获取锁都能获取成功
- 高性能:本身加锁后,线程变成了串行执行,从而会影响性能。所以获取锁的步骤上应该高性能
- 安全性:获取锁应该考虑异常的情况。获取锁后宕机怎么办? 死锁怎么办
2、mysql、Redis、Zookeeper对比
分布式锁的核心是实现多进程之间互斥,而满足条件的并且常见的有三种:mysql、redis、zookeeper
- mysq: 数据库都具有事务的机制,在执行事务操作时,mysql会自动分配一个互斥锁。所以说在事务之间是互斥的,只有一个人能去执行。在业务执行前,去数据库中申请一个互斥锁,然后再去执行业务,当业务执行结束后,提交事务,锁也就释放了。当业务抛出了异常时,会自动触发回滚,锁也就释放了
- redis: 利用setnx互斥命令。 获取锁就在reids中setnx一条数据,如果没有该key,那么添加成功,随之获取锁成功,反之一样。当删除该key,就是释放锁成功。
二、基于redis实现分布式锁(最佳实践!!)
1、最佳实践分布式锁:set key value nx ex
- 超时释放:在获取锁时加入过期时间。 可以避免服务宕机,然后死锁
127.0.0.1:6379[1]> set lock thread ex 8 nx
OK
127.0.0.1:6379[1]> ttl lock
(integer) 6
127.0.0.1:6379[1]> ttl lock
(integer) 4
127.0.0.1:6379[1]> ttl lock
(integer) 1
127.0.0.1:6379[1]> ttl lock
(integer) -2
释放锁就是删除key: del lock
2、实际开发中:实现redis分布式锁
- 添加释放锁需要判断是否是当前线程,避免锁误删操作。
- 添加LUA脚本解决多条命令原子性问题
1.定义接口,利用redis实现分布式锁功能
尝试获取锁:是因为采用的是非阻塞式。获取锁只是获取一次。要么成功要么失败。
public interface ILock {
/**
* 尝试获取锁
* @param timeoutSec = EX :锁持有的超时时间,过期后自动释放
* @return true代表获取锁成功; false代表获取锁失败
*/
boolean tryLock(long timeoutSec);
/**
* 释放锁
*/
void unlock();
}
2. 实现接口,具体实现获取锁和释放锁
1.在获取锁时存入线程标识(可以用UUID表示)
2.在释放锁时先获取锁中的线程标识,判断是否与当前线程标识一致。从而避免误删别人的锁。
- 如果一致则释放锁
- 如果不一致则不释放锁
public class SimpleRedisLock implements ILock {
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// 调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}
/*@Override
public void unlock() {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标示
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标示是否一致
if(threadId.equals(id)) {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}*/
}
3.释放锁的lua脚本
-- 比较线程标示与锁中的标示是否一致
if(redis.call('get', KEYS[1]) == ARGV[1]) then
-- 释放锁 del key
return redis.call('del', KEYS[1])
end
return 0
三、基于Redisson实现分布式锁
1、Redisson的介绍
Redisson是开源的框架,在redis基础上实现的分布式工具的集合。而分布式锁只是Redisson的一个子集。
-
每个Redis服务实例都能管理多达1TB的内存。
-
Redisson底层采用的是Netty 框架。支持Redis 2.8以上版本,支持Java1.6+以上版本
-
GitHub地址:
https://github.com/redisson/redisson
-
下面这个图是来自官网,可以实现对锁的功能
四、SpringBoot集成Redisson
1、配置环境
- 引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
- 配置Redisson客户端:
@Configuration
public class RedissonConfig {
// redis的工厂类,可以从中拿到各种工具
@Bean
public RedissonClient redissonClient(){
// 配置类
Config config = new Config();
// 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
config.useSingleServer().setAddress("redis://192.168.75.111:6379").setPassword("123321");
// 创建RedissonClient对象。创建客户端
return Redisson.create(config);
}
}
为什么不使用yml文件和start呢?
添加配置可以使用yml文件,跟springBoot整合来实现,官网还提供了start。
因为会替代spring提供的redis的配置和实现。
建议使用Redisson时,自己进行配置bean,不和spring提供的redis配置进行掺和。
2、测试
@Resource
private RedissonClient redissonClient;
@Test
void testRedisson() throws InterruptedException {
// 获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
/* 尝试获取锁,参数分别是:
参数一:获取锁的最大等待时间(期间会重试)
参数二:锁自动释放时间,时间单位
1. 无参模式:非阻塞式
等待时间为-1,就是不等待。如果获取失败立即结束。
自动释放为30秒钟,超时30秒后才会释放
*/
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
// 判断释放获取成功
if(isLock){
try {
System.out.println("执行业务");
}finally {
// 释放锁
lock.unlock();
}
}
}
3、可重入锁的原理
1、什么是可重入锁?
可重入锁:
- 指的是同一个线程,可以多次获得一把锁。
- 利用Hash结构记录线程id和重试次数。
- 利用watchDog延续锁时间。
- 利用信号量控制锁重试等待。
缺点:redis宕机引起锁失效问题
例如方法A调用方法B,在方法A中先去获得锁,然后执行业务去调用B,而B又要获取同一把锁。
而例如set key value nx time 就是不可重入锁,就会出现死锁的状态。例如:如果A获得锁后,去执行B,B如果也想获得锁,但是A并没有释放锁,所以说就会出现死锁状态。
2、获取锁和释放锁
需要Hash类型
- key中记录锁的名称
- field记录线程标识
- value记录锁的重试次数。
获取锁和释放锁的流程:
- 创建锁的对象
- 在方法A中,获取锁,tryLock时记录锁的线程标识和重试次数为1
- 在方法B中,获取锁。如果是锁已经存在,并且是同一线程时,只需要在重试次数中加1。代表是第二次获取同一个锁。
- 在方法B或者方法A中,执行完业务,释放锁的逻辑是:需要把重试次数减1,并判断是否为0,如果为0则删除锁。
@SpringBootTest
class RedissonTest {
@Resource
private RedissonClient redissonClient;
private RLock lock;
@BeforeEach
void setUp() {
lock = redissonClient.getLock("order");
}
@Test
void method1() throws InterruptedException {
// 尝试获取锁
boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
if (!isLock) {
log.error("获取锁失败 .... 1");
return;
}
try {
log.info("获取锁成功 .... 1");
method2();
log.info("开始执行业务 ... 1");
} finally {
log.warn("准备释放锁 .... 1");
lock.unlock();
}
}
void method2() {
// 尝试获取锁
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败 .... 2");
return;
}
try {
log.info("获取锁成功 .... 2");
log.info("开始执行业务 ... 2");
} finally {
log.warn("准备释放锁 .... 2");
lock.unlock();
}
}
}
4. 获取锁和释放锁的lua脚本
获取锁和释放锁一定要采用Lua脚本,来确保获取和释放锁的原子性。
获取锁:
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断是否存在
if(redis.call('exists', key) == 0) then
-- 不存在, 获取锁
redis.call('hset', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return 1; -- 返回结果
end;
-- 锁已经存在,判断threadId是否是自己
if(redis.call('hexists', key, threadId) == 1) then
-- 不存在, 获取锁,重入次数+1
redis.call('hincrby', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return 1; -- 返回结果
end;
return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败
释放锁:
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断当前锁是否还是被自己持有
if (redis.call('HEXISTS', key, threadId) == 0) then
return nil; -- 如果已经不是自己,则直接返回
end;
-- 是自己的锁,则重入次数-1
local count = redis.call('HINCRBY', key, threadId, -1);
-- 判断是否重入次数是否已经为0
if (count > 0) then
-- 大于0说明不能释放锁,重置有效期然后返回
redis.call('EXPIRE', key, releaseTime);
return nil;
else -- 等于0说明可以释放锁,直接删除
redis.call('DEL', key);
return nil;
end;
4、 Redisson的锁重试和WatchDog机制
1、什么可重试问题
可重试:利用信号量和PubSub【发布订阅】功能实现等待、唤醒、获取锁失败的重试机制。
第一次尝试获取锁失败以后,并不是立即失败,而是利用了redis的PubSub的机制,做一个等待,等待释放锁的消息。
而获取锁成功的线程,在释放锁中会发送一条释放锁的消息。从而会被正在等待的线程通过订阅机制捕获到。
当等到释放锁的消息后,就会重试机制。
不可重试: 获取锁只尝试一次就返回false。
boolean isLock = lock.tryLock();
tryLock()的参数:
long waitTime:获取锁的最大等待时常。当第一次获取锁失败后,不会立即返回false,而是在规定的时间内进行重试,直到超时才会返回false。
long leaseTime:自动失效释放的时间
TimeUnit unit:时间单位
2、深入原码解释:
从获取锁这条命令开始往下执行:
boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime); //将传进来的时间,转换为毫秒
long current = System.currentTimeMillis();//得到当前时间
long threadId = Thread.currentThread().getId(); //得到线程的id,就是将来锁的标识
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId); //尝试获取锁。
if (ttl == null) {
return true; //如果获取成功,返回true
} else {
/*
*获取失败,就要再次获取:
*/
time -= System.currentTimeMillis() - current; //判断是否超时重试时间
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
current = System.currentTimeMillis(); // 再次得到当前时间
/*
并没有立即去尝试。而是订阅subscribe 其他人释放锁的信号
在释放锁时有这样的语句,用来发布信号:redis.call('piblis',KEY[2],ARGV[1]);"
*/
RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
// 当且仅当future在指定的时间限制内完成时为True
// 等待time(锁的剩余等待时间),如果等到锁的时间过期,
// 还没有等到释放锁的信号,就会返回获取锁失败
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
// 等待锁的重试超时时间,就取消订阅
this.unsubscribe(subscribeFuture, threadId);
}
});
}
this.acquireFailed(waitTime, unit, threadId);
return false;
try { //计算剩余等待时间
time -= System.currentTimeMillis() - current;
// 如果剩余等待时间小于0
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
boolean var20 = false;
return var20;
} else {
boolean var16;
// 如果剩余等待时间大于0 。 进入do while循环
do {
//得到当前时间
long currentTime = System.currentTimeMillis();
// 第一次去重试
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
var16 = true;
return var16;
}
// 如果获取失败,则看一下剩余时间
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
// 剩余时间如果还有
currentTime = System.currentTimeMillis();
// 采用信号量。在规定时间内,等待得到释放锁的信号量
// 如果ttl小于等待时间:说明在等待时锁就释放了,就等待ttl的时间
// 如果ttl大于等待时间:等待time的时间
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
// 如果等着ttl 到期后,time肯定还没有到期。那么就一直循环while,等待锁的释放信号
} while(time > 0L);
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
} finally {
this.unsubscribe(subscribeFuture, threadId);
}
}
}
}
}
2、WatchDog机制
超时释放: 锁超时释放虽然可以避免死锁,但如果业务执行耗时较长,也会导致锁释放,存在安全隐患。
超时续约: 利用watchDog看门狗机制,每隔一段时间(releseTime/3),重置超时时间
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
//
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
/*
getConnectionManager:看门狗的时间,默认是30秒,去获取锁
*/
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 当future完成以后(剩余有效期,和异常)
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
// 剩余有效期= null, 说明获取锁成功了
if (ttlRemaining == null) {
// 任务调度 过期时间 : 自动续期功能
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
五、分布式锁主从一致性问题
1、什么是主从一致性问题
如果redis提供了主从集群,主从同步存在延迟,假设主服务器宕机后,如果主服务器中的锁,没有同步,导致死锁
主节点:负责增删改
从节点:只负责读的问题
那么主节点会把数据同步到从节点中,但是同步时会存在延迟,即使延迟很短也是会存在。当获取锁后,主从数据还没有来及同步时,主节点宕机了。主备切换后,在新的master节点中,发现锁并不存在了。
2、Redisson如何解决一致性问题【MultiLock联锁】
原理:多个独立的redis节点,必须在所有节点都获取重入锁,才算获取锁成功
优点:所有锁中最安全的实现方法
缺点:运维成本高、实现复杂
既然主从关系是导致一致性问题的原因,那么Redisson取消主从,那么所有的节点都是独立的redisson节点,相互之间没有任何关系,都可以做读写操作。那么获取锁时,依次在多个节点中进行获取锁操作。
可用性问题: 即使某一个节点宕机后,那么其他节点都有锁的信息。
更高的可用性: 在每一个节点后面加入slave节点,做主从同步。
即使加入了主从同步,也不会出现安全问题。
假设某一台master宕机后,刚好并没有完成数据同步。那么slave变成了master主节点。没有锁标识。
有一个线程趁虚而入,想要获取锁,并不能获取成功。因为只有在每一个节点都拿到锁才能获取成功。
只要任意一个节点存活中,其他线程就不能拿到锁,就不会出现锁失效的问题。
优点:保留了主从 机制,确保了整个redis的高可用特性,避免了主从一致引发的锁失效问题。
更多推荐
所有评论(0)