FlinkSQL(1.12)

一、基本语法

1.1、建表语法

create table 表名 (
字段名 字段类型,
...
) with (
连接器配置
)

1.2、时间语义

1.2.1、事件时间

使用:在设置完字段后最后一行进行指定。

格式:watermark for 某时间字段名 AS 某时间字段名 - INTERVAL '某数字' SECOND

1.2.2、处理时间

使用:在设置完字段后最后一行进行指定。

格式:随便起一个字段名 as proctime()

二、Source

2.1、Kafka

一般连接器配置如下即可,其他配置详情见官网Apache Flink 1.12 Documentation: Apache Kafka SQL Connector

'connector' = 'kafka',
'topic' = 'topicName(自定义)',
'properties.bootstrap.servers' = 'ip:port,ip:port,ip:port(自定义)',
'properties.group.id' = 'groupId(自定义)',
'scan.startup.mode' = 'timestamp(可取其他值)',
'scan.startup.timestamp-millis' = '1662393600000(对应上述timestamp的模式)', -- 数据到达kafka的时间 2022-09-06 00:00:00
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'

注意:

  • 普通的kafka不可以定义主键,会报错,因为他没有机制能保证语义上的主键唯一性。

  • 启动位点scan.startup.mode,取值如下:

    • earliest-offset:从Kafka最早分区开始读取。
    • latest-offset:从Kafka最新位点开始读取。
    • group-offsets(默认值):根据Group读取。
    • timestamp:从Kafka指定时间点读取。配置该参数时,同时需要在WITH参数中指定scan.startup.timestamp-millis参数。(这个参数为毫秒单位的时间戳,这个时间是对应kafka中数据的时间,就是broker接受到这条消息的时间)
    • specific-offsets:从Kafka指定分区指定偏移量读取。配置该参数时,同时需要在WITH参数中指定scan.startup.specific-offsets参数。
  • json解析问题:

    • json.fail-on-missing-field:如果为 true,则遇到缺失字段时,会让作业失败。如果为 false(默认值),则只会把缺失字段设置为 null 并继续处理。
    • json.ignore-parse-errors:如果为 true,则遇到解析异常时,会把这个字段设置为 null 并继续处理。如果为 false(默认值),则会让作业失败。
    • 两个参数不能同时为true,否则会抛异常Caused by: org.apache.flink.table.api.ValidationException: fail-on-missing-field and ignore-parse-errors shouldn't both be true.一般都是如上例子,一个true,一个false,表示如果数据解析异常则跳过这条数据,且如果解析没问题,但是找不到某字段,则设置这个字段值为null。
  • key和value问题:

    • 如果除了value,我们还要解析key中的数据,则需要把key和value的format单独设置,且需要额外设置一个配置’value.fields-include’ = ‘EXCEPT_KEY’,表示我们需要的字段,在value中有些没有。默认是ALL,表示我们需要的字段,在value中都有。
    • 如果除了value,我们还要解析key中的数据,且key中的键值和value中的键值有重名的情况,此时还需要额外设置一个配置’key.fields-prefix’ = ‘key_’,‘key.fields’ = ‘field1;field2’。
  • 参数问题:

    • flinksql水位线问题,如果source为kafka,kafka的并行度大于1,但是flink的并行度为1,此时如果kafka中某个分区没数据,这时候的flink的水位线一直不会触发(如果用javaApi的方式实现,是不会有这个问题的!!!)这种情况需要通过参数调整水位线推进。table.exec.source.idle-timeout=10000,单位是ms,如果其他分区没有等待多少ms后没有数据来,则自动推进水位线。

完整例子如下:

CREATE TABLE pageviews (
key_user_id BIGINT,
user_id BIGINT,
page_id BIGINT,
viewtime TIMESTAMP(3), --yyyy-MM-dd HH:mm:ss
user_region STRING,
WATERMARK FOR viewtime AS viewtime - INTERVAL '0' SECOND
 ) WITH (
'connector' = 'kafka',
'topic' = 'VIP-DT',
'properties.bootstrap.servers' = '192.168.7.105:9092,192.168.7.61:9092,192.168.7.221:9092',
'properties.group.id' = 'TestOpenSourceFlinkGroup',
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1662393600000',
   
'key.format' = 'json',
'key.json.ignore-parse-errors' = 'true',
'key.json.fail-on-missing-field' = 'false',
'key.fields-prefix' = 'key_',
'key.fields' = 'key_user_id',
   
'value.format' = 'json',
'value.json.ignore-parse-errors' = 'true',
'value.json.fail-on-missing-field' = 'false',
'value.fields-include' = 'EXCEPT_KEY'
);

2.2、Upsert-Kafka

一般连接器配置如下即可,相对于普通kafkaSource,他不能设置流开始的位点,以及他必须设置主键,主键就是对应的key值。(大部分的配置基本都和2.1kafka的配置相同)

'connector' = 'upsert-kafka',
'topic' = 'topicName(自定义)',
'properties.bootstrap.servers' = 'ip:port,ip:port,ip:port(自定义)',
'properties.group.id' = 'groupId(自定义)',
'key.format' = 'json',
'key.json.ignore-parse-errors' = 'true',
'key.json.fail-on-missing-field' = 'false',
'key.fields-prefix' = 'key_',

'value.format' = 'json',
'value.json.ignore-parse-errors' = 'true',
'value.json.fail-on-missing-field' = 'false',
'value.fields-include' = 'EXCEPT_KEY'

**注意1:**如果source为upsert-kafka,那么意味着从source开始,这条流就是回撤流,我们可以对这条流进行简单group by,但是不能进行开窗口。group by之后的数据需要用可支持upsert的sink进行接受。比如upsert-kafka,但是注意的是,upsert-kafka接受回撤流时,如果是删除的消息,他的value值为空(注意:没有开窗口的group by可以对回撤流进行,也可以对普通append流进行,但是最终返回的都是回撤流

回撤流有以下规则:

  • +I对应新增的数据

  • -U对应回撤某条数据

  • +U对应更新后的数据

  • -D对应删除某条数据

当回撤流写入到upsert-kafka中有以下规则:

  • -U的数据不会进入sink,
  • +I,+U,-D的数据会进入sink,但是-D的数据sink中会没有value值

产生回撤流场景:

  • Aggregate Without Window(不带 Window 的聚合场景)

  • Rank

  • Over Window

  • Left/Right/Full Outer Join

**注意2:**在flink1.11是不支持upsert-kafka的,如果source需要是回撤流,那么kakfa中的数据格式需要是这几个canal-json,debezium-json,maxwel-json,此时我们定义kafka source时,我们的format格式可以知道对应的canal-json,debezium-json,maxwel-json其中一种。相反如果kafka中的json对应的是以上三种格式之一,我们可以通过对应的format格式去接受。如果要进行数据的去重操作,则需要可以定义主键,且flink的参数中加上table.exec.source.cdc-events-duplicate=true,这时框架会生成一个额外的有状态算子,使用该 primary key 来对变更事件去重并生成一个规范化的 changelog 流。

三、流处理场景

3.1、单流

3.1.1、简单聚合

使用:

  • 和正常sql一样对某些字段进行分组,然后求聚合值,只不过会利用状态存储流过来的数据。注意:如果要进行简单聚合,下游必须支持upsert,否则会报错doesn't support consuming update changes which is produced by node GroupAggregate

格式:

  • group by 字段名
3.1.2、窗口

使用:

  • group by后,与正常sql的group by一样使用,只不过不是对某字段group by,而是对一个函数进行group by。

格式:

  • group by tumble(时间字段,间隔时间) 。当然也可以和其他正常字段一起使用,group by 某字段名, tumble(时间字段,间隔时间) 。除了tumble,还有hop和session函数,分别是滚动、滑动、会话窗口。hop中有三个参数,前两个和tumble一样,第三个是一个时间参数,表示滑动间隔。
3.1.3、TopN

使用:

  • 求实时热度等场景时使用,可以在回撤流和普通流上使用,返回一个回撤流

格式:

  • 和传统的开窗函数一样,row_number() over(partition by 字段 order by 字段 desc)

注意:

  • 有一个bug,在使用时必须在外侧套一层select,且必须有where条件,条件必须是rn<某数,或rn=某数,或rn<=某数[FLINK-26051]
3.1.4、视图

使用:

  • 可以用,在写一些复杂sql时,可以使用视图来创建一些中间表,来使代码看起来更易于理解。一个视图内也可以查另一个视图。

格式:

  • create view as select语句

3.2、双流join

3.2.1、正常join(inner,left,right,full)

使用:select * from a inner/left/right/full join b on a.id = b.id;

返回:Flink会通过状态保存两条流的数据,最终会产生一条回撤流。

问题:状态会越来越大,需要定期清除状态。

为什么是回撤流:

以 left Join 为例,且假设左流的数据比右流的数据先到,左流的数据会去扫描右流数据的状态,如果找不到可以 Join 的数据,左流并不知道右流中是确实不存在这条数据还是说右流中的相应数据迟到了。为了满足 left join 的语义的话,左边流数据还是会产生一条 join 数据发送到下游,类似于 MySQL Left Join,左流的字段以正常的表字段值填充,右流的相应字段以 Null 填充,然后输出到下游。

后期如果右流的相应数据到达,会去扫描左流的状态再次进行 join,此时,为了保证语义的正确性,需要把前面已经输出到下游的这条特殊的数据进行回撤,同时会把最新 join 上的数据输出到下游。注意,对于相同的 Key,如果产生了一次回撤,是不会再产生第二次回撤的,因为如果后期再有该 Key 的数据到达,是可以 join 上另一条流上相应的数据的。

3.2.2、interval join

使用:在普通join的基础上增加一些条件,①on后边的关联条件需要多一个时间关联②on后边的时间条件必须和事件时间的字段或者处理时间的时间字段相同。满足这两个条件才是interval join,否则就是普通join。我们可以在flink的webUI上看join的类型。

返回:返回一个普通追加流。

问题:需要自己把握设置一个窗口时间。

注意:interval只支持innerjoin,不支持left,right,full join。

举例:

前提:
	source1:es为事件时间或者处理时间
	source2:es为事件时间或者处理时间
语句1select * from a,b where a.id=b.id and b.es between a.es and a.es + interval '5' second;
语句2select * from a inner/left/right/full join b on a.id=b.id and/where b.es between a.es and 	a.es + interval '5' second;
join类型:
	都是interval join,而且interval join都是inner join,出来的流都是追加流。

验证:去webui上看是否是interval join。在webui上也可以看到jointype,但是如果是interval join,他的jointype一定是inner join(如果你在insert语句中写的是其他left/right/full,在webui上看见的也是left/right/full,但是实际上还是inner join,最终数据不会出现,左右两边有一边为null的情况,输出的还是一个append流。)

3.2.3、时态表join

定义:

  • 时态表就是一张随时间变化的表。

种类:

  • 一种是我们可以访问他的历史版本,这种是版本表,比如回撤流;
  • 一种是我们只能访问到当前最新的版本,这种是普通表,比如一些数据库维表。

如何获取版本表:(个人理解版本表就是一个带有事件时间的回撤流)

  • 一种是创建kafka源表,且format格式为cdc格式(canal/maxwell/debezium),定义主键,定义事件时间。

  • 一种是创建upsert kafak源表,定义主键,定义事件时间。

  • 一种是通过视图获得,核心是转化append流为retract流。首先append流一定得有事件事件,其次在创建视图时候,通过row_number或者group by等操作返回一条retract流。(如果上游是kafka,且数据类型不是cdc类型,且我们需要指定数据的起始位点,这时候我们就要通过视图来获取到版本表)

官网案例:

  • -- source1
    CREATE TABLE orders  (
      order_id STRING,
      product_id STRING,
      order_time TIMESTAMP(3),
      WATERMARK FOR order_time AS order_time  -- defines the necessary event time
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'TEST-ODS_BUFFER_SHUNT',
      'properties.bootstrap.servers' = '192.168.7.105:9092,192.168.7.61:9092,192.168.7.221:9092',
      'properties.group.id' = 'TestOpenSourceFlinkGroup',
      'scan.startup.mode' = 'timestamp',
      'scan.startup.timestamp-millis' = '1663917600000',
      'value.format' = 'json',
      'value.json.ignore-parse-errors' = 'true',
      'value.json.fail-on-missing-field' = 'false'
    );
    -- sourcr2
    CREATE TABLE product_changelog   (
      product_id STRING,
      product_name STRING,
      product_price DECIMAL(10, 4),
      update_time TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL, -- 注意:自动从毫秒数转为时间戳
      PRIMARY KEY(product_id) NOT ENFORCED,      -- (1) defines the primary key constraint
      WATERMARK FOR update_time AS update_time   -- (2) defines the event time by watermark      
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'TEST-ODS_BUFFER_SHUNT2',
      'properties.bootstrap.servers' = '192.168.7.105:9092,192.168.7.61:9092,192.168.7.221:9092',
      'properties.group.id' = 'TestOpenSourceFlinkGroup',
      'scan.startup.mode' = 'timestamp',
      'scan.startup.timestamp-millis' = '1663917600000', --kafka的时间 2022-09-23 15:20:00
      'value.format' = 'debezium-json'
    );
    
    -- sink 
    create table printSink(
      order_id STRING,
      order_time TIMESTAMP(3),
      product_name STRING,
      product_time TIMESTAMP(3),
      price DECIMAL(10, 4)
    )with(
      'connector' = 'print'
    );
    
    -- 基于事件时间的时态表 Join
    insert into printSink 
    SELECT
      O.order_id,
      O.order_time,
      P.product_name,
      P.update_time AS product_time,
      P.product_price AS price
    FROM orders AS O
    LEFT JOIN product_changelog FOR SYSTEM_TIME AS OF O.order_time AS P
    ON O.product_id = P.product_id;
    
    
    -- source1对应数据
    {"order_id":"o_001","product_id":"111","order_time":"2022-09-23 00:01:00"}
    {"order_id":"o_002","product_id":"222","order_time":"2022-09-23 00:02:00"}
    {"order_id":"o_003","product_id":"111","order_time":"2022-09-23 12:00:00"}
    {"order_id":"o_004","product_id":"222","order_time":"2022-09-23 12:00:00"}
    {"order_id":"o_005","product_id":"111","order_time":"2022-09-23 18:00:00"}
    
    -- source2对应数据
    {"before":null,"after":{"product_id":"111","product_name":"scooter","product_price":11.11},"source":{},"op":"c","ts_ms":1663862460000,"transaction":null}
    
    {"before":null,"after":{"product_id":"222","product_name":"basketball","product_price":23.11},"source":{},"op":"c","ts_ms":1663862520000,"transaction":null}
    
    {"before":{"product_id":"111","product_name":"scooter","product_price":11.11},"after":{"product_id":"111","product_name":"scooter","product_price":12.99},"source":{},"op":"u","ts_ms":1663905600000,"transaction":null}
    
    {"before":{"product_id":"222","product_name":"basketball","product_price":23.11},"after":{"product_id":"222","product_name":"basketball","product_price":19.99},"source":{},"op":"u","ts_ms":1663905600000,"transaction":null}
    
    {"before":{"product_id":"111","product_name":"scooter","product_price":12.99},"after":null,"source":{},"op":"d","ts_ms":1663927200000,"transaction":null}
    

3.3、流表join

使用:当流数据需要关联一些维表时,需要去对应数据库异步对应的维度信息,此时需要使用流表join。流表join,也是时态表join的一种,因为数据库维表就相当于一个版本表,只有一个最新的快照版本。

注意:流表需要用处理时间,进行join时,使用这个处理时间。

案例:

CREATE TABLE orders (
  order_id string,
  order_channel string,
  order_time  string,
  pay_amount double,
  real_pay double,
  pay_time string,
  user_id string,
  user_name string,
  area_id string,
  proctime as Proctime() --维表join需要用处理时间
--   WATERMARK FOR order_time AS order_time
) WITH (
  'connector' = 'kafka',
  'topic' = 'TEST-ODS_BUFFER_SHUNT',
  'properties.bootstrap.servers' = '192.168.7.105:9092,192.168.7.61:9092,192.168.7.221:9092',
  'properties.group.id' = 'TestOpenSourceFlinkGroup',
  'scan.startup.mode' = 'timestamp',
  'scan.startup.timestamp-millis' = '1664344800000', --kafka的时间 2022-09-28 14:00:00
  'value.format' = 'json',
  'value.json.ignore-parse-errors' = 'true',
  'value.json.fail-on-missing-field' = 'false'
);

--创建地址维表
create table area_info (
    area_id string, 
    area_province_name string,
    area_city_name string,
    area_county_name string, 
    area_street_name string, 
    region_name string 
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://10.1.12.99:3306/srm_mock_dt?useSSL=false&useUnicode=true&characterEncoding=utf8&serverTimeZone=Asia/Shanghai',
  'table-name' = 'area_info_flinksql_test',
  'username' = 'root',
  'password' = '6nN@@UQ5f%9u'
);

--根据地址维表生成详细的包含地址的订单信息宽表
create table order_detail(
    order_id string,
    order_channel string,
    order_time string,
    pay_amount double,
    real_pay double,
    pay_time string,
    user_id string,
    user_name string,
    area_id string,
    area_province_name string,
    area_city_name string,
    area_county_name string,
    area_street_name string,
    region_name string
) with (
  'connector' = 'print'
);

insert into order_detail
    select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name,
           area.area_id, area.area_province_name, area.area_city_name, area.area_county_name,
           area.area_street_name, area.region_name  from orders 
           left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id;

四、Sink

4.1、Kafka

连接器配置可参考2.1,kafkaSource也可以看作是具体kafka中的数据,往flink内部流入的一个sink

4.2、Upset-Kafka

连接器配置可参考2.2

4.3、Mysql

使用:下游是mysql时,我们可以实现数据的upsert/delete

案例1:

-- {"id":"1","name":"张三","age":18,"sex":"男","amount":20.56}
-- 上游数据是append流,可以实现数据的update,需要定义主键,此主键可以和真实数据库的主键不一样。
CREATE TABLE kafka_source (
    id bigint, 
    name string,
    age int,
    sex string, 
    amount decimal(20,10)
) WITH (
  'connector' = 'kafka',
  'topic' = 'TEST-ODS_BUFFER_SHUNT',
  'properties.bootstrap.servers' = '192.168.7.105:9092,192.168.7.61:9092,192.168.7.221:9092',
  'properties.group.id' = 'TestOpenSourceFlinkGroup',
  'scan.startup.mode' = 'timestamp',
  'scan.startup.timestamp-millis' = '1664348400000', --kafka的时间 2022-09-28 15:00:00
  'value.format' = 'json',
  'value.json.ignore-parse-errors' = 'true',
  'value.json.fail-on-missing-field' = 'false'
);


create table mysql_sink (
    id bigint, 
    name string,
    age int,
    sex string, 
    amount decimal(20,10),
    PRIMARY KEY (name) NOT ENFORCED --真实数据库主键为id,这里可以不为id,如果可以确保某字段唯一,									  --也可以用此字段
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://10.1.12.99:3306/srm_mock_dt?useSSL=false&useUnicode=true&characterEncoding=utf8&serverTimeZone=Asia/Shanghai',
  'table-name' = 'user_flinksql_test',
  'username' = 'root',
  'password' = '6nN@@UQ5f%9u'
);


insert into mysql_sink
select * from kafka_source;

案例2:

-- 上游数据是retreat流,可以实现数据的update/delete,需要定义主键,此主键可以和真实数据库的主键不一样。
CREATE TABLE kafka_source (
    id bigint,
    name string,
    age int,
    sex string,
    amount decimal(20,10)
) WITH (
  'connector' = 'kafka',
  'topic' = 'TEST-ODS_BUFFER_SHUNT',
  'properties.bootstrap.servers' = '192.168.7.105:9092,192.168.7.61:9092,192.168.7.221:9092',
  'properties.group.id' = 'TestOpenSourceFlinkGroup',
  'scan.startup.mode' = 'timestamp',
  'scan.startup.timestamp-millis' = '1664348400000', --kafka的时间 2022-09-28 15:00:00
  'value.format' = 'json',
  'value.json.ignore-parse-errors' = 'true',
  'value.json.fail-on-missing-field' = 'false'
);

create table mysql_sink (
    id bigint,
    name string,
    age int,
    sex string,
    amount decimal(20,10),
    PRIMARY KEY (name) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://10.1.12.99:3306/srm_mock_dt?useSSL=false&useUnicode=true&characterEncoding=utf8&serverTimeZone=Asia/Shanghai',
  'table-name' = 'user_flinksql_test',
  'username' = 'root',
  'password' = '6nN@@UQ5f%9u'
);

insert into mysql_sink
select * from kafka_source;

-- 数据:
-- {"before":null,"after":{"id":"1","name":"张三","age":18,"sex":"男","amount":20.56},"source":{},"op":"c","ts_ms":1663862460000,"transaction":null}

-- {"before":{"id":"1","name":"张三","age":18,"sex":"男","amount":20.56},"after":{"id":"1","name":"张三","age":19,"sex":"男","amount":20.56},"source":{},"op":"u","ts_ms":1663862460000,"transaction":null}

-- {"before":{"id":"1","name":"张三","age":19,"sex":"男","amount":20.56},"after":null,"source":{},"op":"d","ts_ms":1663862460000,"transaction":null}

四、问题

4.1、水位线不推进

场景:source为kafka,kafka分区数大于1,flink的并行度为1,kafka某个分区没数据。

解决:设置参数table.exec.source.idle-timeout=10000,单位是ms,如果其他分区等待xx毫秒没数据,则推进水位线。(如果这个场景是javaAPI的方式对接kafka,则是不会出现的。)

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐