高并发业务代码中一个事务中同时使用select和update的场景深度剖析
1.秒杀代码(最low实现)://开启事务@Transactional(rollbackFor = Exception.class)public BaseResponse<?> startSeckillWithLock(Long id, Long userId) {try {return seckill(id, userId);} catch (Exception e) {throw
1.秒杀代码(最low实现):
//开启事务
@Transactional(rollbackFor = Exception.class)
public BaseResponse<?> startSeckillWithLock(Long id, Long userId) {
try {
return seckill(id, userId);
} catch (Exception e) {
throw e;
} finally {
}
}
private BaseResponse<?> seckill(Long id, Long userId) {
//获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id}
Integer number = seckillMapper.getNumberById(id);
if (number != null && number > 0) {
log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number);
//扣库存,sql为:UPDATE seckill SET number=number-1 WHERE id = #{id}
seckillMapper.deductNumberById(id);
//创建订单
SuccessKilledModel killed = new SuccessKilledModel();
killed.setSeckillId(id);
killed.setUserId(userId);
killed.setState((short)0);
successKilledMapper.insert(killed);
return BaseResponse.valueOfSuccess();
} else {
return BaseResponse.valueOfError(10010, "库存不足");
}
}
这种多个线程执行肯定会发生超卖的现象:然后第一种解决方案是加ReenTranlock程序锁如下:
2.秒杀加程序锁实现(会出现超卖一个的现象)
@Override
//开启事务
@Transactional(rollbackFor = Exception.class)
public BaseResponse<?> startSeckillWithLock(Long id, Long userId) {
lock.lock();
try {
return seckill(id, userId);
} catch (Exception e) {
throw e;
} finally {
lock.unlock();
}
}
private BaseResponse<?> seckill(Long id, Long userId) {
//获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id}
Integer number = seckillMapper.getNumberById(id);
if (number != null && number > 0) {
log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number);
//扣库存,sql为:UPDATE seckill SET number=number-1 WHERE id = #{id}
seckillMapper.deductNumberById(id);
//创建订单
SuccessKilledModel killed = new SuccessKilledModel();
killed.setSeckillId(id);
killed.setUserId(userId);
killed.setState((short)0);
successKilledMapper.insert(killed);
return BaseResponse.valueOfSuccess();
} else {
return BaseResponse.valueOfError(10010, "库存不足");
}
}
启动一千个线程,库存为100,最终结果是个很奇妙的现象:
库存数居然是1,然后在程序里面打的日志是这样的:
然后就有个疑问:加了锁居然还是无法保证数据的原子性?
后面查阅了mysql的默认隔离机制可重复读底层实现以及@Transaction注解的底层实现,原来根本原因是这样的:
我们代码里面,lock.unlock的操作是写在方法里面的finally语句的,
这时候可能我们解锁了,@Transaction标注的方法事务还没提交,
这就导致了另外一个线程执行select语句的时候读的是未更新前的数据,
至于后面的库存数为什么又是依次递减:那是因为事务已经提交了,undo log
里面的快照version已经更新了,后面读的都是前面更新了的快照,
那肯定就是依次递减啦,所以肯定会发生超卖一个的现象
mysql可重复读(RR)和读提交(RC)的隔离机制下,select语句用的是快照读,update,insert,delete默认使用当前读
3.mysql当前读和快照读底层
-
当前读:
select…lock in share mode (共享读锁)
select…for update(排它锁)
update , delete , insert(排它锁)当前读, 读取的是最新版本, 并且对读取的记录加锁, 阻塞其他事务同时改动相同记录,避免出现安全问题。
例如,假设要update一条记录,但是另一个事务已经delete这条数据并且commit了,如果不加锁就会产生冲突。所以update的时候肯定要是当前读,得到最新的信息并且锁定相应的记录
-
快照读:
单纯的select操作,不包括上述 select … lock in share mode, select … for update
-
Read Committed隔离级别:每次select都生成一个快照读。
-
Read Repeatable隔离级别:开启事务后第一个select语句才是快照读的地方,而不是一开启事务就快照读
-
-
快照读的实现方式,undolog和多版本并发控制
下图右侧绿色的是数据:一行数据记录,主键ID是10,name=‘Jack’,age=10, 被update更新set为name= ‘Tom’,age=23。
事务会先使用“排他锁”锁定改行,将该行当前的值复制到undo log中,然后再真正地修改当前行的值,最后填写事务的DB_TRX_ID,使用回滚指针DB_ROLL_PTR指向undo log中修改前的行DB_ROW_ID
DB_TRX_ID: 6字节DB_TRX_ID
字段,表示最后更新的事务id(update,delete,insert)。此外,删除在内部被视为更新,其中行中的特殊位被设置为将其标记为已软删除。
DB_ROLL_PTR: 7字节回滚指针,指向前一个版本的undolog记录,组成undo链表。如果更新了行,则撤消日志记录包含在更新行之前重建行内容所需的信息。
DB_ROW_ID: 6字节的DB_ROW_ID字段,包含一个随着新行插入而单调递增的行ID, 当由innodb自动产生聚集索引时,聚集索引会包括这个行ID的值,否则这个行ID不会出现在任何索引中。如果表中没有主键或合适的唯一索引, 也就是无法生成聚簇索引的时候, InnoDB会帮我们自动生成聚集索引, 聚簇索引会使用DB_ROW_ID的值来作为主键; 如果表中有主键或者合适的唯一索引, 那么聚簇索引中也就不会包含 DB_ROW_ID了 。
其它:insert undo log只在事务回滚时需要, 事务提交就可以删掉了。update undo log包括update 和 delete , 回滚和快照读 都需要
4.共享锁和排他锁(LOCK IN SHARE MODE和FOR UPDATE)
共享锁允许其他事务加共享锁读取,但是,不允许其他事务去做修改,或者加排它锁。而排它锁显得更加严格,不允许其他事务加共享锁或者排它锁,更加不允许其他事务修改加锁的行
5.解决方案1,select语句强制使用当前读(共享锁方式LOCK IN SHARE MODE)
@Override
//开启事务
@Transactional(rollbackFor = Exception.class)
public BaseResponse<?> startSeckillWithLock(Long id, Long userId) {
lock.lock();
try {
return seckill(id, userId);
} catch (Exception e) {
throw e;
} finally {
lock.unlock();
}
}
private BaseResponse<?> seckill(Long id, Long userId) {
//获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id} LOCK IN SHARE MODE
Integer number = seckillMapper.getNumberById(id);
if (number != null && number > 0) {
log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number);
//扣库存
seckillMapper.deductNumberById(id);
//创建订单
SuccessKilledModel killed = new SuccessKilledModel();
killed.setSeckillId(id);
killed.setUserId(userId);
killed.setState((short)0);
successKilledMapper.insert(killed);
return BaseResponse.valueOfSuccess();
} else {
return BaseResponse.valueOfError(10010, "库存不足");
}
}
这种方式的原理:
方法中加了程序锁ReeTranlock,就是在执行完所有代码前只允许一个线程去跑
(但这里也没有保证unlock执行前去提交事务),事务中select语句加上LOCK IN
SHARE MODE代码这行数据加了共享锁,然后下面执行了update语句这时候又给语句
加上了排他锁,排它锁的功能就是保证了其他事务既不能读也不能写这行数据,所以下
个事务进来执行SELECT number FROM seckill WHERE id = #{id} LOCK IN
SHARE MODE是要等待的,这时候就保证了获取的数据是最新的数据
-
然后我们再测试一种死锁的现象,我们把程序锁去掉,程序如下:
@Override //开启事务 @Transactional(rollbackFor = Exception.class) public BaseResponse<?> startSeckillWithLock(Long id, Long userId) { //lock.lock(); try { return seckill(id, userId); } catch (Exception e) { throw e; } finally { //lock.unlock(); } } private BaseResponse<?> seckill(Long id, Long userId) { //获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id} LOCK IN SHARE MODE Integer number = seckillMapper.getNumberById(id); if (number != null && number > 0) { log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number); //扣库存 seckillMapper.deductNumberById(id); //创建订单 SuccessKilledModel killed = new SuccessKilledModel(); killed.setSeckillId(id); killed.setUserId(userId); killed.setState((short)0); successKilledMapper.insert(killed); return BaseResponse.valueOfSuccess(); } else { return BaseResponse.valueOfError(10010, "库存不足"); } }
结果会报死锁的问题:
下面拿一个例子来解析一下产生死锁的原因,使用lock in share mode具有很高的风险,看下面的案例:
session 1:
set autocommit = 0; select * from tb_test where id = 1 lock in share mode;
open session2:
set autocommit = 0; select * from tb_test where id = 1 lock in share mode;
这个时候两个session同时持有
id = 1
这行数据的共享锁。这个时候我们在session 1里面执行update操作:session 1:
update tb_test set col1 = 'AAA' where id = 1;
结果卡住了:这个时候session1必须等待session2退出事务或者等待直到锁超时:
锁超时的情况: ERROR 1205 (HY000): Lock wait timeout exceeded; try restarting transaction
接着 我们再session2里面执行
session2:
update tb_test set col1 = 'BBB' where id = 1;
结果直接报错:
ERROR 1213 (40001): Deadlock found when trying to get lock ; try restarting transaction
这个时候mysql检测到会发生死锁,会中断当前事务该语句的执行,重新开启一个新的事务(应该就是相当于session2先退出事务,然后再开启一个事务)。
-
总结:
Lock in share mode使用不当很容易照成死锁,就是在两个事务同时都有select
和update语句的时候,第一个事务select或者共享锁,第二个事务启动执行select
也获得共享锁,然后第一个事务执行update获取排他锁要等待第二个事务结束,因为
共享锁允许其他事务加共享锁读取,但是,不允许其他事务去做修改,或者加排它锁,
接着第二个事务又加了排它锁,又要等待第一个事务解锁,就照成了一个相互等待
的现象
6.解决方案2,select语句强制使用当前读(排它锁方式FOR UPDATE)(最优解)
-
代码:
@Override //开启事务 @Transactional(rollbackFor = Exception.class) public BaseResponse<?> startSeckillWithLock(Long id, Long userId) { //lock.lock(); try { return seckill(id, userId); } catch (Exception e) { throw e; } finally { //lock.unlock(); } } private BaseResponse<?> seckill(Long id, Long userId) { //获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id} FOR UPDATE Integer number = seckillMapper.getNumberById(id); if (number != null && number > 0) { log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number); //扣库存 seckillMapper.deductNumberById(id); //创建订单 SuccessKilledModel killed = new SuccessKilledModel(); killed.setSeckillId(id); killed.setUserId(userId); killed.setState((short)0); successKilledMapper.insert(killed); return BaseResponse.valueOfSuccess(); } else { return BaseResponse.valueOfError(10010, "库存不足"); } }
这种方式没有加程序锁,用一个FOR UPDATE搞定
原理:
for update会使用排它锁,排它锁不允许其他事务加共享锁或者排它锁,更加 不允许其他事务修改加锁的行,所以结合我们代码,select语句加了排它锁,那么 其他事务过来执行SELECT number FROM seckill WHERE id = #{id} LOCK IN SHARE MODE是要等待的,直到我们第一个事务执行完了 update后提交了事务,这时候下一个事务的select语句才开始执行,这时候 快照里面的行数据已经是最新的了
7.解决方案3,加redis分布式锁(redisson实现)及aop实现(程序锁也可以)
先解释为什么要用aop:
分布式锁内部都是Runtime.exe命令调用外部,肯定是异步的。分布式锁的释放只是
发了一个锁释放命令就算完活了。真正其作用的是下次获取锁的时候,要确保上次是
释放了的。然后如果加锁和解锁请求放在service层方法,那么如果锁释放完了,事务
有可能还没提交,这时候其他事务就已经执行select语句生成快照了,然后读的还是
历史的undo中的记录,所以就会发生超卖一条的现象,放在aop使用环绕通知就可以
在方法执行前和结束后进行加锁和释放锁,确保锁的释放是在事务提交之后
redis配置:
spring:
redis:
host: localhost
port: 6379
password:
timeout: 10000
database: 0
redisson:
connectionPoolSize: 64
timeOut: 5000
connectionMinimumIdleSize: 10
package com.qsk.seckill.common.config;
import com.qsk.seckill.common.properties.RedissonProperties;
import com.qsk.seckill.common.utils.RedissonLockUtil;
import org.apache.commons.lang3.StringUtils;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@EnableConfigurationProperties(RedissonProperties.class)
@Configuration
public class RedisConfig {
@Autowired
private RedisProperties springProperties;
@Autowired
private RedissonProperties redissonProperties;
@Bean
RedissonClient redissonSingle() {
Config config = new Config();
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress(String.format("redis://%s:%s"
, springProperties.getHost()+"", springProperties.getPort()+""))
.setTimeout(redissonProperties.getTimeOut())
.setConnectionPoolSize(redissonProperties.getConnectionPoolSize())
.setConnectionMinimumIdleSize(redissonProperties.getConnectionMinimumIdleSize());
if(StringUtils.isNotBlank(springProperties.getPassword())) {
serverConfig.setPassword(springProperties.getPassword());
}
RedissonClient redissonClient = Redisson.create(config);
RedissonLockUtil.setRedissonClient(redissonClient);
return redissonClient;
}
}
package com.qsk.seckill.common.properties;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "redisson")
@Data
public class RedissonProperties {
private int timeOut;
private int connectionPoolSize;
private int connectionMinimumIdleSize;
}
RedissonLockUtil工具类实现:
package com.qsk.seckill.common.utils;
import org.redisson.api.RLock;
import org.redisson.api.RMapCache;
import org.redisson.api.RedissonClient;
import java.util.concurrent.TimeUnit;
public class RedissonLockUtil {
private static RedissonClient redissonClient;
public static void setRedissonClient(RedissonClient locker) {
redissonClient = locker;
}
/**
* 加锁
* @param lockKey
* @return
*/
public static RLock lock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock();
return lock;
}
/**
* 释放锁
* @param lockKey
*/
public static void unlock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
lock.unlock();
}
/**
* 释放锁
* @param lock
*/
public static void unlock(RLock lock) {
lock.unlock();
}
/**
* 带超时的锁
* @param lockKey
* @param timeout 超时时间 单位:秒
*/
public static RLock lock(String lockKey, int timeout) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock(timeout, TimeUnit.SECONDS);
return lock;
}
/**
* 带超时的锁
* @param lockKey
* @param unit 时间单位
* @param timeout 超时时间
*/
public static RLock lock(String lockKey, TimeUnit unit ,int timeout) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock(timeout, unit);
return lock;
}
/**
* 尝试获取锁
* @param lockKey
* @param waitTime 最多等待时间
* @param leaseTime 上锁后自动释放锁时间
* @return
*/
public static boolean tryLock(String lockKey, int waitTime, int leaseTime) {
RLock lock = redissonClient.getLock(lockKey);
try {
return lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS);
} catch (InterruptedException e) {
return false;
}
}
/**
* 尝试获取锁
* @param lockKey
* @param unit 时间单位
* @param waitTime 最多等待时间
* @param leaseTime 上锁后自动释放锁时间
* @return
*/
public static boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime) {
RLock lock = redissonClient.getLock(lockKey);
try {
return lock.tryLock(waitTime, leaseTime, unit);
} catch (InterruptedException e) {
return false;
}
}
/**
* 初始红包数量
* @param key
* @param count
*/
public void initCount(String key,int count) {
RMapCache<String, Integer> mapCache = redissonClient.getMapCache("skill");
mapCache.putIfAbsent(key,count,3,TimeUnit.DAYS);
}
/**
* 递增
* @param key
* @param delta 要增加几(大于0)
* @return
*/
public int incr(String key, int delta) {
RMapCache<String, Integer> mapCache = redissonClient.getMapCache("skill");
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return mapCache.addAndGet(key, 1);//加1并获取计算后的值
}
/**
* 递减
* @param key 键
* @param delta 要减少几(小于0)
* @return
*/
public int decr(String key, int delta) {
RMapCache<String, Integer> mapCache = redissonClient.getMapCache("skill");
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return mapCache.addAndGet(key, -delta);//加1并获取计算后的值
}
}
aop实现:自定义注解加Aspect
@Target({ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ServiceRedisLock {
String description() default "";
}
package com.qsk.seckill.common.aop;
import com.qsk.seckill.common.utils.RedissonLockUtil;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.context.annotation.Scope;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Component
@Scope
@Aspect
@Order(1)
@Slf4j
//order越小越是最先执行,但更重要的是最先执行的最后结束。order默认值是2147483647
public class LockAspect {
/**
* 思考:为什么不用synchronized
* service 默认是单例的,并发下lock只有一个实例
*/
private static Lock lock = new ReentrantLock(true);//互斥锁 参数默认false,不公平锁
//Service层切点
@Pointcut("@annotation(com.qsk.seckill.common.annotation.Servicelock)")
public void lockAspect() {
}
@Pointcut("@annotation(com.qsk.seckill.common.annotation.ServiceRedisLock)")
public void redisLockAspect() {}
@Around("lockAspect()")
public Object around(ProceedingJoinPoint joinPoint) {
lock.lock();
Object obj = null;
try {
obj = joinPoint.proceed();
} catch (Throwable e) {
throw new RuntimeException(e);
} finally{
lock.unlock();
}
return obj;
}
@Around("redisLockAspect()")
public Object aroundRedis(ProceedingJoinPoint joinPoint) {
Object obj = null;
String lockKey = null;
boolean res = false;
try {
Method method = getMethod(joinPoint);
Parameter[] parameters = method.getParameters();
Parameter param = Arrays.stream(parameters).filter(para -> para.getName().equals("seckillId")).findAny().orElse(null);
if (param == null) {
throw new RuntimeException("请求入参错误,无参数名为seckillId的入参");
}
lockKey = param.getName();
res = RedissonLockUtil.tryLock(lockKey + "", TimeUnit.SECONDS, 10, 20);
if (!res) {
throw new RuntimeException("获取锁失败");
}
obj = joinPoint.proceed();
} catch (Throwable e) {
throw new RuntimeException(e);
} finally{
if (lockKey != null && res) {
RedissonLockUtil.unlock(lockKey);
}
}
return obj;
}
private Method getMethod(JoinPoint pjp) {
Method method = null;
MethodSignature signature = (MethodSignature) pjp.getSignature();
method = signature.getMethod();
return method;
}
}
service层实现:
@Override
@ServiceRedisLock
@Transactional(rollbackFor = Exception.class)
public BaseResponse<?> startSeckilRedisLock(long seckillId, long userId) {
return seckill(seckillId, userId);
}
private BaseResponse<?> seckill(Long id, Long userId) {
//获取库存数sql语句为:SELECT number FROM seckill WHERE id = #{id}
Integer number = seckillMapper.getNumberById(id);
if (number != null && number > 0) {
log.info("当前线程:{},用户:{},当前库存{}", Thread.currentThread().getId(), userId, number);
//扣库存
seckillMapper.deductNumberById(id);
//创建订单
SuccessKilledModel killed = new SuccessKilledModel();
killed.setSeckillId(id);
killed.setUserId(userId);
killed.setState((short) 0);
successKilledMapper.insert(killed);
return BaseResponse.valueOfSuccess();
} else {
return BaseResponse.valueOfError(10010, "库存不足");
}
}
更多推荐
所有评论(0)