在java的分布式系统中,有时候会出现不同的服务操作同一个资源的情况,如交易系统和充值系统都要操作用户账户,分布式锁为解决分布式系统中多个应用同时访问同一个资源的问题。

java的分布式锁主要有三种实现方式:

  1. 基于数据库的分布式锁
  2. 基于缓存的分布式锁
  3. 基于zookeeper的分布式锁

下面对这三种方式具体分析一下

1.基于数据库的分布式锁

数据库实现分布式锁方式比较多,如悲观锁(查询时增加for update)、乐观锁(通过version字段)、增加一个表记录锁信息等。因为依赖于数据库,比较好理解,但是也存在一些问题。
如悲观锁在某些情况下可能会锁表而不是锁行,乐观锁可能需要多次重试,以及操作数据库的性能开销等等,所以基于数据库的分布式锁不做过多研究,因为我看来基本上不会用到。

2.基于缓存(以redis为例)的分布式锁

相对于基于数据库的分布式锁,基于缓存的分布式锁性能上要好很多,以redis为例,下面用两种方式去实现分布式锁。

  1. 自定义操作
/**
     * 以阻塞方式的获取锁
     *
     * @param key key
     * @param value value
     * @param lockTimeout 锁超时时间
     * @param getTimeout 获取锁超时时间
     * @return
     */
    public boolean lockBlock(String key, String value, long lockTimeout, long getTimeout, TimeUnit timeUnit) {
        long start = System.currentTimeMillis();
        while (true) {
            //检测是否超时
            if (System.currentTimeMillis() - start > getTimeout) {
                log.error("get lock timeout");
                return false;
            }
            //执行set命令
            //1
            Boolean absent = redisTemplate.opsForValue().setIfAbsent(key, value, lockTimeout, timeUnit);          
            if (absent == null) {
                log.error("get lock absent is null");
                return false;
            }
            //是否成功获取锁
            if (absent) {
                return true;
            } else {
                log.info("get lock fail:{},{}",key,value);
            }
        }
    }

    public boolean unlock(String key, String value) {
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

        RedisScript<String> redisScript = new DefaultRedisScript<>(script, String.class);

        Object result = redisTemplate.execute(redisScript, Collections.singletonList(key),value);
        if(RELEASE_SUCCESS.equals(result)) {
            return true;
        }
        log.error("unlock error");
        return false;

    }

使用方式如下:

@RequestMapping(value = "/testRedis")
    public void testRedis(String key) throws ExecutionException, InterruptedException {

        CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
            String value = UUID.randomUUID().toString();
            try {
                if (redisUtil.lockBlock(key, value, 3L, 10L, TimeUnit.SECONDS)) {
                    log.info("线程1获取锁成功,value is {}", value);
                    Thread.sleep(2000);
                } else {
                    log.info("线程1获取锁失败,value is {}", value);
                }
            } catch (InterruptedException e) {

                log.info("线程1获取锁异常,value is {}", value);
            } finally {
                if (redisUtil.unlock(key, value)) {
                    log.info("线程1释放锁,value is {}", value);
                }
            }
        }, threadPoolTaskExecutor);

        CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
            String value = UUID.randomUUID().toString();
            try {
                if (redisUtil.lockBlock(key, value, 3L, 10L, TimeUnit.SECONDS)) {
                    log.info("线程2获取锁成功,value is {}", value);
                    Thread.sleep(2000);
                } else {
                    log.info("线程2获取锁失败,value is {}", value);
                }
            } catch (InterruptedException e) {
                log.info("线程2获取锁异常,value is {}", value);
            } finally {
                if (redisUtil.unlock(key, value)) {
                    log.info("线程2释放锁,value is {}", value);
                }
            }
        }, threadPoolTaskExecutor);

        CompletableFuture.allOf(completableFuture1, completableFuture2).get();
    }

这种方式,要注意几个问题:

  1. 为避免程序异常造成解锁操作失败(如断电等异常情况),造成死锁,需要给锁增加超时时间
  2. 为避免加锁成功设置超时时间失败造成死锁,需要保证加锁和设置超时时间是一个原子操作,所以加锁使用setIfAbsent方法
  3. 比保证解锁时是自己持有的锁才可以解,需要对比value,这个要保证唯一

当前的这种方式,还是有一些问题
没有办法保证在操作结束前锁会不会因为超时被释放
不可重入
要简单解决这些问题,可以使用Redisson提供的分布式锁解决方案:
配置如下:

@Value("${spring.redis.host}")
    String redisHost;

    @Value("${spring.redis.port}")
    String redisPort;

    @Value("${spring.redis.password}")
    String redisPassword;

    @Value("${spring.redis.timeout}")
    Integer redisTimeout;

    /**
     * Redisson配置
     * @return
     */
@Bean
    RedissonClient redissonClient() {
        //1、创建配置
        Config config = new Config();

        redisHost = redisHost.startsWith("redis://") ? redisHost : "redis://" + redisHost;
        SingleServerConfig serverConfig = config.useSingleServer()
                .setAddress(redisHost + ":" + redisPort)
                .setTimeout(redisTimeout);

        if (StringUtils.isNotBlank(redisPassword)) {
            serverConfig.setPassword(redisPassword);
        }

        return Redisson.create(config);

    }

使用:

@RequestMapping(value = "/testRedisson", method = RequestMethod.POST)
    public void testRedisson(String key) throws ExecutionException, InterruptedException {

        RLock lock = redissonClient.getLock(key);
        CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {

            try {
                if (lock.tryLock(10, 3, TimeUnit.SECONDS)) {
                    log.info("线程1获取锁成功");
                    Thread.sleep(2000);
                } else {
                    log.info("线程1获取锁失败");
                }
            } catch (InterruptedException e) {
                log.info("线程1获取锁异常");
            } finally {
                if (lock.isHeldByCurrentThread()) {
                    lock.unlock();
                    log.info("线程1释放锁");
                }
            }
        }, threadPoolTaskExecutor);

        CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
            try {
                if (lock.tryLock(10, 3, TimeUnit.SECONDS)) {
                    log.info("线程2获取锁成功");
                    Thread.sleep(2000);
                } else {
                    log.info("线程2获取锁失败");
                }
            } catch (InterruptedException e) {
                log.info("线程2获取锁异常");
            } finally {
                if (lock.isHeldByCurrentThread()) {
                    lock.unlock();
                    log.info("线程2释放锁");
                }
            }
        }, threadPoolTaskExecutor);

        CompletableFuture.allOf(completableFuture1, completableFuture2).get();

    }

Redisson通过lua脚本判断实现了锁的重入,以及watch dog机制实现了刷新锁的过期时间
watch dog机制每过1/3超时时间会去判断当前持有锁的线程是否还没有完成逻辑,如果没有,那么会刷新过期时间,这个过期时间默认是30s,可以通过lockWatchDogTimeout来修改。只有在没有显式的设置加锁时间时,watch dog机制才会生效,像上面的例子中,因为设置了失效时间,所以watch dog机制不会刷新锁的过期时间。

3.基于zookeeper的分布式锁

ZooKeeper是一个为分布式应用提供一致性服务的开源组件,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能有一个唯一文件名。基于ZooKeeper实现分布式锁的步骤如下:

  1. 创建一个锁目录 /locks,该节点为持久节点

  2. 想要获取锁的线程都在锁目录下创建一个临时顺序节点

  3. 获取锁目录下所有子节点,对子节点按节点自增序号从小到大排序

  4. 判断本节点是不是第一个子节点,如果是,则成功获取锁,开始执行业务逻辑操作;如果不是,则监听自己的上一个节点的删除事件

  5. 持有锁的线程释放锁,只需删除当前节点即可。

  6. 当自己监听的节点被删除时,监听事件触发,则回到第3步重新进行判断,直到获取到锁。

可以直接使用zookeeper第三方库Curator客户端,这个客户端中封装了一个可重入的锁服务。

配置如下:

@Slf4j
public class CuratorClientUtil {
    private String zookeeperServer;

    @Getter
    private CuratorFramework client;

    public CuratorClientUtil(String zookeeperServer) {
        this.zookeeperServer = zookeeperServer;
    }

    // 创建CuratorFrameworkFactory并且启动
    public void init() {
        // 重试策略,等待1s,最大重试3次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
        this.client = CuratorFrameworkFactory.builder()
                .connectString(zookeeperServer)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();
        this.client.start();
    }

    // 容器关闭,CuratorFrameworkFactory关闭
    public void destroy() {
        try {
            if (Objects.nonNull(getClient())) {
                getClient().close();
            }
        } catch (Exception e) {
            log.info("CuratorFramework close error=>{}", e.getMessage());
        }
    }
}
@Configuration
public class CuratorConfig {

    @Value("${zookeeper.server}")
    String server;

    @Bean(name = "curatorClientUtil", initMethod = "init", destroyMethod = "destroy")
    public CuratorClientUtil curatorClientUtil() {
        CuratorClientUtil clientUtil = new CuratorClientUtil(server);
        return clientUtil;
    }
}

使用:

@RequestMapping(value = "/testZookeeper", method = RequestMethod.POST)
    public void testZookeeper() throws ExecutionException, InterruptedException {

        InterProcessMutex mutex = new InterProcessMutex(curatorClientUtil.getClient(), lockPath);

        CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
            try {
                if (mutex.acquire(3L, TimeUnit.SECONDS)) {
                    log.info("线程1获取锁成功");
                    Thread.sleep(5000);
                } else {
                    log.info("线程1获取锁失败");
                }
            } catch (Exception e) {
                log.info("线程1获取锁异常");
                throw new RuntimeException(e);
            } finally {
                try {
                    if (mutex.isOwnedByCurrentThread()) {
                        mutex.release();
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }, threadPoolTaskExecutor);

        CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
            try {
                if (mutex.acquire(3L, TimeUnit.SECONDS)) {
                    log.info("线程2获取锁成功");
                    Thread.sleep(5000);
                } else {
                    log.info("线程2获取锁失败");
                }
            } catch (Exception e) {
                log.info("线程2获取锁异常");
                throw new RuntimeException(e);
            } finally {
                try {
                    if (mutex.isOwnedByCurrentThread()) {
                        mutex.release();
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
                log.info("线程2释放锁");
            }
        }, threadPoolTaskExecutor);
        CompletableFuture.allOf(completableFuture1, completableFuture2).get();
    }

总结

相比较而言,基于zookeeper的分布式锁在可靠性上最优,性能也优于数据库,略低于基于缓存的分布式锁,理论上是最佳解决方案。
但我的观点是实际开发中,可能zookeeper并不是很常用,如果单纯为了分布式锁而搭建一套zookeeper集群,似乎并不划算,所以我站redis。

项目地址

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐