Flink Sql(二) Kafka连接器
如果作为TableSource,连接器会将读取到的topic中的数据(key,value),解释为对当前key的数据值的更新(UPDATE),也就是查找动态表中key对应的一行数据,将value更新为最新的值;需要特别说明的是,在KafkaTable的字段中有一个ts,它的声明中用到了METADATAFROM,这是表示一个“元数据列”(metadatacolumn),它是由Kafka连接器的元数据
Kafka连接器
在 Table API 和 SQL 编写的 Flink 程序中,可以在创建表的时候用 WITH 子句指定连接器 (connector),这样就可以连接到外部系统进行数据交互了。
架构中的 TableSource 负责从外部系统中读取数据并转换成表,TableSink 则负责将结果表 写入外部系统。在 Flink 1.13 的 API 调用中,已经不去区分 TableSource 和 TableSink,我们只要建立到外部系统的连接并创建表就可以,Flink 自动会从程序的处理逻辑中解析出它们的用途。
Flink 的 Table API 和 SQL 支持了各种不同的连接器。当然,最简单的其实就是连接到控制台打印输出:
CREATE TABLE ResultTable (
user STRING,
cnt BIGINT
WITH (
'connector' = 'print'
);
这里只需要在 WITH 中定义 connector 为 print 就可以了。而对于其它的外部系统,则需要增加一些配置项。下面我们就分别进行介绍。
Kafka 的 SQL 连接器可以从 Kafka 的主题(topic)读取数据转换成表,也可以将表数据 写入 Kafka 的主题。换句话说,创建表的时候指定连接器为 Kafka,则这个表既可以作为输入表,也可以作为输出表。
1.引入依赖
想要在 Flink 程序中使用 Kafka 连接器,需要引入如下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
这里我们引入的 Flink 和 Kafka 的连接器,与之前 DataStream API 中引入的连接器是一样的。如果想在 SQL 客户端里使用 Kafka 连接器,还需要下载对应的 jar 包放到 lib 目录下。
另外,Flink 为各种连接器提供了一系列的“表格式”(table formats),比如 CSV、JSON、 Avro、Parquet 等等。这些表格式定义了底层存储的二进制数据和表的列之间的转换方式,相当于表的序列化工具。对于 Kafka 而言,CSV、JSON、Avro 等主要格式都是支持的, 根据 Kafka 连接器中配置的格式,我们可能需要引入对应的依赖支持。
以 CSV 为例:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
由于 SQL 客户端中已经内置了 CSV、JSON 的支持,因此使用时无需专门引入;而对于 没有内置支持的格式(比如 Avro),则仍然要下载相应的 jar 包。关于连接器的格式细节详见官网说明,我们后面就不再讨论了。
2. 创建连接到 Kafka 的表
创建一个连接到 Kafka 表,需要在 CREATE TABLE 的 DDL 中在 WITH 子句里指定连接 器为 Kafka,并定义必要的配置参数。
下面是一个具体示例:
CREATE TABLE KafkaTable (
`user` STRING,
`url` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
这里定义了 Kafka 连接器对应的主题(topic),Kafka 服务器,消费者组 ID,消费者起始 模式以及表格式。需要特别说明的是,在 KafkaTable 的字段中有一个 ts,它的声明中用到了 METADATA FROM,这是表示一个“元数据列”(metadata column),它是由 Kafka 连接器的 元数据“timestamp”生成的。这里的 timestamp 其实就是 Kafka 中数据自带的时间戳,我们把 它直接作为元数据提取出来,转换成一个新的字段 ts.
3. Upsert Kafka
正常情况下,Kafka 作为保持数据顺序的消息队列,读取和写入都应该是流式的数据,对 应在表中就是仅追加(append-only)模式。如果我们想要将有更新操作(比如分组聚合)的结 果表写入 Kafka,就会因为 Kafka 无法识别撤回(retract)或更新插入(upsert)消息而导致异常。
为了解决这个问题,Flink 专门增加了一个“更新插入 Kafka”(Upsert Kafka)连接器。这 个连接器支持以更新插入(UPSERT)的方式向 Kafka 的 topic 中读写数据。
具体来说,Upsert Kafka 连接器处理的是更新日志(changlog)流。如果作为 TableSource, 连接器会将读取到的 topic中的数据(key, value),解释为对当前 key 的数据值的更新(UPDATE), 也就是查找动态表中 key 对应的一行数据,将 value 更新为最新的值;因为是 Upsert 操作,所 以如果没有 key 对应的行,那么也会执行插入(INSERT)操作。另外,如果遇到 value 为空 (null),连接器就把这条数据理解为对相应 key 那一行的删除(DELETE)操作。
如果作为 TableSink,Upsert Kafka 连接器会将有更新操作的结果表,转换成更新日志 (changelog)流。如果遇到插入(INSERT)或者更新后(UPDATE_AFTER)的数据,对应 的是一个添加(add)消息,那么就直接正常写入 Kafka 主题;如果是删除(DELETE)或者 更新前的数据,对应是一个撤回(retract)消息,那么就把 value 为空(null)的数据写入 Kafka。 由于 Flink 是根据键(key)的值对数据进行分区的,这样就可以保证同一个 key 上的更新和删除消息都会落到同一个分区中。
下面是一个创建和使用 Upsert Kafka 表的例子:
CREATE TABLE pageviews_per_region (
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'pageviews_per_region',
'properties.bootstrap.servers' = '...',
'key.format' = 'avro',
'value.format' = 'avro'
);
CREATE TABLE pageviews (
user_id BIGINT,
page_id BIGINT,
viewtime TIMESTAMP,
user_region STRING,
WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'pageviews',
'properties.bootstrap.servers' = '...',
'format' = 'json'
);
-- 计算 pv、uv 并插入到 upsert-kafka 表中
INSERT INTO pageviews_per_region
SELECT
user_region,
COUNT(*),
COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;
这里我们从 Kafka 表 pageviews 中读取数据,统计每个区域的 PV(全部浏览量)和 UV (对用户去重),这是一个分组聚合的更新查询,得到的结果表会不停地更新数据。为了将结 果表写入 Kafka 的 pageviews_per_region 主题,我们定义了一个 Upsert Kafka 表,它的字段中 需要用PRIMARY KEY来指定主键,并且在WITH子句中分别指定key和value的序列化格式。
更多推荐
所有评论(0)