从官网下载安装包

一. Standalone模式

https://flink.apache.org/downloads.html#all-stable-releases

[root@Linux121 conf]# vim flink-conf.yaml
修改主master节点
jobmanager.rpc.address: Linux121
修改主从节点信息
[root@Linux121 conf]# cat masters
Linux121:8081
[root@Linux121 conf]# cat workers
Linux121
Linux122
Linux123
[root@Linux121 conf]#

配置zk信息
zoo.cfg

# ZooKeeper quorum peers
server.1=Linux121:2888:3888
server.2=Linux122:2888:3888
server.3=Linux123:2888:3888
该模式启动flink 
./start-cluster.sh

打开FlinkWebUI界面
在这里插入图片描述
提交任务
1.通过webUI
2.通过命令行

[root@Linux121 bin]# ./flink run -c WordCountStream -p 2 /root/myJar/Demo-1.0-SNAPSHOT.jar
Job has been submitted with JobID 35e26ebcbfa7743b3058fd2cc08639c3

取消Job

[root@Linux121 bin]# ./flink cancel 35e26ebcbfa7743b3058fd2cc08639c3
Cancelling job 35e26ebcbfa7743b3058fd2cc08639c3.
Cancelled job 35e26ebcbfa7743b3058fd2cc08639c3.

二.Yarn模式

1.yarn session模式

查看常见命令

[root@Linux121 bin]# ./yarn-session.sh -h
Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
	at java.lang.Class.getDeclaredMethods0(Native Method)
	at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
	at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
	at java.lang.Class.getMethod0(Class.java:3018)
	at java.lang.Class.getMethod(Class.java:1784)
	at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:650)
	at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:632)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 7 more

顾名思义,提示flink启动缺少Hadoop yarn的相关jar包,从官网下载Flink依赖Hadoop的相关jar包;
https://flink.apache.org/downloads.html
进入官网,下载对应的相关依赖jar包,放到flink lib目录下即可
在这里插入图片描述

root@Linux121 bin]# ./yarn-session.sh -h
2022-04-01 23:46:05,631 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, Linux121
2022-04-01 23:46:05,643 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123
2022-04-01 23:46:05,644 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.memory.process.size, 1600m
2022-04-01 23:46:05,644 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1728m
2022-04-01 23:46:05,644 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2022-04-01 23:46:05,644 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: parallelism.default, 1
2022-04-01 23:46:05,644 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.execution.failover-strategy, region
2022-04-01 23:46:06,912 WARN  org.apache.hadoop.util.NativeCodeLoader                      [] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2022-04-01 23:46:07,279 INFO  org.apache.flink.runtime.security.modules.HadoopModule       [] - Hadoop user set to root (auth:SIMPLE)
2022-04-01 23:46:07,344 INFO  org.apache.flink.runtime.security.modules.JaasModule         [] - Jaas file will be created as /tmp/jaas-8845686346801223383.conf.
Usage:
   Optional
     -at,--applicationType <arg>     Set a custom application type for the application on YARN
     -D <property=value>             use value for given property
     -d,--detached                   If present, runs the job in detached mode
     -h,--help                       Help for the Yarn session CLI.
     -id,--applicationId <arg>       Attach to running YARN session
     -j,--jar <arg>                  Path to Flink jar file
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -m,--jobmanager <arg>           Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
     -nl,--nodeLabel <arg>           Specify YARN node label for the YARN application
     -nm,--name <arg>                Set a custom name for the application on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -t,--ship <arg>                 Ship files in the specified directory (t for transfer)
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -yd,--yarndetached              If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode

启动该模式

[root@Linux121 bin]# ./yarn-session.sh -s 1 -jm 1024 -tm 1024

2.yarn perJob模式

[root@Linux121 bin]# ./flink run -m yarn-cluster -ynm flink-job -c WordCountStream /root/myJar/Demo-1.0-SNAPSHOT.jar

出现以下问题

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not deploy Yarn job cluster.
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
	at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
	at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
	at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
	at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster.
	at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
	at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
	at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
	at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1681)
	at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:688)
	at WordCountStream$.main(WordCountStream.scala:9)
	at WordCountStream.main(WordCountStream.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
	... 11 more

根据网上查询的资料得知原因有二
1.需要增加yarn资源配置

<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>2048</value>
</property>

<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>1024</value>
</property>

<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>2048</value>
</property>

2.提交集群版本不匹配我的flink应用打包版本和集群版本不一致导致该问题
再次重新提交
在这里插入图片描述
看来环境配置还是有问题
修改了相关配置后再次验证
/etc/profile(注意版本不同配置文件位置可能有差异,根据实际版本进行配置)

#Flink
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`

启动成功
我们通过yarn的WebUI进行点击ApplicationMaster查看FlinkWebUI相关信息接收数据信息同上
在这里插入图片描述

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐