目录

        1.问题发现与描述

        2.springboot定时任务的注册

        3.springboot定时任务的调度与执行

        4.总结


1.问题发现与描述

        生产上一个服务定时从上游系统拉取数据,突然任务停止了,不再继续执行了。查看日志发现报了异常,springboot的数据库连接池不够了。但是连接池不够为什么会影响到定时任务呢?下面是破案全过程。

        

@Configuration
@EnableScheduling
public class DynamicScheduleAsyncTask implements SchedulingConfigurer {

    @Autowired
    private QueryRepository queryRepository;
    @Autowired
    private AsyncTimerTask timerTask;

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        taskRegistrar.addTriggerTask(timerTask,
                triggerContext -> {
                    //2.1 从数据库获取执行周期
                    String cron = queryRepository.queryCron();
                    //2.3 返回执行周期(Date)
                    return new CronTrigger(cron).nextExecutionTime(triggerContext);
                }
        );
    }
}

     这是配置定时任务的代码,我们要支持动态调整cron周期,所以选择了triggerTask。这个是springboot定时任务的基本配置与使用。接下来我们跟踪源码解决问题。

2.springboot定时任务的注册

        首先我们在配置类中加上了@EnableScheduling注解。这个注解向spring容器注入了一个定时任务的自动配置类。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {

}

        我们再来看看SchedulingConfiguration这个配置类做了什么。

@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {

	@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
		return new ScheduledAnnotationBeanPostProcessor();
	}

}

        很明显这个配置类向spring容器中注入了一个后置处理器ScheduledAnnotationBeanPostProcessor。所谓后置处理器就是指spring在初始化的过程中会回调这些类的某些方法。接下来走进这个后置处理器的postProcessAfterInitialization方法。

if (!this.nonAnnotatedClasses.contains(targetClass) &&
				AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
            //找到所有标记了@Scheduled注解的方法
			Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
					(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
						Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(
								method, Scheduled.class, Schedules.class);
						return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);
					});
			if (annotatedMethods.isEmpty()) {
				this.nonAnnotatedClasses.add(targetClass);
				if (logger.isTraceEnabled()) {
					logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
				}
			}
			else {
				// Non-empty set of methods
                //processScheduled这个方法中将当前任务放进集合
				annotatedMethods.forEach((method, scheduledAnnotations) ->
						scheduledAnnotations.forEach(scheduled -> 
processScheduled(scheduled, method, bean)));
				if (logger.isTraceEnabled()) {
					logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
							"': " + annotatedMethods);
				}
			}
		}

3.springboot定时任务的调度与执行

        上边提到的ScheduledAnnotationBeanPostProcessor这个后置处理器还有一个作用,它实现了ApplicationListener,顾名思义它还是个监听器,spring启动完成后回调所有监听器的onApplicationEvent方法,也正是在这个方法中完成了任务的调度与执行。

@Override
	public void onApplicationEvent(ContextRefreshedEvent event) {
		if (event.getApplicationContext() == this.applicationContext) {
			// Running in an ApplicationContext -> register tasks this late...
			// giving other ContextRefreshedEvent listeners a chance to perform
			// their work at the same time (e.g. Spring Batch's job registration).
			finishRegistration();
		}
	}
private void finishRegistration() {
	this.registrar.afterPropertiesSet();
}

this.registrar默认是ScheduledTaskRegistrar。我们接着往下看:

@Override
public void afterPropertiesSet() {
	scheduleTasks();
}
protected void scheduleTasks() {
	//创建任务执行器 默认是ConcurrentTaskScheduler
	if (this.taskScheduler == null) {
		this.localExecutor = Executors.newSingleThreadScheduledExecutor();
		this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
	}
	//triggerTasks不为空 我们直接看scheduleTriggerTask方法
	if (this.triggerTasks != null) {
		for (TriggerTask task : this.triggerTasks) {
			addScheduledTask(scheduleTriggerTask(task));
		}
	}
	if (this.cronTasks != null) {
		for (CronTask task : this.cronTasks) {
			addScheduledTask(scheduleCronTask(task));
		}
	}
	if (this.fixedRateTasks != null) {
		for (IntervalTask task : this.fixedRateTasks) {
			addScheduledTask(scheduleFixedRateTask(task));
		}
	}
	if (this.fixedDelayTasks != null) {
		for (IntervalTask task : this.fixedDelayTasks) {
			addScheduledTask(scheduleFixedDelayTask(task));
		}
	}
}
@Nullable
public ScheduledTask scheduleTriggerTask(TriggerTask task) {
	ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
	boolean newTask = false;
	//scheduledTask为null new一个
	if (scheduledTask == null) {
		scheduledTask = new ScheduledTask(task);
		newTask = true;
	}
	//taskScheduler默认实现是ConcurrentTaskScheduler,我们接着看它里边的schedule方法
	if (this.taskScheduler != null) {
		scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
	}
	else {
		addTriggerTask(task);
		this.unresolvedTasks.put(task, scheduledTask);
	}
	return (newTask ? scheduledTask : null);
}
@Override
@Nullable
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
	try {
		if (this.enterpriseConcurrentScheduler) {
			return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);
		}
		else {
			//进入到这个分支 我们直接看ReschedulingRunnable里边的schedule方法
			ErrorHandler errorHandler =
					(this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
			return new ReschedulingRunnable(task, trigger, this.clock, this.scheduledExecutor, errorHandler).schedule();
		}
	}
	catch (RejectedExecutionException ex) {
		throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
	}
}

@Nullable
public ScheduledFuture<?> schedule() {
	synchronized (this.triggerContextMonitor) {
		//回调我们的获取cron表达式的方法 计算下次执行时间
		this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
		if (this.scheduledExecutionTime == null) {
			return null;
		}
		//计算出距离下次执行的毫秒值
		long initialDelay = this.scheduledExecutionTime.getTime() - this.triggerContext.getClock().millis();
		//this.executor是JDK的延时线程池DelegatedScheduledExecutorService
		//重要一点:第一个入参就是要被执行的任务,也就是当前对象ReschedulingRunnable
		//所以延时时间到了之后,JDK线程池就会调用当前对象的run方法。
		this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
		return this;
	}
}
@Override
public void run() {
	Date actualExecutionTime = new Date(this.triggerContext.getClock().millis());
	//这里就是回调了我们自定义的任务方法
	super.run();
	Date completionTime = new Date(this.triggerContext.getClock().millis());
	synchronized (this.triggerContextMonitor) {
		Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
		//上述过程都是计算和更新下次执行任务的时间 虽然重要但是不是本次分析重点
		this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
		if (!obtainCurrentFuture().isCancelled()) {
			//重点在这儿:这里又调用了这个schedule方法
			//那么代码又回到了上边schedule方法中。
			schedule();
		}
	}
}

4.总结

        总结一下:其实道理很简单,所有的问题都出现在ReschedulingRunnable这个类中。

        1.schedule方法中从数据库获取cron表达式计算延时时间,然后将自己作为任务对象放到JDK的延时线程池中。

        2.JDK延时线程池执行任务时调用当前对象的run方法。

        3.当前对象的run方法再次计算延时时间,调用schedule方法。

        很明显,上述三个步骤形成了一个循环,正常情况下我们的任务就会一次次的执行下去直到JVM退出。但是文章开头提到,我们的数据库连接池不够了,在第一步从数据库获取cron表达式的时候抛出了异常,导致schedule方法执行到第一行就结束了。也就是说没有将任务放到JDK的线程池中,所以后续的run方法也就不会执行了。这个循环也就结束了。

        截止到目前我们已经将问题的原委全部弄通,怎们解决这个问题合适呢?

        第一我们调整了hikari连接池的参数,核心连接数由默认10调到了30.

        第二我们调整了hikari连接的超时时间,由默认的3000ms调整到了6000ms.

        第三我们在获取cron表到式的时候通过try catch将异常捕获,如果数据库异常我们给个默认的cron即可.

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐