Flink开发问题汇总

一、flink里面能调用图算法吗?
二、Cannot instantiate user function
三、本地idea开发flink程序并本地运行读取HDFS文件
四、The assigned slot container_e08_1539148828017_15937_01_003564_0 was removed
五、java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_e03_1553154249286_347659_01_000011 timed out.
六、could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[Int]
七、flink中使用event事件以及process处理状态数据
八、flink读取多个权限topic(>=1)
九、flink内部使用一个外部类调用其接口报错未初始化
十、The end timestamp of an event-time window cannot become earlier than the current watermark by merging
————————————————
Flink提交运行中常见问题总结

一、提交jar到Flink集群时候出现异常

提交jar到Flink集群时候出现:
java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem: Provider org.apache.hadoop.fs.viewfs.ViewFileSystem could not be instantiated

可能原因解释:

出现该问题,往往是由于没有争取读取到hadoop中配置信息,

解决方法:

  1. HADOOP_HOME=/hadoop/hadoop-2.7.2

  2. HADOOP_CONF_DIR=/hadoop/hadoop-2.7.2/etc/hadoop/

  3. FLINK_HOME=/home/flink-1.4.2

同时建议使用Flink提供的官方flink-hadoop依赖包,自身提供的hadoop的jar包坑不全
在maven中pom.xml中加入如下依赖
使用不同的flink官方包:如果要做checkpoint功能,需要此时需要配置

  1. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2 -->

  2. <dependency>

  3. <groupId>org.apache.flink</groupId>

  4. <artifactId>flink-shaded-hadoop2</artifactId>

  5. <version>1.4.2</version>

  6. </dependency>

ps:其他的maven中的依赖项,见我的另外一篇博客:
在Flink集群搭建和使用中遇到的坑_fct2001140269的博客-CSDN博客_flink 集群使用

二、提交执行Flink的jar包时候,出现错误[Flink JobExecutionException: akka.client.timeout]

提交执行Flink的jar包时候,出现错误[Flink JobExecutionException: akka.client.timeout]

问题分析:

表面来看就是你的工程没有在规定的时间内(Flink集群默认配置中是60s),可能是你的工程项目比较大,jobManager进程难以在规定的短时间内完成Flink的逻辑topology图的构建,或者难以在规定的时间内,完成各个算子(例如:richMapFunction())的初始化等工作;

解决办法:

建议在集群中配置如下选型:增加job作业提交允许的构建和初始化读取资源的时间。
 

  1. -akka.client.timeout:600s

  2. -akka.ask.timeout:600s

重新提交jar包到集群中,等待一段时间可以在web-ui上查看到任务提交情况。(如果失败,可能还有其他原因导致到异常!)

三、启动不起来

查看JobManager日志:

  1. WARN org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to retrieve leader gateway and port.

  2. akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@t-sha1-flk-01:6123/), Path(/user/jobmanager)]

  3. at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)

  4. at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)

  5. at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)

  6. at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)

  7. at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)

  8. at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)

  9. at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)

  10. at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)

  11. at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)

  12. at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)

  13. at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)

  14. at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)

  15. at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)

  16. at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)

  17. at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)

  18. at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)

  19. at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)

  20. at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)

  21. at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)

  22. at java.lang.Thread.run(Thread.java:748)

 解决方案:

/etc/hosts中配置的主机名都是小写,但是在Flink配置文件(flink-config.yaml、masters、slaves)中配置的都是大写的hostname,将flink配置文件中的hostname都改为小写或者IP地址。

四、运行一段时间退出

  1. AsynchronousException{java.lang.Exception: Could not materialize checkpoint 4 for operator Compute By Event Time (1/12).}

  2. at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)

  3. at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

  4. at java.util.concurrent.FutureTask.run(FutureTask.java:266)

  5. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

  6. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

  7. at java.lang.Thread.run(Thread.java:748)

  8. Caused by: java.lang.Exception: Could not materialize checkpoint 4 for operator Compute By Event Time (1/12).

  9. ... 6 more

  10. Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=7061809 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.

  11. at java.util.concurrent.FutureTask.report(FutureTask.java:122)

  12. at java.util.concurrent.FutureTask.get(FutureTask.java:192)

  13. at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)

  14. at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)

  15. ... 5 more

  16. Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.

  17. at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)

  18. at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)

  19. at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)

  20. ... 5 more

  21. Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=7061809 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.

  22. at java.util.concurrent.FutureTask.report(FutureTask.java:122)

  23. at java.util.concurrent.FutureTask.get(FutureTask.java:192)

  24. at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)

  25. at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)

  26. at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)

  27. ... 7 more

  28. Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=7061809 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.

  29. at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:64)

  30. at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetBytes(MemCheckpointStreamFactory.java:144)

  31. at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetHandle(MemCheckpointStreamFactory.java:125)

  32. at org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)

  33. at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351)

  34. at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)

  35. at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)

  36. at java.util.concurrent.FutureTask.run(FutureTask.java:266)

  37. at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:372)

  38. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)

  39. at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)

  40. at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)

  41. at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)

  42. at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)

  43. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)

  44. at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)

  45. at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)

  46. at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)

  47. at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)

  48. at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)

  49. at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)

  50. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)

  51. ... 1 more

  52. [CIRCULAR REFERENCE:java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=7061809 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.]

 解决方案:

   状态存储,默认是在内存中,改为存储到HDFS中:

state.backend.fs.checkpointdir: hdfs://t-sha1-flk-01:9000/flink-checkpoints

五、长时间运行后,多次重启

  1. AsynchronousException{java.lang.Exception: Could not materialize checkpoint 1488 for operator Compute By Event Time -> (MonitorData, MonitorDataMapping, MonitorSamplingData) (6/6).}

  2. at org.apache.flink.streaming.runtime.tasks.StreamTaskAsyncCheckpointRunnable.run(StreamTask.java:948)atjava.util.concurrent.Executors'>[Math Processing Error]AsyncCheckpointRunnable.run(StreamTask.java:948)atjava.util.concurrent.ExecutorsAsyncCheckpointRunnable.run(StreamTask.java:948) at java.util.concurrent.ExecutorsRunnableAdapter.call(Executors.java:511)

  3. at java.util.concurrent.FutureTask.run(FutureTask.java:266)

  4. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

  5. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

  6. at java.lang.Thread.run(Thread.java:748)

  7. Caused by: java.lang.Exception: Could not materialize checkpoint 1488 for operator Compute By Event Time -> (MonitorData, MonitorDataMapping, MonitorSamplingData) (6/6).

  8. ... 6 more

  9. Caused by: java.util.concurrent.ExecutionException: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /flink-checkpoints/8c274785f1ab027e6146a59364be645f/chk-1488/2c612f30-c57d-4ede-9025-9554ca11fd12 could only be replicated to 0 nodes instead of minReplication (=1). There are 3 datanode(s) running and no node(s) are excluded in this operation.

  10. at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1628)

  11. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3121)

  12. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3045)

  13. at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725)

  14. at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:493)

  15. at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtosClientNamenodeProtocol'>[Math Processing Error]ClientNamenodeProtocolClientNamenodeProtocol2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

  16. at org.apache.hadoop.ipc.ProtobufRpcEngineServer'>[Math Processing Error]ServerServerProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)

  17. at org.apache.hadoop.ipc.RPCServer.call(RPC.java:982)atorg.apache.hadoop.ipc.Server'>[Math Processing Error]Server.call(RPC.java:982)atorg.apache.hadoop.ipc.ServerServer.call(RPC.java:982) at org.apache.hadoop.ipc.ServerHandler1.run(Server.java:2217)atorg.apache.hadoop.ipc.Server'>[Math Processing Error]1.run(Server.java:2217)atorg.apache.hadoop.ipc.Server1.run(Server.java:2217) at org.apache.hadoop.ipc.ServerHandler1.run(Server.java:2213)atjava.security.AccessController.doPrivileged(NativeMethod)atjavax.security.auth.Subject.doAs(Subject.java:422)atorg.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)atorg.apache.hadoop.ipc.Server'>[Math Processing Error]1.run(Server.java:2213)atjava.security.AccessController.doPrivileged(NativeMethod)atjavax.security.auth.Subject.doAs(Subject.java:422)atorg.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)atorg.apache.hadoop.ipc.Server1.run(Server.java:2213) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746) at org.apache.hadoop.ipc.ServerHandler.run(Server.java:2213)

 查看hdfs日志:

  1. WARN org.apache.hadoop.hdfs.protocol.BlockStoragePolicy:

  2. Failed to place enough replicas: expected size is 2 but only 0 storage types can be selected

  3. (replication=3, selected=[], unavailable=[DISK], removed=[DISK, DISK],

  4. policy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]})

  搭建的Flink使用HDFS作为CheckPoint的存储,当flink重启时,原来的checkpoint没有用了,我就手动给删了,不知道和这个有没有关系,为了不继续报异常,便重启了Flink、HDFS,重启后不再有异常信息了。

   但是查看HDFS日志时,发现如下警告(不合规范的URI格式):

  1. WARN org.apache.hadoop.hdfs.server.common.Util:

  2. Path /mnt/hadoop/dfs/name should be specified as a URI in configuration files.

  3. Please update hdfs configuration

   原来是配置错了,hdfs-site.xml中的

  1. <property>

  2. <name>dfs.namenode.name.dir</name>

  3. <value>/mnt/hadoop/dfs/name</value>

  4. </property>

应该改为:

  1. <property>

  2. <name>dfs.namenode.name.dir</name>

  3. <value>file:/mnt/hadoop/dfs/name</value>

  4. </property>

  至此问题解决,根上的问题应该是hdfs-site.xml配置的不对导致的。 

六、Unable to load native-hadoop library for your platform

Flink启动时,有时会有如下警告信息:

  1. WARN org.apache.hadoop.util.NativeCodeLoader

  2. - Unable to load native-hadoop library for your platform...

  3. using builtin-java classes where applicable

  参考资料1:http://blog.csdn.net/jack85986370/article/details/51902871

  解决方案:

编辑/etc/profile文件,增加

  1. export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native

  2. export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib"

  未能解决该问题

七、hadoop checknative -a 

  1. WARN bzip2.Bzip2Factory: Failed to load/initialize native-bzip2 library system-native, will use pure-Java version

  2. INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library

  3. Native library checking:

  4. hadoop: true /usr/hadoop-2.7.3/lib/native/libhadoop.so.1.0.0

  5. zlib: true /lib64/libz.so.1

  6. snappy: false

  7. lz4: true revision:99

  8. bzip2: false

  9. openssl: false Cannot load libcrypto.so (libcrypto.so: cannot open shared object file: No such file or directory)!

  10. INFO util.ExitUtil: Exiting with status 1

    参考资料:http://blog.csdn.net/zhangzhaokun/article/details/50951238

解决方案:

  1. cd /usr/lib64/

  2. ln -s libcrypto.so.1.0.1e libcrypto.so

八、TaskManager退出

   Flink运行一段时间后,出现TaskManager退出情况,通过jvisualvm抓取TaskManager的Dump,使用MAT进行分析,结果如下:

  1. One instance of "org.apache.flink.runtime.io.network.buffer.NetworkBufferPool"

  2. loaded by "sun.misc.Launcher$AppClassLoader @ 0x6c01de310" occupies 403,429,704 (76.24%) bytes.

  3. The memory is accumulated in one instance of "java.lang.Object[]" loaded by "<system class loader>".

  4. Keywords

  5. sun.misc.Launcher$AppClassLoader @ 0x6c01de310

  6. java.lang.Object[]

  7. org.apache.flink.runtime.io.network.buffer.NetworkBufferPool

 发现是网络缓冲池不足,查到一篇文章:

 https://issues.apache.org/jira/browse/FLINK-4536

 和我遇到的情况差不多,也是使用了InfluxDB作为Sink,最后在Close里进行关闭,问题解决。

 另外,在$FLINK_HOME/conf/flink-conf.yaml中,也有关于TaskManager网络栈的配置,暂时未调整。

  1. # The number of buffers for the network stack.

  2. #

  3. # taskmanager.network.numberOfBuffers: 2048

九、Kafka partition leader切换导致Flink重启

现象:

 7.1 Flink重启,查看日志,显示:

  1. java.lang.Exception: Failed to send data to Kafka: This server is not the leader for that topic-partition.

  2. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)

  3. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:280)

  4. at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)

  5. at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)

  6. at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)

  7. at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)

  8. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)

  9. at java.lang.Thread.run(Thread.java:748)

  10. Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

7.2 查看Kafka的Controller日志,显示:

  1. INFO [SessionExpirationListener on 10], ZK expired; shut down all controller components and

  2. try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)

7.3 设置retries参数

   参考:http://colabug.com/122248.html 以及 Kafka官方文档(http://kafka.apache.org/082/documentation.html#producerconfigs),关于producer参数设置

   设置了retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做3次尝试:

  1. kafkaProducerConfig

  2. {

  3. "bootstrap.servers": "192.169.2.20:9093,192.169.2.21:9093,192.169.2.22:9093"

  4. "retries":3

  5. }

     

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐