一、基于 SETNX 实现的分布式锁存在的问题

基于 SETNX 实现的分布式锁存在下面的问题:
1、不可重入:同一个线程无法多次获取同一把锁
2、不可重试:获取锁只尝试一次就返回 false,没有重试机制
3、超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
4、主从一致性:如果 Redis 提供了主从集群,主从同步存在延迟,此时某个线程从主节点中获取到了锁,但是尚未同步给从节点,而恰巧主节点在这个时候发生宕机。就会从从机中选择出一个节点成为新的主节点,那么其他线程就有可能趁虚而入,从新的主节点中获取到锁,这样就出现多个线程拿到多把锁,在极端情况下,可能会出现安全问题。

二、Redisson 介绍

Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

官网地址:https:redisson.org
GitHub 地址:https://github.com/redisson/redisson

三、Redisson 入门

3.1 引入依赖

<!--
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.13.0</version>
</dependency>
-->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.6</version>
</dependency>

3.2 配置 Redisson 客户端

@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
        config.useSingleServer().setAddress("redis://ip地址:6379").setPassword("密码");
        return Redisson.create(config);
    }
}

3.3 修改一人一单业务逻辑中获取锁的方式

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Autowired
    private ISeckillVoucherService seckillVoucherService;

    @Autowired
    private RedisIdWorker redisIdWorker;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Autowired
    private RedissonClient redissonClient;

    @Override
    public Result seckillVoucher(Long voucherId) {

        SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);

        // 判断秒杀是否还未开始
        if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
            Result.fail("秒杀尚未开始!");
        }

        // 判断秒杀是否已经结束
        if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
            Result.fail("秒杀已经结束!");
        }

        // 判断库存是否充足
        if (seckillVoucher.getStock() < 1) {
            Result.fail("库存不足!");
        }

        return createVoucherOrder(voucherId);
    }

    @Transactional
    public Result createVoucherOrder(Long voucherId) {
        // 判断当前优惠券用户是否已经下过单
        // 用户 id
        Long userId = UserHolder.getUser().getId();

        RLock lock = redissonClient.getLock("lock:order:" + userId);
        // 获取互斥锁
        // 使用空参意味着不会进行重复尝试获取锁
        boolean isLock = lock.tryLock();
        if (!isLock) {
            // 获取锁失败,直接返回失败或者重试
            return Result.fail("不允许重复下单!");
        }


        try {
            // 查询订单
            int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
            if (count > 0) {
                return Result.fail("用户已经购买过一次");
            }

            // 扣减库存
            boolean success = seckillVoucherService.update().
                    setSql("stock = stock - 1").
                    eq("voucher_id", voucherId).
//                eq("stock", seckillVoucher.getStock()).    // 增加对库存的判断,判断当前库存是否与查询出的结果一致
        gt("stock", 0).        // 修改判断逻辑,改为只要库存大于0,就允许线程扣减
                    update();

            // 扣减失败
            if (!success) {
                return Result.fail("库存不足!");
            }

            // 创建订单
            VoucherOrder voucherOrder = new VoucherOrder();
            // 生成订单 id
            Long orderId = redisIdWorker.nextId("order");
            voucherOrder.setVoucherId(voucherId);

            voucherOrder.setUserId(userId);
            voucherOrder.setId(orderId);
            save(voucherOrder);

            return Result.ok(orderId);
        } finally {
            // 释放锁
            lock.unlock();
        }

    }
}

3.4 使用 Jmeter 进行测试

在这里插入图片描述
聚合报告:
在这里插入图片描述
数据库:
在这里插入图片描述
在这里插入图片描述
测试没有问题

四、Redisson 可重入锁原理

4.1 自定义分布式锁分析

在这里插入图片描述
我们来分析下为什么我们自定义的分布式锁无法实现可重入。
首先定义两个方法,method1 和 method2,method1 在获取到锁的情况下,紧接着去调用 method2 方法,而在 method2 方法中又一次获取了锁,method1 和 method2 是在同一个线程内,在同一个线程内重复获取锁,这就是锁的重入。
method1 在获取锁的时候,会执行命令SET lock thread1 NX EX 10,在获取到锁后,method1 调用了 method2,接着 method2 尝试获取锁,再一次执行命令SET lock thread1 NX EX 10,根据 SETNX 的特性,由于 method1 已经 set 成功,那么 method2 肯定会设置失败,所以也就没办法实现重入。

4.2 Redisson 可重入锁原理

如何解决这个问题呢?
我们可以在获取锁的时候,首先判断锁是否已经被占用,如果已经被占用,判断是否是当前线程所占用的,如果是同一个线程占用,也会让其获取到锁,但是会额外增加一个计数器,用于记录重入的次数,即当前线程总共获取了几次锁。当前线程每获取一次锁,计数器便加1,而在释放锁时,每释放一次锁,计数器就会减1,直至计数器为 0 时,将当前锁直接删除。那么现在,我们不仅要在锁中记录获取锁的线程,还要记录当前线程重入次数,即获取了几次锁,显然,String 类型的数据结构并不满足这个业务需求,这里可以使用 Hash 类型进行存储,这样就可以在 key 的位置,记录锁的名称,在 field 的位置记录线程标识,在 value 的位置记录锁重入次数。
整个判断流程如下:
在这里插入图片描述
可以看出,无论是获取锁还是释放锁,其业务逻辑相较于之前的版本已经复杂了很多,没有办法再通过 Java 代码实现,只能采用 Lua 脚本来确保获取锁和释放锁的原子性。

获取锁的 Lua 脚本:

local key = KEYS[1];  -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间

-- 判断是否存在
if(redis.call('exists', key) == 0) then
    -- 不存在,获取锁
    redis.call('hset', key, threadId, '1');
    -- 设置有效期
    redis.call('expire', key, releaseTime);
    return 1; -- 返回结果
end;

-- 锁已经存在,判断threadId是否是自己
if(redis.call('hexists', key, threadId) == 1) then
    -- 存在,获取锁,重入次数+1
    redis.call('hincrby', key, threadId, '1');
    -- 设置有效期
    redis.call('expire', key, releaseTime);
    return 1; -- 返回结果
end;
return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败

释放锁的 Lua 脚本:

local key = KEYS[1];  -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间

-- 判断当前锁是否还是被自己持有
if(redis.call('hexists', key, threadId) == 0) then
    return nil; -- 如果已经不是自己,则直接返回
end
-- 是自己的锁,则重入次数-1
local count = redis.call('hincrby', key, threadId, -1);
-- 判断重入次数是否已经为0
if(count > 0) then
    -- 大于0说明不能是释放锁,重置有效期然后返回
    redis.call('expire', threadId, releaseTime);
    return nil;
else
    redis.call('del', key); -- 等于 0 说明可以释放锁,直接删除
    return nil;
end

五、Redisson 的锁重试和 WatchDog 机制

在这里插入图片描述
关于锁重试和看门狗机制的详细讲解可以看下这位大佬的文章:Redisson 分布式锁源码 01:可重入锁加锁 以及 Redisson 分布式锁源码 02:看门狗

Redission 分布式锁原理:

  • 可重入:利用 hash 结构记录线程 id 和重入次数。每次获取锁时,先判断锁是否存在,如果不存在,则直接获取,如果已经存在,且线程标识为当前线程,则可以再次获取,并将重入次数加 1。释放锁时,每释放一次,重入次数减 1,直至重入次数减为 0,则证明所有的业务已经执行结束,则可以直接释放锁。
  • 可重试:利用信号量和 PubSub 功能实现等待、唤醒,获取锁失败的重试机制。在第一次尝试获取锁失败后,并不是立即失败,而是去等待释放锁的信号(利用了 Redis 中 PubSub 机制)。而获取锁成功的线程在释放锁的时候,就会向等待中的线程发送一条消息,等待中的线程捕获到消息后,就可以重新尝试获取锁。如果重试失败,则会继续等待释放锁的信号,然后再去重试。当然,重试并不是无限次的,会有一个等待时间,如果超过等待时间,就结束重试。
  • 超时续约:利用 watchDog,每隔一段时间(releaseTime/3),重置超时时间。简单来说,就是在获取锁成功后,会开启一个定时任务,该定时任务每隔一段时间就会重置锁的超时时间,这样锁的超时时间就会重新计时。

六、Redisson 的 multiLock 原理

6.1 问题分析

在这里插入图片描述

如果 Redis 提供了主从集群,主从同步存在延迟,此时某个线程从主节点中获取到了锁,但是尚未同步给从节点,而恰巧主节点在这个时候发生宕机,Redis 的哨兵模式就会从从机中选择出一个节点成为新的主节点,那么其他线程就有可能趁虚而入,从新的主节点中获取到锁,这样就出现多个线程拿到多把锁,在极端情况下,可能会出现安全问题。

那么 Redisson 是如何解决这个问题的呢?
既然主从关系是导致一致性问题发生的原因,那么干脆不要主从关系了,所有的节点都变成了独立的 Redis 节点,相互之间没有主从关系,都可以做读写。这就导致我们获取锁的方式发生了变化,以前获取锁的只需要找到主节点,然后从主节点中获取锁即可。但是,现在这种方案,必须依次向多个 Redis 节点去获取锁,向这些 Redis 节点中保存锁的标识,才意味着获取锁成功。那这种情况下,还会发生线程安全问题吗?首先,由于没有主从关系,所以就不会出现主从一致性问题。其次,即使某个节点发生宕机,但是其他节点还是存或者,并且还保存着锁的信息,Redis 还是可用的。此外,我们还可以给上述的每个节点建立其自己的主从关系。在这种主从模式下,假设在获取锁的时候,其中某个主节点发生宕机,从机成为新的主节点且未完成锁的同步,那么也不会出现一致性问题。这是因为,虽然宕机的节点及其从节点没有保存锁的信息,但是其他主节点中保存了,当其他线程尝试获取时,其他节点可是有锁的,从而获取锁失败。
在这里插入图片描述

6.2 代码测试

首先搭建环境,在虚拟机上安装三个 Redis,具体操作可参照这篇文章:手把手教centos安装redis及开启多个实例

RedissonConfig

package com.hmdp.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author liquanpeng
 */
@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
        config.useSingleServer().setAddress("redis://虚拟机ip:6379").setPassword("密码");
        return Redisson.create(config);
    }

    @Bean
    public RedissonClient redissonClient2(){
        Config config = new Config();
        config.useSingleServer().setAddress("redis://虚拟机ip:6380").setPassword("密码");
        return Redisson.create(config);
    }


    @Bean
    public RedissonClient redissonClient3(){
        Config config = new Config();
        config.useSingleServer().setAddress("redis://虚拟机ip:6381").setPassword("密码");
        return Redisson.create(config);
    }
}

RedissonTest

package com.hmdp;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

@Slf4j
@SpringBootTest
class RedissonTest {

    @Resource
    private RedissonClient redissonClient;

    @Resource
    private RedissonClient redissonClient2;

    @Resource
    private RedissonClient redissonClient3;

    private RLock lock;

    @BeforeEach
    void setUp() {
        RLock lock1 = redissonClient.getLock("order");
        RLock lock2 = redissonClient2.getLock("order");
        RLock lock3 = redissonClient3.getLock("order");

        lock = redissonClient.getMultiLock(lock1, lock2, lock3);
    }

    @Test
    void method1() throws InterruptedException {
        // 尝试获取锁
        boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
        if (!isLock) {
            log.error("获取锁失败 .... 1");
            return;
        }
        try {
            log.info("获取锁成功 .... 1");
            method2();
            log.info("开始执行业务 ... 1");
        } finally {
            log.warn("准备释放锁 .... 1");
            lock.unlock();
        }
    }
    void method2() {
        // 尝试获取锁
        boolean isLock = lock.tryLock();
        if (!isLock) {
            log.error("获取锁失败 .... 2");
            return;
        }
        try {
            log.info("获取锁成功 .... 2");
            log.info("开始执行业务 ... 2");
        } finally {
            log.warn("准备释放锁 .... 2");
            lock.unlock();
        }
    }
}

测试过程:
在 method2 处打上断点,以 debug 模式运行程序,程序执行到 method2 断点处,此时已经获取锁成功,会发现同时从 6379/6380/6381 三个 Redis 中会获取到锁。
在这里插入图片描述
6379:
在这里插入图片描述
6380:

6381:
在这里插入图片描述
将断点放行,让程序执行到下图所示的代码处:
在这里插入图片描述
此时再观察Redis中的情况

6379:在这里插入图片描述
6380:
在这里插入图片描述
6381:
在这里插入图片描述
在执行完 method2 中获取锁的逻辑后,可以看到三个 Redis 中的锁的 value 值加了 1,意味着锁重入成功。

再来观察锁释放,在执行完 method2 中锁释放逻辑后,value 值减 1
6379:
在这里插入图片描述

6380:
在这里插入图片描述
6381:
在这里插入图片描述
继续放行,让程序执行结束,再观察 Redis,发现所已经被释放掉了。

七、总结

不可重入 Redis 分布式锁:

  • 原理:利用 SETNX 的互斥性;利用 ex 避免死锁;释放锁时判断线程标识
  • 缺陷:不可重入、无法重试、锁超时失效

可重入的 Redis 分布式锁:

  • 原理:利用 hash 结构,记录线程标识和重入次数;利用 watchDog 延续锁时间;利用信号量控制锁重试等待
  • 缺陷:Redis 宕机引起锁失效问题

Redisson 的 multiLock:

  • 原理:多个独立的 Redis 节点,必须在所有节点都获取重入锁,才算获取成功
  • 缺陷:运维成本高、实现复杂
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐