表中的列

        FlinkSql中的列分为俩种,第一种是常规列,也就是物理列,其定义了物理介质中存储的数据中字段的名称、类型和顺序。另一种是元数据列元数据列是 SQL 标准的扩展,允许访问数据源本身具有的⼀些元数据。元数据列由 METADATA 关键字标识。最后一种是计算,计算列其实就是在写建表的 DDL 时,可以拿已有的⼀些列经过⼀些⾃定义的运算⽣成的新列。这些列本身是没有 以物理形式存储到数据源中的。

创建表的语句

CREATE TEMPORARY TABLE kafka_campaign_budget (
  user_id BIGINT,
  user_info ROW<`user_age` INT, `user_name` STRING>,
  --如果⾃定义的列名称和 metadata字段的名称⼀样的话, FROM xxx ⼦句是可以被省略的
  offset INT NOT NULL METADATA VIRTUAL, -- 该表作为结果表写入时,VIRTUAL字段不作为写入列
  --元数据列是partition,my_part是重命名该列,则必须使用 FROM 子句
  my_time TIMESTAMP(3) METADATA FROM 'timestamp',
  --计算列可以包含其他列、常量或者函数,但是不能写⼀个⼦查询进去。
  my_date AS CAST(`my_time` AS DATE)
) WITH (
....
);

注意:

AS:as生成的列叫计算列,表示通过其他列计算而来,和虚拟 metadata 列是类似的,计算列也是只能读不能写的。

TEMPORARY:表示创建临时表,表可以是临时的,并与单个Flink会话的生命周期相关,也可以是永久的,并且在多个Flink会话和群集中可见。久表需要一个目录(例如Hive Metastore)来维护有关表的元数据(Kafaka的地址和主题等)

with里的声明信息

  1. 'connector' = 'kafka' :声明外部存储是 Kafka
  2. 'topic' = 'user_behavior' :声明 Flink SQL 任务要连接的 Kafka 表的 topic 是 user_behavior
  3. 'properties.bootstrap.servers' = 'localhost:9092' :声明 Kafka 的 server ip 是 localhost:9092
  4. 'properties.group.id' = 'testGroup' :声明 Flink SQL 任务消费这个 Kafka topic,会使⽤ testGroup 的 group id 去消费
  5. 'scan.startup.mode' = 'earliest-offset' :声明 Flink SQL 任务消费这个 Kafka topic 会从最 早位点开始消费
  6. 'format' = 'csv' :声明 Flink SQL 任务读⼊或者写出时对于 Kafka 消息的序列化⽅式是 csv 格式

Create Table Like ⼦句

--首先定义一个Orders表
CREATE TABLE Orders (
 `user` BIGINT,
 product STRING,
 order_time TIMESTAMP(3)
) WITH ( 
 'connector' = 'kafka',
 'scan.startup.mode' = 'earliest-offset'
);
--但是忘记定义 Watermark 了,那如果想加上 Watermark,就可以⽤ Like ⼦句定义⼀张带 Watermark 的新表
CREATE TABLE Orders_with_watermark (
 -- 1. 添加了 WATERMARK 定义
 WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
 -- 2. 覆盖了原 Orders 表中 scan.startup.mode 参数
 'scan.startup.mode' = 'latest-offset'
)
-- 3. Like ⼦句声明是在原来的 Orders 表的基础上定义 Orders_with_watermark 表
LIKE Orders;

上述语句等价于以下语句

CREATE TABLE Orders_with_watermark (
 `user` BIGINT,
 product STRING,
 order_time TIMESTAMP(3),
 WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
 'connector' = 'kafka',
 'scan.startup.mode' = 'latest-offset'
);
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐