原理:

1.利用spring的schedule功能实现定时任务
2.利用redis的过期策略实现集群中定时任务的分配(单机版redis,集群的redis请考虑redission)

内容概述:

1.多线程调度定时任务
2.增加定时任务管理表(数据库或者redis中持久化)
3.基于反射机制实现动态调用不同的自定义定时任务
4.自动的根据定时任务管理表对定时任务进行增删改查
5.通过定时任务管理使定时任务只执行一次。

1.多线程调度定时任务

默认的schedule使用的是单线程,即多个定时任务需要排队执行,如果某些定时任务耗时过长,会导致其他任务排队过久,且不利于使用redis的过期策略实现分布式定时任务的分配,所以在集群情况下,定时任务需要使用多线程实现,建议:线程数大于同时可执行任务数

/**
 * @author Bight Chen
 * @Date: 2021/9/17 10:27
 * 定时任务线程池
 */
@Configuration
public class ScheduleConfig {
    @Bean
    public TaskScheduler taskScheduler(){
        //此bean对象支持根据cron表达式创建周期性任务
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        //定时任务执行线程池核心数
        //线程数一定大于当前可执行任务数
        taskScheduler.setPoolSize(50);
        //此方法会使得任务一旦被取消将立即被移除
        taskScheduler.setRemoveOnCancelPolicy(true);
        taskScheduler.setThreadNamePrefix("Schedule-");
        return taskScheduler;
    }

}

2.增加定时任务管理表(数据库或者redis中持久化)

1.需要的原因是利于自动增加或者删除所需定时任务
ps:
在mysql中做为数据表
@Data是基于lombok来简化编写 get() set() toString()等方法

@Data
public class ScheduleJob  {
 	//id
    private Long scheduleJobId;
    //动态bean
    private String beanName;
    //方法
    private String methodName;
    //参数
    private String jobParams;
  	//表达式
    private String jobCron;
    //任务名
    private String jobName;

    private String remark;

    //0停止,1正常,2已完成
    private String status;

    private String createdBy;

    private Date createdTime;

    private String lastUpdatedBy;
    }

3.基于反射机制实现动态调用不同的自定义定时任务

1.反射调用bean的工具类

/**
 * @author Bight Chen
 * @Date: 2021/9/17 11:54
 */
@SuppressWarnings("unchecked")
@Component
public class SpringToolsConfig implements ApplicationContextAware {


    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringToolsConfig.applicationContext = applicationContext;
    }

    public static Object getBean(String name) {
        return applicationContext.getBean(name);
    }


}

2.动态调用bean

/**
 * @author Bight Chen
 * @Date: 2021/9/17 11:40
 */
public class TaskRunable implements Runnable {
    private static final Logger logger = LogManager.getLogger(TaskRunable.class);

    private ScheduleJob scheduleJob;

    public TaskRunable(ScheduleJob scheduleJob) {
        this.scheduleJob = scheduleJob;
    }


    private RedisTemplate redisTemplate = (RedisTemplate) SpringToolsConfig.getBean("redisTemplate");

    @Override
    public void run() {
        //周期性任务:加锁的过期时间要大于不同服务器时间之差,且小于同个任务2次执行间隔(cron)
        //抢到锁的执行,没抢到锁的等待下一次任务执行
        try {
            Object target = SpringToolsConfig.getBean(scheduleJob.getBeanName());

            Method method = null;

            if (!StringUtils.isEmpty(scheduleJob.getJobParams())) {
                method = target.getClass().getDeclaredMethod(scheduleJob.getMethodName(), String.class);
            } else {
                method = target.getClass().getDeclaredMethod(scheduleJob.getMethodName());
            }
            //考虑到集群部署时,旧版本的查找不到新增的定时任务时处理,就需要先找到bean再存redis
            //保证原子性
            if(!redisTemplate.opsForValue().setIfAbsent("schdeleJob:"+scheduleJob.getScheduleJobId().toString(), "1", 15000L,TimeUnit.MILLISECONDS)){
                logger.info("{}正在执行!!,不能重复执行",scheduleJob.getJobName());
                return;
            }
            logger.info("定时任务开始执行 - 参数:{}", scheduleJob.toString());
            long startTime = System.currentTimeMillis();
            try {
                ReflectionUtils.makeAccessible(method);

                if (!StringUtils.isEmpty(scheduleJob.getJobParams())) {
                    JSONObject jsonObject = JSONObject.parseObject(scheduleJob.getJobParams());
                    jsonObject.put("scheduleJobId", scheduleJob.getScheduleJobId());
                    method.invoke(target, JSONObject.toJSONString(jsonObject));
                } else {
                    method.invoke(target);
                }
            } catch (Exception e) {
                logger.error("定时任务执行异常 -参数:{} ,异常:{}", scheduleJob.toString(), e);
            } finally {
                Object oDelete = redisTemplate.opsForValue().get("schdeleJob:" + scheduleJob.getScheduleJobId());
                if (oDelete != null) {
                    //锁依旧存在则自动删除
                    redisTemplate.delete("schdeleJob:" + scheduleJob.getScheduleJobId().toString());
                }
            }
            long times = System.currentTimeMillis() - startTime;
            logger.info("定时任务执行结束 -参数:{},耗时:{} 毫秒", scheduleJob.toString(), times);
        } catch (Exception e) {
            logger.error("定时任务执行异常 -参数:{} ,异常:{}", scheduleJob.toString(), e);
        }
    }
}

4.自动的根据定时任务管理表对定时任务进行增删改查

/**
 * @author Bight Chen
 * @Date: 2021/9/17 13:59
 * 分布式定时任务初始化
 */
@Component
public class ScheduleInitConfig {

    private static final Logger logger = LogManager.getLogger(ScheduleInitConfig.class);

    //内存中保存定时任务数据
    private HashMap<Long, ScheduledFuture> map = new HashMap<>();


    @Autowired
    private TaskScheduler autoTaskScheduler;


    @Autowired
    private ScheduleJobService scheduleJobService;

    /**
     * 定时自动查询增加任务数据,注入定时任务
     * schedule.cron:0/15 * * * * ?
     */
    @Scheduled(cron = "${schedule.cron}")
    public void autoAddTask() {
        Long time = System.currentTimeMillis();
        try {
            List<ScheduleJob> list = scheduleJobService.selectByExample(new ScheduleJobExample());
            for (ScheduleJob scheduleJob : list) {
                if (map.get(scheduleJob.getScheduleJobId()) == null && "1".equals(scheduleJob.getStatus())) {
                    //存在 启动状态的 定时任务自动增加
                    TaskRunable taskRunable = new TaskRunable(scheduleJob);
                    ScheduledFuture future = autoTaskScheduler.schedule(taskRunable, new CronTrigger(scheduleJob.getJobCron()));
                    map.put(scheduleJob.getScheduleJobId(), future);
                    logger.info("autoAddTask,自动增加任务,参数:{}", scheduleJob.toString());
                }

            }
        } catch (Exception e) {
            logger.error("autoAddTask,error:{}", e);
        }

        //logger.info("autoAddTask,end:{}", System.currentTimeMillis() - time);
    }

    /**
     * 定时自动查询任务数据,删除过期任务列表
     * schedule.cron:0/15 * * * * ?
     */
    @Scheduled(cron = "${schedule.cron}")
    public void autoDeleteTask() {
        Long time = System.currentTimeMillis();
        try {
            List<ScheduleJob> list = scheduleJobService.selectByExample(new ScheduleJobExample());
            for (ScheduleJob scheduleJob : list) {
                if (map.get(scheduleJob.getScheduleJobId()) != null &&!"1".equals(scheduleJob.getStatus())) {
                    //非启动中的都删除
                    ScheduledFuture future = map.get(scheduleJob.getScheduleJobId());
                    future.cancel(true);
                    map.remove(scheduleJob.getScheduleJobId());
                    logger.info("autoDeleteTask,自动删除任务,参数:{}", scheduleJob.toString());
                }
            }
        } catch (Exception e) {
            logger.error("autoDeleteTask,error:{}", e);
        }

        //logger.info("autoDeleteTask,end:{}", System.currentTimeMillis() - time);
    }
}

5.通过定时任务管理使定时任务只执行一次。

/**
 * @author Bight Chen
 * @Date: 2021/9/17 13:53
 */
@Component("testTask")
public class TestTask implements BaseTask {
    private static final Logger logger = LogManager.getLogger(TestTask.class);

    @Autowired
    private ScheduleJobService scheduleJobService;
    
    @Override
    public void runTask() { }
    @Override
    public void runTask(String params) {
        JSONObject jsonObject = JSON.parseObject(params);
        Long scheduleJobId =  Long.parseLong( jsonObject.get("scheduleJobId")+"");
        try {
            Thread.sleep(16000);
        } catch (Exception e) {
            logger.info("runTask>>>"+e.getMessage());
        }finally {
        if (scheduleJobId != null) {
            //最后关闭当前任务,使任务执行一次
            scheduleJobService.updateStatusById(scheduleJobId, "2");
        }
    }
        logger.info(Thread.currentThread().getName() + ":" + params);
    }
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐