4-API-Redisson
Redisson:if (redis.call('exists', KEYS[1]) == 0) then判断key是否存在redis.call('hset', KEYS[1], ARGV[2], 1);如果不存在,则设置key的 某个字段value=1redis.call('pexpire', KEYS[1], ARGV[1]);设置key的过期时间return nil;end;if (redi
一、基本用法:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.8.2</version>
</dependency>
redis单节点配置
Config config = new Config();
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress("redis://127.0.0.1:7000")
.setTimeout(timeout)
.setConnectionPoolSize(connectionPoolSize)
.setConnectionMinimumIdleSize(connectionMinimumIdleSize);
if(StringUtils.isNotBlank(password)) {
serverConfig.setPassword(password);
}
RedissonClient Redisson.create(config);
redis多节点配置
Config config = new Config();
config.useClusterServers()
.setScanInterval(2000)
.addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001")
.addNodeAddress("redis://127.0.0.1:7002");
//支持添加多节点
RedissonClient redisson = Redisson.create(config);
指定master-slave
Config config = new Config();// 创建配置
config.useMasterSlaveServers()// 指定使用主从部署方式
//.setReadMode(ReadMode.SLAVE) 默认值SLAVE,读操作只在从节点进行
//.setSubscriptionMode(SubscriptionMode.SLAVE) 默认值SLAVE,订阅操作只在从节点进行
//.setMasterConnectionMinimumIdleSize(10) 默认值10,针对每个master节点初始化10个连接
//.setMasterConnectionPoolSize(64) 默认值64,针对每个master节点初始化10个连接,最大可以扩展至64个连接
//.setSlaveConnectionMinimumIdleSize(10) 默认值10,针对每个slave节点初始化10个连接
//.setSlaveConnectionPoolSize(64) 默认值,针对每个slave节点初始化10个连接,最大可以扩展至64个连接
//.setSubscriptionConnectionMinimumIdleSize(1) 默认值1,在SubscriptionMode=SLAVE时候,针对每个slave节点初始化1个连接
//.setSubscriptionConnectionPoolSize(50) 默认值50,在SubscriptionMode=SLAVE时候,针对每个slave节点初始化1个连接,最大可以扩展至50个连接
.setMasterAddress("redis://192.168.29.24:6379") // 设置redis主节点
.addSlaveAddress("redis://192.168.29.24:7000") // 设置redis从节点
.addSlaveAddress("redis://192.168.29.24:7001"); // 设置redis从节点
RedissonClient redisson = Redisson.create(config);// 创建客户端(发现这一操作非常耗时,基本在2秒-4秒左右
哨兵模式
Config config = new Config();
SentinelServersConfig serverConfig = config.useSentinelServers().addSentinelAddress("redis://192.168.29.24:6379","redis://192.168.29.24:7000","redis://192.168.29.24:7001")
.setMasterName(redssionProperties.getMasterName())
.setTimeout(redssionProperties.getTimeout())
.setMasterConnectionPoolSize(redssionProperties.getMasterConnectionPoolSize())
.setSlaveConnectionPoolSize(redssionProperties.getSlaveConnectionPoolSize());
if(StringUtils.isNotBlank(redssionProperties.getPassword())) {
serverConfig.setPassword(redssionProperties.getPassword());
}
return Redisson.create(config);
二、获取锁API
//获得分布式锁实例,这里不会阻塞。
lock = redisson.getLock(key);
lock.lock();//获取锁
lock.lock(timeOut, TimeUnit.SECONDS); //获取锁,如果获取成功并设置锁的过期时间
lock.tryLock(); //尝试获取锁
lock.tryLock(timeOut,1L,TimeUnit.SECONDS);
public RLock getLock(String name) {
return new RedissonLock(this.connectionManager.getCommandExecutor(), name);
}
public void lock() {
try {
this.lockInterruptibly();
} catch (InterruptedException var2) {
Thread.currentThread().interrupt();
}
}
public void lock(long leaseTime, TimeUnit unit) {
try {
this.lockInterruptibly(leaseTime, unit);
} catch (InterruptedException var5) {
Thread.currentThread().interrupt();
}
}
//这里比较重要
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(leaseTime, unit, threadId);//尝试获取锁
if (ttl != null) {
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
this.commandExecutor.syncSubscription(future);
try {
while(true) {
ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
return;
}
if (ttl >= 0L) {
this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
this.getEntry(threadId).getLatch().acquire();
}
}
} finally {
this.unsubscribe(future, threadId);
}
}
}
可以看到,调用getLock()方法后实际返回一个RedissonLock对象,在RedissonLock对象的lock()方法主要调用tryAcquire()方法
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
public void operationComplete(Future<Long> future) throws Exception {
if (future.isSuccess()) {
Long ttlRemaining = (Long)future.getNow();
if (ttlRemaining == null) {
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
return ttlRemainingFuture;
}
}
最终走到下面这行代码
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);//leaseTime表示key的过期时间,unit是时间单位
return this.commandExecutor.evalWriteAsync(
this.getName(),
LongCodec.INSTANCE,
command,
"if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);",
Collections.singletonList(this.getName()),
new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
首先,看一下evalWriteAsync方法的定义
<T, R> RFuture<R> evalWriteAsync(
String var1, Codec var2,
RedisCommand<T> var3, String var4,
List<Object> var5, Object... var6);
结合上面代码看下var4(下面是LUA脚本)
if (redis.call('exists', KEYS[1]) == 0) then 判断key是否存在
redis.call('hset', KEYS[1], ARGV[2], 1); 如果不存在,则设置key的 某个字段 value=1
redis.call('pexpire', KEYS[1], ARGV[1]); 设置key的过期时间
return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 判断hash 表key的某个字段是否等于1
redis.call('hincrby', KEYS[1], ARGV[2], 1); 如果是1设置key的字段加一
redis.call('pexpire', KEYS[1], ARGV[1]); 重新设置key的过期时间
return nil; 这个是重入锁的功能
end;
return redis.call('pttl', KEYS[1]); 返回锁的过期时间
说明:
this.getName() 就是我们代码传的redisKey
LongCodec.INSTANCE 表示编码格式
command 不清楚是啥,可以不管。
Collections.singletonList(this.getName()) 表示吧RedisKey转成List,也就是LUA代码中的KEYS1
new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)} 表示吧多个参数转成数组其中
this.internalLockLeaseTime也就是ARGV1,也就是过期时间
this.getLockName(threadId) 也就是ARGV2,也就是Hash中的一个字段
流程小结
1、判断有没有一个叫“abc”的key
2、如果没有,则在其下设置一个字段为“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”,值为“1”的键值对
,并设置它的过期时间
3、如果存在,则进一步判断“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”是否存在,若存在,则其值加1,并重新设置过期时间
4、返回“abc”的生存时间(毫秒)
三. 解锁
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(
this.getName(),
LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;",
Arrays.asList(this.getName(), this.getChannelName()),
new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)});
}
简化下就是:
if (redis.call('exists', KEYS[1]) == 0) then //判断KEY是否存在
redis.call('publish', KEYS[2], ARGV[1]); //如果不存在,将信息发送到指定的频道,向频道KEYS[2],发送ARGV[1]消息
return 1;
end;
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then //用于查看哈希表的指定字段是否存在,不存在return
return nil;
end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); //存在,将Hash表中的ARGV3字段减一,得到ARGV[3]的值
if (counter > 0) then //判断ARGV[3]当前值>0,说明锁还没释放完
redis.call('pexpire', KEYS[1], ARGV[2]); //重新设置KEY[1]的过期时间。所以锁还没有释放完
return 0;
else //执行到这说明counter=0,锁已经释放完了。
redis.call('del', KEYS[1]); //删除Hash的KEY
redis.call('publish', KEYS[2], ARGV[1]); //释放完发布订阅,将信息发送到指定的频道KEYS[2]发送内容ARGV[1]
return 1;
end;
return nil;
this.getName() //KEY[1] 缓存KEY1就是HASH的大KEY[1]
this.getChannelName() //KEY[2] 缓存KEY2,发布订阅中的频道KEYS[2]
LockPubSub.unlockMessage //ARGV[1] 频道所对应的内容
this.internalLockLeaseTime //ARGV[2] 生存时间
this.getLockName(threadId) //ARGV[3] Hash表中的一个字段。里面存的是当前占用锁的字段。
1、判断是否存在一个key
2、如果不存在,向Channel中广播一条消息,广播的内容是0,并返回1
3、如果存在,进一步判断字段 ARGV[3] 是否存在
4、若字段不存在,返回空,若字段存在,则字段值减1
5、若减完以后,字段值仍大于0,则返回0
6、减完后,若字段值小于或等于0,则广播一条消息,广播内容是0,并返回1;
可以猜测,广播0表示资源可用,即通知那些等待获取锁的线程现在可以获得锁了
等待:以上是正常情况下获取到锁的情况,那么当无法立即获取到锁的时候怎么办呢?再回到前面获取锁的位置
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl != null) {
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
this.commandExecutor.syncSubscription(future);
try {
while(true) {
ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
return;
}
if (ttl >= 0L) {
this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
this.getEntry(threadId).getLatch().acquire();
}
}
} finally {
this.unsubscribe(future, threadId);
}
}
}
这里会订阅Channel,当资源可用时可以及时知道,并抢占,防止无效的轮询而浪费资源
当资源可用用的时候,循环去尝试获取锁,由于多个线程同时去竞争资源,所以这里用了信号量,对于同一个资源只允许一个线程获得锁,其它的线程阻塞
小结:
四、实际开发中的问题
琐最大的缺点就是它加锁时只作用在一个Redis节点上,即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况:
- 在Redis的master节点上拿到了锁;
- 但是这个加锁的key还没有同步到slave节点;
- master故障,发生故障转移,slave节点升级为master节点;
- 导致锁丢失。
正因为如此,Redis作者antirez基于分布式环境下提出了一种更高级的分布式锁的实现方式:Redlock。笔者认为,Redlock也是Redis所有分布式锁实现方式中唯一能让面试官高潮的方式。
Redlock的设计思路:
- 依次尝试从5个实例,使用相同的key和具有唯一性的value获取锁。
- 客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间,这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。
- 当且仅当从大多数(N/2+1)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功
- 如果因为某些原因,获取锁失败客户端应该在所有的Redis实例上进行解锁(无论有没有得到锁)
Redlock分布式锁
那么Redlock分布式锁如何实现呢?
config1.useSingleServer().setAddress("redis://172.29.1.180:5378").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);
config2.useSingleServer().setAddress("redis://172.29.1.180:5379").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);
config3.useSingleServer().setAddress("redis://172.29.1.180:5380").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);
String resourceName = "REDLOCK";
RLock lock1 = redissonClient1.getLock(resourceName);
RLock lock2 = redissonClient2.getLock(resourceName);
RLock lock3 = redissonClient3.getLock(resourceName);
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
boolean isLock;
try {
isLock = redLock.tryLock(500, 30000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
} finally {
// 无论如何, 最后都要解锁。
redLock.unlock();
}
最核心的变化就是RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
既然核心变化是使用了RedissonRedLock,那么我们看一下它的源码有什么不同。这个类是RedissonMultiLock的子类,所以调用tryLock方法时,事实上调用了RedissonMultiLock的tryLock方法,精简源码如下:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 实现要点之允许加锁失败节点限制(N-(N/2+1))
int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
// 实现要点之遍历所有节点通过EVAL命令执行lua加锁
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
// 对节点尝试加锁
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
} catch (RedisConnectionClosedException|RedisResponseTimeoutException e) {
// 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
// 抛出异常表示获取锁失败
lockAcquired = false;
}
if (lockAcquired) {
// 成功获取锁集合
acquiredLocks.add(lock);
} else {
// 如果达到了允许加锁失败节点限制,那么break,即此次Redlock加锁失败
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}
}
}
return true;
}
失效时间如何设置:
这个问题的场景是,假设设置失效时间10秒,如果由于某些原因导致10秒还没执行完任务,这时候锁自动失效,导致其他线程也会拿到分布式锁。
这确实是Redis分布式最大的问题,不管是普通分布式锁,还是Redlock算法分布式锁,都没有解决这个问题。也有一些文章提出了对失效时间续租,即延长失效时间,很明显这又提升了分布式锁的复杂度
zookeeper or redis:
没有绝对的好坏,只有更适合自己的业务。就性能而言,redis很明显优于zookeeper;就分布式锁实现的健壮性而言,zookeeper很明显优于redis。如何选择,取决于你的业务!
未完待续。。。
本文取自:https://www.cnblogs.com/cjsblog/p/9831423.html
源码:https://github.com/redisson/redisson/
很细的点:https://www.shangmayuan.com/a/007922c7222743b0a682a490.html
分析: https://blog.csdn.net/weixin_39853155/article/details/113330925
超细介绍:https://www.oschina.net/p/redisson?hmsr=aladdin1e1
项目案例:https://www.cnblogs.com/cjsblog/p/11273205.html
Redisson使用实战:https://blog.csdn.net/zsj777/article/details/82468212
https://blog.csdn.net/zxl646801924/article/details/107610699
分布式原理:https://blog.csdn.net/weixin_39853155/article/details/113330925
https://blog.csdn.net/u014042066/article/details/72778440
更多推荐
所有评论(0)