问题描述:这是关于Quartz服务部署的集群问题。先说结论,如果只有这一个报错可以不予理会,这是由于在Quartz集群部署引起的,原因是在集群环境中,对于同一个集群实例,只允许一个可用服务器来执行定时任务。

PS:如果想要彻底解决,方便本地调试,可以设置org.quartz.jobStore.isClustered=false。

实际过程:在单体服务中,引入了Quartz定时任务框架,且使用的是集群模式,当部署了公司的DEV环境以后,由于开发需要,还需要在本地启动服务,这就导致隔一段时间,log就会报错如下:

2021-12-13 16:16:40.886  WARN 6748 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore     : This scheduler instance (集群本机节点) is still active but was recovered by another instance in the cluster.  This may cause inconsistent behavior.

开始在网上找了些答案,大意就是说这是由于集群中机器时间不同步引起的,想想也正常,DEV环境是linux,本地开发用的windows,时间不同步很正常。

但是拿着答案去倒推,却又总感觉哪里不太对劲。索性直接从源码一点点开始看,为什么会报这个警告。

直接定位到报错的代码段开始看起,通过LocalDataSourceJobStore网上查找父类,加上打印的日志信息,可以定位到具体报错在JobStoreSupport的findFailedInstances()方法:

protected List<SchedulerStateRecord> findFailedInstances(Connection conn) throws JobPersistenceException {
        try {
            List<SchedulerStateRecord> failedInstances = new LinkedList();
            boolean foundThisScheduler = false;
            long timeNow = System.currentTimeMillis();
            //1数据库获取运行当前实例机器
            List<SchedulerStateRecord> states = this.getDelegate().selectSchedulerStateRecords(conn, (String)null);
            Iterator i$ = states.iterator();

            while(i$.hasNext()) {
                SchedulerStateRecord rec = (SchedulerStateRecord)i$.next();
                //2只有调试机器和数据库存储的机器ID符合,已下条件才为true
                if (rec.getSchedulerInstanceId().equals(this.getInstanceId())) {
                    foundThisScheduler = true;
                    if (this.firstCheckIn) {
                        failedInstances.add(rec);
                    }
                } else if (this.calcFailedIfAfter(rec) < timeNow) {
                    failedInstances.add(rec);
                }
            }

            if (this.firstCheckIn) {
                failedInstances.addAll(this.findOrphanedFailedInstances(conn, states));
            }
            //3当foundThisScheduler和firstCheckIn都为false时,错误才会打印
            if (!foundThisScheduler && !this.firstCheckIn) {
                this.getLog().warn("This scheduler instance (" + this.getInstanceId() + ") is still " + "active but was recovered by another instance in the cluster.  " + "This may cause inconsistent behavior.");
            }

            return failedInstances;
        } catch (Exception var9) {
            this.lastCheckin = System.currentTimeMillis();
            throw new JobPersistenceException("Failure identifying failed instances when checking-in: " + var9.getMessage(), var9);
        }
    }

通过代码中的123步注释看的很清楚,只有foundThisScheduler和firstCheckIn都为false时,错误才会打印出来,firstCheckIn就是判断是否是第一次进入这个代码,这个我们后续细讲。一般是为false。

foundThisScheduler则是主要依据条件,理论上来说,只要它为ture这个错误便不会打印出来。

那么如何才能获取foundThisScheduler的值呢,在代码的第二步注释:

rec.getSchedulerInstanceId().equals(this.getInstanceId())

通过代码就可以看出,就是当前本机实例ID和服务器配置ID的比较来判断的。那么服务器的配置ID哪里取得呢?我们看上面的

List<SchedulerStateRecord> states = this.getDelegate().selectSchedulerStateRecords(conn, (String)null);

ctrl+alt+B进去selectSchedulerStateRecords方法看详细情况:

public List<SchedulerStateRecord> selectSchedulerStateRecords(Connection conn, String theInstanceId) throws SQLException {
        PreparedStatement ps = null;
        ResultSet rs = null;

        try {
            List<SchedulerStateRecord> lst = new LinkedList();
            if (theInstanceId != null) {
                ps = conn.prepareStatement(this.rtp("SELECT * FROM {0}SCHEDULER_STATE WHERE SCHED_NAME = {1} AND INSTANCE_NAME = ?"));
                ps.setString(1, theInstanceId);
            } else {
                ps = conn.prepareStatement(this.rtp("SELECT * FROM {0}SCHEDULER_STATE WHERE SCHED_NAME = {1}"));
            }

            rs = ps.executeQuery();

            while(rs.next()) {
                SchedulerStateRecord rec = new SchedulerStateRecord();
                rec.setSchedulerInstanceId(rs.getString("INSTANCE_NAME"));
                rec.setCheckinTimestamp(rs.getLong("LAST_CHECKIN_TIME"));
                rec.setCheckinInterval(rs.getLong("CHECKIN_INTERVAL"));
                lst.add(rec);
            }

            LinkedList var10 = lst;
            return var10;
        } finally {
            closeResultSet(rs);
            closeStatement(ps);
        }
    }

就是一个标准的JDBC查询,查找表SCHEDULER_STATE,我们看看数据库保存的对应数据:

 它的InstanceId也就是INSTANCE_NAME和我们本机启动的“集群本机节点”完全不同,所以上面的判断条件

rec.getSchedulerInstanceId().equals(this.getInstanceId())肯定不为ture。

为了方便调试,我本地org.quartz.scheduler.instanceId设置的为:集群本机节点,而DEV服务器部署的代码设置的是自动生成AUTO模式,所以数据库查出来是uuid的形式。以此可以来区分2个不同的启动实例。

由此可以看到,由于Qutaz采取的是集群配置模式,所以当本地启动调试的时候,这个log打印是不可避免的。

也可以采取配置org.quartz.jobStore.isClustered=false来设置非集群启动。

再回到上文说到的firstCheckIn取值,我们可以看到其false值是在JobStoreSupport类中的doCheckin()执行完成以后直接赋值false的。

所以综上所述,除了设置为非集群模式,这个警告是不可避免的,这样的好处就是当部署在分布式集群下,避免了不同机器部署的Qutaz定时任务对定时资源进行无序抢占,导致任务出错。

说了细节,我们再说整体,这个整体的逻辑是如何执行的?通过断点调试,我们可以看到其断点如下:

 1、我们从第一个方法run开始看,可以看到其执行逻辑如下:

public void run() {
            while(!this.shutdown) {
                if (!this.shutdown) {
                    long timeToSleep = JobStoreSupport.this.getClusterCheckinInterval();
                    long transpiredTime = System.currentTimeMillis() - JobStoreSupport.this.lastCheckin;
                    timeToSleep -= transpiredTime;
                    if (timeToSleep <= 0L) {
                        timeToSleep = 100L;
                    }

                    if (this.numFails > 0) {
                        timeToSleep = Math.max(JobStoreSupport.this.getDbRetryInterval(), timeToSleep);
                    }

                    try {
                        Thread.sleep(timeToSleep);
                    } catch (Exception var6) {
                    }
                }

                if (!this.shutdown && this.manage()) {
                    JobStoreSupport.this.signalSchedulingChangeImmediately(0L);
                }
            }

        }

往上翻我们可以看到这其实是一个继承了Thread类的ClusterManager.class重写的run()方法。

2、其通过this.manage()调用进入JobStoreSupport类中的manage()方法:

private boolean manage() {
            boolean res = false;

            try {
                res = JobStoreSupport.this.doCheckin();
                this.numFails = 0;
                JobStoreSupport.this.getLog().debug("ClusterManager: Check-in complete.");
            } catch (Exception var3) {
                if (this.numFails % 4 == 0) {
                    JobStoreSupport.this.getLog().error("ClusterManager: Error managing cluster: " + var3.getMessage(), var3);
                }

                ++this.numFails;
            }

            return res;
        }

3、再调用我们上文提到的this.doCheckIn():

protected boolean doCheckin() throws JobPersistenceException {
        boolean transOwner = false;
        boolean transStateOwner = false;
        boolean recovered = false;
        Connection conn = this.getNonManagedTXConnection();

        try {
            List<SchedulerStateRecord> failedRecords = null;
            if (!this.firstCheckIn) {
                failedRecords = this.clusterCheckIn(conn);
                this.commitConnection(conn);
            }
...

4、调用集群检查this.clusterCheckIn(conn):

protected List<SchedulerStateRecord> clusterCheckIn(Connection conn) throws JobPersistenceException {
        List failedInstances = this.findFailedInstances(conn);

        try {
            this.lastCheckin = System.currentTimeMillis();
            if (this.getDelegate().updateSchedulerState(conn, this.getInstanceId(), this.lastCheckin) == 0) {
                this.getDelegate().insertSchedulerState(conn, this.getInstanceId(), this.lastCheckin, this.getClusterCheckinInterval());
            }

            return failedInstances;
        } catch (Exception var4) {
            throw new JobPersistenceException("Failure updating scheduler state when checking-in: " + var4.getMessage(), var4);
        }
    }

其最终调用本文开头提到的findFailedInstances()方法来实现警告日志打印。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐