flinksql 读取kafka的key的字段,相关解析
1、因官网读取kafka的key结构文档说明比较复杂,研究半天才搞懂,特此进行记录-- 创建数据源表CREATE TEMPORARY TABLE `source_kafka` (`time` varchar,`distinct_id` varchar,`msgkey` varchar) with ('connector' = 'kafka','topic' = 'hulin_sink_test',
·
1、因官网读取kafka的key结构文档说明比较复杂,研究半天才搞懂,特此进行记录
-- 创建数据源表
CREATE TEMPORARY TABLE `source_kafka` (
`time` varchar,
`distinct_id` varchar,
`msgkey` varchar
) with (
'connector' = 'kafka',
'topic' = 'hulin_sink_test',
'properties.bootstrap.servers' = 'xxx:9092',
'properties.group.id' = 'kafka_sink_test_by_wpp',
-- 'format' = 'json',
'key.format' = 'raw',
'key.fields' = 'msgkey',
'value.format' = 'json',
--- 解析key的值是,要加上 'value.fields-include' = 'EXCEPT_KEY' 参数,不然这个 msgkey列也会被当成 value 的一部分参与 value 的解析,从而导致解析不出来数据
'value.fields-include' = 'EXCEPT_KEY',
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1618223534000'
);
-- 转化
create TEMPORARY view source_kafka_change as
select
cast(`time` as bigint) as `time`,
`distinct_id`,
cast(`msgkey` as varchar) messageKey
from `source_kafka`;
-- 创建结果表
CREATE TEMPORARY TABLE `sink_odps` (
`userid` varchar,
`time` bigint,
messageKey varchar
) with (
'connector' = 'print'
);
-- 数据落地
BEGIN STATEMENT SET;
insert into sink_odps
select
`distinct_id` as userid,
`time`,
messageKey
from source_kafka_change;
END;
更多推荐
已为社区贡献5条内容
所有评论(0)