一、基本用法:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.8.2</version>
</dependency>
redis单节点配置
Config config = new Config();
SingleServerConfig serverConfig = config.useSingleServer()
        .setAddress("redis://127.0.0.1:7000")
        .setTimeout(timeout)
        .setConnectionPoolSize(connectionPoolSize)
        .setConnectionMinimumIdleSize(connectionMinimumIdleSize);

if(StringUtils.isNotBlank(password)) {
    serverConfig.setPassword(password);
}
RedissonClient Redisson.create(config);
redis多节点配置
Config config = new Config();
config.useClusterServers()
        .setScanInterval(2000)
        .addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001")
        .addNodeAddress("redis://127.0.0.1:7002");
		//支持添加多节点

RedissonClient redisson = Redisson.create(config);
指定master-slave
Config config = new Config();// 创建配置
config.useMasterSlaveServers()// 指定使用主从部署方式
        //.setReadMode(ReadMode.SLAVE) 默认值SLAVE,读操作只在从节点进行
        //.setSubscriptionMode(SubscriptionMode.SLAVE) 默认值SLAVE,订阅操作只在从节点进行
        //.setMasterConnectionMinimumIdleSize(10) 默认值10,针对每个master节点初始化10个连接
        //.setMasterConnectionPoolSize(64) 默认值64,针对每个master节点初始化10个连接,最大可以扩展至64个连接
        //.setSlaveConnectionMinimumIdleSize(10) 默认值10,针对每个slave节点初始化10个连接
        //.setSlaveConnectionPoolSize(64) 默认值,针对每个slave节点初始化10个连接,最大可以扩展至64个连接
        //.setSubscriptionConnectionMinimumIdleSize(1) 默认值1,在SubscriptionMode=SLAVE时候,针对每个slave节点初始化1个连接
        //.setSubscriptionConnectionPoolSize(50) 默认值50,在SubscriptionMode=SLAVE时候,针对每个slave节点初始化1个连接,最大可以扩展至50个连接
        .setMasterAddress("redis://192.168.29.24:6379")  // 设置redis主节点
        .addSlaveAddress("redis://192.168.29.24:7000") // 设置redis从节点
        .addSlaveAddress("redis://192.168.29.24:7001"); // 设置redis从节点
RedissonClient redisson = Redisson.create(config);// 创建客户端(发现这一操作非常耗时,基本在2秒-4秒左右
哨兵模式
Config config = new Config();
SentinelServersConfig serverConfig = config.useSentinelServers().addSentinelAddress("redis://192.168.29.24:6379","redis://192.168.29.24:7000","redis://192.168.29.24:7001")
        .setMasterName(redssionProperties.getMasterName())
        .setTimeout(redssionProperties.getTimeout())
        .setMasterConnectionPoolSize(redssionProperties.getMasterConnectionPoolSize())
        .setSlaveConnectionPoolSize(redssionProperties.getSlaveConnectionPoolSize());

if(StringUtils.isNotBlank(redssionProperties.getPassword())) {
    serverConfig.setPassword(redssionProperties.getPassword());
}
return Redisson.create(config);

二、获取锁API

//获得分布式锁实例,这里不会阻塞。
lock = redisson.getLock(key);

lock.lock();//获取锁
lock.lock(timeOut, TimeUnit.SECONDS); //获取锁,如果获取成功并设置锁的过期时间

lock.tryLock(); //尝试获取锁
lock.tryLock(timeOut,1L,TimeUnit.SECONDS);


public RLock getLock(String name) {
    return new RedissonLock(this.connectionManager.getCommandExecutor(), name);
}

public void lock() {
    try {
        this.lockInterruptibly();
    } catch (InterruptedException var2) {
        Thread.currentThread().interrupt();
    }

}

public void lock(long leaseTime, TimeUnit unit) {
    try {
        this.lockInterruptibly(leaseTime, unit);
    } catch (InterruptedException var5) {
        Thread.currentThread().interrupt();
    }
}

//这里比较重要
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = this.tryAcquire(leaseTime, unit, threadId);//尝试获取锁
    if (ttl != null) {
        RFuture<RedissonLockEntry> future = this.subscribe(threadId);
        this.commandExecutor.syncSubscription(future);

        try {
            while(true) {
                ttl = this.tryAcquire(leaseTime, unit, threadId);
                if (ttl == null) {
                    return;
                }

                if (ttl >= 0L) {
                    this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    this.getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            this.unsubscribe(future, threadId);
        }
    }
}

可以看到,调用getLock()方法后实际返回一个RedissonLock对象,在RedissonLock对象的lock()方法主要调用tryAcquire()方法

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}


private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1L) {
        return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            public void operationComplete(Future<Long> future) throws Exception {
                if (future.isSuccess()) {
                    Long ttlRemaining = (Long)future.getNow();
                    if (ttlRemaining == null) {
                        RedissonLock.this.scheduleExpirationRenewal(threadId);
                    }

                }
            }
        });
        return ttlRemainingFuture;
    }
}

最终走到下面这行代码


return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);


<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    this.internalLockLeaseTime = unit.toMillis(leaseTime);//leaseTime表示key的过期时间,unit是时间单位
    return this.commandExecutor.evalWriteAsync(
			this.getName(), 
			LongCodec.INSTANCE, 
			command, 
			"if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", 
			Collections.singletonList(this.getName()), 
			new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}

首先,看一下evalWriteAsync方法的定义

<T, R> RFuture<R> evalWriteAsync(
		String var1, Codec var2, 
		RedisCommand<T> var3, String var4, 
		List<Object> var5, Object... var6);

结合上面代码看下var4(下面是LUA脚本)

if (redis.call('exists', KEYS[1]) == 0) then   判断key是否存在
	redis.call('hset', KEYS[1], ARGV[2], 1);   如果不存在,则设置key的 某个字段  value=1
	redis.call('pexpire', KEYS[1], ARGV[1]);   设置key的过期时间
	return nil;                                
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then  判断hash 表key的某个字段是否等于1
	redis.call('hincrby', KEYS[1], ARGV[2], 1);         如果是1设置key的字段加一
    redis.call('pexpire', KEYS[1], ARGV[1]);            重新设置key的过期时间
    return nil;                                         这个是重入锁的功能
end;
return redis.call('pttl', KEYS[1]);                     返回锁的过期时间

说明:
this.getName() 就是我们代码传的redisKey
LongCodec.INSTANCE 表示编码格式
command 不清楚是啥,可以不管。

Collections.singletonList(this.getName()) 表示吧RedisKey转成List,也就是LUA代码中的KEYS1

new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)} 表示吧多个参数转成数组其中
this.internalLockLeaseTime也就是ARGV1,也就是过期时间
this.getLockName(threadId) 也就是ARGV2,也就是Hash中的一个字段

流程小结

1、判断有没有一个叫“abc”的key
2、如果没有,则在其下设置一个字段为“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”,值为“1”的键值对
,并设置它的过期时间
3、如果存在,则进一步判断“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”是否存在,若存在,则其值加1,并重新设置过期时间
4、返回“abc”的生存时间(毫秒)

三. 解锁

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
	return this.commandExecutor.evalWriteAsync(
			this.getName(), 
			LongCodec.INSTANCE, 
			RedisCommands.EVAL_BOOLEAN, 
			"if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", 
			Arrays.asList(this.getName(), this.getChannelName()), 
			new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)});
}

简化下就是:

if (redis.call('exists', KEYS[1]) == 0) then 					//判断KEY是否存在
	redis.call('publish', KEYS[2], ARGV[1]); 					//如果不存在,将信息发送到指定的频道,向频道KEYS[2],发送ARGV[1]消息
	return 1; 
end;
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then			//用于查看哈希表的指定字段是否存在,不存在return
	return nil;
end; 
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);	//存在,将Hash表中的ARGV3字段减一,得到ARGV[3]的值
if (counter > 0) then											//判断ARGV[3]当前值>0,说明锁还没释放完
	redis.call('pexpire', KEYS[1], ARGV[2]); 					//重新设置KEY[1]的过期时间。所以锁还没有释放完
	return 0;
else 															//执行到这说明counter=0,锁已经释放完了。
	redis.call('del', KEYS[1]);									//删除Hash的KEY
	redis.call('publish', KEYS[2], ARGV[1]); 					//释放完发布订阅,将信息发送到指定的频道KEYS[2]发送内容ARGV[1]
	return 1;
end; 
	return nil;
this.getName()	      		//KEY[1]  缓存KEY1就是HASH的大KEY[1]
this.getChannelName()		//KEY[2]  缓存KEY2,发布订阅中的频道KEYS[2] 
LockPubSub.unlockMessage	//ARGV[1] 频道所对应的内容
this.internalLockLeaseTime	//ARGV[2] 生存时间
this.getLockName(threadId)	//ARGV[3] Hash表中的一个字段。里面存的是当前占用锁的字段。

1、判断是否存在一个key
2、如果不存在,向Channel中广播一条消息,广播的内容是0,并返回1
3、如果存在,进一步判断字段 ARGV[3] 是否存在
4、若字段不存在,返回空,若字段存在,则字段值减1
5、若减完以后,字段值仍大于0,则返回0
6、减完后,若字段值小于或等于0,则广播一条消息,广播内容是0,并返回1;

可以猜测,广播0表示资源可用,即通知那些等待获取锁的线程现在可以获得锁了

等待:以上是正常情况下获取到锁的情况,那么当无法立即获取到锁的时候怎么办呢?再回到前面获取锁的位置

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = this.tryAcquire(leaseTime, unit, threadId);
    if (ttl != null) {
        RFuture<RedissonLockEntry> future = this.subscribe(threadId);
        this.commandExecutor.syncSubscription(future);

        try {
            while(true) {
                ttl = this.tryAcquire(leaseTime, unit, threadId);
                if (ttl == null) {
                    return;
                }

                if (ttl >= 0L) {
                    this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    this.getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            this.unsubscribe(future, threadId);
        }
    }
}

这里会订阅Channel,当资源可用时可以及时知道,并抢占,防止无效的轮询而浪费资源
当资源可用用的时候,循环去尝试获取锁,由于多个线程同时去竞争资源,所以这里用了信号量,对于同一个资源只允许一个线程获得锁,其它的线程阻塞

小结:
在这里插入图片描述

在这里插入图片描述

四、实际开发中的问题

琐最大的缺点就是它加锁时只作用在一个Redis节点上,即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况:

  1. 在Redis的master节点上拿到了锁;
  2. 但是这个加锁的key还没有同步到slave节点;
  3. master故障,发生故障转移,slave节点升级为master节点;
  4. 导致锁丢失。

正因为如此,Redis作者antirez基于分布式环境下提出了一种更高级的分布式锁的实现方式:Redlock。笔者认为,Redlock也是Redis所有分布式锁实现方式中唯一能让面试官高潮的方式。

Redlock的设计思路:

  1. 依次尝试从5个实例,使用相同的key和具有唯一性的value获取锁。
  2. 客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间,这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。
  3. 当且仅当从大多数(N/2+1)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功
  4. 如果因为某些原因,获取锁失败客户端应该在所有的Redis实例上进行解锁(无论有没有得到锁)

Redlock分布式锁
那么Redlock分布式锁如何实现呢?

config1.useSingleServer().setAddress("redis://172.29.1.180:5378").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);

config2.useSingleServer().setAddress("redis://172.29.1.180:5379").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);

config3.useSingleServer().setAddress("redis://172.29.1.180:5380").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);

String resourceName = "REDLOCK";
RLock lock1 = redissonClient1.getLock(resourceName);
RLock lock2 = redissonClient2.getLock(resourceName);
RLock lock3 = redissonClient3.getLock(resourceName);

RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
boolean isLock;
try {
    isLock = redLock.tryLock(500, 30000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
} finally {
    // 无论如何, 最后都要解锁。
    redLock.unlock();
}

最核心的变化就是RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
既然核心变化是使用了RedissonRedLock,那么我们看一下它的源码有什么不同。这个类是RedissonMultiLock的子类,所以调用tryLock方法时,事实上调用了RedissonMultiLock的tryLock方法,精简源码如下:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    // 实现要点之允许加锁失败节点限制(N-(N/2+1))
    int failedLocksLimit = failedLocksLimit();
    List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
    // 实现要点之遍历所有节点通过EVAL命令执行lua加锁
    for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
        RLock lock = iterator.next();
        boolean lockAcquired;
        try {
            // 对节点尝试加锁
            lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
        } catch (RedisConnectionClosedException|RedisResponseTimeoutException e) {
            // 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁
            unlockInner(Arrays.asList(lock));
            lockAcquired = false;
        } catch (Exception e) {
            // 抛出异常表示获取锁失败
            lockAcquired = false;
        }

        if (lockAcquired) {
            // 成功获取锁集合
            acquiredLocks.add(lock);
        } else {
            // 如果达到了允许加锁失败节点限制,那么break,即此次Redlock加锁失败
            if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                break;
            }               
        }
    }
    return true;
}

失效时间如何设置:

这个问题的场景是,假设设置失效时间10秒,如果由于某些原因导致10秒还没执行完任务,这时候锁自动失效,导致其他线程也会拿到分布式锁。

这确实是Redis分布式最大的问题,不管是普通分布式锁,还是Redlock算法分布式锁,都没有解决这个问题。也有一些文章提出了对失效时间续租,即延长失效时间,很明显这又提升了分布式锁的复杂度

zookeeper or redis:
没有绝对的好坏,只有更适合自己的业务。就性能而言,redis很明显优于zookeeper;就分布式锁实现的健壮性而言,zookeeper很明显优于redis。如何选择,取决于你的业务!

未完待续。。。

本文取自:https://www.cnblogs.com/cjsblog/p/9831423.html
源码:https://github.com/redisson/redisson/
很细的点:https://www.shangmayuan.com/a/007922c7222743b0a682a490.html
分析: https://blog.csdn.net/weixin_39853155/article/details/113330925
超细介绍:https://www.oschina.net/p/redisson?hmsr=aladdin1e1
项目案例:https://www.cnblogs.com/cjsblog/p/11273205.html
Redisson使用实战:https://blog.csdn.net/zsj777/article/details/82468212
https://blog.csdn.net/zxl646801924/article/details/107610699
分布式原理:https://blog.csdn.net/weixin_39853155/article/details/113330925
https://blog.csdn.net/u014042066/article/details/72778440

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐