flink 读取外部.properties的配置文件
千里风雪于 2021-02-02 17:46:07 发布4663收藏 9分类专栏: flink 文章标签: 大数据 flink版权flink专栏收录该内容20 篇文章2 订阅订阅专栏简述:flink JOB中的环境配置信息全部从代码里面抽出来放到配置文件中去。 主要讲一下对flink 自带的ParameterTool 的使用细节,以及遇到的问题,如何解决。有三种方案:1: 使用flink 的 Co
简述:
flink JOB中的环境配置信息全部从代码里面抽出来放到配置文件中去。 主要讲一下对flink 自带的ParameterTool 的使用细节,以及遇到的问题,如何解决。
有三种方案:
1: 使用flink 的 Configuration
2: 采用广播变量
3: 使用flink 自带的ParameterTool读取外部配置文件
我这里介绍的是第三种方案: 使用flink 自带的ParameterTool读取外部配置文件
Flink 文档地址:https://github.com/apache/flink/blob/master/docs/dev/application_parameters.zh.md
使用过程1:
从命令行参数中获取 像 --config_path /config.properties --elements 42这种形式的参数
package com.application
import com.ProjectConfig
import org.apache.flink.api.java.utils.ParameterTool
//启动Flink任务时,需要添加参数config_path 举例: flink run Test.jar -config_path /config.properties
object TestApplication extends App {
val parameters = ParameterTool.fromArgs(args)
val path = parameters.get("config_path")
ParameterTool configname = ParameterTool.fromPropertiesFile(propertiesFilePath);
// 初始化配置信息, 形成静态变量
ProjectConfig.initConfig(configname)
}
设置静态变量
package com.util
import java.util.Properties
import org.apache.flink.api.java.utils.ParameterTool
object ProjectConfig {
//#Hbase配置
var HBASE_ZOOKEEPER_QUORUM = ""
var HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT ="2181"
var KAFKA_QUORUM=""
/**
* 初始化
*/
def initConfig(configname: ParameterTool): Unit= {
HBASE_ZOOKEEPER_QUORUM = configname.get("hbase.zookeeper_quorum")
HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT =configname.get("hbase.zookeeper_property_clientport")
KAFKA_QUORUM= configname.get("kafka.zookeeper.connect")
}
flink 启动命令: flink-1.9.0/bin/flink run -c com.application.TestApplication -m yarn-cluster -p 6 -yjm 1024m -ytm 2048m -ynm TestApplication Test.jar -config_path /home/hadoop/config.properties
在on yarn模式下跑flink job, 但是出现了一个问题。
flink job中存在两个RichSourceFunction,一个是flink自带的从kafka获取数据流, 另一个是自定义的从hbase获取维表数据。
在on yarn运行的时候失败了, 原因: flink从kafka获取数据流是正常运行了, 但是从hbase获取维表的RichSourceFunction,显示不能获取配置信息。
使用过程2:
使用ParameterTool 的注册全局变量
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 注册全局变量
env.getConfig.setGlobalJobParameters(parameters)
hbase获取维表的RichSourceFunction的open函数
override def open(parameters: Configuration): Unit = {
// 获取全局变量
val parameters = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool]
ProjectConfig.initConfig(parameters)
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", ProjectConfig.HBASE_ZOOKEEPER_QUORUM)
conf.set("hbase.zookeeper.property.clientPort", ProjectConfig.HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT)
connection = ConnectionFactory.createConnection(conf)
}
flink 在on yarn 上运行成功
参考:http://ytluck.github.io/big-data/my-bigdata-post-59.html
https://blog.csdn.net/smileyan9/article/details/99587572
https://www.cnblogs.com/tonglin0325/p/14115069.html
更多推荐
所有评论(0)