基于Jedis实现Redis分布式锁
在很多互联网产品应用中,有些场景需要加锁处理,比如:秒杀,全局递增ID,楼层生成等等。大部分的解决方案是基于DB实现的,Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系。其次Redis提供一些命令SETNX,GETSET,可以方便实现分布式锁机制。
1. Redis开启远程连接
Redis默认是不支持远程连接的,需要手动开启。Redis.conf 一共需要修改两个地方。
- 注释掉bind:127.0.0.1(bind:127.0.0.1的作用是就只有本机能够访问,)
- 打开requirepass,设置密码,开启密码验证
修改后保存redis.conf,重新启动Redis实例redis-server redis.conf
.
2. Jedis
Jedis官网:https://github.com/xetorthio/jedis
Redis不仅是使用命令来操作,现在基本上主流的语言都有客户端支持,比如java、C、C#、C++、php、Node.js、Go等。在官方网站里列一些Java的客户端,有Jedis、Redisson、Jredis、JDBC-Redis等。Jedis是Redis官方推荐的Java连接开发工具。Jedis提供了完整Redis命令。
2.1 基本使用
在一个Maven项目中,添加Jedis依赖。
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
测试连接Redis
public class JedisTest {
public static void main(String[] args) {
//1.构造Jedis对象
Jedis jedis = new Jedis("localhost");
//2.密码认证
jedis.auth("123");
//3.测试是否连接成功
String pong=jedis.ping();
//4.返回pong表示连接成功
System.out.println(pong);
}
}
2.2 连接池
在实际应用中,Jedis实例我们一般通过连接池来获取,由于Jedis对象不是线程安全的,所以当我们使用Jedis对象时,从连接池获取Jedis,使用完之后要还会给连接池。
public class JedisPoolTest {
public static void main(String[] args) {
//1.构建Jedis连接池
JedisPool pool =new JedisPool("127.0.0.1",6379);
Jedis jedis = null;
try {
//2.从连接池中获取Jedis连接
jedis = pool.getResource();
jedis.auth("hjh123");
//3.Jedis操作
String pong = jedis.ping();
System.out.println(pong);
//4.归还连接
jedis.close();
}catch (Exception e){
e.printStackTrace();
}finally {
if (jedis!=null){
jedis.close();
}
}
}
}
Jedis连接池是基于apache-commons pool2实现的。在构建连接池对象的时候,需要提供池对象的配置对象,及JedisPoolConfig(继承自GenericObjectPoolConfig)。我们可以通过这个配置对象对连接池进行相关参数的配置(如最大连接数,最大空数等)。
public class JedisPoolTest {
public static void main(String[] args) {
//1.构建Jedis连接池
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxIdle(8);
config.setMaxTotal(18);
JedisPool pool = new JedisPool(config, "127.0.0.1", 6379, 2000, "password");
Jedis jedis = null;
try {
//2.从连接池中获取Jedis连接
jedis = pool.getResource();
jedis.auth("hjh123");
//3.Jedis操作
String pong = jedis.ping();
System.out.println(pong);
//4.归还连接
jedis.close();
}catch (Exception e){
e.printStackTrace();
}finally {
if (jedis!=null){
jedis.close();
}
}
}
}
Jedis 的close方法如下:
@Override
public void close() { //Jedis的close方法
if (dataSource != null) {
if (client.isBroken()) {
this.dataSource.returnBrokenResource(this);
} else {
this.dataSource.returnResource(this);
}
} else {
client.close();
}
}
Jedis对close方法进行了改造,如果是连接池中的连接对象,调用Close方法将会是把连接对象返回到对象池,若不是则关闭连接。
2.3 约束使用
定义函数式接口
@FunctionalInterface
public interface CallWithJedis {
void call(Jedis jedis);
}
创建Redis类
public class Redis {
private JedisPool jedisPool;
/**
* 返回Redis对象
*/
public Redis(){
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
//连接池最大空闲数
config.setMaxIdle(300);
//最大连接数
config.setMaxTotal(1000);
//连接最大等待时间
config.setMaxWaitMillis(30000);
//在空闲时检查有效性
config.setTestOnBorrow(true);
jedisPool=new JedisPool(config,"39.108.169.57",6379,30000,"hjh123");
}
public void execute(CallWithJedis callWithJedis){
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
callWithJedis.call(jedis);
}catch (Exception e){
e.printStackTrace();
}finally {
jedis.close();
}
}
}
具体使用
public class JedisPoolTest02 {
public static void main(String[] args) {
Redis redis = new Redis();
redis.execute(jedis -> {
System.out.println(jedis.ping());
});
}
}
3. Lettuce
Lettuce的Github:https://github.com/lettuce-io/lettuce-core
Lettuce的使用也十分广泛,Lettuce基于Netty NIO框架来构建,所以克服了Jedis中线程不安全的问题,Lettuce支持同步、异步以及响应式调用,多个线程可以共享一个连接实例。
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.2.2.RELEASE</version>
</dependency>
public class LettuceDemo {
public static void main(String[] args) {
RedisClient redisClient = RedisClient.create("redis://hjh123@39.108.169.57");
StatefulRedisConnection<String, String> connect = redisClient.connect();
RedisCommands<String, String> sync = connect.sync();
sync.set("say","hai");
String name = sync.get("say");
System.out.println(name);
}
}
4. 分布式锁介绍
在很多互联网产品应用中,有些场景需要加锁处理,比如:秒杀,全局递增ID等等。在分布式应用中需要使用分布式锁来控制程序的并发执行,确保不会出现并发问题。Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系。其次Redis提供一些命令SETNX,GETSET,可以方便实现分布式锁机制。
Redis做分布式锁的作用是:
一个线程先占位,然后其他线程来操作的时候,如果有人占位了,就会放弃操作或者稍后重试。
在Redis中,占位一般使用setnx指令,先进行的进程先占位,线程的操作执行完成后,再调用del指令释放位置。
Redis 的
setnx(SET if Not eXists)
命令在指定的 key 不存在时,为 key
设置指定的值。如果存在则不会进行设置.设置成功,返回 1 。 设置失败,返回 0 。
5. 分布式锁简单实现
public class LockTest01 {
public static void main(String[] args) {
Redis redis=new Redis();
redis.execute(jedis -> {
Long setnx = jedis.setnx("k1", "v1");
//没有人占位
if(setnx==1){
jedis.set("say","hi");
String name=jedis.get("say");
System.out.println(name);
//释放锁
jedis.del("k1");
}else{
//有人占位,放弃或者稍后重试
System.out.println("锁已被占用!");
}
});
}
}
存在的问题:当获取了锁的线程的业务代码执行过程中挂了,导致del指令没有被调用,这样导致锁不会释放,导致所有请求全部堵塞。
6. 设置过期时间
为解决上述代码存在的问题,我们可以获取到锁之后,给锁添加一个过期时间,确保锁在一定时间之后获得到释放。
public class LockTest02 {
public static void main(String[] args) {
Redis redis=new Redis();
redis.execute(jedis -> {
Long setnx = jedis.setnx("k1", "v1");
//没有人占位
if(setnx==1){
jedis.expire("k1",5);
jedis.set("say","hi");
String name=jedis.get("say");
System.out.println(name);
//释放锁
jedis.del("k1");
}else{
//有人占位,放弃或者稍后重试
System.out.println("锁已被占用!");
}
});
}
}
但是像上面这样写的话,获取锁setnx和设置过期时间expire这是两个操作,不是原子性操作(原子操作是指不会被线程调度机制打断的操作。这种操作一旦开始,就会一直运行到结束,中间不会有任何线程切换)。这就会导致获取锁和设置过期时间,如果程序突然挂了或者人为因素,导致没有设置过期时间。这个时候锁一直被占用,锁无法释放。
为了解决这个问题,Redis2.8版本开始,setnx和expire可以通过setex命令一起来执行了
Redis Setex 命令为指定的 key 设置值及其过期时间。如果 key 已经存在, SETEX 命令将会替换旧的值。
redis Setex 命令基本语法如下:
redis 127.0.0.1:6379> SETEX KEY_NAME TIMEOUT VALUE
下面使用setex命令将上述代码再做改进。
public class LockTest03 {
public static void main(String[] args) {
Redis redis=new Redis();
redis.execute(jedis -> {
String set = jedis.set("k1", "v1",new SetParams().nx().ex(20));
//没有人占位
if("OK".equals(set)){
jedis.set("say","hi");
String name=jedis.get("say");
System.out.println(name);
//释放锁
// jedis.del("k1");
}else{
//有人占位,放弃或者稍后重试
System.out.println("锁已被占用!");
}
});
}
}
7. 解决超时问题
上面为了解决程序突然终止而导致的锁不能得到释放的问题,我们给每个锁添加了超时时间,超时后锁会被自动释放。但是如果执行的业务代码非常耗时,可能出现紊乱。
举例超时的场景如下:
第一个线程首先获取到锁,然后开始执行业务代码,但是业务代码比较耗时,执行了 8 秒,这样,会在第一个线程的任务还未执行成功锁就会被释放了,此时第二个线程会获取到锁开始执行,在第二个线程刚执行了 3 秒,第一个线程也执行完了,此时第一个线程会释放锁,但是注意,它释放的是第二个线程的锁,释放之后,第三个线程进来。这会导致临界区代码不能得到严格串行执行。
对于这个问题,可以有两个解决方案:
- 尽量避免在获取到锁之后,执行耗时操作
- 将锁的value设置成一个随机字符串,每次释放锁的时候就先比较随机字符串是否一致,如果一致再去释放。
第二个方案,分为查看锁的value、比较value是否正确、释放锁三个步骤。这三个步骤不具备原子性, 为了解决这个问题,需要引入Lua脚本。
使用Lua脚本的优势:
- 使用方便,Redis内置了Lua脚本的支持
- Lua脚本可以在Redis服务端原子的执行多个Redis命令
- 由于网络在很大程度上会影响Redis性能,而使用Lua脚本可以让多个命令一次执行,可以有效解决网络给Redis带来的性能问题。
在Redis中使用Lua脚本的两种方式:
- 提前在Redis服务端写好Lua脚本,然后在Java客户端去调用脚本
- 直接在Java端去写Lua脚本,写好之后,需要执行时,每次将脚本发送到Redis上去执行。
在redis中创建脚本文件releasewhenvalueequal.lua,脚本文件内容如下:
if redis.call("get",KEYS[1]==ARGV[1]) then
return redis.call("del",KEYS[1])
else
return 0
end
给该lua脚本计算一个SHA1和,命令如下:
cat releasewhenvalueequal.lua | redis-cli -a 你的redis密码 script load --pipe
script load这个命令会在Redis服务器中缓存Lua脚本,并返回脚本内容的SHA1校验和,然后在Java端调用的时候,传入SHA1校验和作为参数,这样Redis就知道要执行的是哪个脚本了。
下面使用上面提到的方案2来避免上述问题:
public class LockTest04 {
public static void main(String[] args) {
void
Redis redis = new Redis();
Thread thread1 = new Thread(() -> {
redis.execute(jedis -> {
//1.先获取一个随机字符串
String value = UUID.randomUUID().toString();
String k1;
synchronized (LockTest04.class){
k1= jedis.set("k1", value, new SetParams().nx().ex(1));
//模拟耗时操作
Thread.sleep(3000);
}
//2.获取锁
//3.判断是否成功获取到锁
if ("OK".equals(k1)) {
//4.具体的业务操作
jedis.set("say", "线程1:hello");
System.out.println(jedis.get("say"));
//5.释放锁
jedis.evalsha("b8059ba43af6ffe8bed3db65bac35d452f8115d8", Arrays.asList("k1"), Arrays.asList(value));
} else {
System.out.println("线程1获取该锁已经被占用!");
}
});
});
thread1.setPriority(10);
thread1.start();
Thread thread2=new Thread(() -> {
redis.execute(jedis -> {
//1.先获取一个随机字符串
String value = UUID.randomUUID().toString();
//2.获取锁
String k1;
synchronized (LockTest04.class){
//等到线程1的锁过期
Thread.sleep(1000);
k1 = jedis.set("k1", value, new SetParams().nx().ex(30));
}
//3.判断是否成功获取到锁
if ("OK".equals(k1)) {
//4.具体的业务操作
jedis.set("say", "线程2:hello");
System.out.println(jedis.get("say"));
//5.释放锁
jedis.evalsha("b8059ba43af6ffe8bed3db65bac35d452f8115d8", Arrays.asList("k1"), Arrays.asList(value));
} else {
System.out.println("线程2获取该锁已经被占用!");
}
});
});
thread2.setPriority(9);
thread2.start();
Thread thread3=new Thread(() -> {
//模拟耗时操作
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
redis.execute(jedis -> {
//1.先获取一个随机字符串
String value = UUID.randomUUID().toString();
//2.获取锁
String k1;
synchronized (LockTest04.class){
k1 = jedis.set("k1", value, new SetParams().nx().ex(30));
}
//3.判断是否成功获取到锁
if ("OK".equals(k1)) {
//模拟耗时操作
Thread.sleep(3000);
//4.具体的业务操作
jedis.set("say", "线程3:hello");
System.out.println(jedis.get("say"));
//5.释放锁
jedis.evalsha("b8059ba43af6ffe8bed3db65bac35d452f8115d8", Arrays.asList("k1"), Arrays.asList(value));
} else {
System.out.println("线程3获取该锁已经被占用!");
}
});
});
thread3.setPriority(1);
thread3.start();
}
}
更多推荐
所有评论(0)