一、全局唯一ID

1.1 知识点介绍

每个店铺都可以发布优惠券,而每张优惠券都是唯一的。当用户抢购时,就会生成订单并保存到 tb_voucher_order 这张表中,而订单表如果使用数据库自增 ID 就存在一些问题:

  • id 的规律太明显。如果 id 规律太明显,用户就能够根据 id 猜测出一些信息。比方说,某用户第一天下了一单,此时 id 为 10,第二天同一时刻,该用户又下了一单,此时 id 为 100,那么用户就能够推断出昨天一天店家卖出了 90 单,这就将一些信息暴露给用户。
  • 受单表数据量的限制。订单的一个特点就是数据量比较大,只要用户不停地产生购买行为,就会不停地产生新的订单。如果网站做到一定的规模,用户量达到数百万,这时候每天都会产生数十万甚至近百万的订单,一年下来就会达到数千万的订单,那么两年三年不断累积下来,订单量就会越来越庞大,此时单张表就无法保存这么多的订单数据,就需要将单张表拆分成多张表。MySQL 的每张表会自己计算自己的自增长,如果每张表都使用自增长,订单 id 就一定会重复。

全局 ID 生成器,是一种在分布式系统下用来生成全局唯一 ID 的工具,一般要满足下列特性:

  • 唯一性
  • 高可用
  • 高性能
  • 递增型
  • 安全

为了增加 ID 的安全性,我们可以不直接使用 Redis 自增的数值,而是拼接一些其它信息:
在这里插入图片描述
ID 组成部分:

  • 符号位:1 bit,永远为 0
  • 时间戳:31 bit,以秒为单位,可以使用 69 年
  • 序列号:32 bit,秒内的计数器,支持每秒产生 2^32 个不同的 ID

1.2 Redis 实现全局唯一 id

package com.hmdp.utils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;

@Component
public class RedisIdWorker {

    private static final long BEGIN_TIMESTAMP = 1640995200L;

    private static int COUNT_BITS = 32;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    public Long nextId(String keyPrefix){
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long time = nowSecond - BEGIN_TIMESTAMP;

        String format = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        // Redis Incrby 命令将 key 中储存的数字加上指定的增量值。
        // 如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCRBY 命令。
        Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + format);

        return time << COUNT_BITS | count;
    }

    public static void main(String[] args) {
        LocalDateTime of = LocalDateTime.of(2022, 1, 1, 0, 0, 0);
        long l = of.toEpochSecond(ZoneOffset.UTC);
        // LocalTime类的toEpochSecond()方法用于
        // 将此LocalTime转换为自1970-01-01T00:00:00Z以来的秒数
        System.out.println(l);
    }
}

测试:

@SpringBootTest
class HmDianPingApplicationTests {

    @Autowired
    private RedisIdWorker redisIdWorker;

    private ExecutorService es = Executors.newFixedThreadPool(500);

    @Test
    void testIdWorker() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(300);
        Runnable task = () -> {
            for (int i = 0; i < 100; i++) {
                Long id = redisIdWorker.nextId("order");
                System.out.println("id = " + id);
            }
            latch.countDown();
        };
        long begin = System.currentTimeMillis();
        for (int i = 0; i < 300; i++) {
            es.submit(task);
        }
        latch.await();
        long end = System.currentTimeMillis();
        System.out.println("time = " + (end - begin));
    }

}

1.3 总结

全局唯一 ID 生成策略:

  • UUID:16进制的字符串ID,可以做唯一ID,但不支持自增
  • Redis 自增
  • snowflake 雪花算法:long 类型的 64 ID,性能更好,但是比较依赖于时钟,如果时间不准确,可能会出现异常问题
  • 数据库自增:单独创建一张表,用于实现自增

Redis 自增 ID 策略:

  • 每天一个 key,方便统计订单量
  • ID 构造是 时间戳 + 计数器

二、实现优惠券秒杀下单

2.1 案例分析

每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购:
在这里插入图片描述
表关系如下:

  • tb_voucher:优惠券的基本信息,优惠金额、使用规则等。
  • tb_seckill_voucher:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息。

在这里插入图片描述

功能实现

下单时需要判断两点:

  • 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
  • 库存是否充足,不足则无法下单

在这里插入图片描述

2.2 代码实现

VoucherOrderController

@RestController
@RequestMapping("/voucher-order")
public class VoucherOrderController {

    @Autowired
    private IVoucherOrderService voucherOrderService;

    @PostMapping("seckill/{id}")
    public Result seckillVoucher(@PathVariable("id") Long voucherId) {
        return voucherOrderService.seckillVoucher(voucherId);
    }
}

IVoucherOrderService

public interface IVoucherOrderService extends IService<VoucherOrder> {

    Result seckillVoucher(Long voucherId);
}

VoucherOrderServiceImpl

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Autowired
    private ISeckillVoucherService seckillVoucherService;

    @Autowired
    private RedisIdWorker redisIdWorker;

    @Override
    @Transactional
    public Result seckillVoucher(Long voucherId) {

        SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);

        // 判断秒杀是否还未开始
        if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
            Result.fail("秒杀尚未开始!");
        }

        // 判断秒杀是否已经结束
        if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
            Result.fail("秒杀已经结束!");
        }

        // 判断库存是否充足
        if (seckillVoucher.getStock() < 1) {
            Result.fail("库存不足!");
        }

        // 扣减库存
        boolean success = seckillVoucherService.update().
                setSql("stock = stock - 1").
                eq("voucher_id", voucherId).update();

        // 扣减失败
        if(!success){
            return Result.fail("库存不足!");
        }

        // 创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        // 生成订单 id
        Long orderId = redisIdWorker.nextId("order");
        voucherOrder.setVoucherId(voucherId);
        // 用户 id
        Long userId = UserHolder.getUser().getId();
        voucherOrder.setUserId(userId);
        voucherOrder.setId(orderId);
        save(voucherOrder);

        return Result.ok(orderId);
    }
}

三、超卖问题

3.1 Jmeter 测试

使用 Jmeter 测试高并发情况下优惠券秒杀功能。

Jmeter 设置如下:
1、新增线程组
在这里插入图片描述
2、设置线程组参数
在这里插入图片描述
3、新增 HTTP 请求
在这里插入图片描述
并设置参数:
在这里插入图片描述

4、右键 HTTP 请求,添加查看结果树及聚合报告
在这里插入图片描述
5、添加身份验证 token。右键 HTTP 请求,选择 add—Config Element — HTTP Header Manager.
在这里插入图片描述
并设置参数:
在这里插入图片描述

如何查看 authorization 的值?
启动黑马点评项目,然后登录,进入系统后,按F12,选择 Network,选择 Header,就可以看到authorization
在这里插入图片描述
测试结果如下:
在这里插入图片描述
聚合报告中显示 200 个线程均成功了,但事实上我们只有 100 件库存。
再来看下数据库中秒杀表的结果:
在这里插入图片描述
可以看出库存结果显示为 -100,商品出现超卖现象。
在实际生活中,超卖问题是不允许出现,这会给商家造成巨大的损失。

3.2 超卖问题出现的原因

在高并发情况下,假设线程 1 查询库存,查询结果为 1 ,当线程 1 准备要去扣减库存时,其他线程也去查询库存,结果查询出来的库存数也是 1,那么这时所有的线程查询到的库存数都是大于 0 的,所有的线程都会去执行扣减操作,就会导致超卖问题。

超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:
在这里插入图片描述

乐观锁

乐观锁的关键是判断之前查询得到的数据是否有被修改过,常见的方式有两种:

1、版本号法:

所谓版本号法就是给查询得到的数据加一个版本号,在多线程并发的时候,基于版本号来判断数据有没有被修改过,每当数据被修改,版本号就会加1。
在这里插入图片描述
假设当前有两个线程,线程 1 和线程 2,线程1 在执行查询操作时,将库存数据以及版本号查询出来,而在线程 1 执行扣减库存前,线程 2 开始执行查询操作,查询出的库存数据与版本号与线程 1 一致。而后,线程 1 开始执行扣减操作,在修改时,需判断版本号是否发生改变,如果一致,扣减库存并修改版本号。此时,线程 2 也开始扣减库存,线程 2 在扣减时判断版本号与之前查询得到的版本号是否一致,此时版本号已经被线程 1 修改,所以得到的结果也就不一致,扣减失败。

2、CAS(Compare And Swap) 法

CAS 法,即比较和替换法,是在版本号法的基础上改进而来。CAS 法去除了版本号法中的版本号信息,以库存信息本身有没有变化为判断依据,当线程修改库存时,判断当前数据库中的库存与之前查询得到的库存数据是否一致,如果一致,则说明线程安全,可以执行扣减操作,如果不一致,则说明线程不安全,扣减失败。
在这里插入图片描述

3.3 代码实现

修改 VoucherOrderServiceImpl,在扣减库存时,增加对库存数量的判断,判断当前库存是否与查询得到的库存数一致。

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Autowired
    private ISeckillVoucherService seckillVoucherService;

    @Autowired
    private RedisIdWorker redisIdWorker;

    @Override
    @Transactional
    public Result seckillVoucher(Long voucherId) {

        SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);

        // 判断秒杀是否还未开始
        if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
            Result.fail("秒杀尚未开始!");
        }

        // 判断秒杀是否已经结束
        if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
            Result.fail("秒杀已经结束!");
        }

        // 判断库存是否充足
        if (seckillVoucher.getStock() < 1) {
            Result.fail("库存不足!");
        }

        // 扣减库存
        boolean success = seckillVoucherService.update().
                setSql("stock = stock - 1").
                eq("voucher_id", voucherId).
                eq("stock", seckillVoucher.getStock()).    // 增加对库存的判断,判断当前库存是否与查询出的结果一致
                update();

        // 扣减失败
        if(!success){
            return Result.fail("库存不足!");
        }

        // 创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        // 生成订单 id
        Long orderId = redisIdWorker.nextId("order");
        voucherOrder.setVoucherId(voucherId);
        // 用户 id
        Long userId = UserHolder.getUser().getId();
        voucherOrder.setUserId(userId);
        voucherOrder.setId(orderId);
        save(voucherOrder);

        return Result.ok(orderId);
    }
}

将数据库中的 tb_voucher_order 表(优惠券订单表)以及 tb_seckill_voucher 表(优惠券秒杀表)数据还原,使用 Jmeter 进行测试,看一下聚合报告结果:
在这里插入图片描述
发现错误率 50%,再看下tb_voucher_order、tb_seckill_voucher 中的数据在这里插入图片描述
在这里插入图片描述
只卖出了 20 件,这是为什么呢?
当大量线程涌入,先执行库存数查询逻辑,那么这一批线程拿到的库存数都是一致的,当再去执行扣减库存逻辑时,假设此时线程 A 先拿到了执行权,在执行时,将库存数减 1,那么当其他线程再准备去执行扣减时,肯定跟最开始查询得到的库存数不一致,就会导致扣减失败。
如何修改呢?
库存数只要大于0,就应该允许所有线程都能够执行扣减逻辑。

3.4 代码改进

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Autowired
    private ISeckillVoucherService seckillVoucherService;

    @Autowired
    private RedisIdWorker redisIdWorker;

    @Override
    @Transactional
    public Result seckillVoucher(Long voucherId) {

        SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);

        // 判断秒杀是否还未开始
        if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
            Result.fail("秒杀尚未开始!");
        }

        // 判断秒杀是否已经结束
        if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
            Result.fail("秒杀已经结束!");
        }

        // 判断库存是否充足
        if (seckillVoucher.getStock() < 1) {
            Result.fail("库存不足!");
        }

        // 扣减库存
        boolean success = seckillVoucherService.update().
                setSql("stock = stock - 1").
                eq("voucher_id", voucherId).
//                eq("stock", seckillVoucher.getStock()).    // 增加对库存的判断,判断当前库存是否与查询出的结果一致
                gt("stock", 0).        // 修改判断逻辑,改为只要库存大于0,就允许线程扣减
                update();

        // 扣减失败
        if(!success){
            return Result.fail("库存不足!");
        }

        // 创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        // 生成订单 id
        Long orderId = redisIdWorker.nextId("order");
        voucherOrder.setVoucherId(voucherId);
        // 用户 id
        Long userId = UserHolder.getUser().getId();
        voucherOrder.setUserId(userId);
        voucherOrder.setId(orderId);
        save(voucherOrder);

        return Result.ok(orderId);
    }
}

使用 Jmeter 重新测试:
聚合报告:
在这里插入图片描述
tb_seckill_voucher:
在这里插入图片描述
tb_voucher_order:
在这里插入图片描述
执行结果没有问题。

3.5 总结

超卖这样的线程安全问题,解决方案有哪些?
1、悲观锁:添加同步锁,让线程串行执行

  • 优点:简单粗暴
  • 缺点:性能一般

2、乐观锁:不加锁,在更新时判断是否有其他线程在修改

  • 优点:性能好
  • 缺点:存在成功率低的问题。

像上面的库存这种案例比较特殊,只需判断库存是否大于 0 即可,但是有些情况可能就只能通过判断数据有没有发生变化,这种情况要想提高成功率,可以采用分批加锁(分段锁)的方案,将数据资源分成几份,以库存为例,假设库存数 100,可以将库存分到 10 张表中,每张表中库存数为 10,用户在抢购的时候可以在多张表中进行抢购,这样成功率就会提高。

四、实现一人一单功能

4.1 需求分析

来看下优惠券订单表 tb_voucher_order 的数据:
在这里插入图片描述
可以发现所有的优惠券都被同一个用户所抢购,但是像这种优惠券,一般只允许用户购买一次,显然这不符合实际的业务需求,那如何进行改进呢?

需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单。

业务流程分析:
在这里插入图片描述

4.2 代码实现

在扣减库存前增加”判断当前优惠券用户是否已经下过单“的逻辑。

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Autowired
    private ISeckillVoucherService seckillVoucherService;

    @Autowired
    private RedisIdWorker redisIdWorker;

    @Override
    @Transactional
    public Result seckillVoucher(Long voucherId) {

        SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);

        // 判断秒杀是否还未开始
        if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
            Result.fail("秒杀尚未开始!");
        }

        // 判断秒杀是否已经结束
        if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
            Result.fail("秒杀已经结束!");
        }

        // 判断库存是否充足
        if (seckillVoucher.getStock() < 1) {
            Result.fail("库存不足!");
        }

        // 判断当前优惠券用户是否已经下过单
        // 用户 id
        Long userId = UserHolder.getUser().getId();
        int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
        if (count > 0) {
            return Result.fail("用户已经购买过一次");
        }

        // 扣减库存
        boolean success = seckillVoucherService.update().
                setSql("stock = stock - 1").
                eq("voucher_id", voucherId).
//                eq("stock", seckillVoucher.getStock()).    // 增加对库存的判断,判断当前库存是否与查询出的结果一致
                gt("stock", 0).        // 修改判断逻辑,改为只要库存大于0,就允许线程扣减
                update();

        // 扣减失败
        if(!success){
            return Result.fail("库存不足!");
        }

        // 创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        // 生成订单 id
        Long orderId = redisIdWorker.nextId("order");
        voucherOrder.setVoucherId(voucherId);

        voucherOrder.setUserId(userId);
        voucherOrder.setId(orderId);
        save(voucherOrder);

        return Result.ok(orderId);
    }
}

使用 Jmeter 进行测试:
tb_sekill_voucher 表中库存应当剩 99
在这里插入图片描述
再来看下 tb_voucher_order 表,可以看出这 10 个订单都是同一个用户所下。这是什么原因导致的呢?
在这里插入图片描述
其实跟优惠券秒杀逻辑一样,在并发情况下,此时有 10 个线程同时查询当前用户是否对当前优惠券下过单,而所查询到的结果均为 0,也就并发执行了扣减逻辑。如何解决呢?能不能像扣减逻辑一样使用乐观锁呢?答案是不行,乐观锁只能针对并发更新数据的情况,而此处我们执行的是查询操作,只能使用悲观锁,即同步代码块的方式。

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Autowired
    private ISeckillVoucherService seckillVoucherService;

    @Autowired
    private RedisIdWorker redisIdWorker;

    @Override
    public Result seckillVoucher(Long voucherId) {

        SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);

        // 判断秒杀是否还未开始
        if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
            Result.fail("秒杀尚未开始!");
        }

        // 判断秒杀是否已经结束
        if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
            Result.fail("秒杀已经结束!");
        }

        // 判断库存是否充足
        if (seckillVoucher.getStock() < 1) {
            Result.fail("库存不足!");
        }

        return createVoucherOrder(voucherId);
    }

    @Transactional
    public Result createVoucherOrder(Long voucherId) {
        // 判断当前优惠券用户是否已经下过单
        // 用户 id
        Long userId = UserHolder.getUser().getId();
        synchronized (userId.toString().intern()) {
            // 查询订单
            int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
            if (count > 0) {
                return Result.fail("用户已经购买过一次");
            }

            // 扣减库存
            boolean success = seckillVoucherService.update().
                    setSql("stock = stock - 1").
                    eq("voucher_id", voucherId).
//                eq("stock", seckillVoucher.getStock()).    // 增加对库存的判断,判断当前库存是否与查询出的结果一致
        gt("stock", 0).        // 修改判断逻辑,改为只要库存大于0,就允许线程扣减
                    update();

            // 扣减失败
            if (!success) {
                return Result.fail("库存不足!");
            }

            // 创建订单
            VoucherOrder voucherOrder = new VoucherOrder();
            // 生成订单 id
            Long orderId = redisIdWorker.nextId("order");
            voucherOrder.setVoucherId(voucherId);

            voucherOrder.setUserId(userId);
            voucherOrder.setId(orderId);
            save(voucherOrder);

            return Result.ok(orderId);
        }
    }
}

代码分析:

将判断用户是否下过单的逻辑以及扣减逻辑抽取成方法是为了更好地进行加锁。另外,由于扣减逻辑是在抽取出的方法中执行的,即操作数据的逻辑,所以也就将注解 @Transactional 添加到该方法上(该注解主要用于事务管理)。为什么不在方法上添加 Synchronized,即加锁呢?将 synchronized 加在方法上,那么同步锁就是当前对象。不过,不建议将 synchronized 加在方法上,那么锁的范围就变成了整个方法,而且锁的对象是 this,即当前对象,也就说明任何一个用户执行该方法时,都会添加这个锁,而且是同一个锁,那么整个方法就会串行执行,整体性能就会变差。所谓的一人一单,实际上针对只是同一个用户,只有同一个用户在执行该方法时,才会进行判断该用户的并发安全问题,如果不是同一个用户,是不需要加锁的。那么锁的对象就不应该是 this,而是用户 id,将锁定资源范围缩小。另外还需要注意一点,我们在对用户 id 加锁时,应当保证同一个用户的 id 的值是一样的,但是 userId.toString() 方法,它在底层实际上是重新 new 了一个字符串,所以 userId 每调一次 toString 方法,就会生成一个全新的字符串对象,这里我们就需要调用字符串的 intern() 方法,intern() 方法会手动将字符串添加进字符串常量池,在下一次调用 intern() 方法时,会首先去字符串常量池中查找当前字符串是否存在,如果存在则直接将常量池中数据返回,如果不存在,则将其添加进常量池。关于 intern() 方法的介绍,可以看下这位大佬的文章:浅谈String.intern()方法

代码测试结果

tb_seckill_voucher
在这里插入图片描述
tb_voucher_order
在这里插入图片描述

另外,在观看视频时,发现弹幕中有人说这是需要多个同样 id 的账号访问才会造成多线程问题啊。此处加锁的原因,是因为很有可能某些人,比如黄牛,使用某些并发工具,在抢购时并发抢购,产生一人多单的情况,所以这就是在此处针对个人id进行加锁的根本原因。

五、一人一单的并发安全问题

5.1 问题分析

通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。
1、我们将服务启动两份,端口分别为 8081 和 8082:
如何复制参考这位大佬写的文章,设置一下 idea 即可:idea如何开启Run Dashboard
在这里我也具体说一下:
打开黑马点评项目中 .idea 文件夹下的 workspace.xml 文件:
在这里插入图片描述
在文件中添加下面一段内容:

<component name="RunDashboard">
  <option name="configurationTypes">
    <set>
      <option value="SpringBootApplicationConfigurationType" />
    </set>
  </option>
</component>

添加完成后,在 idea 下方会出现一个 Service 窗口,点击Service 窗口之后,或者使用快捷键 command + 8(Mac 系统下),然后选择左侧的 8081 端口的服务应用,按 Ctrl + D 进行复制。
在这里插入图片描述
按 Ctrl + D 之后会出现如下弹窗,设置服务名以及服务端口号,点击”OK“键保存。
在这里插入图片描述
然后右键每一个服务,选择启动即可。
在这里插入图片描述
2、修改 nginx 的 conf 目录下的 nginx.conf 文件,配置反向代理和负载均衡:
在这里插入图片描述
重新启动nginx:./nginx -s reload

现在,用户请求会在这两个节点上负载均衡,再次测试下是否存在线程安全问题。

在创建订单方法中打上断点,然后使用 postman 新建两个请求
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
这里使用的是同一个用户
最终数据库中会插入两条相同的订单数据(我没能实现)。
假设做集群部署时,有两个 Tomcat,每个 Tomcat 都有各自的 JVM。在 每个 JVM 内部会维护一个锁的监视器对象,而秒杀方法中,锁的监视器对象用的是用户 id,而用户 id 是放置在了常量池中,一个 JVM 中维护一个常量池,那么不同的 JVM 有不同的常量池,所以当其他线程去执行秒杀逻辑时,获取不到 其他 JVM 中已经存在的锁监视器对象,这种情况下就又会出现线程安全问题。
在这里插入图片描述

六、分布式锁

6.1 工作原理

在集群模式下,synchronized 锁只能保证单个 JVM 内部的多个线程之间的互斥,而没有办法让集群下的多个 JVM 进程之间互斥,想要解决这个问题,就不能再使用 JVM 内部的锁监视器,必须让多个 JVM 使用同一个锁监视器,因此该锁监视器必须是一个在 JVM 外部的、多 JVM 进程都能看到的锁监视器。这个时候,无论是 JVM 内部还是多 JVM 进程都应该来找该锁监视器。
在这里插入图片描述

假设有两个 JVM,此时 JVM1 中的线程 1 需要获取互斥锁,就会去找外部的锁监视器,一旦获取成功,锁监视器就会记录当前线程。恰好,JVM2 中的线程 3 也想获取互斥锁,这个时候由于锁监视器已经被JVM1 中的线程 1 所使用,线程 3 就会获取锁失败,一旦失败,线程 3 就会等待锁释放,而线程 1 由于获取锁成功,就会执行查询订单,判断订单是否存在,插入订单等业务逻辑,执行结束线程 1 就会释放锁,线程 3 就会获取所成功,也开始执行查询订单逻辑,由于 JVM 1 中的线程 1 已经插入订单,此时 JVM2 中的线程 3 就会查询到订单,就无法再插入新订单,也就解决了多进程之间的一人一单问题。

6.2 什么是分布式锁

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
分布式锁应当满足以下几个点:
1、多进程可见
2、互斥:必须确保只能有一个线程拿到互斥锁。
3、高可用:必须保证大多数情况下获取锁都是成功的
4、高性能:加锁以后就会影响业务的性能,加锁后,业务的执行变成串行执行,如果获取锁的动作又很慢,就会导致执行效率更低,雪上加霜。
5、安全性:在获取锁的时候应当考虑一些异常情况,比如获取后还未释放,服务器宕机,锁需要怎么处理,又或者会不会产生死锁问题。

6.3 分布式锁的实现方式

分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种:
在这里插入图片描述
MySQL:MySQL 本身具备事务机制,在事务执行的时候,或者说在执行写操作的时候,MySQL 会自动分配一个互斥的锁,这样在多个事务之间就是互斥的,可以用这个原理来实现锁。假设现在有业务需要用到分布式互斥锁,我们可以在业务执行前,先去 MySQL 中申请一个互斥锁,当业务执行结束,就可以提交事务,锁就会释放,当业务抛出异常时,就会自动触发回滚,锁也会释放。MySQL 本身支持主从模式,所以高可用性能比较好。至于性能,就会受限于 MySQL 的性能,相比较于 Redis 而言,其性能一般。
Redis:利用 SETNX 互斥命令,当使用 SETNX 命令向 Redis 中存储数据时,只有该数据的 key 不存在时才能存储成功,如果已经存在,就会存储失败。释放锁时,只需要将 key 删除即可。Redis 支持主从模式、集群模式,可用性高。Redis 的性能也远远高于 MySQL。但是 Redis 在使用 SETNX 命令存储数据成功后,一旦 Redis 服务宕机,那锁就无法释放,就会导致其他线程拿不到锁,出现死锁。这里可以在 SETNX 时,设置过期时间,但是如果过期时间设置的过长,那么锁的无效等待时间就会比较多,如果设置过短,有可能导致业务没有执行结束就将锁释放掉。
Zookeeper:Zookeeper 实现锁的原理是基于它内部的节点机制。Zookeeper 内部可以创建数据节点,而节点具有唯一性和有序性,另外,Zookeeper 还可以创建临时节点。所谓唯一性就是在创建节点时,节点不能重复;所谓有序性是指每创建一个节点,节点的id是自增的。那么就可以利用节点的有序性来实现互斥。当有大量线程来获取互斥锁时,每个线程就会创建一个节点,而每个节点的 id 是单调递增的,如果我们约定 id 最小的那个获取锁成功,这样就可以实现互斥。当然,也可以利用唯一性,让所有线程去创建节点,但是节点名称相同,这样就会只能有一个线程创建成功。一般情况下,会使用有序性来实现互斥。想要释放锁,则只需要将节点删除即可,一旦将最小节点删除,那么剩余节点中 id 最小的那个节点就会获取锁成功。Zookeeper 本身也支持集群,所以其可用性很好。而Zookeeper 的集群强调节点之间的强一致性,而这种强一致性就会导致主从之间在进行数据同步时会消耗一定的时间,其性能相较于 Redis 而言会差一点。安全性方面,Zookeeper 一般创建的是临时节点,一旦服务出现故障,Zookeeper 就会自动断开连接,锁就会自动释放掉。

6.4 基于 Redis 的分布式锁

实现分布式锁时需要实现的两个基本方法:
(1)获取锁:
互斥:确保只能有一个线程获取锁

# 添加锁,利用SETNX的互斥特性
SETNX lock thread1

# 添加锁过期时间,避免服务宕机引起的死锁
EXPIRE lock 10

但需保证操作的原子性,在添加锁的同时设置过期时间,如果先添加锁,此时 Redis 服务恰好宕机,那么锁就没法释放,可以使用下面的命令保证操作的原子性。

# EX 后跟过期时间,NX 保证互斥特性
set lock thread1 EX 10 NX

在获取锁时,成功返回 OK,失败返回 nil。但是失败以后,又该如何操作呢?有两种处理方式,一种是阻塞式获取,即获取失败后会一直等待锁释放;另一种是非阻塞式获取,即获取失败后立即返回。在这里,我们使用非阻塞式获取的方式,阻塞式获取的方式,对内存有一定的浪费且实现起来比较困难。
非阻塞:尝试一次,成功返回 true,失败返回 false
(2) 释放锁:
手动释放:

# 释放锁,删除即可
del key

超时释放:获取锁时添加一个超时时间

流程分析

在这里插入图片描述

6.5 基于 Redis 实现分布式锁初级版本

需求:定义一个类,实现下面接口,利用 Redis 实现分布式锁功能。

public interface ILock {
    
    /**
     * 尝试获取锁
     * @param timeoutSec 锁持有的超时时间,过期后自动释放
     * @return true 代表获取锁成功;false 代表获取锁失败
     * 
     * */
    boolean tryLock(long timeoutSec);
    
    /**
     * 释放锁
     * 
     * */
    void unlock();
}

实现
public class SimpleRedisLock implements ILock{

    /**
     * 锁名称,不能固定写死,应当根据业务传入进来
     */
    private String name;
    private StringRedisTemplate stringRedisTemplate;
    /**
     * 锁前缀
     */
    private static final String KEY_PREFIX = "lock:";

    /**
     * 构造函数
     *
     * @param name 锁名称,创建 SimpleRedisLock 实例时传入
     * @param stringRedisTemplate
     */
    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public boolean tryLock(long timeoutSec) {
        // 获取线程标识
        long threadId = Thread.currentThread().getId();
        // 获取锁
        Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name,
                threadId + "", timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    @Override
    public void unlock() {
        // 释放锁
        stringRedisTemplate.delete(KEY_PREFIX + name);
    }
}

一人一单业务逻辑修改
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Autowired
    private ISeckillVoucherService seckillVoucherService;

    @Autowired
    private RedisIdWorker redisIdWorker;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public Result seckillVoucher(Long voucherId) {

        SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);

        // 判断秒杀是否还未开始
        if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
            Result.fail("秒杀尚未开始!");
        }

        // 判断秒杀是否已经结束
        if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
            Result.fail("秒杀已经结束!");
        }

        // 判断库存是否充足
        if (seckillVoucher.getStock() < 1) {
            Result.fail("库存不足!");
        }

        return createVoucherOrder(voucherId);
    }

    @Transactional
    public Result createVoucherOrder(Long voucherId) {
        // 判断当前优惠券用户是否已经下过单
        // 用户 id
        Long userId = UserHolder.getUser().getId();

        // 创建互斥锁对象
        // 此处我们针对的是一人一单,防止一人出现多单,所以锁应该添加上用户 id,
        // 如果不加,则是针对的所有用户,那么只要有用户获取到锁,其他用户就无法在进行操作。
        SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
        // 获取互斥锁
        boolean isLock = lock.tryLock(1200);
        if (!isLock) {
         	// 获取锁失败,直接返回失败或者重试
            return Result.fail("不允许重复下单!");
        }


        try {
            // 查询订单
            int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
            if (count > 0) {
                return Result.fail("用户已经购买过一次");
            }

            // 扣减库存
            boolean success = seckillVoucherService.update().
                    setSql("stock = stock - 1").
                    eq("voucher_id", voucherId).
//                eq("stock", seckillVoucher.getStock()).    // 增加对库存的判断,判断当前库存是否与查询出的结果一致
        gt("stock", 0).        // 修改判断逻辑,改为只要库存大于0,就允许线程扣减
                    update();

            // 扣减失败
            if (!success) {
                return Result.fail("库存不足!");
            }

            // 创建订单
            VoucherOrder voucherOrder = new VoucherOrder();
            // 生成订单 id
            Long orderId = redisIdWorker.nextId("order");
            voucherOrder.setVoucherId(voucherId);

            voucherOrder.setUserId(userId);
            voucherOrder.setId(orderId);
            save(voucherOrder);

            return Result.ok(orderId);
        } finally {
            // 释放锁
            lock.unlock();
        }

    }
}

分别使用 postman 和 Jmeter 进行测试

postman 测试:
8081:
在这里插入图片描述
这里应该 idea 的一个bug,但是根据下面 Debugger 窗口中的参数信息,可以看出 8081 端口成功获取到了锁。
再来看下 8082:
在这里插入图片描述
8082 获取互斥锁失败。
使用 Jmeter 测试结果也成功了。

6.6 Redis 分布式锁误删问题

在这里插入图片描述

假设线程 1 获取互斥锁且获取成功,拿到锁后,线程 1 开始执行业务,但是由于某种原因,线程 1 的业务发生了阻塞,那么线程 1 持有锁的周期就会变长,那会在什么时候释放锁呢?一种情况是线程 1 的业务执行结束,由线程 1 执行释放锁的逻辑。还有一种情况是,由于线程 1 阻塞时间过长,超过了锁自动释放的时间,使得锁自动释放掉了,即业务还未执行结束,锁提前释放掉了。第二种情况,会有什么问题呢?如果此时,其他线程也去尝试获取锁,就会获取成功。假设此时线程 2 成功拿到锁,然后执行自己的业务。而就在此时,线程 1 从阻塞中被唤醒了,业务执行结束,然后就会去释放锁,此时释放掉的的锁其实是线程 2 的锁。线程 2 并不知道自己的锁已经被线程 1 给释放掉了,还在执行自己的业务。恰巧此时,线程 3 又获取锁成功,然后线程 3 也开始执行自己的业务,那么此时此刻,就会有两个线程都拿到了锁都在执行业务,又一次出现并行问题,线程安全问题就有可能再次发生。

如何解决第二种情况呢?造成这种情况的原因,其实就在于线程 1 在释放的时候将别人的锁释放掉了,如果在释放锁的时候,判断下当前线程与锁中存放的标识是否一致(此处业务在获取锁时将线程 id 存放到了锁监视器中),就可以避免这个问题。
在这里插入图片描述

流程分析

在这里插入图片描述

6.7 改进 Redis 的分布式锁

需求:修改之前的分布式锁实现,满足:
1.在获取锁时存入线程标识(可以用 UUID 表示)
2.在释放锁时先获取锁中的线程标识,判断是否与当前线程标识一致。如果一致则释放锁,如果不一致则不释放锁。

代码改进

public class SimpleRedisLock implements ILock{

    /**
     * 锁名称,不能固定写死,应当根据业务传入进来
     */
    private String name;
    private StringRedisTemplate stringRedisTemplate;
    /**
     * 锁前缀
     */
    private static final String KEY_PREFIX = "lock:";

    private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";

    /**
     * 构造函数
     *
     * @param name 锁名称,创建 SimpleRedisLock 实例时传入
     * @param stringRedisTemplate
     */
    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public boolean tryLock(long timeoutSec) {
        // 获取线程标识
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁
        Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name,
                threadId, timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    @Override
    public void unlock() {
        // 获取线程标识
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁中的标识
        String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
        if(threadId.equals(id)) {
            // 释放锁
            stringRedisTemplate.delete(KEY_PREFIX + name);
        }
    }
}

测试

使用 postman 同时向 8081 和 8082 发送秒杀请求,让 8081 先获取锁,随后再让 8082 获取锁,接着去 Redis 中将 8081 存储的线程标识删除,然后放行至释放锁逻辑,观察当前线程标识和从 Redis 中获取到的线程标识是否一致,这个时候获取到的标识应该是不一致的。接着放行 8082 至释放锁逻辑,此时当前线程标识和从 Redis 中获取到的线程标识是一致的。

6.8 分布式锁的原子性问题

在这里插入图片描述

假设线程 1 获取互斥锁且获取成功,拿到锁后,线程 1 开始执行业务,执行结束后,线程 1 的准备释放锁,但在此时线程 1 发生了阻塞,由于线程 1 阻塞时间过长,超过了锁自动释放的时间,使得锁自动释放掉了。此时线程 2 成功拿到锁,然后执行自己的业务。而就在此时,线程 1 从阻塞中被唤醒了,业务执行结束,然后就会去释放锁,此时释放掉的的锁其实是线程 2 的锁。线程 2 并不知道自己的锁已经被线程 1 给释放掉了,还在执行自己的业务。恰巧此时,线程 3 又获取锁成功,然后线程 3 也开始执行自己的业务,那么此时此刻,就会有两个线程都拿到了锁都在执行业务,又一次出现并行问题,线程安全问题就有可能再次发生。

要解决这个问题,就必须确保判断锁标识的动作和释放锁的动作一起执行,不能存在间隔。那如何保证两个动作的原子性呢?

6.9 Lua 脚本解决多条命令原子性问题

6.9.1 Lua 脚本简介

Redis 提供了 Lua 脚本功能,在一个脚本中编写多条 Redis 命令,确保多条命令执行时的原子性。Lua 是一种编程语言。

Redis 提供的调用函数,语法如下:

-- 执行 Redis 命令
redis.call('命令名称', 'key', '其他参数', ...)

例如, 我们要执行 set name jack,则脚本是这样:

-- 执行 set name jack
redis.call('set', 'name', 'jack')

例如,我们要先执行 set name Rose,再执行 get name,则脚本如下:

-- 先执行 set name Rose
redis.call('set', 'name', 'Rose')
-- 再执行 get name
local name = redis.call('get', 'name')
-- 返回
return name

写好脚本以后,需要用 Redis 命令来调用脚本,调用脚本的常见命令如下:
在这里插入图片描述
例如,我们要执行 redis.call('set', 'name', 'jack') 这个脚本,语法如下:
在这里插入图片描述
如果脚本中 key、value 不想写死,可以作为参数传递。key 类型的参数会放入 KEYS 数组,其他参数会放入 ARGV 数组,在脚本中可以从 KEYS 和 ARGV 数组中获取这些参数:
在这里插入图片描述
注意:在 Lua 语言中,数组参数的下标是从 1 开始的

6.6.2 使用 Lua 编写释放锁的业务流程

释放锁的业务流程是这样的:
1、获取锁中的线程标识
2、判断是否与指定的标识(当前线程标识)一致
3、如果一致则释放锁(删除)
4、如果不一致则什么都不做

如果用 Lua 脚本来表示则是这样的:

-- 这里的 KEYS[1] 就是锁的 key,这里的 ARGV[1] 就是当前线程标识
-- 获取锁中的标识,判断是否与当前线程标识一致
if(redis.call('get', KEYS[1]) == ARGV[1]) then
    -- 一致,则删除锁
    return redis.call('del', KEYS[1])
end
-- 不一致,则直接返回
return 0

6.9.3 再次改进 Redis 的分布式锁

需求:基于 Lua 脚本实现分布式锁的释放锁逻辑
提示:RedisTemplate 调用 Lua 脚本的 API 如下:
在这里插入图片描述
代码实现:

public class SimpleRedisLock implements ILock{

    /**
     * 锁名称,不能固定写死,应当根据业务传入进来
     */
    private String name;
    private StringRedisTemplate stringRedisTemplate;
    /**
     * 锁前缀
     */
    private static final String KEY_PREFIX = "lock:";

    private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";

    /**
     * 使用静态代码块的目的:lua 脚本会在该类加载时就会被加载进来,
     * 这样就可以不用在每次释放锁的时候加载 Lua 脚本,提高性能
     */
    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
    static {
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    }

    /**
     * 构造函数
     *
     * @param name 锁名称,创建 SimpleRedisLock 实例时传入
     * @param stringRedisTemplate
     */
    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public boolean tryLock(long timeoutSec) {
        // 获取线程标识
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁
        Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name,
                threadId, timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    @Override
    public void unlock() {
        stringRedisTemplate.execute(UNLOCK_SCRIPT,
                Collections.singletonList(KEY_PREFIX + name),
                ID_PREFIX + Thread.currentThread().getId());
    }
}

测试方式不变

七、总结

基于 Redis 的分布式锁实现思路:

  • 利用 set nx ex 获取锁,并设置过期时间,保存线程标识
  • 释放锁时先判断线程标识是否与自己一致,一致则删除

特性:

  • 利用 set nx 满足互斥性
  • 利用 set ex 保证故障时锁依然能释放,避免死锁,提高安全
  • 利用 Redis 集群保证高可用和高并发特性
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐