1.秒杀代码(最low实现):

    //开启事务
    @Transactional(rollbackFor = Exception.class)
    public BaseResponse<?> startSeckillWithLock(Long id, Long userId) {
        try {
          return seckill(id, userId);
        } catch (Exception e) {
            throw e;
        } finally {
        }
    }

    private BaseResponse<?> seckill(Long id, Long userId) {
        //获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id}
        Integer number = seckillMapper.getNumberById(id);
        if (number != null && number > 0) {
            log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number);
            //扣库存,sql为:UPDATE seckill  SET number=number-1 WHERE id = #{id}
            seckillMapper.deductNumberById(id);
            //创建订单
            SuccessKilledModel killed = new SuccessKilledModel();
            killed.setSeckillId(id);
            killed.setUserId(userId);
            killed.setState((short)0);
            successKilledMapper.insert(killed);
            return BaseResponse.valueOfSuccess();
        } else {
            return BaseResponse.valueOfError(10010, "库存不足");
        }
    }

这种多个线程执行肯定会发生超卖的现象:然后第一种解决方案是加ReenTranlock程序锁如下:

2.秒杀加程序锁实现(会出现超卖一个的现象)

    @Override
    //开启事务
    @Transactional(rollbackFor = Exception.class)
    public BaseResponse<?> startSeckillWithLock(Long id, Long userId) {
        lock.lock();
        try {
          return seckill(id, userId);
        } catch (Exception e) {
            throw e;
        } finally {
            lock.unlock();
        }
    }

    private BaseResponse<?> seckill(Long id, Long userId) {
        //获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id}
        Integer number = seckillMapper.getNumberById(id);
        if (number != null && number > 0) {
            log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number);
            //扣库存,sql为:UPDATE seckill  SET number=number-1 WHERE id = #{id}
            seckillMapper.deductNumberById(id);
            //创建订单
            SuccessKilledModel killed = new SuccessKilledModel();
            killed.setSeckillId(id);
            killed.setUserId(userId);
            killed.setState((short)0);
            successKilledMapper.insert(killed);
            return BaseResponse.valueOfSuccess();
        } else {
            return BaseResponse.valueOfError(10010, "库存不足");
        }
    }

启动一千个线程,库存为100,最终结果是个很奇妙的现象:
在这里插入图片描述

库存数居然是1,然后在程序里面打的日志是这样的:

在这里插入图片描述

然后就有个疑问:加了锁居然还是无法保证数据的原子性?

后面查阅了mysql的默认隔离机制可重复读底层实现以及@Transaction注解的底层实现,原来根本原因是这样的:

我们代码里面,lock.unlock的操作是写在方法里面的finally语句的,
这时候可能我们解锁了,@Transaction标注的方法事务还没提交,
这就导致了另外一个线程执行select语句的时候读的是未更新前的数据,
至于后面的库存数为什么又是依次递减:那是因为事务已经提交了,undo log
里面的快照version已经更新了,后面读的都是前面更新了的快照,
那肯定就是依次递减啦,所以肯定会发生超卖一个的现象

mysql可重复读(RR)和读提交(RC)的隔离机制下,select语句用的是快照读,update,insert,delete默认使用当前读

3.mysql当前读和快照读底层

  • 当前读:

    select…lock in share mode (共享读锁)
      select…for update(排它锁)
      update , delete , insert(排它锁)

    当前读, 读取的是最新版本, 并且对读取的记录加锁, 阻塞其他事务同时改动相同记录,避免出现安全问题

    例如,假设要update一条记录,但是另一个事务已经delete这条数据并且commit了,如果不加锁就会产生冲突。所以update的时候肯定要是当前读,得到最新的信息并且锁定相应的记录

  • 快照读:

    单纯的select操作,不包括上述 select … lock in share mode, select … for update

    • Read Committed隔离级别:每次select都生成一个快照读。

    • Read Repeatable隔离级别:开启事务后第一个select语句才是快照读的地方,而不是一开启事务就快照读

  • 快照读的实现方式,undolog和多版本并发控制

    下图右侧绿色的是数据:一行数据记录,主键ID是10,name=‘Jack’,age=10, 被update更新set为name= ‘Tom’,age=23。

    事务会先使用“排他锁”锁定改行,将该行当前的值复制到undo log中,然后再真正地修改当前行的值,最后填写事务的DB_TRX_ID,使用回滚指针DB_ROLL_PTR指向undo log中修改前的行DB_ROW_ID

在这里插入图片描述

DB_TRX_ID: 6字节DB_TRX_ID字段,表示最后更新的事务id(update,delete,insert)。此外,删除在内部被视为更新,其中行中的特殊位被设置为将其标记为已软删除。

DB_ROLL_PTR: 7字节回滚指针,指向前一个版本的undolog记录,组成undo链表。如果更新了行,则撤消日志记录包含在更新行之前重建行内容所需的信息。
DB_ROW_ID: 6字节的DB_ROW_ID字段,包含一个随着新行插入而单调递增的行ID, 当由innodb自动产生聚集索引时,聚集索引会包括这个行ID的值,否则这个行ID不会出现在任何索引中。如果表中没有主键或合适的唯一索引, 也就是无法生成聚簇索引的时候, InnoDB会帮我们自动生成聚集索引, 聚簇索引会使用DB_ROW_ID的值来作为主键; 如果表中有主键或者合适的唯一索引, 那么聚簇索引中也就不会包含 DB_ROW_ID了 。

其它:insert undo log只在事务回滚时需要, 事务提交就可以删掉了。update undo log包括update 和 delete , 回滚和快照读 都需要

4.共享锁和排他锁(LOCK IN SHARE MODE和FOR UPDATE)

共享锁允许其他事务加共享锁读取,但是,不允许其他事务去做修改,或者加排它锁。而排它锁显得更加严格,不允许其他事务加共享锁或者排它锁,更加不允许其他事务修改加锁的行

5.解决方案1,select语句强制使用当前读(共享锁方式LOCK IN SHARE MODE)

    @Override
    //开启事务
    @Transactional(rollbackFor = Exception.class)
    public BaseResponse<?> startSeckillWithLock(Long id, Long userId) {
        lock.lock();
        try {
          return seckill(id, userId);
        } catch (Exception e) {
            throw e;
        } finally {
            lock.unlock();
        }
    }

    private BaseResponse<?> seckill(Long id, Long userId) {
        //获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id} LOCK IN SHARE MODE
        Integer number = seckillMapper.getNumberById(id);
        if (number != null && number > 0) {
            log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number);
            //扣库存
            seckillMapper.deductNumberById(id);
            //创建订单
            SuccessKilledModel killed = new SuccessKilledModel();
            killed.setSeckillId(id);
            killed.setUserId(userId);
            killed.setState((short)0);
            successKilledMapper.insert(killed);
            return BaseResponse.valueOfSuccess();
        } else {
            return BaseResponse.valueOfError(10010, "库存不足");
        }
    }

这种方式的原理:

方法中加了程序锁ReeTranlock,就是在执行完所有代码前只允许一个线程去跑
(但这里也没有保证unlock执行前去提交事务),事务中select语句加上LOCK IN 
SHARE MODE代码这行数据加了共享锁,然后下面执行了update语句这时候又给语句
加上了排他锁,排它锁的功能就是保证了其他事务既不能读也不能写这行数据,所以下
个事务进来执行SELECT number FROM seckill WHERE id = #{id} LOCK IN 
SHARE MODE是要等待的,这时候就保证了获取的数据是最新的数据
  • 然后我们再测试一种死锁的现象,我们把程序锁去掉,程序如下:

        @Override
        //开启事务
        @Transactional(rollbackFor = Exception.class)
        public BaseResponse<?> startSeckillWithLock(Long id, Long userId) {
            //lock.lock();
            try {
              return seckill(id, userId);
            } catch (Exception e) {
                throw e;
            } finally {
                //lock.unlock();
            }
        }
    
        private BaseResponse<?> seckill(Long id, Long userId) {
            //获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id} LOCK IN SHARE MODE
            Integer number = seckillMapper.getNumberById(id);
            if (number != null && number > 0) {
                log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number);
                //扣库存
                seckillMapper.deductNumberById(id);
                //创建订单
                SuccessKilledModel killed = new SuccessKilledModel();
                killed.setSeckillId(id);
                killed.setUserId(userId);
                killed.setState((short)0);
                successKilledMapper.insert(killed);
                return BaseResponse.valueOfSuccess();
            } else {
                return BaseResponse.valueOfError(10010, "库存不足");
            }
        }
    

    结果会报死锁的问题:

    下面拿一个例子来解析一下产生死锁的原因,使用lock in share mode具有很高的风险,看下面的案例:

    session 1:

    set autocommit = 0;
    select * from tb_test where id = 1 lock in share mode;
    

    open session2:

    set autocommit = 0;
    select * from tb_test where id = 1 lock in share mode;
    

    这个时候两个session同时持有id = 1这行数据的共享锁。这个时候我们在session 1里面执行update操作:

    session 1:

    update tb_test set col1 = 'AAA' where id = 1;
    

    结果卡住了:这个时候session1必须等待session2退出事务或者等待直到锁超时:

    锁超时的情况:
    
    ERROR 1205 (HY000): Lock wait timeout exceeded; try restarting 
    transaction
    

    接着 我们再session2里面执行

    session2:

    update tb_test set col1 = 'BBB' where id = 1;
    

    结果直接报错:

    ERROR 1213 (40001): Deadlock found when trying to get lock
    ; try restarting transaction
    

    这个时候mysql检测到会发生死锁,会中断当前事务该语句的执行,重新开启一个新的事务(应该就是相当于session2先退出事务,然后再开启一个事务)。

  • 总结:

Lock in share mode使用不当很容易照成死锁,就是在两个事务同时都有select
和update语句的时候,第一个事务select或者共享锁,第二个事务启动执行select
也获得共享锁,然后第一个事务执行update获取排他锁要等待第二个事务结束,因为
共享锁允许其他事务加共享锁读取,但是,不允许其他事务去做修改,或者加排它锁,
接着第二个事务又加了排它锁,又要等待第一个事务解锁,就照成了一个相互等待
的现象

6.解决方案2,select语句强制使用当前读(排它锁方式FOR UPDATE)(最优解)

  • 代码:

        @Override
        //开启事务
        @Transactional(rollbackFor = Exception.class)
        public BaseResponse<?> startSeckillWithLock(Long id, Long userId) {
            //lock.lock();
            try {
              return seckill(id, userId);
            } catch (Exception e) {
                throw e;
            } finally {
                //lock.unlock();
            }
        }
    
        private BaseResponse<?> seckill(Long id, Long userId) {
            //获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id} FOR UPDATE
            Integer number = seckillMapper.getNumberById(id);
            if (number != null && number > 0) {
                log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number);
                //扣库存
                seckillMapper.deductNumberById(id);
                //创建订单
                SuccessKilledModel killed = new SuccessKilledModel();
                killed.setSeckillId(id);
                killed.setUserId(userId);
                killed.setState((short)0);
                successKilledMapper.insert(killed);
                return BaseResponse.valueOfSuccess();
            } else {
                return BaseResponse.valueOfError(10010, "库存不足");
            }
        }
    

    这种方式没有加程序锁,用一个FOR UPDATE搞定

    原理:

    for update会使用排它锁,排它锁不允许其他事务加共享锁或者排它锁,更加
    不允许其他事务修改加锁的行,所以结合我们代码,select语句加了排它锁,那么
    其他事务过来执行SELECT number FROM seckill WHERE id = #{id}
     LOCK IN SHARE MODE是要等待的,直到我们第一个事务执行完了
     update后提交了事务,这时候下一个事务的select语句才开始执行,这时候
     快照里面的行数据已经是最新的了
    

7.解决方案3,加redis分布式锁(redisson实现)及aop实现(程序锁也可以)

先解释为什么要用aop:

分布式锁内部都是Runtime.exe命令调用外部,肯定是异步的。分布式锁的释放只是
发了一个锁释放命令就算完活了。真正其作用的是下次获取锁的时候,要确保上次是
释放了的。然后如果加锁和解锁请求放在service层方法,那么如果锁释放完了,事务
有可能还没提交,这时候其他事务就已经执行select语句生成快照了,然后读的还是
历史的undo中的记录,所以就会发生超卖一条的现象,放在aop使用环绕通知就可以
在方法执行前和结束后进行加锁和释放锁,确保锁的释放是在事务提交之后

redis配置:

spring:
  redis:
    host: localhost
    port: 6379
    password:
    timeout: 10000
    database: 0

redisson:
  connectionPoolSize: 64
  timeOut: 5000
  connectionMinimumIdleSize: 10
package com.qsk.seckill.common.config;

import com.qsk.seckill.common.properties.RedissonProperties;
import com.qsk.seckill.common.utils.RedissonLockUtil;
import org.apache.commons.lang3.StringUtils;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@EnableConfigurationProperties(RedissonProperties.class)
@Configuration
public class RedisConfig {

    @Autowired
    private RedisProperties springProperties;

    @Autowired
    private RedissonProperties redissonProperties;

    @Bean
    RedissonClient redissonSingle() {
        Config config = new Config();
        SingleServerConfig serverConfig = config.useSingleServer()
                .setAddress(String.format("redis://%s:%s"
                        , springProperties.getHost()+"", springProperties.getPort()+""))
                .setTimeout(redissonProperties.getTimeOut())
                .setConnectionPoolSize(redissonProperties.getConnectionPoolSize())
                .setConnectionMinimumIdleSize(redissonProperties.getConnectionMinimumIdleSize());
        if(StringUtils.isNotBlank(springProperties.getPassword())) {
            serverConfig.setPassword(springProperties.getPassword());
        }
        RedissonClient redissonClient = Redisson.create(config);
        RedissonLockUtil.setRedissonClient(redissonClient);
        return redissonClient;
    }
}
package com.qsk.seckill.common.properties;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "redisson")
@Data
public class RedissonProperties {
    private int timeOut;
    private int connectionPoolSize;
    private int connectionMinimumIdleSize;

}

RedissonLockUtil工具类实现:

package com.qsk.seckill.common.utils;

import org.redisson.api.RLock;
import org.redisson.api.RMapCache;
import org.redisson.api.RedissonClient;

import java.util.concurrent.TimeUnit;

public class RedissonLockUtil {
    private static RedissonClient redissonClient;

    public static void setRedissonClient(RedissonClient locker) {
        redissonClient = locker;
    }

    /**
     * 加锁
     * @param lockKey
     * @return
     */
    public static RLock lock(String lockKey) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.lock();
        return lock;
    }

    /**
     * 释放锁
     * @param lockKey
     */
    public static void unlock(String lockKey) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.unlock();
    }

    /**
     * 释放锁
     * @param lock
     */
    public static void unlock(RLock lock) {
        lock.unlock();
    }

    /**
     * 带超时的锁
     * @param lockKey
     * @param timeout 超时时间   单位:秒
     */
    public static RLock lock(String lockKey, int timeout) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.lock(timeout, TimeUnit.SECONDS);
        return lock;
    }

    /**
     * 带超时的锁
     * @param lockKey
     * @param unit 时间单位
     * @param timeout 超时时间
     */
    public static RLock lock(String lockKey, TimeUnit unit ,int timeout) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.lock(timeout, unit);
        return lock;
    }

    /**
     * 尝试获取锁
     * @param lockKey
     * @param waitTime 最多等待时间
     * @param leaseTime 上锁后自动释放锁时间
     * @return
     */
    public static boolean tryLock(String lockKey, int waitTime, int leaseTime) {
        RLock lock = redissonClient.getLock(lockKey);
        try {
            return lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            return false;
        }
    }

    /**
     * 尝试获取锁
     * @param lockKey
     * @param unit 时间单位
     * @param waitTime 最多等待时间
     * @param leaseTime 上锁后自动释放锁时间
     * @return
     */
    public static boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime) {
        RLock lock = redissonClient.getLock(lockKey);
        try {
            return lock.tryLock(waitTime, leaseTime, unit);
        } catch (InterruptedException e) {
            return false;
        }
    }

    /**
     * 初始红包数量
     * @param key
     * @param count
     */
    public void initCount(String key,int count) {
        RMapCache<String, Integer> mapCache = redissonClient.getMapCache("skill");
        mapCache.putIfAbsent(key,count,3,TimeUnit.DAYS);
    }
    /**
     * 递增
     * @param key
     * @param delta 要增加几(大于0)
     * @return
     */
    public int incr(String key, int delta) {
        RMapCache<String, Integer> mapCache = redissonClient.getMapCache("skill");
        if (delta < 0) {
            throw new RuntimeException("递增因子必须大于0");
        }
        return  mapCache.addAndGet(key, 1);//加1并获取计算后的值
    }

    /**
     * 递减
     * @param key 键
     * @param delta 要减少几(小于0)
     * @return
     */
    public int decr(String key, int delta) {
        RMapCache<String, Integer> mapCache = redissonClient.getMapCache("skill");
        if (delta < 0) {
            throw new RuntimeException("递减因子必须大于0");
        }
        return mapCache.addAndGet(key, -delta);//加1并获取计算后的值
    }
}

aop实现:自定义注解加Aspect

@Target({ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ServiceRedisLock {
    String description()  default "";
}
package com.qsk.seckill.common.aop;

import com.qsk.seckill.common.utils.RedissonLockUtil;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.context.annotation.Scope;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@Component
@Scope
@Aspect
@Order(1)
@Slf4j
//order越小越是最先执行,但更重要的是最先执行的最后结束。order默认值是2147483647
public class LockAspect {
    /**
     * 思考:为什么不用synchronized
     * service 默认是单例的,并发下lock只有一个实例
     */
    private static Lock lock = new ReentrantLock(true);//互斥锁 参数默认false,不公平锁

    //Service层切点
    @Pointcut("@annotation(com.qsk.seckill.common.annotation.Servicelock)")
    public void lockAspect() {
    }

    @Pointcut("@annotation(com.qsk.seckill.common.annotation.ServiceRedisLock)")
    public void redisLockAspect() {}

    @Around("lockAspect()")
    public Object around(ProceedingJoinPoint joinPoint) {
        lock.lock();
        Object obj = null;
        try {
            obj = joinPoint.proceed();
        } catch (Throwable e) {
            throw new RuntimeException(e);
        } finally{
            lock.unlock();
        }
        return obj;
    }

      @Around("redisLockAspect()")
    public Object aroundRedis(ProceedingJoinPoint joinPoint) {
        Object obj = null;
        String lockKey = null;
        boolean res = false;
        try {
            Method method = getMethod(joinPoint);
            Parameter[] parameters = method.getParameters();
            Parameter param = Arrays.stream(parameters).filter(para -> para.getName().equals("seckillId")).findAny().orElse(null);
            if (param == null) {
                throw new RuntimeException("请求入参错误,无参数名为seckillId的入参");
            }
            lockKey = param.getName();
            res = RedissonLockUtil.tryLock(lockKey + "", TimeUnit.SECONDS, 10, 20);
            if (!res) {
                throw new RuntimeException("获取锁失败");
            }
            obj = joinPoint.proceed();
        } catch (Throwable e) {
            throw new RuntimeException(e);
        } finally{
            if (lockKey != null && res) {
                RedissonLockUtil.unlock(lockKey);
            }
        }
        return obj;
    }

    private Method getMethod(JoinPoint pjp) {
        Method method = null;
        MethodSignature signature = (MethodSignature) pjp.getSignature();
        method = signature.getMethod();
        return method;
    }
}

service层实现:

@Override
@ServiceRedisLock
@Transactional(rollbackFor = Exception.class)
public BaseResponse<?> startSeckilRedisLock(long seckillId, long userId) {
    return seckill(seckillId, userId);
}

private BaseResponse<?> seckill(Long id, Long userId) {
    //获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id}
    Integer number = seckillMapper.getNumberById(id);
    if (number != null && number > 0) {
        log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number);
        //扣库存
        seckillMapper.deductNumberById(id);
        //创建订单
        SuccessKilledModel killed = new SuccessKilledModel();
        killed.setSeckillId(id);
        killed.setUserId(userId);
        killed.setState((short) 0);
        successKilledMapper.insert(killed);
        return BaseResponse.valueOfSuccess();
    } else {
        return BaseResponse.valueOfError(10010, "库存不足");
    }
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐