本文分为两部分:

一、介绍redis分布式锁的原理和使用方法;

二、使用redis分布式锁实现一个简单的秒杀系统。

注意:本文使用java1.8,最后的例子为springboot项目。

目录

redis分布式锁

原理

进阶

锁过期时间

lock防误删

确保删锁原子性

秒杀系统示例


redis分布式锁

对并发有要求的系统常常面临一个问题,如何在实现并发的基础上保持数据的一致性。redis分布式锁能给出一个解决方案。

redis相信大家都非常熟悉了,作为一个数据库缓存技术,简便好用,也支持并发,核心就是使用redis分布式锁。

原理

redis分布式锁的原理非常简单:在运行实际的业务代码之前,首先到redis中去获得唯一的redis锁,如果获取到,则继续执行业务代码,并在业务代码结束后主动释放锁;若未成功获取到锁,则不执行业务代码。

核心代码如下:

//通过向redis服务器插入一组键值对来获取锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "123");

if (lock) {
    //业务代码在此
    //......
    
    //通过删除redis中的lock键值对释放锁
    redisTemplate.delete("lock");
}

通过以上代码可以看出,所谓的“获取锁”,“释放锁”操作,本质上就是向redis服务器插入和删除键值对。是不是也没那么高大上?

理解了原理,我们下面就动手做一做。以一个springboot项目为例,看看如何使用redis分布式锁。

使用IDE创建一个基本的springboot项目,我这里使用的IDEA2021.1社区版。

需要引入的依赖如下。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-redis</artifactId>
            <version>2.5.7</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

 在配置文件中对redis服务器进行配置。我这里使用了本地的redis服务器,各位可以根据情况修改服务器配置。

 创建一个package并创建一个RedisLockController.class,然后在其中创建一个testLock方法。该方法的业务逻辑非常简单:将redis中存储的num变量值+1。

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
public class RedisLockController {
    @Autowired
    StringRedisTemplate redisTemplate;

    @GetMapping("/testLock")
    public void testLock() {
        //尝试获取锁,注意这里的lock为键名,123为对应的值
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "123");
        if (lock) {
            log.info("成功获取锁");
            //以下是业务代码:到redis服务器中查找num,如果没找到则初始化,若找到则num+1
            String numStr = redisTemplate.opsForValue().get("num");
            if (StringUtils.isEmpty(numStr)) {
                redisTemplate.opsForValue().set("num", "0");
            } else {
                int number = Integer.valueOf(numStr);
                redisTemplate.opsForValue().set("num", ++number + "");
            }
            
            //释放锁
            redisTemplate.delete("lock");
        } {
            log.info("未获取到锁");
        }
    }
}

运行项目,并发起请求:

之后查看redis服务器上的数据,发现num已被初始化为0:

 

在以下位置打断点,并再次调用接口。

 代码暂停至断点处,此时我们再次查看redis服务器,发现多了一个键值对:这就是获得的锁。

让代码恢复运行,再次查看redis服务器,发现锁已经释放(其实就是把lock键值对删除了),并且业务代码正常运行,num+1:

以上示例简单说明了redis分布式锁的原理和使用方法。让我们更进一步,以上代码是否有问题或者是否存在可以优化的空间?

进阶

锁过期时间

 试想以下场景:

1、代码正常获取到锁并开始运行业务代码,但是业务代码有bug,抛错了,会发生什么?

我们将代码修改一下,模拟业务代码出错。注意抛错是在释放锁之前。

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
public class RedisLockController {
    @Autowired
    StringRedisTemplate redisTemplate;

    @GetMapping("/testLock")
    public void testLock() throws Exception{
        //尝试获取锁
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "123");
        if (lock) {
            log.info("成功获取锁");
            //以下是业务代码:到redis服务器中查找num,如果没找到则初始化,若找到则num+1
            String numStr = redisTemplate.opsForValue().get("num");
            if (StringUtils.isEmpty(numStr)) {
                redisTemplate.opsForValue().set("num", "0");
            } else {
                int number = Integer.valueOf(numStr);
                redisTemplate.opsForValue().set("num", ++number + "");
                throw new Exception();//这里模拟业务代码抛错
            }
            
            //释放锁
            redisTemplate.delete("lock");
        } {
            log.info("未获取到锁");
        }
    }
}

 再次运行代码并调用接口,发现确实抛错:

查看redis服务器,发现lock并没有消失:

这是因为业务代码抛错,导致之后的释放锁代码没有执行,进而出现锁永久生效的情况。之后我们再调用接口,会发现因为无法正常获得锁,num不会再自增。除非我们手动删除锁。

如何解决以上问题?很简单,给锁设置一个过期时间,像下面代码这样:在尝试获取锁时就给锁设置一个失效时间,在时间到期后,无论有没有主动释放锁,锁都会自动过期。

Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "123", 30, TimeUnit.SECONDS);

我们将redis中的lock键值对手动删除,再次运行并调用接口(代码依旧会抛出异常):

lock仍然正常产生了,但是等待30秒后,发现lock已经自动删除。

总结下:通过给redis分布式锁设置一个自动过期时间,可以防止业务代码抛错导致产生死锁。

这里有的小伙伴可能要问了:那在业务代码执行完毕后不主动释放锁,等待锁自动过期不就行吗?答案是不行🙅‍♂️。如果不主动释放锁,那么每一次业务代码的执行成功与否都高度依赖锁的自动过期时间。倘若业务代码执行的快,在锁自动过期前结束,那么程序还必须等待锁自动释放后才能继续执行,这样显然是低效的,尤其对于高并发的场景;如果业务代码执行的慢,在锁自动过期后还没结束,相当于没锁,这可能导致不可预知的问题。所以主动释放锁是必须的。

(可能又有小伙伴会问:我的业务运行时间可能就是比较长,过期时间怎么设置?如果设置的短,可能业务还没结束就主动释放锁了;如果过期时间设置的太长,会导致并发效率大幅下降。这该怎么办?这就需要一个锁自动延期的机制,这部分不在本篇的探讨范围,有兴趣的朋友可以了解下Redisson的看门狗机制。)

到这儿是不是就可以了呢?我们继续往下看。

lock防误删

试想下面一种场景:

1、有业务1和业务2,两个业务并发抢用redis锁(想象两个人同时抢购同一商品)

2、业务1的运行时间t1非常长,甚至长过了redis锁的自动失效时间timeout,即t1>timeout 

3、那么在业务1运行timeout时间后,锁自动释放

4、在业务1结束之前,业务2开始运行,并成功得到了redis锁(业务1的锁已经到期自动释放)

5、在业务2运行期间,业务1结束了,并尝试主动释放锁。

因为业务1获取的锁早就已经在timeout之后自动释放了,此时业务1释放的会是业务2的锁!显然这样是不合理的,之后业务2的操作其实没有上锁,也就无法保证并发的正确性。

这种情况相当于业务无法区分要释放的锁是否是自己的锁,既然如此,我们需要让业务知道当前释放的是不是自己的锁,那么就给锁设置一个唯一标识。既然锁的key是确定相同的(本文中为“lock”),我们就从锁的value下手:在尝试获取锁的时候,使用uuid生成一个唯一的字符串作为锁的value,不同的业务释放锁的时候通过比对uuid的值去检查释放的是否是自己的锁,代码如下。

总结下:每个业务在获取redis锁的时候,保证锁的value唯一性,之后通过比对锁的value值判断锁是否属于自己,如果是则主动释放锁,否则不主动释放。

(这里有的小伙伴可能会问,为什么不直接给一个不一样的key?如果key不一样那锁也无效了不是吗?如果每次请求的锁都是唯一的,锁肯定不会有冲突,没有冲突,锁就没有意义。)

到这儿是不是就完美了呢?还没有。

确保删锁原子性

我们仔细查看上面的代码,可能会出现这样一种情况:

1、当业务1已经使用uuid判断了锁是自己的,刚进入if语句块中

2、此时恰好业务1的锁因为到期自动释放了

3、业务2恰好获取到了锁

4、业务1开始释放锁,这里释放的还是业务2的锁!

有人会说这样的概率太小了吧,单看是这样的,但如果考虑到像双11或618那样的海量并发请求,即便是小概率事件在基数极大时也有可能发生。其实,这种“先检查再执行”的使用方式是竞态条件的一种常见情况。

如何处理以上问题?引入LUA脚本,将if语句和删锁操作写在一个脚本语句中,进而确保原子性:Redis会将整个脚本作为一个整体执行,中间不会被其他进程或者进程的命令插入。将if语句块修改为以下形式:

//使用LUA脚本执行原子操作,避免锁误删
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);

// 设置一下返回值类型 为Long
// 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
// 那么返回字符串与0 会有发生错误。
redisScript.setResultType(Long.class);
// 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
redisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);

好了,到此终于得到了一个比较完善的redis锁处理并发的解决方案了,下面我们就将其应用到实战!

秒杀系统示例

秒杀系统是很常见的一种并发的场景,正适合演示redis锁。

一些基本参数如下图。我们就设想10万人抢10个商品。redis锁自动释放时间为3秒。其中的userPatient指的是,用户在抢购失败后马上重新发起抢购,如此重复的最大时间(想象某宝上抢东西,抢一次没抢到,又抢,又没抢到,继续抢,直到抢光或失去耐心......)。

private int totalNum = 10;//总商品数量

private int lockTimeOut = 3;//锁过期时间3秒

private int userPatient = 30000;//用户抢购的模拟时间(毫秒)

private int userNum = 100000;//抢购人数

然后我们模拟创建一定数量的用户。这里为了便于演示,直接用1~100000的字符串代指用户。

List<String> initUsers() {
        List<String> result = new ArrayList<>();
        //这里简单的用数字代指用户
        for (int i = 1; i <= userNum; i++) {
            result.add(String.valueOf(i));
        }

        return result;
    }

将上一节介绍的teskLock方法修改下。若抢购成功,则方法返回该用户,如果抢购失败则返回null:


    public String rob(String user) {
        //用户开抢时间
        long startTime = System.currentTimeMillis();

        //模拟用户持续抢
        while ((startTime + userPatient) >= System.currentTimeMillis()) {
            //首先查看总库存,如果为0,则返回null
            if (totalNum < 1) {
                return null;
            }

            String uuid = UUID.randomUUID().toString();
            Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, lockTimeOut, TimeUnit.SECONDS);//设置锁过期时间,防止死锁
            //获取锁成功
            if (lock) {
                //首先查看总库存,如果为0,则返回null
                if (totalNum < 1) {
                    return null;
                }
                //模拟用户生成订单时间
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                log.info("用户 {} 抢购成功", user);
                totalNum--;


                //使用LUA脚本执行原子操作,避免锁误删
                String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
                DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
                redisScript.setScriptText(script);

                // 设置一下返回值类型 为Long
                // 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
                // 那么返回字符串与0 会有发生错误。
                redisScript.setResultType(Long.class);
                // 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
                redisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);
                return user;
            } else {
                log.error("获取锁失败!(没抢到)");
            }
        }
        return null;

    }

这里需要注意:在获取锁前后都做了库存数量的判断,以免在库存为0时继续抢购(想象双11抢东西,抢着抢着没货了)。

最后是模拟抢购的接口。注意这里使用了parallelStream方法模拟并发。

    @GetMapping("/monitor")
    public List<String> monitor() {
        //初始化抢购用户
        List<String> users = initUsers();
        //抢购成功用户结果表
        List<String> winners = new ArrayList<>();

        users.parallelStream().forEach(user -> {
            //用户尝试抢购
            String currentUser = rob(user);
            //如果抢购成功,则将用户放入结果表
            if (!StringUtils.isEmpty(currentUser)) {
                winners.add(currentUser);
            }
        });

        return winners;
    }

运行程序,并调用monitor接口,稍等片刻:

返回的10个数字就是抢购到商品的用户。多次调用接口,会发现每次生成的字符串几乎都不相同,但字符串数量始终是10个。

好的,到此有关redis分布式锁的介绍就到这里。源代码在此lisz112/redisLock 

PS:以上讲解的是Redis分布式锁的原理,具体使用的时候大家可以直接使用现成的框架如Redisson。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐