Flink Checkpoint超时问题

问题现象

业务部门最近使用Flink来做数据实时同步,通过同步工具把CDC消息接入Kafka,其中上百张表同步到单个topic里,然后通过Flink来消费Kafka,做数据解析、数据分发、然后发送到目标数据库(mysql/oracle),整个链路相对比较简单,之前通过Jstorm来实现,最近才迁移到Flink,通过Flink DataStream API来实现。代码里仅用到Kafka Source、Map、Process几个简单的算子,发送目标库的逻辑在Process的逻辑里实现,因此process的逻辑里涉及数据库连接的创建与清理、通过队列来缓存数据,创建额外线程来启动发送和消费队列的逻辑,先不说整个逻辑是否合理,本文主要基于此案例来阐述遇到的问题和排查思路以及解决方法。
在这里插入图片描述

业务部门使用的Flink版本为1.11.2,部署模式采用Standalone。出问题的是单机环境,即有一个JobManager进程和一个TaskManager进程。

问题现象是通过web页面观察发现启动任务后很短时间任务就发生重启,同时还会出现重启去Cancel任务的时候无法Cancel,一直处于CANCELING状态(正常会很快变成CANCELED)。并且过一段时候后TaskManager进程挂掉,导致任务一直处于无法申请Slot的状态,最终导致数据无法正常同步。因此,问题主要有以下几个现象:

  1. Checkpoint超时
  2. 子任务长时间处于CANCELING,任务长时间处于RESTARTING状态
  3. 一段时间后TaskManager进程挂掉
  4. 数据无法正常同步

问题分析

问题1:TaskManager进程挂掉

看到问题的第一反应是首先看TaskManager进程为什么会挂掉,这个问题比较严重,因为涉及到集群层面而不单单是任务了。查看Taskmanager日志,发现有以下片段:

// 日志1
ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Task did not exit gracefully within 180 + seconds.
org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
	at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) [flink-dist_2.11-1.11.2.jar:1.11.2]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
2021-03-05 04:09:30,816 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Fatal error occurred while executing the TaskManager. Shutting it down...
org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
	at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) [flink-dist_2.11-1.11.2.jar:1.11.2]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
// 日志2
WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Task 'Source: Custom Source -> Map -> Process (1/1)' did not react to cancelling signal for 30 seconds, but is stuck in method:
 org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:91)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
java.lang.Thread.run(Thread.java:748)

看日志1发现有关键日志Task did not exit gracefully within 180 + seconds打印,于是查看Flink源码,查看包含此日志的代码。

private static class TaskCancelerWatchDog implements Runnable {
    @Override
    public void run() {
        try {
            final long hardKillDeadline = System.nanoTime() + timeoutMillis * 1_000_000;
            long millisLeft;
            while (executerThread.isAlive()
                    && (millisLeft = (hardKillDeadline - System.nanoTime()) / 1_000_000) > 0) {
                try {
                    executerThread.join(millisLeft);
                } catch (InterruptedException ignored) {
                }
            }
            if (executerThread.isAlive()) {
                String msg = "Task did not exit gracefully within " + (timeoutMillis / 1000) + " + seconds.";
                taskManager.notifyFatalError(msg, new FlinkRuntimeException(msg));
            }
        } catch (Throwable t) {
        }
    }
}

可以看到TaskCancelerWatchDog是用来监听Cancel任务是否成功的线程,如果超过timeoutMillis执行线程还处理alive状态,则向TaskManager进程抛出FatalError,而这个timeoutMillis是通过task.cancellation.timeout参数来指定,默认是180s,如果指定为0则不开启这个功能。

日志2涉及的源码如下:

private static final class TaskInterrupter implements Runnable {
    @Override
    public void run() {
        try {
            executerThread.join(interruptIntervalMillis);
            while (task.shouldInterruptOnCancel() && executerThread.isAlive()) {
                //
                log.warn("Task '{}' did not react to cancelling signal for {} seconds, but is stuck in method:\n {}",
                        taskName, (interruptIntervalMillis / 1000), bld);
                executerThread.interrupt();
                try {
                    executerThread.join(interruptIntervalMillis);
                } catch (InterruptedException e) {
                    // we ignore this and fall through the loop
                }
            }
        } catch (Throwable t) {
            // FatalError
        }
    }
}

TaskInterrupter是用来中断执行线程的线程,这个日志可以看出需要Cancel但还在执行的线程的堆栈信息。从以上两段日志可以看出,TaskManager进程挂掉的原因是由于任务在180s内没被正常Cancel导致。为了防止TaskManager进程挂掉,我们添加参数task.cancellation.timeout: 0

问题2:任务长时间处于CANCELING

显然,问题1是由于问题2导致的,那问题2是什么原因导致的呢,从问题1的日志堆栈可以看到在执行StreamTask.invoke,此堆栈好像也没提供比较有用的信息。我们只能猜测,某个Task在执行Cancel的时候未被Cancel掉,可能是因为某种原因hang住导致,下面再进一步分析。但还有一个问题是这个Task是什么原因需要去执行Cancel操作呢?因为没有人为的去执行Cancel操作,所以肯定是Flink自己去Cancel的,具体的原因继续往下看,我们发现standalonesession(JobManager日志)日志里存在checkpoint超时的错误日志,关键信息为expired before completing

INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint 58 of job f46ee0d14fe0e6f91253e78487796f5b expired before completing.
INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
	at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_181]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_181]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_181]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_181]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]

因此接着看第三个问题

问题3:Checkpoint超时

expired before completing的日志意味着checkpoint发生超时,确认任务配置参数,配置如下:

execution.checkpointing.interval: 60000
execution.checkpointing.timeout: 60000
execution.checkpointing.max-concurrent-checkpoints: 500
execution.checkpointing.min-pause: 500

一开始以为checkpoint超时时间设置太短,于是增大超时时间到30分钟,但从web界面发现,大量checkpoint处于pendding状态,最终还会超时。因为未设置execution.checkpointing.tolerable-failed-checkpoints,因此一旦发生超时,任务将会发生重启。

看代码和日志都看不出个所以然,只能查看TaskManager进程的堆栈来排查了,目的是看下发生checkpoint超时的时候内部线程运行情况是怎么样的。Flink1.11.2也提供了web界面查看stack的功能,但相比jstack命令打印的还是有点区别,这里还是采用jstack -l id > id.jstack的方式来进行排查。
在这里插入图片描述

查看stack发现有处于BLOCKED状态的线程

"Source: Custom Source -> Map -> mysqlmsg (1/1)" #77 prio=5 os_prio=0 tid=0x00007fb560080800 nid=0x7c3b waiting for monitor entry [0x00007fb539609000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:91)
	- waiting to lock <0x0000000702a61a58> (a java.lang.Object)

接着查看0x0000000702a61a58对应的线程

"Legacy Source Thread - Source: Custom Source -> Map -> mysqlmsg (1/1)" #82 prio=5 os_prio=0 tid=0x00007fb48c485000 nid=0x7c4e in Object.wait() [0x00007fb47f3fd000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at org.apache.ibatis.datasource.pooled.PooledDataSource.popConnection(PooledDataSource.java:451)
	- locked <0x000000070396d180> (a org.apache.ibatis.datasource.pooled.PoolState)
	at org.apache.ibatis.datasource.pooled.PooledDataSource.getConnection(PooledDataSource.java:90)
	at org.apache.ibatis.transaction.jdbc.JdbcTransaction.openConnection(JdbcTransaction.java:139)
	at org.apache.ibatis.transaction.jdbc.JdbcTransaction.getConnection(JdbcTransaction.java:61)
	at org.apache.ibatis.executor.BaseExecutor.getConnection(BaseExecutor.java:338)
	at org.apache.ibatis.executor.SimpleExecutor.prepareStatement(SimpleExecutor.java:84)
	at org.apache.ibatis.executor.SimpleExecutor.doQuery(SimpleExecutor.java:62)
	at org.apache.ibatis.executor.BaseExecutor.queryFromDatabase(BaseExecutor.java:326)
	at org.apache.ibatis.executor.BaseExecutor.query(BaseExecutor.java:156)
	at org.apache.ibatis.executor.CachingExecutor.query(CachingExecutor.java:109)
	at org.apache.ibatis.executor.CachingExecutor.query(CachingExecutor.java:83)
	at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:148)
	at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:141)
	at org.apache.ibatis.session.defaults.DefaultSqlSession.selectOne(DefaultSqlSession.java:77)
	at org.apache.ibatis.binding.MapperMethod.execute(MapperMethod.java:83)
	at org.apache.ibatis.binding.MapperProxy.invoke(MapperProxy.java:59)
	at com.sun.proxy.$Proxy35.selectByPrimaryKey(Unknown Source)
//省略
       org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
	- locked <0x0000000702a61a58> (a java.lang.Object)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
	- locked <0x0000000702a61a58> (a java.lang.Object)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

   Locked ownable synchronizers:
	- None

可以看到PooledDataSource.popConnection一直在阻塞,即在获取连接时阻塞了。于是查看初始化连接池的配置,没有配置poolMaximumActiveConnections,即默认最大连接数为10,而代码在ProcessFunctionprocessElement方法里采用短连接方式获取数据库连接,每次来一波数据都创建连接,发送完断开连接。因此很容易因为获取不到连接而使得processElement方法处于阻塞状态。而processElement方法阻塞进而影响Barrier的流动,所以导致了Checkpoint发生超时。

问题4:数据无法正常同步

基于以上几个问题的定位,这个问题就很好解释了,首先由于阻塞导致了Checkpoint发生超时(问题3),然后导致任务重启,在重启时由于阻塞的线程hang住无法Cancel(问题2),由于TaskCancelerWatchDog的存在导致超过默认时间180s后TaskManager挂掉(问题1)。最后导致了问题4数据无法正常同步。

解决思路

原因定位清楚了,离解决问题就近在咫尺了,可以采用几种方式:

  • 增加最大活跃线程数poolMaximumActiveConnections
  • 采用长连接,在open时初始化连接,close方法销毁连接;
  • 不用另外开启连接,直接采用flink-jdbc-connector来发送数据,因为数据源涉及上百张表,需要有分流的操作。

总结

本文基于实时同步任务遇到无法正常同步的问题进行排查分析,旨在提供一种当遇到Flink Checkpoint超时问题时的排查思路,同时也顺便介绍了在Standalone部署模式下运行Flink任务的一种典型问题-TaskManager无缘无故挂掉的问题,希望给正在使用Flink的同学提供一种思路,避免踩坑。

参考文档

jstack详解
MyBatis-内置DataSource实现

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐