利用Redis分布式锁解决集群服务器定时任务重复执行问题
问题描述将带有定时任务的项目部署在单台机器上,完全没问题。但实际生产是两台集群服务器,项目部署上去发现定时任务的模块同时在两台机器上各执行了一遍,这将会导致数据问题。方案选择首先我想到了利用数据库的行锁来解决这个问题,发现需要改动原本的代码,且如果每增加一个任务在开发的时候都要注意行锁问题,是很繁琐的。因此PASS掉,然后还有一种方式是,只让任务在其中一台机器上执行,但是这种方式,过于依赖机器,如
问题描述
将带有定时任务的项目部署在单台机器上,完全没问题。
但实际生产是两台集群服务器,项目部署上去发现定时任务的模块同时在两台机器上各执行了一遍,这将会导致数据问题。
方案选择
首先我想到了利用数据库的行锁来解决这个问题,发现需要改动原本的代码,且如果每增加一个任务在开发的时候都要注意行锁问题,是很繁琐的。因此PASS掉,然后还有一种方式是,只让任务在其中一台机器上执行,但是这种方式,过于依赖机器,如果当前执行任务的机器宕机了,就会有问题。
最后考虑使用redis的分布式锁来解决这个问题,同时我希望在改动最小的情况下,给关键的任务都加上该锁,因此利用AOP面向切面的编程思想,将加锁部分抽象成一个切面,并利用自定义注解。以下是我的初版代码。
解决步骤
(1)创建一个切面类
/**
* redis 分布式锁
*/
@Aspect
@Slf4j
@Component
public class CacheLockAspect {
@Resource
private RedisUtil redisUtil;
/**
* 分布式锁的key
*/
private static final String KEY_PREFIX_LOCK = "CACHE_LOCK_ASPECT:";
@Around("@annotation(com.jd.fp.mrp.aspect.CacheLock)")
public void cacheLockPoint(ProceedingJoinPoint pjp) {
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method cacheMethod = signature.getMethod();
if(null == cacheMethod){
log.info("未获取到使用方法 pjp: {}",pjp);
return;
}
String lockKey = cacheMethod.getAnnotation(CacheLock.class).lockedKey();
long timeOut = cacheMethod.getAnnotation(CacheLock.class).expireTime();
boolean release = cacheMethod.getAnnotation(CacheLock.class).release();
if(StringUtils.isBlank(lockKey)){
log.error("method:{}, 锁名称为空!",cacheMethod);
return;
}
try {
if (redisUtil.setNx(KEY_PREFIX_LOCK+lockKey, lockKey, timeOut, TimeUnit.SECONDS)){
redisUtil.expire(KEY_PREFIX_LOCK+lockKey, timeOut, TimeUnit.SECONDS);
log.info("method:{} 获取锁:{},开始运行!",cacheMethod,KEY_PREFIX_LOCK+lockKey);
pjp.proceed();
return;
}
log.info("method:{} 未获取锁:{},运行失败!",cacheMethod,KEY_PREFIX_LOCK+lockKey);
//不需要释放锁
release = false;
} catch (Throwable e) {
log.error("method:{},运行错误!",cacheMethod,e);
}finally {
if(release){
log.info("method:{} 执行完成释放锁:{}",cacheMethod,KEY_PREFIX_LOCK+lockKey);
redisUtil.del(KEY_PREFIX_LOCK+lockKey);
}
}
}
}
(2)自定义注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {
String lockedKey() default ""; //redis锁key
long expireTime() default 10; //key在redis里存在的时间 单位:秒
boolean release() default true; //是否在方法执行完成之后释放锁
}
(3)测试服务类
//测试服务接口
public interface TestService {
void testAspect(String name);
}
//测试服务类
@Slf4j
@Service
public class TestServiceImpl implements TestService {
@Override
@CacheLock(lockedKey = "CacheLockAspectTest", expireTime = 10)
public void testAspect(String name) {
log.info("任务:"+ name +"方法获取到锁了!时间:"+ new Date());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("任务:"+ name +"方法执行完成了!"+"时间:"+ new Date());
}
}
(4)测试类
/**
* redis锁切面测试
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MrpWebApplication.class})
@Slf4j
public class CacheLockAspectTest {
@Resource
private TestService testService;
/**
* 测试多线程情况下,只有一个线程可以获取到锁并执行方法
*/
@Test
public void test1(){
Runnable runnable = new Runnable() {
@Override
public void run() {
log.info("任务"+Thread.currentThread().getName()+"执行开始时间:"+ new Date());
testService.testAspect("任务"+Thread.currentThread().getName());
log.info("任务"+Thread.currentThread().getName()+"执行结束时间:"+ new Date());
}
};
Thread thread1 = new Thread(runnable);
Thread thread2 = new Thread(runnable);
thread1.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.error(t.getName()+"发生异常"+e.getMessage());
}
});
thread2.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.error(t.getName()+"发生异常"+e.getMessage());
}
});
thread1.start();
thread2.start();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 测试当release = true时方法执行完成会自动释放锁
* 测试当release = false时方法执行完成,如果还未到锁失效时间,会使其他任务执行方法失败
*/
@Test
public void test2(){
testService.testAspect("任务一");
testService.testAspect("任务二");
}
}
结果验证
注:为了展示测试将日志打印做了微调
//修改前
log.info("method:{} 获取锁:{},开始运行!",cacheMethod,KEY_PREFIX_LOCK+lockKey);
//修改后
log.info("method:{} 获取锁:{},开始运行!",Thread.currentThread(),KEY_PREFIX_LOCK+lockKey);
test1()结果:可以看到线程37和38同时执行该方法,37获取到锁了,38未获取到锁,37执行5秒之后将锁释放。
test2():
首先令release = true,且设置锁占用时间为10s,大于方法执行时间,可以看到执行日志方法1执行了5s然后释放了锁,紧接着任务二开始执行。
首先令release = false,且设置锁占用时间为10s,大于方法执行时间,可以看到执行日志方法1执行了5s但是锁一直没释放,紧接着任务二想要执行方法,没有获取到锁就停止了。
其实做到这里就已经解决了我的问题,我只需要保证一个任务执行的时候,不是同时调用两台服务器的方法即可。
可以改进的点还有:可以增加一个等待时间,让获取不到锁的线程可以稍后获取到,等等。
方案改进
增加等待时间,让其他线程在一定时间内有机会获得锁,如果超过时间,则获锁失败,并且有些方法的加锁是希望能够知道执行失败的,因此增加是否抛出异常的开关。
(1)自定义注解增加等待时间、是否抛出异常参数
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {
String lockedKey() default ""; //redis锁key的前缀
long expireTime() default 10; //key在redis里存在的时间 单位:秒
boolean release() default true; //释放在方法执行完成之后释放锁
long waitTime() default 0; //获取锁的最大等待时间,单位:秒,默认不等待,0即为快速失败
boolean throwException() default false;//是否抛出异常 默认不抛出
}
(2)对切面类做了一点调整
/**
* redis 分布式锁
*/
@Aspect
@Slf4j
@Component
public class CacheLockAspect {
@Resource
private RedisUtil redisUtil;
/**
* 分布式锁的key
*/
private static final String KEY_PREFIX_LOCK = "CACHE_LOCK_ASPECT:";
/**
* 最小等待单位时间
*/
private static final long MIN_WAIT_MILL = 300;
@Around("@annotation(com.jd.fp.mrp.aspect.CacheLock)")
public void cacheLockPoint(ProceedingJoinPoint pjp) {
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method cacheMethod = signature.getMethod();
if(null == cacheMethod){
log.info("未获取到使用方法 pjp: {}",pjp);
return;
}
String lockKey = cacheMethod.getAnnotation(CacheLock.class).lockedKey();
if(StringUtils.isBlank(lockKey)){
log.error("method:{}, 锁名称为空!",cacheMethod);
return;
}
//锁占用时间
long timeOut = cacheMethod.getAnnotation(CacheLock.class).expireTime();
//等待时间
long waitTime = cacheMethod.getAnnotation(CacheLock.class).waitTime();
//是否抛出异常
boolean throwException = cacheMethod.getAnnotation(CacheLock.class).throwException();
//是否释放锁
boolean release = cacheMethod.getAnnotation(CacheLock.class).release();
//是否有锁
boolean haveLock = false;
long startTime = System.currentTimeMillis();
long endTime = System.currentTimeMillis() + waitTime * 1000;
try {
do{
if (redisUtil.setNx(KEY_PREFIX_LOCK+lockKey, lockKey, timeOut, TimeUnit.SECONDS)){
redisUtil.expire(KEY_PREFIX_LOCK+lockKey, timeOut, TimeUnit.SECONDS);
log.info("method:{} 获取锁:{},开始运行!",Thread.currentThread(),KEY_PREFIX_LOCK+lockKey);
pjp.proceed();
haveLock = true;
return;
}
log.info("method:{} 未获取锁:{},开始等待!",Thread.currentThread(),KEY_PREFIX_LOCK+lockKey);
Thread.sleep( Math.min(MIN_WAIT_MILL, waitTime * 100));
}while (System.currentTimeMillis()<=endTime);
log.info("获得锁失败,放弃等待,之前共等待{}ms,方法将不执行,方法名为{}",System.currentTimeMillis()-startTime, cacheMethod);
if(throwException){
throw new BusinessException("等待锁失败,未正常执行方法!");
}
} catch (Throwable e) {
log.error("method:{},运行错误!",cacheMethod,e);
if(throwException){
throw new BusinessException("获取锁失败!");
}
}finally {
if(release && haveLock){
log.info("method:{} 执行完成释放锁:{}",Thread.currentThread(),KEY_PREFIX_LOCK+lockKey);
redisUtil.del(KEY_PREFIX_LOCK+lockKey);
}
}
}
}
测试运行test1()
1.设置等待时长>任务执行的时间,则最终可以等到锁
可以看到线程37先获得锁,并执行任务,38处理等待状态,等37执行任务完成并将锁释放之后,38等到了锁,并开始执行任务
2.设置等待时长<任务执行的时间,且默认不抛出异常,则最终获得锁失败
可以看到线程37先获得锁,并执行任务,38处理等待状态,在37占用锁执行任务的过程中,38等待时间耗尽,最终没有等到锁,任务不执行,且没有抛出异常
现在将throwException设置为true,则可以看到异常抛出,且被捕获到了
要注意线程等待一直在消耗资源,需要根据实际的场景去设置参数。
最后,欢迎大家与我交流进步!
更多推荐
所有评论(0)