问题描述

将带有定时任务的项目部署在单台机器上,完全没问题。
但实际生产是两台集群服务器,项目部署上去发现定时任务的模块同时在两台机器上各执行了一遍,这将会导致数据问题。

方案选择

首先我想到了利用数据库的行锁来解决这个问题,发现需要改动原本的代码,且如果每增加一个任务在开发的时候都要注意行锁问题,是很繁琐的。因此PASS掉,然后还有一种方式是,只让任务在其中一台机器上执行,但是这种方式,过于依赖机器,如果当前执行任务的机器宕机了,就会有问题。

最后考虑使用redis的分布式锁来解决这个问题,同时我希望在改动最小的情况下,给关键的任务都加上该锁,因此利用AOP面向切面的编程思想,将加锁部分抽象成一个切面,并利用自定义注解。以下是我的初版代码。

解决步骤

(1)创建一个切面类

/**
 * redis 分布式锁
 */
@Aspect
@Slf4j
@Component
public class CacheLockAspect {
    @Resource
    private RedisUtil redisUtil;
    /**
     * 分布式锁的key
     */
    private static final String KEY_PREFIX_LOCK = "CACHE_LOCK_ASPECT:";

    @Around("@annotation(com.jd.fp.mrp.aspect.CacheLock)")
    public void  cacheLockPoint(ProceedingJoinPoint pjp) {
        MethodSignature signature = (MethodSignature) pjp.getSignature();
        Method cacheMethod = signature.getMethod();
        if(null == cacheMethod){
            log.info("未获取到使用方法 pjp: {}",pjp);
            return;
        }
        String lockKey = cacheMethod.getAnnotation(CacheLock.class).lockedKey();
        long timeOut = cacheMethod.getAnnotation(CacheLock.class).expireTime();
        boolean release = cacheMethod.getAnnotation(CacheLock.class).release();
        if(StringUtils.isBlank(lockKey)){
            log.error("method:{}, 锁名称为空!",cacheMethod);
            return;
        }
        try {
            if (redisUtil.setNx(KEY_PREFIX_LOCK+lockKey, lockKey, timeOut, TimeUnit.SECONDS)){
                redisUtil.expire(KEY_PREFIX_LOCK+lockKey, timeOut, TimeUnit.SECONDS);
                log.info("method:{} 获取锁:{},开始运行!",cacheMethod,KEY_PREFIX_LOCK+lockKey);
                pjp.proceed();
                return;
            }
            log.info("method:{} 未获取锁:{},运行失败!",cacheMethod,KEY_PREFIX_LOCK+lockKey);
            //不需要释放锁
            release = false;
        } catch (Throwable e) {
            log.error("method:{},运行错误!",cacheMethod,e);
        }finally {
            if(release){
                log.info("method:{} 执行完成释放锁:{}",cacheMethod,KEY_PREFIX_LOCK+lockKey);
                redisUtil.del(KEY_PREFIX_LOCK+lockKey);
            }
        }
    }
}

(2)自定义注解

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {
    String lockedKey() default "";   //redis锁key
    long expireTime() default 10;    //key在redis里存在的时间 单位:秒
    boolean release() default true; //是否在方法执行完成之后释放锁
}

(3)测试服务类

//测试服务接口
public interface TestService {
    void testAspect(String name);
}

//测试服务类
@Slf4j
@Service
public class TestServiceImpl implements TestService {
	
    @Override
    @CacheLock(lockedKey = "CacheLockAspectTest", expireTime = 10)
    public void testAspect(String name) {
        log.info("任务:"+ name +"方法获取到锁了!时间:"+ new Date());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("任务:"+ name +"方法执行完成了!"+"时间:"+ new Date());
    }
}

(4)测试类

/**
 * redis锁切面测试
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MrpWebApplication.class})
@Slf4j
public class CacheLockAspectTest {

    @Resource
    private TestService testService;

    /**
     * 测试多线程情况下,只有一个线程可以获取到锁并执行方法
     */
    @Test
    public void test1(){
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                log.info("任务"+Thread.currentThread().getName()+"执行开始时间:"+ new Date());
                testService.testAspect("任务"+Thread.currentThread().getName());
                log.info("任务"+Thread.currentThread().getName()+"执行结束时间:"+ new Date());
            }
        };

        Thread thread1 = new Thread(runnable);
        Thread thread2 = new Thread(runnable);

        thread1.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                log.error(t.getName()+"发生异常"+e.getMessage());
            }
        });
        thread2.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                log.error(t.getName()+"发生异常"+e.getMessage());
            }
        });

        thread1.start();
        thread2.start();

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 测试当release = true时方法执行完成会自动释放锁
     * 测试当release = false时方法执行完成,如果还未到锁失效时间,会使其他任务执行方法失败
     */
    @Test
    public void test2(){
        testService.testAspect("任务一");
        testService.testAspect("任务二");
    }
}

结果验证

注:为了展示测试将日志打印做了微调

//修改前
log.info("method:{} 获取锁:{},开始运行!",cacheMethod,KEY_PREFIX_LOCK+lockKey);
//修改后
log.info("method:{} 获取锁:{},开始运行!",Thread.currentThread(),KEY_PREFIX_LOCK+lockKey);

test1()结果:可以看到线程37和38同时执行该方法,37获取到锁了,38未获取到锁,37执行5秒之后将锁释放。

test2():
首先令release = true,且设置锁占用时间为10s,大于方法执行时间,可以看到执行日志方法1执行了5s然后释放了锁,紧接着任务二开始执行。
在这里插入图片描述
首先令release = false,且设置锁占用时间为10s,大于方法执行时间,可以看到执行日志方法1执行了5s但是锁一直没释放,紧接着任务二想要执行方法,没有获取到锁就停止了。
在这里插入图片描述
其实做到这里就已经解决了我的问题,我只需要保证一个任务执行的时候,不是同时调用两台服务器的方法即可。
可以改进的点还有:可以增加一个等待时间,让获取不到锁的线程可以稍后获取到,等等。

方案改进

增加等待时间,让其他线程在一定时间内有机会获得锁,如果超过时间,则获锁失败,并且有些方法的加锁是希望能够知道执行失败的,因此增加是否抛出异常的开关。

(1)自定义注解增加等待时间、是否抛出异常参数

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {
    String lockedKey() default "";   //redis锁key的前缀
    long expireTime() default 10;    //key在redis里存在的时间 单位:秒
    boolean release() default true; //释放在方法执行完成之后释放锁
    long waitTime() default 0; //获取锁的最大等待时间,单位:秒,默认不等待,0即为快速失败
    boolean throwException() default false;//是否抛出异常 默认不抛出
}

(2)对切面类做了一点调整

/**
 * redis 分布式锁
 */
@Aspect
@Slf4j
@Component
public class CacheLockAspect {
    @Resource
    private RedisUtil redisUtil;
    /**
     * 分布式锁的key
     */
    private static final String KEY_PREFIX_LOCK = "CACHE_LOCK_ASPECT:";

    /**
     * 最小等待单位时间
     */
    private static final long MIN_WAIT_MILL = 300;

    @Around("@annotation(com.jd.fp.mrp.aspect.CacheLock)")
    public void  cacheLockPoint(ProceedingJoinPoint pjp) {
        MethodSignature signature = (MethodSignature) pjp.getSignature();
        Method cacheMethod = signature.getMethod();
        if(null == cacheMethod){
            log.info("未获取到使用方法 pjp: {}",pjp);
            return;
        }
        String lockKey = cacheMethod.getAnnotation(CacheLock.class).lockedKey();
        if(StringUtils.isBlank(lockKey)){
            log.error("method:{}, 锁名称为空!",cacheMethod);
            return;
        }
        
        //锁占用时间
        long timeOut = cacheMethod.getAnnotation(CacheLock.class).expireTime();
        //等待时间
        long waitTime = cacheMethod.getAnnotation(CacheLock.class).waitTime();
        //是否抛出异常
        boolean throwException = cacheMethod.getAnnotation(CacheLock.class).throwException();
        //是否释放锁
        boolean release = cacheMethod.getAnnotation(CacheLock.class).release();
        //是否有锁
        boolean haveLock = false;
        
        long startTime = System.currentTimeMillis();
        long endTime = System.currentTimeMillis() + waitTime * 1000;

        try {
            do{
                if (redisUtil.setNx(KEY_PREFIX_LOCK+lockKey, lockKey, timeOut, TimeUnit.SECONDS)){
                    redisUtil.expire(KEY_PREFIX_LOCK+lockKey, timeOut, TimeUnit.SECONDS);
                    log.info("method:{} 获取锁:{},开始运行!",Thread.currentThread(),KEY_PREFIX_LOCK+lockKey);
                    pjp.proceed();
                    haveLock = true;
                    return;
                }
                log.info("method:{} 未获取锁:{},开始等待!",Thread.currentThread(),KEY_PREFIX_LOCK+lockKey);
                Thread.sleep( Math.min(MIN_WAIT_MILL, waitTime * 100));
            }while (System.currentTimeMillis()<=endTime);

            log.info("获得锁失败,放弃等待,之前共等待{}ms,方法将不执行,方法名为{}",System.currentTimeMillis()-startTime, cacheMethod);
            if(throwException){
                throw new BusinessException("等待锁失败,未正常执行方法!");
            }

        } catch (Throwable e) {
            log.error("method:{},运行错误!",cacheMethod,e);
            if(throwException){
                throw new BusinessException("获取锁失败!");
            }
        }finally {
            if(release && haveLock){
                log.info("method:{} 执行完成释放锁:{}",Thread.currentThread(),KEY_PREFIX_LOCK+lockKey);
                redisUtil.del(KEY_PREFIX_LOCK+lockKey);
            }
        }
    }
}

测试运行test1()

1.设置等待时长>任务执行的时间,则最终可以等到锁

可以看到线程37先获得锁,并执行任务,38处理等待状态,等37执行任务完成并将锁释放之后,38等到了锁,并开始执行任务
在这里插入图片描述

2.设置等待时长<任务执行的时间,且默认不抛出异常,则最终获得锁失败

可以看到线程37先获得锁,并执行任务,38处理等待状态,在37占用锁执行任务的过程中,38等待时间耗尽,最终没有等到锁,任务不执行,且没有抛出异常
在这里插入图片描述
现在将throwException设置为true,则可以看到异常抛出,且被捕获到了
在这里插入图片描述
要注意线程等待一直在消耗资源,需要根据实际的场景去设置参数

最后,欢迎大家与我交流进步!

参考文章:
https://blog.csdn.net/jc983433742/article/details/92025109?utm_medium=distribute.pc_relevant.none-task-blog-2defaultbaidujs_title~default-5.pc_relevant_default&spm=1001.2101.3001.4242.4&utm_relevant_index=8

https://juejin.cn/post/6844903734107963400

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐