Expiring XXX record(s) for XXX:120015 ms has passed since batch creation

  1. 问题背景:dws曝光人+场模型聚合压测,憋量20亿左右数据;
  2. 问题发生现象:flink job启动后,频繁发生checkpoint失败,并且checkpoint失败原因 :Failure reason: Checkpoint was declined.
  3. 问题现场日志:
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 8 for operator aggregate -> Sink: exp sink (86/160). Failure reason: Checkpoint was declined.at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434) ...
Caused by: org.apache.flink.util.SerializedThrowable: Failed to send data to Kafka: Expiring 2483 record(s) for 【topic_name】-85:120015 ms has passed since batch creation ...
Caused by: org.apache.flink.util.SerializedThrowable: Expiring 2483 record(s) for 【topic_name】-85:120015 ms has passed since batch creation ...
org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
  1. 问题发生原因描述:
    问题的根本原因是kafka消息发送是批量发送,ProducerRecord会先存储到本地buffer,消息存储在这个buffer里的时长是有限制的【request.timeout.ms】,因此在消息量级比较大,存储在buffer里的消息,超过了request.timeout.ms这个设置时长,就会报上述Expiring XXX record(s) for XXX:120015 ms has passed since batch creation错误;而与此同时,我们开启了端到端的精准一次特性即事务,此时checkpoint与消息的pre commit绑定,pre commit 失败,导致checkpoint的失败,任务重启,大量消息积压;
  2. 问题解决方案:
    a)调整 request.timeout.ms 这个参数去满足需求,让消息在buffer里待更长的时间;
    b)我们公司会给与每个生产者限速,可以提升生产者的速度,这样本地缓存的消息就不会产生积压;
  3. checkpoint失败现场截图,表现为某一个或者多个并行度checkpoint失败:
    在这里插入图片描述
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐