简述:

flink JOB中的环境配置信息全部从代码里面抽出来放到配置文件中去。 主要讲一下对flink 自带的ParameterTool 的使用细节,以及遇到的问题,如何解决。

有三种方案:
1: 使用flink 的 Configuration
2: 采用广播变量
3: 使用flink 自带的ParameterTool读取外部配置文件

我这里介绍的是第三种方案: 使用flink 自带的ParameterTool读取外部配置文件

Flink 文档地址:https://github.com/apache/flink/blob/master/docs/dev/application_parameters.zh.md

使用过程1:

从命令行参数中获取 像 --config_path /config.properties --elements 42这种形式的参数

package com.application

import com.ProjectConfig
import org.apache.flink.api.java.utils.ParameterTool

//启动Flink任务时,需要添加参数config_path  举例: flink run Test.jar -config_path /config.properties
object TestApplication extends App {
    val parameters = ParameterTool.fromArgs(args)
    val path = parameters.get("config_path")
    ParameterTool configname = ParameterTool.fromPropertiesFile(propertiesFilePath); 
    // 初始化配置信息, 形成静态变量
    ProjectConfig.initConfig(configname)
}

设置静态变量

package com.util
import java.util.Properties
import org.apache.flink.api.java.utils.ParameterTool

object ProjectConfig {
  //#Hbase配置
  var HBASE_ZOOKEEPER_QUORUM = ""
  var HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT ="2181"
  var KAFKA_QUORUM=""
  /**
  * 初始化
  */
def initConfig(configname: ParameterTool): Unit= {
  HBASE_ZOOKEEPER_QUORUM = configname.get("hbase.zookeeper_quorum")
  HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT =configname.get("hbase.zookeeper_property_clientport")
  KAFKA_QUORUM= configname.get("kafka.zookeeper.connect")
}

flink 启动命令: flink-1.9.0/bin/flink run -c com.application.TestApplication -m yarn-cluster -p 6 -yjm 1024m -ytm 2048m -ynm TestApplication Test.jar -config_path /home/hadoop/config.properties

在on yarn模式下跑flink job, 但是出现了一个问题。

flink job中存在两个RichSourceFunction,一个是flink自带的从kafka获取数据流, 另一个是自定义的从hbase获取维表数据。

在on yarn运行的时候失败了, 原因: flink从kafka获取数据流是正常运行了, 但是从hbase获取维表的RichSourceFunction,显示不能获取配置信息。

使用过程2:

使用ParameterTool 的注册全局变量

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// 注册全局变量
env.getConfig.setGlobalJobParameters(parameters)

hbase获取维表的RichSourceFunction的open函数

override def open(parameters: Configuration): Unit = {
  // 获取全局变量
  val parameters = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool]
  ProjectConfig.initConfig(parameters)

  val conf = HBaseConfiguration.create()
  conf.set("hbase.zookeeper.quorum", ProjectConfig.HBASE_ZOOKEEPER_QUORUM)
  conf.set("hbase.zookeeper.property.clientPort", ProjectConfig.HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT)
  connection = ConnectionFactory.createConnection(conf)
}

flink 在on yarn 上运行成功

参考:http://ytluck.github.io/big-data/my-bigdata-post-59.html
https://blog.csdn.net/smileyan9/article/details/99587572
https://www.cnblogs.com/tonglin0325/p/14115069.html
 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐