flink读写kafka
1 读kafka参考代码/*** flink-sql形式读取(flink1.14支持多个topic), 适合json格式统一的情况*/def readKafka(tab_env: StreamTableEnvironment, topics: String, servers: String, group_id: String): Unit = {// 连接kafkaval kafka_view =
·
1 读kafka参考代码
/**
* flink-sql形式读取(flink1.14支持多个topic), 适合json格式统一的情况
*/
def readKafka(tab_env: StreamTableEnvironment, topics: String, servers: String, group_id: String): Unit = {
// 连接kafka
val kafka_view =
s"""
|CREATE TABLE kafka_source (
|sts BIGINT ,
|ins STRING ,
|isfl BOOLEAN ,
|events ARRAY< ROW<tid BIGINT, event STRING, ts BIGINT, net INT, eparam STRING> >
|)
|WITH (
| 'connector' = 'kafka',
| 'topic' = '${topics}', --多个间用逗号分隔
| 'properties.bootstrap.servers' = '${servers}',
| 'properties.group.id' = '${group_id}',
| 'scan.startup.mode' = 'latest-offset',
| 'format' = 'json',
| 'json.fail-on-missing-field' = 'true',
| 'json.ignore-parse-errors' = 'false'
|)
|""".stripMargin
tab_env.executeSql(kafka_view)
}
/**
* flink-sql形式读取(flink1.14支持多个topic), 适合直接获取json串
*/
def readKafka(tab_env: StreamTableEnvironment, topics: String, servers: String, group_id: String): Unit = {
// 连接kafka
val kafka_view =
s"""
|CREATE TABLE kafka_source (
|json_str string
|)
|WITH (
| 'connector' = 'kafka',
| 'topic' = '${topics}', --多个间用逗号分隔
| 'properties.bootstrap.servers' = '${servers}',
| 'properties.group.id' = '${group_id}',
| 'scan.startup.mode' = 'latest-offset',
| 'format' = 'json',
| 'json.fail-on-missing-field' = 'true',
| 'json.ignore-parse-errors' = 'false'
|)
|""".stripMargin
tab_env.executeSql(kafka_view)
}
/**
* datastream形式读取, 适合直接获取json串
*/
def getSdk(env: StreamExecutionEnvironment, tab_env: StreamTableEnvironment, topics_str: String, servers: String, group_id: String): Unit = {
// 从kafka读取数据
val topics = topics_str.split(",").toList
val source = KafkaSource
.builder[String]
.setBootstrapServers(servers)
.setTopics(topics: _*)
.setGroupId(group_id)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build
val input_stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka_source")
// 格式化数据, 并创建临时视图
// todo : 为什么需要flatMap, 没有按行读取?
val input_tab = input_stream
.flatMap(_.split("\n"))
.map(x => JavaTool.formateJson(x))
// input_tab.print()
tab_env.createTemporaryView("input_tab", input_tab)
// 解析json数据
val sql =
"""
|SELECT
| JSON_VALUE(f0,'$.gaid' RETURNING STRING) AS gaid ,
| JSON_VALUE(f0,'$.vid' RETURNING STRING) AS vid
|FROM
| input_tab
|""".stripMargin
tab_env.createTemporaryView("kafka_source", tab_env.sqlQuery(sql))
}
2 写kafka参考代码
/**
* 数据写出到kafka
*/
def writeToKafka(tab_env: StreamTableEnvironment, topic: String, servers: String): Unit = {
// 连接kafka
val kafka_view =
s"""
|CREATE TABLE kafka_in (
|mcc STRING ,
|mnc STRING
|)
|WITH (
| 'connector' = 'kafka',
| 'topic' = '${topic}',
| 'properties.bootstrap.servers' = '${servers}',
| 'format' = 'json'
|)
|""".stripMargin
tab_env.executeSql(kafka_view)
}
flink写入kafka时指定压缩格式
'properties.compression.type'='gzip'
更多推荐
已为社区贡献1条内容
所有评论(0)