问题解决方案参考 https://www.bbsmax.com/A/RnJW4vkE5q/
问题:
使用FlinkKafkaProducer进行数据生产,数据只写到了kafka的部分分区中,其它的分区没有数据写入
原因:
原因1:Flink写kafka使用的机制与原生接口的写入方式是有差别的,在默认情况下,Flink使用了”并行度编号+分区数量”取模计算的结果作为topic的分区编号。
1.并行度%分区数量=0,表示并行度是kafkatopic分区数的一倍或者多倍,数据的写入每个分区数据量是均衡的。
2.并行度%分区数量≠0,那么数据量势必会在个别分区上的数据量产生倾斜。
原因2:在业务代码的部分算子中使用了keyby()方法,每个key值所属的数据量不一致(就是说某些key的数据量会非常大,有些又非常小)导致每个并行度中输出的数据流量不一致,从而出现数据倾斜。
解决办法:
原因一:
方法1,调整kafka的分区数跟flink的并行度保持一致,即要求kafka的分区数与flink写kafka的sink并行度保持强一致性。这种做法的优势在于每个并行度仅需要跟每个kafka分区所在的 broker保持一个常链接即可。能够节省每个并发线程与分区之间调度的时间。
方法2,flink写kafka的sink的分区策略写成随机写入模式,这样数据会随即写入topic的分区中,但是会有一部分时间损耗在线程向寻址,推荐使用方法1。
原因二:
需要调整业务侧对key值的选取,例如:可以将key调整为“key+随机数”的方式,保证Flink的keyby()算子中每个处理并行度中的数据是均衡的。

问题:
Flink任务的日志目录增长过快,导致磁盘写满,经过排查发现是taskmanager.out文件过大导致
原因分析:
原因一:代码中存在大量的print模块,导致taskmanager.out文件被写入大量的日志信息,taskmanager.out 一般是,业务代码加入了 .print的代码或system.out.println代码,需要在代码中排查是否有类似于以下的代码逻辑
在这里插入图片描述

原因二:用户误将 logback-classic + logback-core 打jar包时打进包中,使得 flink slf4j 桥接到 logback,但是打印的包中未提供正确的logback.xml或者没有logback.xml文件
在启动的任务可看出到jobmanager.err或者taskmanager.err中可看到:

SLF4J: Class path contains multiple SLF4J bindings. 
 SLF4J: Found binding in [jar:file:/D:/mvn_repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] 
 SLF4J: Found binding in [jar:file:/D:/mvn_repository/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] 
 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 
 SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]

slf4j-api.jar里的org.slf4j.LoggerFactory会调org.slf4j.impl.StaticLoggerBinder,这个StaticLoggerBinder在logback-classic.jar里实现,这样就把slf4j的日志绑定到logback了。这时,如果classpath里同时还有slf4j-log4j12.jar那么会报multiple SLF4j bindings错误,因为slf4j-log4j12.jar里也有StaticLoggerBinder实现,从日志中,我们看到 slf4j 找到了多个桥接器实现类,但是最终选择了 logback
如果 slf4j 选择的是 logback,而你没有相应的 logback.xml 配置文件的话,则 logback 将采用默认方式,将日志输出到 stdout。即使flink/conf下有ogback.xml文件也不会选择,此时默认的

<root level="DEBUG">  
       <appender-ref ref="STDOUT" />  
 </root>

导致taskmanager.out和jobmanager.out中存在大量DEBUG日志
如果 sfl4j 选择的是 log4j.properites,而你没有相应的 log4j.properties 而配置文件的话,则 log4j 将采用默认方式,将日志输出到 stdout 中。同理,用户自己代码中的 resources 目录中,误将 debug 的 log4j.properties 文件带入,覆盖了 conf 下面的 log4j.properties,导致 log4j 初始化不符合预期

解决方案:
原因一:
打包时,将代码中 .print 和 system.out.println的代码删掉 或注释掉
原因二:
选择log4j,使用 slf4j + slf4j-log4j12+log4j 作为 flink 的日志框架,配置文件采用 conf/log4j.properties,打包时,将jar包中log的相关依赖排除

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>3.1.0</version>
    <configuration>
        <artifactSet>
            <excludes>
                <exclude>org.slf4j:*</exclude>
                <exclude>log4j:*</exclude>
                <exclude>ch.qos.logback:*</exclude>
                <exclude>log4j.properties</exclude>
            </excludes>
        </artifactSet>
    </configuration>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <filters>
                    <filter>
                        <artifact>*:*</artifact>
                        <excludes>
                            <exclude>META-INF/*.SF</exclude>
                            <exclude>META-INF/*.DSA</exclude>
                            <exclude>META-INF/*.RSA</exclude>
                        </excludes>
                    </filter>
                </filters>
            </configuration>
        </execution>
    </executions>
</plugin>

jar包传到服务器后,可用以下命令查看jar包中是否剔除对应log查看jar包中包含类的方式

jar -tf test.jar | grep -i logback

如果发现剔除不完全,可使用idea的maven Helper插件,点击Dependency Analyzer来exclude

启动作业后可看到:

SLF4J: Found binding in [jar:file:/D:/mvn_repository/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] 
 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 
 SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

补充flink官网可看
Flink 附带以下默认日志配置文件:
log4j-cli.properties:由 Flink 命令行客户端使用(例如 flink run)(启动application时终端打印的日志)
log4j-session.properties:Flink 命令行客户端在启动 YARN 或 Kubernetes session 时使用(yarn-session.sh,kubernetes-session.sh)
log4j.properties:作为 JobManager/TaskManager 日志配置使用(standalone 和 YARN 两种模式下皆使用)
使用flink run这种方式提交任务,会自动去FLINK_HOME下的conf目录下找log4j.properties的文件作为jobmanager和taskmanager的日志配置

flink/conf/log4j.properties:

appender.main.name = MainAppender
appender.main.type = RollingFile
appender.main.append = true
appender.main.fileName = ${sys:log.file}
appender.main.filePattern = ${sys:log.file}.%i
appender.main.layout.type = PatternLayout
appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.main.policies.type = Policies
appender.main.policies.size.type = SizeBasedTriggeringPolicy
appender.main.policies.size.size = 100MB
appender.main.policies.startup.type = OnStartupTriggeringPolicy
appender.main.strategy.type = DefaultRolloverStrategy
appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}

1.以上设置每 100 MB 滚动一次 jobmanager.log 和 taskmanager.log,并保留旧日志文件 7 天,或在总大小超过 5000MB 时删除最旧的日志文件
2.以上log4j.properties 不控制jobmanager.err/out 和taskmanaer.err/out,如果您的应用程序显式打印任何结果到stdout/stderr,您可能会在长时间运行后填满文件系统。我们建议您利用 log4j 日志框架来记录任何消息,或打印任何结果

问题:
算子的部分节点产生背压,其它节点正常

解决办法:
1.页面找到背压节点,找到具体算子,用disableOperatorChaining()方法拆开算子链逐个分析;
2.一个节点启动多个slot,一个taskmanager中设置了多个slot,避免多个taskmanager出现在一个nodemanager节点上

问题:
任务启动失败,任务启动一段时间后报错,报资源不足:Could not allocate all requires slots within timeout of xxxx ms

解决办法:
Flink任务在启动过程中的资源使用是先增长再下降到当前值的,实际在启动过程中需要的资源量等于每个算子并行度之和。等到任务开始运行后,Flink会对资源进行合并。

例如如下算子,在启动过程中需要“1+6+6+5+3=21”个资源。
描述
但是运行稳定后会降低到6。这个是Flink的机制。假如任务在启动过程中不满足21个资源的启动资源量,任务就会出现NoResourceAvailableException的异常。

Flink试图将工作负载分散到所有当前可用的TMs上,
在flink-conf.yaml文件中设置cluster.evenly-spread-out-slots: true

flink run启动命令解读:
./flink run -m yarn-cluster -p 1 -ys 3 …/examples/streaming/WindowJoin.jar
仅会启动一个taskmanager,分配三个slot
./flink run -m yarn-cluster -yjm 1024m -ytm 1024m -yqu default -ynm test_app -yD taskmanager.memory.managed.size=12365b -yD env.java.opts=“-Dflink_per_job_name=testjob” -p 20 -ys 5 -c com.test.JobApp …/examples/streaming/WindowJoin.jar -yd
一般-p大于-ys, taskmanager的个数=p/s
-p: 指定并行度
-ys: 指定每个taskmanager的slot个数
-yqu: 指定队列
-ynm: 指定启动的application名
-m: 指定启动模式
-ytm: 指定每个taskmanager的内存大小
-yjm: 指定jobmanager的内存大小
-c: 指定main方法的类名(全限定名)
-C:指定classpath的url,将url添加到集群中所有节点上的每个用户代码类加载器(file:// 或者 hdfs上)
-yD <property=value>: 指定额外参数
-yd: detached模式跑任务(相当于后台启动)
-yid:指定已有的yarnapplicationId,附加到正在运行的YARN会话
-yj:指定flink jar 文件path
-yn:指定使用taskmanager个数(该选项用于指定从YARN启动时的container数量,官网建议弃用)
-s:指定savepoint的路径
-yq:查询yarn可用资源(memory,cores)
-yst:streaming模式启动flink (该选项用于禁用预先分配的内存,官网建议弃用)
-yt:指定资源文件,资源目录和 jar 文件会被添加到 classpath 中
-yz:指定创建zookeeper时的namespace

yarn session启动命令解读:
./yarn-session.sh -n 4 -jm 6144 -tm 6144 -s 5 -qu default -nm test_app -d
-s: 指定每个taskmanager的slot个数
-qu: 指定队列
-nm: 指定启动的application名
-tm: 指定每个taskmanager的内存大小(默认单位M)
-jm: 指定jobmanager的内存大小(默认单位M)
-n: 指定taskmanager个数

yarn application -kill applicationID :kill掉
yarn logs -applicationId appid

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐