flinksql 解析kafka复杂嵌套json

解析复杂json

JSON数据

{
    "afterColumns":{
        "created":"1589186680",
        "extra":{
            "canGiving":false
        },
        "parameter":[
            1,
            2,
            3,
            4
        ]
    },
    "beforeColumns":null,
    "tableVersion":{
        "binlogFile":null,
        "binlogPosition":0,
        "version":0
    },
    "touchTime":1589186680591
}

JSON数据类型和FLINK SQL数据类型的映射关系
在这里插入图片描述
sql代码

首先我们的JSON最外层是个OBJECT对象,有4个字段分别是afterColumns,beforeColumns,tableVersion,touchTime。其中afterColumns又是个嵌套JSON,也就是OBJECT对象;beforeColumns是个STRING类型;tableVersion同样也是个嵌套JSON;touchTime是number类型。所以我们最开始的DDL是这样的
	CREATE TABLE t1 ( 
	afterColumns ROW(),
	beforeColumns STRING,
	tableVersion ROW(),
	touchTime BIGINT
	)WITH (
    'connector.type' = 'kafka',  -- 指定连接类型是kafka
    'connector.version' = '0.11',  -- 与我们之前Docker安装的kafka版本要一致
    'connector.topic' = 'json_parse', -- 之前创建的topic 
    'connector.properties.group.id' = 'flink-test-0', -- 消费者组,相关概念可自行百度
    'connector.startup-mode' = 'earliest-offset',  --指定从最早消费
    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zk地址
    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- broker地址
    'format.type' = 'json'  -- json格式,和topic中的消息格式保持一致
	) 		
		
 用FLINK SQL执行 select * from t1 看看,得到的结果是 ,null,,1589186680591
 afterColumns 和 tableVersion 都是空,另外两个字段都打印出来了,原因是afterColumns 和 tableVersion都是OBJECT类型,FLINK SQL还是没法帮我们智能识别出来,所以只能自己定义的更详细点。
 afterColumns中有三个字段,分别是created,extra,parameter,对应着STRING,OBJECT,ARRAY类型;extra中有一个字段canGiving,类型是BOOLEAN。
 tableVersion中也有三个字段,分别是binlogFile,binlogPosition,version,都是简单类型,分别是STRING,INTEGER,INTEGER。
 所以,我们新的DDL语句时这样的
	 CREATE TABLE t1 ( 
	afterColumns ROW(created STRING,extra ROW(canGiving BOOLEAN),`parameter` ARRAY <INT>) ,
	beforeColumns STRING ,
	tableVersion ROW(binlogFile STRING,binlogPosition INT ,version INT) ,
	touchTime bigint 
	) WITH (
	    'connector.type' = 'kafka',  -- 指定连接类型是kafka
	    'connector.version' = '0.11',  -- 与我们之前Docker安装的kafka版本要一致
	    'connector.topic' = 'json_parse', -- 之前创建的topic 
	    'connector.properties.group.id' = 'flink-test-0', -- 消费者组,相关概念可自行百度
	    'connector.startup-mode' = 'earliest-offset',  --指定从最早消费
	    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zk地址
	    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- broker地址
	    'format.type' = 'json'  -- json格式,和topic中的消息格式保持一致
	)
	这里我们注意一下,parameter得用``括起来,因为parameter是个关键字,想用的话得用``包围,所有关键字可以参考https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/#reserved-keywords
	最后输出的数据是这样的:1589186680,false,[1, 2, 3, 4],null,null,0,0,1589186680591,和我们JSON中的数据一样,没毛病!
	不过我们用的是select *,正常来说我们应该只输出想要的字段。那么,假设我们只想拿到canGiving 这个嵌套好几层的字段值,该怎么写呢?
	其实很简单,就像是包路径一样 afterColumns.extra.canGiving ,试一下
	select  afterColumns.extra.canGiving from t1
	输出:false
	再试一下输出 parameter 这个集合的第一个值
	select afterColumns.`parameter`[0] from t1
	报错了:Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
  很纳闷,长度为4的数组,取数组序号为0,也就是数组中第一个数,竟然报角标越界了,换成afterColumns.`parameter`[1]试试看呢?
  震惊!竟然输出了1,也就是我们数组中的第一个字段。可在我们JAVA应用或者HIVE SQL中,数组不是应该是从0开始吗?
  因为 Flink 中数组下标是从 1 开始的,在 Validation 阶段发现下标是小于等于 0 的时候抛出异常
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐