flinksql 解析kafka复杂嵌套json
flinksql 解析kafka复杂嵌套json解析复杂json解析复杂jsonJSON数据{"afterColumns":{"created":"1589186680","extra":{"canGiving":false},"parameter":[1,2,3,4]
·
flinksql 解析kafka复杂嵌套json
解析复杂json
JSON数据
{
"afterColumns":{
"created":"1589186680",
"extra":{
"canGiving":false
},
"parameter":[
1,
2,
3,
4
]
},
"beforeColumns":null,
"tableVersion":{
"binlogFile":null,
"binlogPosition":0,
"version":0
},
"touchTime":1589186680591
}
JSON数据类型和FLINK SQL数据类型的映射关系
sql代码
首先我们的JSON最外层是个OBJECT对象,有4个字段分别是afterColumns,beforeColumns,tableVersion,touchTime。其中afterColumns又是个嵌套JSON,也就是OBJECT对象;beforeColumns是个STRING类型;tableVersion同样也是个嵌套JSON;touchTime是number类型。所以我们最开始的DDL是这样的
CREATE TABLE t1 (
afterColumns ROW(),
beforeColumns STRING,
tableVersion ROW(),
touchTime BIGINT
)WITH (
'connector.type' = 'kafka', -- 指定连接类型是kafka
'connector.version' = '0.11', -- 与我们之前Docker安装的kafka版本要一致
'connector.topic' = 'json_parse', -- 之前创建的topic
'connector.properties.group.id' = 'flink-test-0', -- 消费者组,相关概念可自行百度
'connector.startup-mode' = 'earliest-offset', --指定从最早消费
'connector.properties.zookeeper.connect' = 'localhost:2181', -- zk地址
'connector.properties.bootstrap.servers' = 'localhost:9092', -- broker地址
'format.type' = 'json' -- json格式,和topic中的消息格式保持一致
)
用FLINK SQL执行 select * from t1 看看,得到的结果是 ,null,,1589186680591
afterColumns 和 tableVersion 都是空,另外两个字段都打印出来了,原因是afterColumns 和 tableVersion都是OBJECT类型,FLINK SQL还是没法帮我们智能识别出来,所以只能自己定义的更详细点。
afterColumns中有三个字段,分别是created,extra,parameter,对应着STRING,OBJECT,ARRAY类型;extra中有一个字段canGiving,类型是BOOLEAN。
tableVersion中也有三个字段,分别是binlogFile,binlogPosition,version,都是简单类型,分别是STRING,INTEGER,INTEGER。
所以,我们新的DDL语句时这样的
CREATE TABLE t1 (
afterColumns ROW(created STRING,extra ROW(canGiving BOOLEAN),`parameter` ARRAY <INT>) ,
beforeColumns STRING ,
tableVersion ROW(binlogFile STRING,binlogPosition INT ,version INT) ,
touchTime bigint
) WITH (
'connector.type' = 'kafka', -- 指定连接类型是kafka
'connector.version' = '0.11', -- 与我们之前Docker安装的kafka版本要一致
'connector.topic' = 'json_parse', -- 之前创建的topic
'connector.properties.group.id' = 'flink-test-0', -- 消费者组,相关概念可自行百度
'connector.startup-mode' = 'earliest-offset', --指定从最早消费
'connector.properties.zookeeper.connect' = 'localhost:2181', -- zk地址
'connector.properties.bootstrap.servers' = 'localhost:9092', -- broker地址
'format.type' = 'json' -- json格式,和topic中的消息格式保持一致
)
这里我们注意一下,parameter得用``括起来,因为parameter是个关键字,想用的话得用``包围,所有关键字可以参考https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/#reserved-keywords
最后输出的数据是这样的:1589186680,false,[1, 2, 3, 4],null,null,0,0,1589186680591,和我们JSON中的数据一样,没毛病!
不过我们用的是select *,正常来说我们应该只输出想要的字段。那么,假设我们只想拿到canGiving 这个嵌套好几层的字段值,该怎么写呢?
其实很简单,就像是包路径一样 afterColumns.extra.canGiving ,试一下
select afterColumns.extra.canGiving from t1
输出:false
再试一下输出 parameter 这个集合的第一个值
select afterColumns.`parameter`[0] from t1
报错了:Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
很纳闷,长度为4的数组,取数组序号为0,也就是数组中第一个数,竟然报角标越界了,换成afterColumns.`parameter`[1]试试看呢?
震惊!竟然输出了1,也就是我们数组中的第一个字段。可在我们JAVA应用或者HIVE SQL中,数组不是应该是从0开始吗?
因为 Flink 中数组下标是从 1 开始的,在 Validation 阶段发现下标是小于等于 0 的时候抛出异常
更多推荐
已为社区贡献1条内容
所有评论(0)