在 springBoot 项目中,要使用定时任务十分容易,我们只需使用 @EnableScheduling 开启定时任务支持,再配合 @Scheduled(cron = "cron表达式"),即可完成定时任务的集成,简单方便的同时,此种方式却也存在着硬编码问题,当我们需要动态的开启或关闭一个定时任务时,就需要修改源码重启项目,才能生效,达不到动态效果,下文教大家如何实现一个定时任务的动态增删改功能。

首先看下我们要使用的三张表:

scheduled_job :配置要在定时任务中执行的工作的表

表记录: 

 理论上这张表是跑完初始化脚本就不会再修改的,这个表的每一条记录对应代码里的一个处理任务的Bean,这种硬编码是必然存在的,应该没人会想要在数据库添一条记录就要在代码里动态添加一个类型的Bean的需求吧?

scheduled_corn : 存 corn 表达式的表

 表记录:

这个表,理论上存一些常用的corn表达式,基本也不会去修改了,我们主要是修改下面的关系表。

scheduled_corn_job :中间关系表

这个表先空着,我们在测试时再插入数据。

 下面开始环境搭建。

POM依赖:

   <dependencies>
        <!-- mvc起步依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- mybatis plus起步依赖 -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
        </dependency>

        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        ... 其他略...
   </dependencies>

yaml配置,几乎没什么内容:

server:
  port: 8080

# Spring相关
spring:
  # 数据源相关
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/xxx?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&useSSL=true
    username: xxx
    password: xxx

VO类:

/**
 * @Author zaiLuShang
 */
@Data
public class BaseVO implements Serializable {
    // 序列化版本id
    private static final long serialVersionUID = -153746138274322843L;
    // 创建人
    private String createBy;
    // 创建时间
    private Date createTime;
    // 修改人
    private String updateBy;
    // 修改时间
    private Date updateTime;
}

/**
 * @Author zaiLuShang
 */
@Data
@TableName("scheduled_corn_job")
public class ScheduledCornJobVO extends BaseVO {
    // 主键
    @TableId(type = IdType.AUTO)
    private Long id;
    // 在内存存储时的key值
    private String storeKey;
    // 外键: corn表id
    private Long cornId;
    // 外键: job表id
    private Long jobId;
    // 是否开启(0:关闭,1:开启)
    private String status;
}

/**
 * @Author zaiLuShang
 */
@Data
@TableName("scheduled_corn")
public class ScheduledCornVO extends BaseVO {
    // 主键
    @TableId(type = IdType.AUTO)
    private Long id;
    // 任务表达式
    private String corn;
    // 表达式描述
    private String description;

    // 提供转换为CronTrigger的工具方法
    public CronTrigger toCronTrigger() {
        return new CronTrigger(this.corn);
    }
}

/**
 * @Author zaiLuShang
 */
@Data
@TableName("scheduled_job")
public class ScheduledJobVO extends BaseVO {
    // 主键
    @TableId(type = IdType.AUTO)
    private Long id;
    // 执行具体工作的bean名称
    private String beanName;
    // 工作描述
    private String description;
}

Mapper 和 Service没什么可看的,就简单的贴一起了:

--------------------------------  Mapper接口  --------------------------------
/**
 * @Author zaiLuShang
 */
public interface ScheduledCornJobMapper extends BaseMapper<ScheduledCornJobVO> {
}

/**
 * @Author zaiLuShang
 */
public interface ScheduledCornMapper extends BaseMapper<ScheduledCornVO> {
}

/**
 * @Author zaiLuShang
 */
public interface ScheduledJobMapper extends BaseMapper<ScheduledJobVO> {
}

--------------------------------  Service接口  --------------------------------
/**
 * @Author zaiLuShang
 */
public interface ScheduledCornJobService extends IService<ScheduledCornJobVO> {
    List<ScheduledCornJobVO> list(String status);
}

/**
 * @Author zaiLuShang
 */
public interface ScheduledCornService extends IService<ScheduledCornVO> {
}

/**
 * @Author zaiLuShang
 */
public interface ScheduledJobService extends IService<ScheduledJobVO> {
}

--------------------------------  Service实现类  --------------------------------

/**
 * @Author zaiLuShang
 */
@Service
public class ScheduledCornJobServiceImpl extends ServiceImpl<ScheduledCornJobMapper, ScheduledCornJobVO> implements ScheduledCornJobService {
    @Override
    public List<ScheduledCornJobVO> list(String status) {
        return new LambdaQueryChainWrapper<>(getBaseMapper()).eq(ScheduledCornJobVO::getStatus, status).list();
    }
}

/**
 * @Author zaiLuShang
 */
@Service
public class ScheduledCornServiceImpl extends ServiceImpl<ScheduledCornMapper, ScheduledCornVO> implements ScheduledCornService {
}

/**
 * @Author zaiLuShang
 */
@Service
public class ScheduledJobServiceImpl extends ServiceImpl<ScheduledJobMapper, ScheduledJobVO> implements ScheduledJobService {
}

config类 和 controller类 是重点,下面开始介绍:

配置类一共有两个,先说简单的,这个配置类仅仅是用来简单的注入一些我们的定时任务工作的实现类:

/**
 * 注意:开启定时任务需要传入Runnable接口的实现类,所以实现Runnable接口是必须的
 * 自定义定时任务工作接口:仅仅是对 Runnable接口的简单封装,如果我们需要在执行定时任务时,还携
 * 带上参数,可以自己定制,比如实现 EnvironmentAware 接口从环境变量中取值,或者注入一个读取数 
 * 据库相关配置的serviceBean等,来获取我们需要的参数,多种实现方式,笔者这里就选择最简单的来演
 *  示了,不需要额外参数。
 * @Author zaiLuShang
 */
@FunctionalInterface
public interface Worker extends Runnable {
    void doWork();

    default void run() {
        doWork();
    }
}

/**
 * @Author zaiLuShang
 */
@SpringBootConfiguration
public class JobConfig {
    @Bean(name = "clean")
    public Worker cleanJob() {
        return () -> System.out.println(Thread.currentThread().getName() + "正在执行清理数据的工作...");
    }

    @Bean(name = "count")
    public Worker countJob() {
        return () -> System.out.println(Thread.currentThread().getName() + "正在执行统计报表的工作...");
    }

    @Bean(name = "printA")
    public Worker printA() {
        return () -> System.out.println(Thread.currentThread().getName() + "正在执行打印a的工作...");
    }

    @Bean(name = "printB")
    public Worker printB() {
        return () -> System.out.println(Thread.currentThread().getName() + "正在执行打印b的工作...");
    }
}
/**
 * 这个配置类就比较重要了  --- 2022/6/30日注:此配置类存在bug,修订版本已发,请移至文末查看
 * @Author zaiLuShang
 */
@AllArgsConstructor
@SpringBootConfiguration
public class TaskConfig {

    // 走构造注入 使用lombok生成全参数构造
    private final ScheduledCornService scheduledCornService;
    private final ScheduledJobService scheduledJobService;
    private final ScheduledCornJobService scheduledCornJobService;

    // 依赖搜索:拿到spring容器中的所有实现类,即上面配置类中配置的bean
    private final Map<String, Worker> allWorkerMap;

    /* 
     * 存储有效使用中的 Worker,也就是上面的 allWorkerMap 一个子集,因为有些工作可能没配置使用
     * 上面的key是 beanName,这个 key是数据库表scheduled_job的id
     */
    @Bean(name = "workerMap")
    public Map<Long, Worker> workerMap() {
        return new ConcurrentHashMap<>();
    }

    /**
     * 存储scheduled_corn表的记录
     * key 即scheduled_corn id, value是将corn表达式转为了后续操作需要的CronTrigger对象
     * 参看 {@link ScheduledCornVO#toCronTrigger}
     */
    @Bean(name = "triggerMap")
    public Map<Long, Trigger> triggerMap() {
        return new ConcurrentHashMap<>();
    }

    /*
     * 每开启一个定时任务,会产生一个 ScheduledFuture对象,我们要关闭定时任务,就要
     * 调用其scheduledFuture.cancel(true)方法,所以我们需要在每次开启定时任务的同时
     * 维护其引用对象,也就是此map的作用
     * 在spring容器中维护一个ScheduledFuture的注册表,用于我们来操作其开启关闭  
     */
    @Bean(name = "scheduledFutureMap")
    public Map<String, ScheduledFuture> scheduledFutureMap() {
        return new ConcurrentHashMap<>();
    }

    /*
     * 用表驱动法简化if判断,也就是提前配置好映射关系
     * 这个Map是用于简化判断的 if(status==1) do... if(status==0) do...
     * 存储开启或关闭定时任务的两个操作,用消费型函数式接口来封装
     */
    @Bean(name = "operationMap")
    public Map<String, Consumer<ScheduledCornJobVO>> operationMap() {
        return new ConcurrentHashMap<>();
    }

    /*
     * 一个用于开启定时任务的线程池;我们关注的核心方法是:threadPoolTaskScheduler.schedule(工作内容, 触发器)
     * 此方法用于开启一个定时任务,至此,我们关闭和添加定时任务的方法都有了
     */
    @Bean(name = "threadPoolTaskScheduler")
    public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
        threadPoolTaskScheduler.setPoolSize(Runtime.getRuntime().availableProcessors());
        threadPoolTaskScheduler.setThreadNamePrefix("WorKerThread:");
        threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
        threadPoolTaskScheduler.setAwaitTerminationSeconds(30);
        return threadPoolTaskScheduler;
    }

    /**
     * jsr250规范中的注解,初始化方法的一种, 同bean initMethod,InitializingBean接口一样
     * 用于在bean属性赋值后的初始化逻辑,只是三者在调用时机上略有不同而已
     * 此方法用于以上map数据的初始化
     */
    @PostConstruct
    public void initMap() {
        // 初始化工作注册表
        initWorkerMap();

        // 初始化触发器注册表
        initTriggerMap();

        // 初始化ScheduledFuture注册表
        initScheduledFutureMap();

        // 初始化操作注册表
        initOperationMap();
    }

    private void initWorkerMap() {
        // 查询数据库配置的所有job记录
        List<ScheduledJobVO> jobList = scheduledJobService.list();
        // 拿到我们注册的工作类 Worker 和数据库配置的 scheduled_job 工作来匹配
        Map<Long, Worker> effectiveWorkerMap = jobList.stream().collect(Collectors.toMap(ScheduledJobVO::getId, scheduledJob -> allWorkerMap.get(scheduledJob.getBeanName())));
        // 保存起来,方便后续操作
        workerMap().putAll(effectiveWorkerMap);
    }

    private void initTriggerMap() {
        // 查询数据库配置的所有corn记录
        List<ScheduledCornVO> cornList = scheduledCornService.list();
        // 将我们数据库表记录 scheduled_corn 存起来,不过要先转化为CronTrigger对象
        // 参看 {@link ScheduledCornVO#toCronTrigger}
        Map<Long, Trigger> cronTriggerMap = cornList.stream().collect(Collectors.toMap(ScheduledCornVO::getId, ScheduledCornVO::toCronTrigger));
        // 匹配到的保存起来,方便后续操作
        triggerMap().putAll(cronTriggerMap);
    }

    private void initScheduledFutureMap() {
        // 查询数据库配置的corn-job关系表的有效记录(已经配置开启的status=1)
        List<ScheduledCornJobVO> cornJobList = scheduledCornJobService.list("1");
        // 线程池
        ThreadPoolTaskScheduler threadPoolTaskScheduler = threadPoolTaskScheduler();
        // scheduledFuture注册表
        Map<String, ScheduledFuture> scheduledFutureMap = scheduledFutureMap();
        // 已经初始化完成的工作类注册表
        Map<Long, Worker> workerMap = workerMap();
        // 已经初始化完成的触发器注册表
        Map<Long, Trigger> triggerMap = triggerMap();
        // 遍历进行定时任务的批量开启
        cornJobList.forEach(cornJob -> {
            // 每开启一个定时任务所产生的 schedule对象,用于后续的关闭操作,所以要保存起来
            ScheduledFuture<?> schedule = threadPoolTaskScheduler.schedule((workerMap.get(cornJob.getJobId())), triggerMap.get(cornJob.getCornId()));
            // 用我们配置的指定key值来保存至注册表中
            scheduledFutureMap.put(cornJob.getStoreKey(), schedule);
        });
    }

    private void initOperationMap() {
        // 定时任务:打开操作(此处存在bug,重复多次打开一个已经开启的定时任务,会关不掉,在下文中解决,此处用做记录,暂时不做修改)
        Consumer<ScheduledCornJobVO> open = cornJob -> {
            String key = cornJob.getStoreKey();
            // 动态开启一个定时任务
            ScheduledFuture<?> schedule = threadPoolTaskScheduler().schedule(workerMap().get(cornJob.getJobId()), triggerMap().get(cornJob.getCornId()));
            // 将任务添加至注册表中进行维护,以指定key存储
            scheduledFutureMap().compute(key, (k, v) -> {
                // 如果开启时,此任务已存在,则替换该任务(先取消再添加)
                Optional<ScheduledFuture> ov = Optional.ofNullable(v);
                ov.ifPresent(v0 -> v0.cancel(true));
                return ov.orElse(schedule);
            });
        };
        operationMap().put("1", open);

        // 关闭操作
        Consumer<ScheduledCornJobVO> close = cornJob -> {
            String key = cornJob.getStoreKey();
            // 取消此定时任务
            ScheduledFuture scheduledFuture = scheduledFutureMap().get(key);
            Optional.ofNullable(scheduledFuture).ifPresent(os -> os.cancel(true));
            // 从注册表中移除
            scheduledFutureMap().remove(key);
        };
        operationMap().put("0", close);
    }
}
/**
 * 此controller用于操作单表scheduled_corn_job(中间关系表)
 * 操作完单表会再调用refresh方法来刷新定时任务状态
 * @Author zaiLuShang
 */
@RequiredArgsConstructor
@RestController
@RequestMapping("/task")
public class TaskController {

    private final ScheduledCornJobService scheduledCornJobService;

    private final Map<String, Consumer<ScheduledCornJobVO>> operationMap;

    // 向scheduled_corn_job中插入记录,并刷新定时任务
    @PostMapping
    public String add(@RequestBody ScheduledCornJobVO scheduledCornJob) {
        // 操作关系表
        scheduledCornJobService.save(scheduledCornJob);
        // 刷新内存中定时任务
        refresh(scheduledCornJob.getId());
        return "操作成功";
    }

    // 开启一个定时任务或者关闭一个定时任务
    @PutMapping
    public String update(@RequestBody ScheduledCornJobVO scheduledCornJob) {
        // 操作关系表
        scheduledCornJobService.updateById(scheduledCornJob);
        // 刷新内存中定时任务
        refresh(scheduledCornJob.getId());
        return "操作成功";
    }

    // 刷新所有,我们为了测试方便,直接改表记录,调用此方法就行
    @GetMapping("/refreshAll")
    public void refreshAll() {
        List<ScheduledCornJobVO> cornJobList = scheduledCornJobService.list();
        cornJobList.stream().forEach(this::refresh);
    }
    
    // 通过id去查询数据库完成单个定时任务的刷新,调用下面重载方法,传入查询的实时结果
    private void refresh(Long id) {
        refresh(scheduledCornJobService.getById(id));
    }

    // 重载方法:根据传入的查询的实时结果,拿到定时任务的状态(开启/关闭),去operationMap中取对应操作来执行
    private void refresh(ScheduledCornJobVO cornJob) {
        operationMap.get(cornJob.getStatus()).accept(cornJob);
    }
}

至此以上核心代码讲解完成,下面进入测试阶段:

先启动项目,此时控制台并无任何输出。。

我们向scheduled_corn_job表插入一条如下记录:表示每10s调用一次清理数据工作...

 我们打开浏览器调用 http://localhost:8080/task/refreshAll 接口,10s后控制台开始输出结果,说明定时任务的动态添加功能已经生效。

 我们来修改一下工作内容以及执行频率,修改为如下:每一秒执行一次打印a工作..

  调用 http://localhost:8080/task/refreshAll 接口进行刷新,输出如下,成功切换工作内容及执行频率。

 我们再将status修改为0

  调用 http://localhost:8080/task/refreshAll 接口进行刷新

 控制台不再输出,定时任务停止成功,至此,所有定时任务的动态增删改完成。

值得说明的是:增加定时任务,我们只能在已有的任务实现类中来选择动态的做某件事,而添加一件之前就不存在实现类的事,我认为属于奇葩需求,这里就不去实现了。

至此,全文完结,笔者技术、文笔有限,感谢大家的耐心观看。

2022/6/30日 修订内容如下:1、循环依赖问题 2、重复打开一个已经开启的定时任务,会关不掉的问题

文章如上写法,可能会引发一个循环依赖异常,异常单独记录在此处,请移步 https://blog.csdn.net/qq_40553917/article/details/125538251

 修改建议如下:

将 TaskConfig 类中,bean 的托管和初始化逻辑相分离,可以再单独起一个配置类做 bean 的初始化。另修复重复打开一个已经开启的定时任务时,无法关闭的bug。

/**
 * 此配置类也可以合并至 JobConfig 中,但出于职责单一,不建议这样做
 * @Author zaiLuShang
 */
@AllArgsConstructor
@SpringBootConfiguration
public class TaskConfig {
    @Bean(name = "workerMap")
    public Map<Long, Worker> workerMap() {
        return new ConcurrentHashMap<>();
    }

    @Bean(name = "triggerMap")
    public Map<Long, Trigger> triggerMap() {
        return new ConcurrentHashMap<>();
    }

    @Bean(name = "scheduledFutureMap")
    public Map<String, ScheduledFuture> scheduledFutureMap() {
        return new ConcurrentHashMap<>();
    }

    @Bean(name = "operationMap")
    public Map<String, Consumer<ScheduledCornJobVO>> operationMap() {
        return new ConcurrentHashMap<>();
    }

    @Bean(name = "threadPoolTaskScheduler")
    public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
        threadPoolTaskScheduler.setPoolSize(Runtime.getRuntime().availableProcessors());
        threadPoolTaskScheduler.setThreadNamePrefix("WorKerThread:");
        threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
        threadPoolTaskScheduler.setAwaitTerminationSeconds(30);
        return threadPoolTaskScheduler;
    }
}

/**
 * 新增配置类,作为容器类的初始化器
 * @Author zaiLuShang
 */
@SpringBootConfiguration
@AllArgsConstructor
public class TaskInitializer {

    // 依赖搜索:拿到spring容器中的所有实现类
    private final Map<String, Worker> allWorkerMap;

    // 依赖注入 Service
    private final ScheduledCornService scheduledCornService;
    private final ScheduledJobService scheduledJobService;
    private final ScheduledCornJobService scheduledCornJobService;

    // 依赖注入 容器bean
    // 工作注册表
    private final Map<Long, Worker> workerMap;
    // 触发器注册表
    private final Map<Long, Trigger> triggerMap;
    // 定时任务注册表
    private final Map<String, ScheduledFuture> scheduledFutureMap;
    // 操作注册表
    private final Map<String, Consumer<ScheduledCornJobVO>> operationMap;
    // 线程池
    private final ThreadPoolTaskScheduler threadPoolTaskScheduler;

    @PostConstruct
    public void initMap() {
        // 初始化工作注册表
        initWorkerMap();

        // 初始化触发器注册表
        initTriggerMap();

        // 初始化ScheduledFuture注册表
        initScheduledFutureMap();

        // 初始化操作注册表
        initOperationMap();
    }

    private void initWorkerMap() {
        // 查询数据库配置的所有job记录
        var jobList = scheduledJobService.list();
        var effectiveWorkerMap = jobList.stream().collect(Collectors.toMap(ScheduledJobVO::getId, scheduledJob -> allWorkerMap.get(scheduledJob.getBeanName())));
        workerMap.putAll(effectiveWorkerMap);
    }

    private void initTriggerMap() {
        // 查询数据库配置的所有corn记录
        var cornList = scheduledCornService.list();
        var cronTriggerMap = cornList.stream().collect(Collectors.toMap(ScheduledCornVO::getId, ScheduledCornVO::toCronTrigger));
        triggerMap.putAll(cronTriggerMap);
    }

    private void initScheduledFutureMap() {
        // 查询数据库配置的corn-job关系表
        var cornJobList = scheduledCornJobService.list("1");
        cornJobList.forEach(cornJob -> {
            ScheduledFuture<?> schedule = threadPoolTaskScheduler.schedule((workerMap.get(cornJob.getJobId())), triggerMap.get(cornJob.getCornId()));
            scheduledFutureMap.put(cornJob.getStoreKey(), schedule);
        });
    }

    private void initOperationMap() {
        
        /**** 之前引发 bug处 *****/
        // 打开操作
        Consumer<ScheduledCornJobVO> open = cornJob -> {
            var key = cornJob.getStoreKey();
            // 将任务添加至注册表中进行维护,这里应该先判断存在再开启            
            scheduledFutureMap.compute(key, (k, v) -> {
                Optional<ScheduledFuture> ov = Optional.ofNullable(v);
                ov.ifPresent(v0 -> v0.cancel(true));
                // 动态开启一个定时任务
                var schedule = threadPoolTaskScheduler.schedule(workerMap.get(cornJob.getJobId()), triggerMap.get(cornJob.getCornId()));
                // 这里应该直接返回最新打开的 schedule 定时任务对象,而不是原先的
                return schedule;
            });
        };

        // 关闭操作
        Consumer<ScheduledCornJobVO> close = cornJob -> {
            var key = cornJob.getStoreKey();
            // 取消此定时任务
            var scheduledFuture = scheduledFutureMap.get(key);
            Optional.ofNullable(scheduledFuture).ifPresent(os -> os.cancel(true));
            // 从注册表中移除
            scheduledFutureMap.remove(key);
        };

        operationMap.put("1", open);
        operationMap.put("0", close);
    }
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐