This scheduler instance (...) is still active but was recovered by another instance in the cluste
问题描述:这是关于Quartz服务部署的集群问题。先说结论,如果只有这一个报错可以不予理会,这是由于在Quartz集群部署引起的,原因是在集群环境中,对于同一个集群实例,只允许一个可用服务器来执行定时任务。PS:如果想要彻底解决,方便本地调试,可以设置org.quartz.jobStore.isClustered=false。实际过程:在单体服务中,引入了Quartz定时任务框架,且使用的是集群模
问题描述:这是关于Quartz服务部署的集群问题。先说结论,如果只有这一个报错可以不予理会,这是由于在Quartz集群部署引起的,原因是在集群环境中,对于同一个集群实例,只允许一个可用服务器来执行定时任务。
PS:如果想要彻底解决,方便本地调试,可以设置org.quartz.jobStore.isClustered=false。
实际过程:在单体服务中,引入了Quartz定时任务框架,且使用的是集群模式,当部署了公司的DEV环境以后,由于开发需要,还需要在本地启动服务,这就导致隔一段时间,log就会报错如下:
2021-12-13 16:16:40.886 WARN 6748 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore : This scheduler instance (集群本机节点) is still active but was recovered by another instance in the cluster. This may cause inconsistent behavior.
开始在网上找了些答案,大意就是说这是由于集群中机器时间不同步引起的,想想也正常,DEV环境是linux,本地开发用的windows,时间不同步很正常。
但是拿着答案去倒推,却又总感觉哪里不太对劲。索性直接从源码一点点开始看,为什么会报这个警告。
直接定位到报错的代码段开始看起,通过LocalDataSourceJobStore网上查找父类,加上打印的日志信息,可以定位到具体报错在JobStoreSupport的findFailedInstances()方法:
protected List<SchedulerStateRecord> findFailedInstances(Connection conn) throws JobPersistenceException {
try {
List<SchedulerStateRecord> failedInstances = new LinkedList();
boolean foundThisScheduler = false;
long timeNow = System.currentTimeMillis();
//1数据库获取运行当前实例机器
List<SchedulerStateRecord> states = this.getDelegate().selectSchedulerStateRecords(conn, (String)null);
Iterator i$ = states.iterator();
while(i$.hasNext()) {
SchedulerStateRecord rec = (SchedulerStateRecord)i$.next();
//2只有调试机器和数据库存储的机器ID符合,已下条件才为true
if (rec.getSchedulerInstanceId().equals(this.getInstanceId())) {
foundThisScheduler = true;
if (this.firstCheckIn) {
failedInstances.add(rec);
}
} else if (this.calcFailedIfAfter(rec) < timeNow) {
failedInstances.add(rec);
}
}
if (this.firstCheckIn) {
failedInstances.addAll(this.findOrphanedFailedInstances(conn, states));
}
//3当foundThisScheduler和firstCheckIn都为false时,错误才会打印
if (!foundThisScheduler && !this.firstCheckIn) {
this.getLog().warn("This scheduler instance (" + this.getInstanceId() + ") is still " + "active but was recovered by another instance in the cluster. " + "This may cause inconsistent behavior.");
}
return failedInstances;
} catch (Exception var9) {
this.lastCheckin = System.currentTimeMillis();
throw new JobPersistenceException("Failure identifying failed instances when checking-in: " + var9.getMessage(), var9);
}
}
通过代码中的123步注释看的很清楚,只有foundThisScheduler和firstCheckIn都为false时,错误才会打印出来,firstCheckIn就是判断是否是第一次进入这个代码,这个我们后续细讲。一般是为false。
foundThisScheduler则是主要依据条件,理论上来说,只要它为ture这个错误便不会打印出来。
那么如何才能获取foundThisScheduler的值呢,在代码的第二步注释:
rec.getSchedulerInstanceId().equals(this.getInstanceId())
通过代码就可以看出,就是当前本机实例ID和服务器配置ID的比较来判断的。那么服务器的配置ID哪里取得呢?我们看上面的
List<SchedulerStateRecord> states = this.getDelegate().selectSchedulerStateRecords(conn, (String)null);
ctrl+alt+B进去selectSchedulerStateRecords方法看详细情况:
public List<SchedulerStateRecord> selectSchedulerStateRecords(Connection conn, String theInstanceId) throws SQLException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
List<SchedulerStateRecord> lst = new LinkedList();
if (theInstanceId != null) {
ps = conn.prepareStatement(this.rtp("SELECT * FROM {0}SCHEDULER_STATE WHERE SCHED_NAME = {1} AND INSTANCE_NAME = ?"));
ps.setString(1, theInstanceId);
} else {
ps = conn.prepareStatement(this.rtp("SELECT * FROM {0}SCHEDULER_STATE WHERE SCHED_NAME = {1}"));
}
rs = ps.executeQuery();
while(rs.next()) {
SchedulerStateRecord rec = new SchedulerStateRecord();
rec.setSchedulerInstanceId(rs.getString("INSTANCE_NAME"));
rec.setCheckinTimestamp(rs.getLong("LAST_CHECKIN_TIME"));
rec.setCheckinInterval(rs.getLong("CHECKIN_INTERVAL"));
lst.add(rec);
}
LinkedList var10 = lst;
return var10;
} finally {
closeResultSet(rs);
closeStatement(ps);
}
}
就是一个标准的JDBC查询,查找表SCHEDULER_STATE,我们看看数据库保存的对应数据:
它的InstanceId也就是INSTANCE_NAME和我们本机启动的“集群本机节点”完全不同,所以上面的判断条件
rec.getSchedulerInstanceId().equals(this.getInstanceId())肯定不为ture。
为了方便调试,我本地org.quartz.scheduler.instanceId设置的为:集群本机节点,而DEV服务器部署的代码设置的是自动生成AUTO模式,所以数据库查出来是uuid的形式。以此可以来区分2个不同的启动实例。
由此可以看到,由于Qutaz采取的是集群配置模式,所以当本地启动调试的时候,这个log打印是不可避免的。
也可以采取配置org.quartz.jobStore.isClustered=false来设置非集群启动。
再回到上文说到的firstCheckIn取值,我们可以看到其false值是在JobStoreSupport类中的doCheckin()执行完成以后直接赋值false的。
所以综上所述,除了设置为非集群模式,这个警告是不可避免的,这样的好处就是当部署在分布式集群下,避免了不同机器部署的Qutaz定时任务对定时资源进行无序抢占,导致任务出错。
说了细节,我们再说整体,这个整体的逻辑是如何执行的?通过断点调试,我们可以看到其断点如下:
1、我们从第一个方法run开始看,可以看到其执行逻辑如下:
public void run() {
while(!this.shutdown) {
if (!this.shutdown) {
long timeToSleep = JobStoreSupport.this.getClusterCheckinInterval();
long transpiredTime = System.currentTimeMillis() - JobStoreSupport.this.lastCheckin;
timeToSleep -= transpiredTime;
if (timeToSleep <= 0L) {
timeToSleep = 100L;
}
if (this.numFails > 0) {
timeToSleep = Math.max(JobStoreSupport.this.getDbRetryInterval(), timeToSleep);
}
try {
Thread.sleep(timeToSleep);
} catch (Exception var6) {
}
}
if (!this.shutdown && this.manage()) {
JobStoreSupport.this.signalSchedulingChangeImmediately(0L);
}
}
}
往上翻我们可以看到这其实是一个继承了Thread类的ClusterManager.class重写的run()方法。
2、其通过this.manage()调用进入JobStoreSupport类中的manage()方法:
private boolean manage() {
boolean res = false;
try {
res = JobStoreSupport.this.doCheckin();
this.numFails = 0;
JobStoreSupport.this.getLog().debug("ClusterManager: Check-in complete.");
} catch (Exception var3) {
if (this.numFails % 4 == 0) {
JobStoreSupport.this.getLog().error("ClusterManager: Error managing cluster: " + var3.getMessage(), var3);
}
++this.numFails;
}
return res;
}
3、再调用我们上文提到的this.doCheckIn():
protected boolean doCheckin() throws JobPersistenceException {
boolean transOwner = false;
boolean transStateOwner = false;
boolean recovered = false;
Connection conn = this.getNonManagedTXConnection();
try {
List<SchedulerStateRecord> failedRecords = null;
if (!this.firstCheckIn) {
failedRecords = this.clusterCheckIn(conn);
this.commitConnection(conn);
}
...
4、调用集群检查this.clusterCheckIn(conn):
protected List<SchedulerStateRecord> clusterCheckIn(Connection conn) throws JobPersistenceException {
List failedInstances = this.findFailedInstances(conn);
try {
this.lastCheckin = System.currentTimeMillis();
if (this.getDelegate().updateSchedulerState(conn, this.getInstanceId(), this.lastCheckin) == 0) {
this.getDelegate().insertSchedulerState(conn, this.getInstanceId(), this.lastCheckin, this.getClusterCheckinInterval());
}
return failedInstances;
} catch (Exception var4) {
throw new JobPersistenceException("Failure updating scheduler state when checking-in: " + var4.getMessage(), var4);
}
}
其最终调用本文开头提到的findFailedInstances()方法来实现警告日志打印。
更多推荐
所有评论(0)