1、因官网读取kafka的key结构文档说明比较复杂,研究半天才搞懂,特此进行记录


-- 创建数据源表
CREATE TEMPORARY TABLE `source_kafka` (
  `time` varchar,
  `distinct_id` varchar,
 `msgkey` varchar
) with (
  'connector' = 'kafka',
   'topic' = 'hulin_sink_test',
  'properties.bootstrap.servers' = 'xxx:9092',
  'properties.group.id' = 'kafka_sink_test_by_wpp',
  -- 'format' = 'json',
  'key.format' = 'raw',
  'key.fields' = 'msgkey',

  'value.format' = 'json',
  --- 解析key的值是,要加上 'value.fields-include' = 'EXCEPT_KEY' 参数,不然这个 msgkey列也会被当成 value 的一部分参与 value 的解析,从而导致解析不出来数据
  'value.fields-include' = 'EXCEPT_KEY',
  'scan.startup.mode' = 'timestamp',
  'scan.startup.timestamp-millis' = '1618223534000'
);


-- 转化
create TEMPORARY view source_kafka_change as 
select
  cast(`time` as bigint) as `time`,
  `distinct_id`,
  cast(`msgkey` as varchar)  messageKey
from `source_kafka`;


-- 创建结果表
CREATE TEMPORARY TABLE `sink_odps` (
  `userid` varchar,
  `time` bigint,
  messageKey varchar
) with (
    'connector' = 'print'
);

-- 数据落地
BEGIN STATEMENT SET;

insert into sink_odps 
select
  `distinct_id` as userid,
  `time`,
  messageKey
from source_kafka_change;
END;

 

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐