Flink-1.12 - 之Flink SQL 与 kafka connector实践
Flink-1.12 - 之kafka connector实践1 前言(消息更新模式)阅读之前可以先了解一下,动态table抓换成data stream的3种模式,这个在动态Table转换成DataStream或者写入外部系统的时候是有严格的约束的。Append Mode一个只有Insert操作的动态表,才能转换成Append-only stream(或写入到支持AppendMode的外部系统如:
·
Flink-1.12 - 之kafka connector实践
1 前言(消息更新模式)
阅读之前可以先了解一下,动态table抓换成data stream的3种模式,这个在动态Table转换成DataStream
或者写入外部系统
的时候是有严格的约束的。
Append Mode
- 一个只有Insert操作的动态表,才能转换成
Append-only stream
(或写入到支持AppendMode的外部系统如:文件、Kafka、Hive等)
- 一个只有Insert操作的动态表,才能转换成
Retract Mode
- 这种模式有2种类型的消息:add 和 retract
- 通常 insert操作被视为add消息
- delete操作被视为 retract消息
- update操作被视为先retract后add的消息
- 这种模式有2种类型的消息:add 和 retract
Upsert Mode
- 这种模式也将消息分成2类:upsert 和 delete
- 通常Insert和update的操作被堪称Upsert消息
- Delete操作被看成Delete消息
- 这种模式也将消息分成2类:upsert 和 delete
通常能转换成Append DataStream的 Table都适用于其它2种更新模式
2 kafka连接器分类
Flink支持对kafka的Source 和 Sink,Flink1.12
版本的Flink SQL支持2种kafka连接器。
(regular) Kafka
- append only的连接器
- 这种连接器支持 unbounded source scan & streaming sink 【append】
- append only的连接器
Upsert Kafka
- upsert-的连接器(upsert代表insert/update)
- upsert-kafka connector提供了changelog stream
- 每条record代表一个update或delete的Event
- 必须指定一个primary key
- upsert-的连接器(upsert代表insert/update)
#1、upsert-kafka作为Source的时候
Flink连接产生的stream是一个 chenge log stream,每来一条record消息,如果消息的key已经存在过,那么就update进行覆盖,如果不存在过那么就是insert,如果消息的value为空,且之前key已经存在,那么就是delete删除。
#2、upsert-kafka作为Sink的时候
upsert-kafka能消费Flink中的一个 chenge log stream
insert/update_after的消息作为普通的kafka message写入到kafka,delete操作的消息将会以value为null的形式写入到kafka,Flink将通过对primary key列的值进行分区,来保证消息在primary key上的排序,所以update/delete拥有相同key的message会被分配到相同的partition
作为Sink的时候,必须在DDL上指定primary key
3 (regular) Kafka-Connector
3.1 NOTE 注意了!!!~~~
这种连接器,只支持Append-Only Sink
3.2 简单的DDL案例
- 其中required为必选~
CREATE TABLE KafkaTable (
`event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- from Debezium format
`origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- from Kafka connector
`offset` BIGINT METADATA VIRTUAL, -- from Kafka connector
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka', # required
'topic' = 'topic-1;topic-2', # required for sink
'properties.bootstrap.servers' = 'kafkahost:9092,...' # required
'format' = 'csv' , # required,指定message value的ser\deser格式
'properties.group.id' = 'group-id1', # required by source 指定消费者组
###########################################################################################################################
'topic-pattern' = 'topic-*', # 以正则的方式匹配source topic
'properties.allow.auto.create.topics' = 'false', # properties.*的形式指定kafka的配置,不能`key.deserializer` and `value.deserializer`
'key.format' = 'csv' , # 指定kafka消息key的ser\deser
'key.fields' = 'field1;field2' , # 如果指定了'key.format',这个就必须指定,否则会有一个空key
'key.fields-prefix' = 'key_' , # 给消息的key指定前缀
'value.format' = 'csv' , # 与'format'一样的含义
'value.fields-include' = 'ALL' , # 指定消息中的value列,可以选ALL、EXCEPT_KEY
'scan.startup.mode' = 'earliest-offset' , # 指定consumer从哪里开始消费,可选 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
'scan.startup.specific-offsets' = 'partition:0,offset:42;...' , # 当选择'specific-offsets'的时候指定每个分区的offset消费
'scan.startup.timestamp-millis' = '16666666000' , # 当选择'timestamp'从哪个timestamp开始消费
'scan.topic-partition-discovery.interval' = 'Long interval' , #
'sink.partitioner' = 'default' , # flink->kafka的分区分配策略.'fixed','round-robin','org.mycompany.MyPartitioner'
'sink.semantic' = 'exactly-once' , # 指定sink的语义,'at-least-once', 'exactly-once' and 'none'
'sink.parallelism' = '88' # 指定kafka-sink-connector的并行度,默认与flink的最后一个算子的parallelism保持一致
);
3.3 kafka的可用元数据
以下元数据属性也已当作,DDL中的字段进行使用,如:event_time TIMESTAMP(3) METADATA FROM ‘timestamp’ VIRTUAL
,其中VIRTUAL
必须标注在只读的元数据字段上,除了 headers、timestamp支持R/W,其它全是只读~
topic | STRING NOT NULL | kafka对应的topic |
---|---|---|
partition | INT NOT NULL | kafka消息对应的分区 |
headers | MAP<STRING, BYTES> NOT NULL | kafka消息的header |
leader-epoch | INT NULL | 标识kafka消息对应的leader的选举标识 |
offset | BIGINT NOT NULL | 该分区的kafka消息的offset |
timestamp | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL | kafka消息的timestamp |
timestamp-type | STRING NOT NULL | kafka消息的timestamp的类型 |
4 Upsert-kafka-Connector
这种Upsert模式的连接器支持,增删改。
4.1 简单的DDL案例
CREATE TABLE pageviews_per_region (
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED -- 必须指定一个primary key,这个必须是唯一的,通常是group by之后的字段
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'pageviews_per_region',
'properties.bootstrap.servers' = '...',
'key.format' = 'avro',
'value.format' = 'avro',
'value.json.fail-on-missing-field' = 'false'
);
CREATE TABLE pageviews (
user_id BIGINT,
page_id BIGINT,
viewtime TIMESTAMP,
user_region STRING,
WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'pageviews',
'properties.bootstrap.servers' = '...',
'format' = 'json'
);
-- calculate the pv, uv and insert into the upsert-kafka sink
INSERT INTO pageviews_per_region
SELECT
user_region,
COUNT(*),
COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;
4.2 kafka可用元数据
与普通的kafka连接器保持一致
4.3 常用的WITH-OPTIONS
'connector' = 'kafka', # required
'topic' = 'topic-1;topic-2', # required for sink
'properties.bootstrap.servers' = 'kafkahost:9092,...' # required
'format' = 'csv' , # required,指定message value的ser\deser格式
###########################################################################################################################
'properties.allow.auto.create.topics' = 'false', # properties.* 的形式指定kafka的配置,不能`key.deserializer` and `value.deserializer`
'key.format' = 'csv' , # required,指定kafka消息key的ser\deser
'key.fields-prefix' = 'key_' , # 给消息的key指定前缀
'value.format' = 'csv' , # required,与'format'一样的含义
'value.fields-include' = 'ALL' , # 指定消息中的value列,可以选ALL、EXCEPT_KEY
'sink.parallelism' = '88' # 指定kafka-sink-connector的并行度,默认与flink的最后一个算子的parallelism保持一致
5 数据格式与类型匹配
更多推荐
已为社区贡献3条内容
所有评论(0)