java 分布式锁
在java的分布式系统中,有时候会出现不同的服务操作同一个资源的情况,如交易系统和充值系统都要操作用户账户,分布式锁为解决分布式系统中多个应用同时访问同一个资源的问题。java的分布式锁主要有三种实现方式:基于数据库的分布式锁基于缓存的分布式锁基于zookeeper的分布式锁下面对这三种方式具体分析一下1.基于数据库的分布式锁数据库实现分布式锁方式比较多,如悲观锁(...
在java的分布式系统中,有时候会出现不同的服务操作同一个资源的情况,如交易系统和充值系统都要操作用户账户,分布式锁为解决分布式系统中多个应用同时访问同一个资源的问题。
java的分布式锁主要有三种实现方式:
- 基于数据库的分布式锁
- 基于缓存的分布式锁
- 基于zookeeper的分布式锁
下面对这三种方式具体分析一下
1.基于数据库的分布式锁
数据库实现分布式锁方式比较多,如悲观锁(查询时增加for update)、乐观锁(通过version字段)、增加一个表记录锁信息等。因为依赖于数据库,比较好理解,但是也存在一些问题。
如悲观锁在某些情况下可能会锁表而不是锁行,乐观锁可能需要多次重试,以及操作数据库的性能开销等等,所以基于数据库的分布式锁不做过多研究,因为我看来基本上不会用到。
2.基于缓存(以redis为例)的分布式锁
相对于基于数据库的分布式锁,基于缓存的分布式锁性能上要好很多,以redis为例,下面用两种方式去实现分布式锁。
- 自定义操作
/**
* 以阻塞方式的获取锁
*
* @param key key
* @param value value
* @param lockTimeout 锁超时时间
* @param getTimeout 获取锁超时时间
* @return
*/
public boolean lockBlock(String key, String value, long lockTimeout, long getTimeout, TimeUnit timeUnit) {
long start = System.currentTimeMillis();
while (true) {
//检测是否超时
if (System.currentTimeMillis() - start > getTimeout) {
log.error("get lock timeout");
return false;
}
//执行set命令
//1
Boolean absent = redisTemplate.opsForValue().setIfAbsent(key, value, lockTimeout, timeUnit);
if (absent == null) {
log.error("get lock absent is null");
return false;
}
//是否成功获取锁
if (absent) {
return true;
} else {
log.info("get lock fail:{},{}",key,value);
}
}
}
public boolean unlock(String key, String value) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
RedisScript<String> redisScript = new DefaultRedisScript<>(script, String.class);
Object result = redisTemplate.execute(redisScript, Collections.singletonList(key),value);
if(RELEASE_SUCCESS.equals(result)) {
return true;
}
log.error("unlock error");
return false;
}
使用方式如下:
@RequestMapping(value = "/testRedis")
public void testRedis(String key) throws ExecutionException, InterruptedException {
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
String value = UUID.randomUUID().toString();
try {
if (redisUtil.lockBlock(key, value, 3L, 10L, TimeUnit.SECONDS)) {
log.info("线程1获取锁成功,value is {}", value);
Thread.sleep(2000);
} else {
log.info("线程1获取锁失败,value is {}", value);
}
} catch (InterruptedException e) {
log.info("线程1获取锁异常,value is {}", value);
} finally {
if (redisUtil.unlock(key, value)) {
log.info("线程1释放锁,value is {}", value);
}
}
}, threadPoolTaskExecutor);
CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
String value = UUID.randomUUID().toString();
try {
if (redisUtil.lockBlock(key, value, 3L, 10L, TimeUnit.SECONDS)) {
log.info("线程2获取锁成功,value is {}", value);
Thread.sleep(2000);
} else {
log.info("线程2获取锁失败,value is {}", value);
}
} catch (InterruptedException e) {
log.info("线程2获取锁异常,value is {}", value);
} finally {
if (redisUtil.unlock(key, value)) {
log.info("线程2释放锁,value is {}", value);
}
}
}, threadPoolTaskExecutor);
CompletableFuture.allOf(completableFuture1, completableFuture2).get();
}
这种方式,要注意几个问题:
- 为避免程序异常造成解锁操作失败(如断电等异常情况),造成死锁,需要给锁增加超时时间
- 为避免加锁成功设置超时时间失败造成死锁,需要保证加锁和设置超时时间是一个原子操作,所以加锁使用setIfAbsent方法
- 比保证解锁时是自己持有的锁才可以解,需要对比value,这个要保证唯一
当前的这种方式,还是有一些问题
没有办法保证在操作结束前锁会不会因为超时被释放
不可重入
要简单解决这些问题,可以使用Redisson提供的分布式锁解决方案:
配置如下:
@Value("${spring.redis.host}")
String redisHost;
@Value("${spring.redis.port}")
String redisPort;
@Value("${spring.redis.password}")
String redisPassword;
@Value("${spring.redis.timeout}")
Integer redisTimeout;
/**
* Redisson配置
* @return
*/
@Bean
RedissonClient redissonClient() {
//1、创建配置
Config config = new Config();
redisHost = redisHost.startsWith("redis://") ? redisHost : "redis://" + redisHost;
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress(redisHost + ":" + redisPort)
.setTimeout(redisTimeout);
if (StringUtils.isNotBlank(redisPassword)) {
serverConfig.setPassword(redisPassword);
}
return Redisson.create(config);
}
使用:
@RequestMapping(value = "/testRedisson", method = RequestMethod.POST)
public void testRedisson(String key) throws ExecutionException, InterruptedException {
RLock lock = redissonClient.getLock(key);
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
try {
if (lock.tryLock(10, 3, TimeUnit.SECONDS)) {
log.info("线程1获取锁成功");
Thread.sleep(2000);
} else {
log.info("线程1获取锁失败");
}
} catch (InterruptedException e) {
log.info("线程1获取锁异常");
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
log.info("线程1释放锁");
}
}
}, threadPoolTaskExecutor);
CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
try {
if (lock.tryLock(10, 3, TimeUnit.SECONDS)) {
log.info("线程2获取锁成功");
Thread.sleep(2000);
} else {
log.info("线程2获取锁失败");
}
} catch (InterruptedException e) {
log.info("线程2获取锁异常");
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
log.info("线程2释放锁");
}
}
}, threadPoolTaskExecutor);
CompletableFuture.allOf(completableFuture1, completableFuture2).get();
}
Redisson通过lua脚本判断实现了锁的重入,以及watch dog机制实现了刷新锁的过期时间
watch dog机制每过1/3超时时间会去判断当前持有锁的线程是否还没有完成逻辑,如果没有,那么会刷新过期时间,这个过期时间默认是30s,可以通过lockWatchDogTimeout来修改。只有在没有显式的设置加锁时间时,watch dog机制才会生效,像上面的例子中,因为设置了失效时间,所以watch dog机制不会刷新锁的过期时间。
3.基于zookeeper的分布式锁
ZooKeeper是一个为分布式应用提供一致性服务的开源组件,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能有一个唯一文件名。基于ZooKeeper实现分布式锁的步骤如下:
-
创建一个锁目录 /locks,该节点为持久节点
-
想要获取锁的线程都在锁目录下创建一个临时顺序节点
-
获取锁目录下所有子节点,对子节点按节点自增序号从小到大排序
-
判断本节点是不是第一个子节点,如果是,则成功获取锁,开始执行业务逻辑操作;如果不是,则监听自己的上一个节点的删除事件
-
持有锁的线程释放锁,只需删除当前节点即可。
-
当自己监听的节点被删除时,监听事件触发,则回到第3步重新进行判断,直到获取到锁。
可以直接使用zookeeper第三方库Curator客户端,这个客户端中封装了一个可重入的锁服务。
配置如下:
@Slf4j
public class CuratorClientUtil {
private String zookeeperServer;
@Getter
private CuratorFramework client;
public CuratorClientUtil(String zookeeperServer) {
this.zookeeperServer = zookeeperServer;
}
// 创建CuratorFrameworkFactory并且启动
public void init() {
// 重试策略,等待1s,最大重试3次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
this.client = CuratorFrameworkFactory.builder()
.connectString(zookeeperServer)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
this.client.start();
}
// 容器关闭,CuratorFrameworkFactory关闭
public void destroy() {
try {
if (Objects.nonNull(getClient())) {
getClient().close();
}
} catch (Exception e) {
log.info("CuratorFramework close error=>{}", e.getMessage());
}
}
}
@Configuration
public class CuratorConfig {
@Value("${zookeeper.server}")
String server;
@Bean(name = "curatorClientUtil", initMethod = "init", destroyMethod = "destroy")
public CuratorClientUtil curatorClientUtil() {
CuratorClientUtil clientUtil = new CuratorClientUtil(server);
return clientUtil;
}
}
使用:
@RequestMapping(value = "/testZookeeper", method = RequestMethod.POST)
public void testZookeeper() throws ExecutionException, InterruptedException {
InterProcessMutex mutex = new InterProcessMutex(curatorClientUtil.getClient(), lockPath);
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
try {
if (mutex.acquire(3L, TimeUnit.SECONDS)) {
log.info("线程1获取锁成功");
Thread.sleep(5000);
} else {
log.info("线程1获取锁失败");
}
} catch (Exception e) {
log.info("线程1获取锁异常");
throw new RuntimeException(e);
} finally {
try {
if (mutex.isOwnedByCurrentThread()) {
mutex.release();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}, threadPoolTaskExecutor);
CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
try {
if (mutex.acquire(3L, TimeUnit.SECONDS)) {
log.info("线程2获取锁成功");
Thread.sleep(5000);
} else {
log.info("线程2获取锁失败");
}
} catch (Exception e) {
log.info("线程2获取锁异常");
throw new RuntimeException(e);
} finally {
try {
if (mutex.isOwnedByCurrentThread()) {
mutex.release();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
log.info("线程2释放锁");
}
}, threadPoolTaskExecutor);
CompletableFuture.allOf(completableFuture1, completableFuture2).get();
}
总结
相比较而言,基于zookeeper的分布式锁在可靠性上最优,性能也优于数据库,略低于基于缓存的分布式锁,理论上是最佳解决方案。
但我的观点是实际开发中,可能zookeeper并不是很常用,如果单纯为了分布式锁而搭建一套zookeeper集群,似乎并不划算,所以我站redis。
更多推荐
所有评论(0)