基于lettuce+lua实现Redis分布式锁
转载自:https://juejin.cn/post/6844904057920831496实现原理基本思路分布式系统中有很多个节点,但这些节点都访问一个共同的Redis实例(或Redis集群)。所以可以使用 setnx(set if not exists) 指令执行key操作,只允许被一个客户端节点执行,但其它节点再来执行时会失败,这样就可以保证同一时刻只有一个节点占有锁。节点在执行完业务逻辑后
转载自:https://juejin.cn/post/6844904057920831496
实现原理
基本思路
分布式系统中有很多个节点,但这些节点都访问一个共同的Redis实例(或Redis集群)。所以可以使用 setnx(set if not exists) 指令执行key操作,只允许被一个客户端节点执行,但其它节点再来执行时会失败,这样就可以保证同一时刻只有一个节点占有锁。节点在执行完业务逻辑后需要将key删除(相当于释放锁)。这里为了防止业务执行时发生了异常而导致死锁,还需要给key设置一个过期时间(到了时间自动释放锁),以便其它节点可以继续获取锁。
- 使用setnx执行成功后一定要设置key的失效时间,或者直接使用命令
set key value px millseconds nx
- 每个key对应的value一定要是唯一的值,可以使用UUID生成一个随机ID。
- 删除key之前需要验证value是否一致,防止错误地释放锁。
缺陷
事实上这类琐最大的缺点就是它加锁时只作用在一个Redis节点上,即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况:在Redis的master节点上拿到了锁, 但是这个加锁的key还没有同步到slave节点。 master故障,发生故障转移,slave节点升级为master节点,由于salve节点上没有这个加锁的key,所以导致锁丢失。
Lua脚本
因为加锁和解锁涉及到一个以上的命令,为了保证原子性,所以一般使用lua脚本来执行命令。
加锁脚本:
-- 首先尝试使用setnx设置值
local result = redis.call('setnx', KEYS[1], ARGV[2]);
-- 如果成功,则设置key的失效时间
if result == 1 then
redis.call('pexpire', KEYS[1], tonumber(ARGV[1]));
-- 设置成功的话就返回空值,与后面返回的失效时间区别开
return nil
else
-- key操作失败,说明有其它节点正在操作,直接返回key的失效时间
return redis.call('pttl', KEYS[1])
end
复制代码
解锁脚本:
-- 首先获取key对应的值
local result = redis.call('get', KEYS[1]);
-- 然后验证value,如果相同则删除该key
if result == ARGV[1] then
redis.call('del', KEYS[1])
return 1
else
return nil
end
复制代码
使用lettuce客户端实现分布式锁
这里基于lettuce客户端实现一个可重入的分布式Redis锁。首先需要引入lettuce的依赖:
<!-- lettuce Redis的高级客户端 -->
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.1.8.RELEASE</version>
</dependency>
复制代码
关于lettuce的使用,可以参考lettuce官方github的wiki。
锁的实现
因为加锁和解锁的脚本需要频繁地调用,为了减少网络IO的消耗,所以先将加锁和解锁的脚本缓存到了服务器本地,redis会使用sha1算法将脚本内容变成一个40位16进制组成的字符串作为该脚本的唯一ID并返回给客户端,客户端只需发送这个唯一ID给服务器就可以执行脚本了。在lettuce中实现如下:
/**
* 两个原子变量,用于存储加锁和解锁脚本的sha ID
*/
private final AtomicReference<String> LUA_SHA_LOCK = new AtomicReference<>();
private final AtomicReference<String> LUA_SHA_UNLOCK = new AtomicReference<>();
// 加锁的lua脚本
// 键不存在则创建,并设置键的的过期时间为5s, 如果已存在则创建失败,
final String lockLua = "local result = redis.call('setnx', KEYS[1], ARGV[2]); " +
"if result == 1 then redis.call('pexpire', KEYS[1], tonumber(ARGV[1])) " +
"return nil else return redis.call('pttl', KEYS[1]) end";
// 将脚本缓存到服务器,并保存它的唯一ID到原子变量中
LUA_SHA_LOCK.compareAndSet(null, commands.scriptLoad(lockLua));
// 释放锁的lua脚本
final String unlockLua = "local result = redis.call('get', KEYS[1]);" +
"if result == ARGV[1] then redis.call('del', KEYS[1]) " +
"return 1 else return nil end";
// 将脚本缓存到服务器,并保存它的唯一ID到原子变量中
LUA_SHA_UNLOCK.compareAndSet(null, commands.scriptLoad(unlockLua));
复制代码
为了保证值的唯一性,这里使用UUID来生成随机ID作为值,并将该值存在本地线程变量中,当重入锁或解锁时可以直接从变量中拿以保证相同的key拥有的value一致。然后执行lua脚本尝试获取锁,如果获取锁成功则直接返回。否则一直阻塞,并不断尝试获取锁直到拿到锁。
我这里的实现是有问题的,因为不能及时通知其它线程去获取锁。Redisson的分布式锁是通过Redis的发布订阅机制实现通知的则可以及时通知其它线程去抢锁(Java的ReentrantLock则是通过队列实现的)
这里有个坑,就是ScriptOutputType类型和真实返回类型不一样,比如这里我设置的是Integer,它实例返回类型是Long(可以自己打印class查看对象类型)
private Boolean tryLock(String key) {
String[] keys = new String[]{key};
// 失效时间为5秒
String ttl = "5000";
String value = getValueByKey(key);
// 如果没有设置过值
if (value == null) {
// 锁的值使用UUID生成随机ID以保证值的唯一性
value = UUID.randomUUID().toString();
// 将新生成的值放入集合中
values.get().put(key, value);
}
String[] args = new String[]{ttl, value};
// 使用脚本唯一ID执行lua脚本
Long result = commands.evalsha(LUA_SHA_LOCK.get(), ScriptOutputType.INTEGER, keys, args);
// 返回空说明加锁成功
if (result == null) {
return true;
}
boolean isLock;
// 一直阻塞,直到拿到锁
while (true) {
try {
// 等待一小会儿
Thread.sleep(5);
// 继续尝试获取锁
isLock = this.tryLock(key);
if (isLock) {
return true;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
复制代码
下面贴出完整的代码:
public class Lock {
private Logger logger = LoggerFactory.getLogger(Lock.class);
/**
* 当前线程的锁集合
*/
private ThreadLocal<Map<String, Integer>> lockers = new ThreadLocal<>();
/**
* 当前线程锁的key和value集合
*/
private ThreadLocal<Map<String, String>> values = new ThreadLocal<>();
private RedisClient client;
private RedisCommands<String, String> commands;
/**
* 两个原子变量,用于存储加锁和解锁脚本的sha ID
*/
private final AtomicReference<String> LUA_SHA_LOCK = new AtomicReference<>();
private final AtomicReference<String> LUA_SHA_UNLOCK = new AtomicReference<>();
/**
* Lock的构造函数
* @param client RedisClient的实例
*/
public Lock(RedisClient client) {
this.client = client;
StatefulRedisConnection<String, String> connection = client.connect();
commands = connection.sync();
// 加锁的lua脚本
// 键不存在则创建,并设置键的的过期时间为5s, 如果已存在则创建失败,
final String lockLua = "local result = redis.call('setnx', KEYS[1], ARGV[2]); " +
"if result == 1 then redis.call('pexpire', KEYS[1], tonumber(ARGV[1])) " +
"return nil else return redis.call('pttl', KEYS[1]) end";
// 将脚本缓存到服务器,并保存它的唯一ID到原子变量中
LUA_SHA_LOCK.compareAndSet(null, commands.scriptLoad(lockLua));
// 释放锁的lua脚本
final String unlockLua = "local result = redis.call('get', KEYS[1]);" +
"if result == ARGV[1] then redis.call('del', KEYS[1]) " +
"return 1 else return nil end";
// 将脚本缓存到服务器,并保存它的唯一ID到原子变量中
LUA_SHA_UNLOCK.compareAndSet(null, commands.scriptLoad(unlockLua));
}
/**
* 尝试获取锁
* @param key 锁的键
* @return 是否成功获取锁
*/
private Boolean tryLock(String key) {
String[] keys = new String[]{key};
// 失效时间为5秒
String ttl = "5000";
String value = getValueByKey(key);
// 如果没有设置过值
if (value == null) {
// 锁的值使用UUID生成随机ID以保证值的唯一性
value = UUID.randomUUID().toString();
// 将新生成的值放入集合中
values.get().put(key, value);
}
String[] args = new String[]{ttl, value};
// 如果创建键成功,则说明加锁成功
Long result = commands.evalsha(LUA_SHA_LOCK.get(), ScriptOutputType.INTEGER, keys, args);
if (result == null) {
return true;
}
boolean isLock;
// 一直阻塞,直到拿到锁
while (true) {
try {
// 这里的实现是有问题的
// Redisson实现的分布式锁是使用发布订阅实现的(Java的reentrantLock通过队列实现的)
// 从而可以及时通知其它线程去抢锁
Thread.sleep(5);
// 继续尝试获取锁
isLock = this.tryLock(key);
if (isLock) {
return true;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 尝试释放锁
* @param key 锁的键
* @return 是否成功释放锁
*/
private boolean tryRelease(String key) {
String[] keys = new String[]{key};
String[] args = new String[]{getValueByKey(key)};
// 释放锁
Long result = commands.evalsha(LUA_SHA_UNLOCK.get(), ScriptOutputType.INTEGER, keys, args);
return result != null;
}
/**
* 获取当前线程的锁
* @param key 锁的键
* @return 当前线程的锁和该锁的重入次数
*/
private Integer getLockerCnt(String key) {
// 获取当前线程的锁集合
Map<String, Integer> map = lockers.get();
// 如果集合不为空,返回key对应的值
if (map != null) {
return map.get(key);
}
lockers.set(new HashMap<>(4));
return null;
}
/**
* 获取锁对应的值
* @param key 锁的键
* @return 锁对应的值
*/
private String getValueByKey(String key) {
// 获取当前线程的锁和对应值的键值对集合
Map<String, String> map = values.get();
// 如果集合不为空,返回key对应的值
if (map != null) {
return map.get(key);
}
values.set(new HashMap<>(4));
return null;
}
/**
* 加可重入锁
* @param key 锁的键
* @return 是否成功
*/
public boolean lock(String key){
Integer refCnt = getLockerCnt(key);
if (refCnt != null) {
// 如果锁已持有,则锁的引用计数加1
lockers.get().put(key, refCnt + 1);
return true;
}
// 尝试加锁
boolean ok = this.tryLock(key);
// 如果加锁失败,则返回
if (!ok) {
return false;
}
// 加锁成功,引用计数设置为1
lockers.get().put(key, 1);
return true;
}
/**
* 释放可重入锁
* @param key 锁的键
* @return 是否成功
*/
public boolean unlock(String key) {
Integer refCnt = getLockerCnt(key);
// 当前未持有锁
if (refCnt == null) {
return false;
}
// 锁的引用数减1
refCnt --;
// 引用计数大于0,说明还持有锁
if (refCnt > 0) {
lockers.get().put(key, refCnt);
} else {
// 否则从锁集合中删除该键,并释放锁
lockers.get().remove(key);
return this.tryRelease(key);
}
return true;
}
}
复制代码
测试锁的使用
public class TestLettuce {
private final Logger logger = LoggerFactory.getLogger(TestLettuce.class);
private RedisClient client;
private int count;
@Test
public void testLock() throws InterruptedException {
RedisURI redisUri = RedisURI.builder().withHost("localhost").withPort(6379).withPassword("redis666").build()
// 获取客户端
client = RedisClient.create(redisUri);
// 创建锁的实例
Lock lock = new Lock(client);
// 创建一个固定线程池
ExecutorService service = Executors.newFixedThreadPool(5);
// 开启五个任务
for (int i = 0; i < 5; i++) {
service.submit(() -> {
boolean isLock;
try {
isLock = lock.lock("myLock");
if (isLock) {
logger.info("我拿到锁啦");
count ++;
logger.info("count: [{}]", count);
}
}finally {
lock.unlock("myLock");
}
});
}
Thread.sleep(10000);
client.shutdown();
}
复制代码
根据输出可以看到,实例变量是被五个线程安全地使用的。
2020-02-06 20:53:04.434 [pool-1-thread-2] INFO com.ncusofter.redis.TestDemo - 我拿到锁啦
2020-02-06 20:53:04.434 [pool-1-thread-2] INFO com.ncusofter.redis.TestDemo - count: [1]
2020-02-06 20:53:04.441 [pool-1-thread-5] INFO com.ncusofter.redis.TestDemo - 我拿到锁啦
2020-02-06 20:53:04.441 [pool-1-thread-5] INFO com.ncusofter.redis.TestDemo - count: [2]
2020-02-06 20:53:04.445 [pool-1-thread-3] INFO com.ncusofter.redis.TestDemo - 我拿到锁啦
2020-02-06 20:53:04.445 [pool-1-thread-3] INFO com.ncusofter.redis.TestDemo - count: [3]
2020-02-06 20:53:04.449 [pool-1-thread-1] INFO com.ncusofter.redis.TestDemo - 我拿到锁啦
2020-02-06 20:53:04.449 [pool-1-thread-1] INFO com.ncusofter.redis.TestDemo - count: [4]
2020-02-06 20:53:04.455 [pool-1-thread-4] INFO com.ncusofter.redis.TestDemo - 我拿到锁啦
2020-02-06 20:53:04.457 [pool-1-thread-4] INFO com.ncusofter.redis.TestDemo - count: [5]
更多推荐
所有评论(0)