减库存业务

使用两台Nginx + 两个Tomcat进行负载均衡,模拟分布式,Redis使用单机实例。

压测工具:Jmeter

库存数据保存在Redis中
在这里插入图片描述
这里使用springboot 整合 Redis

/**
 * @author 阳光大男孩!!!
 */
@RestController
@RequestMapping("order")
@Slf4j
@RequiredArgsConstructor
public class orderController {

    private final StringRedisTemplate stringRedisTemplate;
    
    @RequestMapping("/deduceStock")
    public RespBean deduceOrder()
    {
        String s = stringRedisTemplate.opsForValue().get("stock");
        if(s==null)
        {
            return RespBean.error("请求失败");
        }
        int stock =Integer.parseInt(s) ;
        if(stock>0)
        {
            stringRedisTemplate.opsForValue().set("stock",String.valueOf(stock-1));
            System.out.println("扣减库存成功,剩余库存"+(stock-1));
            return RespBean.ok("扣减库存成功");
        }else
        {
            System.out.println("扣减库存失败,余额不足");
            return RespBean.ok("扣减库存失败");
        }

    }
}

可以看出来这是一段问题代码,当并发情况下,会导致超卖。

为防止超卖,怎么做?

可以使用synchronized加锁

synchronized问题在哪?

分布式环境下synchronized失效,因为synchronized是单JVM下保证锁。

当分布式情况下,两台实例时,依旧会导致问题。

在通过nginx负载均衡后,可以看到两个实例出现了相同的“剩余库存”,这就会超卖
在这里插入图片描述

在这里插入图片描述

那应该怎么做?

使用Redis的setnx命令,Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值

如果能修改key返回true,说明获取到key,加锁成功,最后将key删掉,便于下个减库存。

  Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "");
  if(!lock){
      return RespBean.error("请重试");
  }

业务出现异常,没有释放锁(删掉key),怎么办?

使用try catch finally ,在finally中删掉key

中途宕机的话锁就不会被释放,中途宕机怎么办?

解决方案可以是加锁时,给key设置过期时间。

如果在加锁后,设置过期时间之前,挂了,依然不会解决问题,怎么办

set命令可以支持同时设置key和过期时间,即setnx+expire,redis可以保证原子性,或者使用LUA脚本保证setnx+expire命令的原子性。

set key value [EX seconds] [PX milliseconds] [NX|XX]
EX seconds:设置失效时长,单位秒
PX milliseconds:设置失效时长,单位毫秒
NX:key不存在时设置value,成功返回OK,失败返回(nil)
XX:key存在时设置value,成功返回OK,失败返回(nil)

案例:设置name=p7+,失效时长100s,不存在时设置
1.1.1.1:6379> set name p7+ ex 100 nx

当执行请求时,执行请求时间过长,键过期了,其他请求就会过来加锁,执行减库存,那么就会出现锁失效问题,怎么办?

可以使用watch dog 机制,开启子线程,每隔一段时间,判断锁是否还存在,如果还存在,那么就延长锁的时间。

实现Redis分布式锁

可以基于Redission实现分布式锁

 <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>3.14.1</version>
 </dependency>

将redisson交给Spring管理

/**
 * @author 阳光大男孩!!!
 */
@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient getRedission()
    {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://localhost:6379");
        return Redisson.create(config);

    }
}

用Redisson实现分布式锁,最基本的使用就是三板斧

// 获取锁
RLock lock = redissonClient.getLock("lock");
// 加锁
lock.lock();
// 解锁
lock.unlock();

将上面的减库存改一下

/**
 * @author 阳光大男孩!!!
 */
@RestController
@RequestMapping("order")
@Slf4j
@RequiredArgsConstructor
public class orderController {

    private final StringRedisTemplate stringRedisTemplate;

    private final RedissonClient redissonClient;

    @RequestMapping("/deduceStock")
    public RespBean deduceOrder() {
        // 获取锁
        RLock lock = redissonClient.getLock("lock");
        // 加锁
        lock.lock();
        try {
            // 获取库存
            String s = stringRedisTemplate.opsForValue().get("stock");
            if (s == null) {
                return RespBean.error("请求失败");
            }
            int stock = Integer.parseInt(s);
            // 减库存
            if (stock > 0) {
                stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock - 1));
                System.out.println("扣减库存成功,剩余库存" + (stock - 1));
            } else {
                System.out.println("扣减库存失败,余额不足");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return RespBean.ok("请求成功");
    }
}

使用jmeter进行压测
在这里插入图片描述
库存,正确地从500变成了0
在这里插入图片描述

使用Redis实现分布式锁的缺陷

场景

如果向 Redis master 实例,写入了用于实现锁的KV键值对,会异步复制给对应的 master slave 实例。但是这个过程中一旦发生 redis master 宕机,主备切换,redis slave 变为了 redis master。

问题

客户端 2 来尝试加锁的时候,在新的 redis master 上完成了加锁,而客户端 1 也以为自己成功加了锁。此时就会导致多个客户端对一个分布式锁完成了加锁。这时系统在业 务语义上一定会出现问题,导致各种脏数据的产生。

总结来说,就是在 redis master 实例宕机的时候,可能导致多个客户端同时完成加锁,出现问题。

解决方案

一、RedLock

基于此,redis官方提出了RedLock的实现方案,核心思想是同时使用多个Redis Master来冗余,且这些节点是完全独立的,也不需要对这些节点之间的数据进行同步。获取集群中多数master节点上的锁,同时全部获取,否则全部释放。

在这里插入图片描述

基于zookeeper

基于zookeeper的临时有序节点可以实现的分布式锁,其大致思想为:

每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个唯一的瞬时有序节点。

判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。 当释放锁的时候,只需将这个瞬时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。完成业务流程后,删除对应的子节点释放锁。

Redis分布式锁,B的锁被A给释放了的问题?

A、B两个线程来尝试给key myLock加锁,A线程先拿到锁(假如锁3秒后过期),B线程就在等待尝试获取锁,到这一点毛病没有。

那如果此时业务逻辑比较耗时,执行时间已经超过redis锁过期时间,这时A线程的锁自动释放(删除key),B线程检测到myLock这个key不存在,执行 SETNX命令也拿到了锁。但是,此时A线程执行完业务逻辑之后,还是会去释放锁(删除key),这就导致B线程的锁被A线程给释放了。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐