Flink的安装部署:Local本地模式|Standalone独立集群模式|Standalone-HA高可用集群模式|Flink On Yarn模式

我这里电脑上有三台安装好的虚拟机分别是node1、node2、node3

Local本地模式:

       原理:主节点JobManager(Master)和从节点TaskManager(Slave)在一台机器上模拟

                  1、Flink程序由JobClient进行提交

                  2、JobClient将任务提交给JobManager

                  3、JobManager只负责协调分配资源和分发任务,资源分配完成后将任务提交给相应的TaskManager

                  4、TaskManager启动一个线程开始执行任务,TaskManager会向JobManager报告状态的变更,

                        例如:开始执行、正在执行、执行完成

                  5、作业执行完成后,结果将发送回客户端(JobClient)

              

       具体部署:

                  1、下载安装包https://archive.apache.org/dist/flink/,下面以flink-1.12.0-bin-scala_2.12.tgz安装包为例说明

                  2、上传flink-1.12.0-bin-scala_2.12.tgz安装包到node1指定路径/export/software/下面

                  3、解压安装包到 /export/server/下面:tar -zxvf flink-1.12.0-bin-scala_2.12.tgz -C /export/server/

                  4、如果出现权限的问题需要修改权限:chown -R root:root /export/server/flink-1.12.0/

                  5、可以对其修改名称或者添加软连接方便后面操作:

                       mv flink-1.12.0 flink    |     ln -s /export/server/flink-1.12.0 /export/server/flink

       测试使用:

                  1、准备测试文件

                  2、启动Fink本地集群(这里说的集群,不是真正意义上的集群,而是在一台虚拟机上模拟的集群)

                       /export/server/flink/bin/start-cluster.sh

                       

                  3、jps会看到启动了一个主节点和一个从节点

                       

                  4、访问Flink的WEB UI:http://node1:8081/#/overview

                        

                  5、执行官方示例

                       /export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar

                       (可以只执行上面这个,也可以加上下面的参数)

                       --input /root/words.txt

                       --output /root/out

                  6、停止Flink:/export/server/flink/bin/stop-cluster.sh

Standalone独立集群模式:

       原理:node1作为主节点JobManager(master),node1、node2、node3作为从节点TaskManager(Slave)

              1、client客户端提交任务给JobManager

              2、JobManager负责申请分配任务所需资源并管理任务任务和资源

              3、JobManager分发任务给TaskManager执行

              4、TaskManager定期向JobManager汇报状态

              

       具体部署:node1作为主节点JobManager(master),node1、node2、node3作为从节点TaskManager(Slave)

              1、修改node1节点上/export/server/flink/conf/flink-conf.yaml

                     vim flink-conf.yaml

                     jobmanager.rpc.address: node1        #指定主节点的服务器名称

                     taskmanager.numberOfTaskSlots: 2     #指定每个从节点上有几个slot

                     web.submit.enable: true              #是否可以web提交

                     #历史服务器

                     jobmanager.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/    #指定任务归档的路径

                     historyserver.web.address: node1                                      #指定历史服务器的名称

                     historyserver.web.port: 8082                                          #指定历史服务器的web端口

                     historyserver.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/ #指定历史服务器获取历史任务的路径

                     注意:冒号后面的空格不能删除

              2、修改主节点配置:vim /export/server/flink/conf/masters        node1:8081    #指定主节点服务器及web端口

              3、修改从节点配置: vim /export/server/flink/conf/workers

                     node1

                     node2

                     node3

              4、将node1上的Flink分发给node2和node3,并添加软连接(参照上面的命令)

                     cd /export/server

                      scp -r flink-1.12.0 node2:$PWD

                      scp -r flink-1.12.0 node3:$PWD

       测试使用:

              1、启动集群

                     在node1上执行:/export/server/flink/bin/start-cluster.sh

                     

                     jps查看node1

                     

                     jps查看node2

                   

                     jps查看node3

                   

                     或者单独启动:

             /export/server/flink/bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all

             /export/server/flink/bin/taskmanager.sh start|start-foreground|stop|stop-all

2、启动历史服务器

/export/server/flink/bin/historyserver.sh start

启动历史任务可能会失败:

查看log  cd /export/server/flink/log:cat flink-root-historyserver-0-node1.log

                      

1、配置的任务任务归档路径以及历史任务查看路径没有创建,需要手动创建 hdfs dfs -mkdir -p /flink/completed-jobs/

2、找不到hdfs:

vim /etc/profile

添加:export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop

source /etc/profile(分发给node2和node3后都要执行这个命令)

node1执行jps

                        

         3、访问Flink UI界面

http://node1:8081/#/overview

        http://node1:8082/#/overview

TaskManager界面:可以查看到当前Flink集群中有多少个TaskManager,每个TaskManager的slots、内存、CPU Core是多少

                  4、测试执行官方示例

                        /export/server/flink/bin/flink run  /export/server/flink/examples/batch/WordCount.jar 
                       --input hdfs://node1:8020/wordcount/input/words.txt 
                       --output hdfs://node1:8020/wordcount/output/result.txt  
                       --parallelism 2

                  5、查看历史日志

         http://node1:50070/explorer.html#/flink/completed-jobs

         http://node1:8082/#/overview

  6、停止Flink集群:

/export/server/flink/bin/stop-cluster.sh

/export/server/flink/bin/historyserver.sh stop

Standalone-HA高可用集群模式

       原理:

              Standalone只有一个主节点,由于主节点负责调度和分配资源,如果主节点出现意外情况(例如宕机),集群就死掉了

              但是Standalone-HA高可用集群会有多个主节点,他们都会想zookeeper注册并创建一个临时节点

              (一个处于active活跃(工作状态)状态,其他处于standby状态),

              如果活跃状态的主机出现意外和zookeeper心跳检测超时,临时节点就会被删除,

              zookeeper就会从处于standby状态的节点选一个新的主节点,删除standby的临时节点添加active的临时节点

              

       具体部署:

              node1和node2都是主节点JobManager(master),但是有一个是active状态一个是standby状态

              node1、node2、node3又都是从节点TaskManager(Slave)

              1、首先要启动zookeeper:zkServer.sh start

              2、启动hdfs:/export/serves/hadoop/sbin/start-dfs.sh

              3、修改node1下的flink-conf.yaml

            vim /export/server/flink/conf/flink-conf.yaml

   增加下面内容:

#开启HA,使用文件系统作为快照存储

state.backend: filesystem

#启用检查点,可以将快照保存到HDFS

state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints

#使用zookeeper搭建高可用

high-availability: zookeeper

# 存储JobManager的元数据到HDFS

high-availability.storageDir: hdfs://node1:8020/flink/ha/

# 配置ZK集群地址

high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181

          分发到node2和node3

scp -r /export/server/flink/conf/flink-conf.yaml node2:/export/server/flink/conf/

scp -r /export/server/flink/conf/flink-conf.yaml node3:/export/server/flink/conf/

              4、修改masters:vim /export/server/flink/conf/masters(现在node1和node2都是主节点)

            node1:8081

                    node2:8081

                    分发给node1和node2

                    scp -r /export/server/flink/conf/masters node2:/export/server/flink/conf/

                    scp -r /export/server/flink/conf/masters node3:/export/server/flink/conf/

              5、修改node2上的flink-conf.yaml

                    vim /export/server/flink/conf/flink-conf.yaml

                    jobmanager.rpc.address: node2      #因为现在node2也是主节点

              6、启动集群

                    /export/server/flink/bin/stop-cluster.sh   #停掉集群

   /export/server/flink/bin/start-cluster.sh   #启动集群

                   

       测试使用:

              1、访问WEB UI

                   http://node1:8081/#/job-manager/config

                   http://node2:8081/#/job-manager/config

              2、测试官方示例

                   /export/server/flink/bin/flink run  /export/server/flink/examples/batch/WordCount.jar

              3、测试kill掉一个master(因为我们不知道哪个master是active状态,所有随便停掉一次测试)

                   /export/server/flink/bin/jobmanager.sh start|stop

                   /export/server/flink/bin/taskmanager.sh start||stop

              4、重新执行步骤2测试能不能正常执行

              5、停掉集群:/export/server/flink/bin/stop-cluster.sh

Flink On Yarn模式

       原理:

1、Client上传Flink的jar包和配置文件到HDFS集群上

2、Client向Yarn的ResourceManager提交任务和申请资源

3、ResourceManager分配Container资源并启动ApplicationMaster

4、ApplicationMaster加载Flink的jar包和配置文件构建环境启动Flink-JobManager

5、ApplicationMaster向ResourceManager申请任务资源,

6、NodeManager加载Flink的jar包和配置文件构建环境并启动TaskManager

7、TaskManager启动后会向JobManager发送心跳,并等待JobManager向其分配任务

       

       Flink On Yarn模式的两种方式:Session模式和Per-Job模式

       Session模式:(适合小任务使用)

              需要先申请资源,启动JobManager和TaskManager

              不需要每次提交任务再去申请资源,而是使用已经申请好的资源,从而提高执行效率

              任务提交完资源不会被释放,因此一直会占用资源

              

       Per-Job模式:(适合使用大任务,且资源充足)

              每次提交任务都需要去申请资源,申请资源需要时间,所有影响执行效率(但是在大数据面前都是小事)

              每次执行完任务资源就会立刻被释放,不会占用资源

              

       具体部署:

              1、关闭yarn的内存检查:vim /export/server/hadoop/etc/hadoop/yarn-site.xml

                     添加:

                     <!-- 关闭yarn内存检查 -->

                     <property>

                            <name>yarn.nodemanager.pmem-check-enabled</name>

                            <value>false</value>

                     </property>

                     <property>

                            <name>yarn.nodemanager.vmem-check-enabled</name>

                            <value>false</value>

                     </property>

              2、分发

     scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node2:/export/server/hadoop/etc/hadoop/yarn-site.xml

                     scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node3:/export/server/hadoop/etc/hadoop/yarn-site.xml

              3.重启yarn

                     /export/server/hadoop/sbin/stop-yarn.sh

                     /export/server/hadoop/sbin/start-yarn.sh

       测试示例:

              yarn-session.sh(开辟资源)+ flink run(提交任务)

              1、在yarn上启动一个flink会话

                     /export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d

              2、说明

                     -n  表示申请两个容器,这里指的是TaskManager

                     -tm 表示每个TaskManager的内存发小

                     -s   表示每个TaskManager中slot的数量

                     -d   表示后台程序方式运行(守护进程)

                    注意:

                    该警告不用管

                    WARN  org.apache.hadoop.hdfs.DFSClient  - Caught exception

                    java.lang.InterruptedException

              3、查看UI界面:http://node1:8088/cluster

                   

              4、使用flink run提交任务

                    /export/server/flink/bin/flink run  /export/server/flink/examples/batch/WordCount.jar

                    运行完之后可以继续运行其他的小任务

                    /export/server/flink/bin/flink run  /export/server/flink/examples/batch/WordCount.jar

             5、关闭yarn-session:(http://node1:8088/cluster里面查看id并杀死)

                    yarn application -kill application_1618886145038_0001

                   

              Per-Job分离模式:

              1、直接提交Job:

                    /export/server/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /export/server/flink/examples/batch/WordCount.jar

              2、说明:

                    -m    JobManager的地址

                    -yjm 1024  指定JobManager的内存大小

                    -yjm 1024  指定TaskManager的内存大小

             注意:

                    在之前版本中如果使用的是flink on yarn方式,想切换回standalone模式的话,

                    如果报错需要删除:【/tmp/.yarn-properties-root】

                    rm -rf /tmp/.yarn-properties-root

 

为什么大数据任务执行都是提交给Yarn,MR、Spark、Flink都是提交给Yarn???

1、yarn在大数据领域中是一个非常成熟稳定并且功能强大的资源管理和任务执行调度框架

       大多数公司之前大数据任务都是使用yarn来做调度执行管理的,所以后续的大数据计算框架想要占领市场份额就要去支持On Yarn

2、yarn支持多种调度策略(比如FIFO,Fair,Capactity…),比较灵活

3、yarn做统一的资源管理

4、yarn的资源可以按需使用,提高集群的资源利用率

5、yarn的任务有优先级,根据优先级执行任务

6、基于yarn调度系统,能够自动化的处理各个角色的Fairover(容错)

JobManager进程和TaskManager进程由Yarn的NodeManager监控

如果JobManager进程出现异常,Yarn的ResourceManager会重新调度JobManager到其他的机器

如果TaskManager进程出现异常,JobManager会收到消息并重新向Yarn的ResourceManager申请资源,重新启动TaskManager

注意:未来云环境会越来越流行,向Spark On K8S 和 Flink On K8S可能也会流行

Spark|Flink On Yarn不需要单独启动Spark|Flink的Standalone集群,

因为Spark|Flink On Yarn的本质是将Spark|Flink任务的jar包放到Yarn的JVM中去执行

是在Yarn集群中启动的一些JVM进程(Spark|Flink的一些角色)

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐