Flink-1.12 - 之kafka connector实践

1 前言(消息更新模式)

阅读之前可以先了解一下,动态table抓换成data stream的3种模式,这个在动态Table转换成DataStream或者写入外部系统的时候是有严格的约束的。

  • Append Mode
    • 一个只有Insert操作的动态表,才能转换成Append-only stream(或写入到支持AppendMode的外部系统如:文件、Kafka、Hive等)
  • Retract Mode
    • 这种模式有2种类型的消息:add 和 retract
      • 通常 insert操作被视为add消息
      • delete操作被视为 retract消息
      • update操作被视为先retract后add的消息
  • Upsert Mode
    • 这种模式也将消息分成2类:upsert 和 delete
      • 通常Insert和update的操作被堪称Upsert消息
      • Delete操作被看成Delete消息

通常能转换成Append DataStream的 Table都适用于其它2种更新模式

2 kafka连接器分类

Flink支持对kafka的Source 和 Sink,Flink1.12版本的Flink SQL支持2种kafka连接器。

  • (regular) Kafka
    • append only的连接器
      • 这种连接器支持 unbounded source scan & streaming sink 【append】
  • Upsert Kafka
    • upsert-的连接器(upsert代表insert/update)
      • upsert-kafka connector提供了changelog stream
      • 每条record代表一个update或delete的Event
      • 必须指定一个primary key
#1、upsert-kafka作为Source的时候
Flink连接产生的stream是一个 chenge log stream,每来一条record消息,如果消息的key已经存在过,那么就update进行覆盖,如果不存在过那么就是insert,如果消息的value为空,且之前key已经存在,那么就是delete删除。


#2、upsert-kafka作为Sink的时候
upsert-kafka能消费Flink中的一个 chenge log stream
insert/update_after的消息作为普通的kafka message写入到kafka,delete操作的消息将会以value为null的形式写入到kafka,Flink将通过对primary key列的值进行分区,来保证消息在primary key上的排序,所以update/delete拥有相同key的message会被分配到相同的partition

作为Sink的时候,必须在DDL上指定primary key

3 (regular) Kafka-Connector

3.1 NOTE 注意了!!!~~~

这种连接器,只支持Append-Only Sink

3.2 简单的DDL案例

  • 其中required为必选~
CREATE TABLE KafkaTable (
  `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format
  `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
  `partition_id` BIGINT METADATA FROM 'partition' VIRTUAL,  -- from Kafka connector
  `offset` BIGINT METADATA VIRTUAL,  -- from Kafka connector
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
'connector' = 'kafka', 											# required 
'topic' = 'topic-1;topic-2', 									# required for sink
'properties.bootstrap.servers' = 'kafkahost:9092,...'   		# required
'format' = 'csv' , 												# required,指定message value的ser\deser格式
'properties.group.id' = 'group-id1', 							# required by source 指定消费者组
###########################################################################################################################   
'topic-pattern' = 'topic-*', 									# 以正则的方式匹配source topic
'properties.allow.auto.create.topics' = 'false',				# properties.*的形式指定kafka的配置,不能`key.deserializer` and `value.deserializer` 
'key.format' = 'csv' , 											# 指定kafka消息key的ser\deser
'key.fields' = 'field1;field2' ,								# 如果指定了'key.format',这个就必须指定,否则会有一个空key
'key.fields-prefix' = 'key_' , 									# 给消息的key指定前缀 
'value.format' = 'csv' , 										# 与'format'一样的含义
'value.fields-include' = 'ALL' , 								# 指定消息中的value列,可以选ALL、EXCEPT_KEY
'scan.startup.mode' = 'earliest-offset' , 						# 指定consumer从哪里开始消费,可选 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
'scan.startup.specific-offsets' = 'partition:0,offset:42;...' , # 当选择'specific-offsets'的时候指定每个分区的offset消费
'scan.startup.timestamp-millis' = '16666666000' , 				# 当选择'timestamp'从哪个timestamp开始消费
'scan.topic-partition-discovery.interval' = 'Long interval' ,	# 
'sink.partitioner' = 'default' , 								# flink->kafka的分区分配策略.'fixed','round-robin','org.mycompany.MyPartitioner'
'sink.semantic' = 'exactly-once' , 								# 指定sink的语义,'at-least-once', 'exactly-once' and 'none'
'sink.parallelism' = '88' 										# 指定kafka-sink-connector的并行度,默认与flink的最后一个算子的parallelism保持一致
);

3.3 kafka的可用元数据

以下元数据属性也已当作,DDL中的字段进行使用,如:event_time TIMESTAMP(3) METADATA FROM ‘timestamp’ VIRTUAL,其中VIRTUAL必须标注在只读的元数据字段上,除了 headers、timestamp支持R/W,其它全是只读~

topicSTRING NOT NULLkafka对应的topic
partitionINT NOT NULLkafka消息对应的分区
headersMAP<STRING, BYTES> NOT NULLkafka消息的header
leader-epochINT NULL标识kafka消息对应的leader的选举标识
offsetBIGINT NOT NULL该分区的kafka消息的offset
timestampTIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULLkafka消息的timestamp
timestamp-typeSTRING NOT NULLkafka消息的timestamp的类型

4 Upsert-kafka-Connector

这种Upsert模式的连接器支持,增删改。

4.1 简单的DDL案例

CREATE TABLE pageviews_per_region (
  user_region STRING,
  pv BIGINT,
  uv BIGINT,
  PRIMARY KEY (user_region) NOT ENFORCED -- 必须指定一个primary key,这个必须是唯一的,通常是group by之后的字段
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'pageviews_per_region',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'avro',
  'value.format' = 'avro',
  'value.json.fail-on-missing-field' = 'false'
);

CREATE TABLE pageviews (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  user_region STRING,
  WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews',
  'properties.bootstrap.servers' = '...',
  'format' = 'json'
);

-- calculate the pv, uv and insert into the upsert-kafka sink
INSERT INTO pageviews_per_region
SELECT
  user_region,
  COUNT(*),
  COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;

4.2 kafka可用元数据

与普通的kafka连接器保持一致

4.3 常用的WITH-OPTIONS

'connector' = 'kafka', 											# required 
'topic' = 'topic-1;topic-2', 									# required for sink
'properties.bootstrap.servers' = 'kafkahost:9092,...'   		# required
'format' = 'csv' , 											# required,指定message value的ser\deser格式
###########################################################################################################################   
'properties.allow.auto.create.topics' = 'false',				# properties.* 的形式指定kafka的配置,不能`key.deserializer` and `value.deserializer` 
'key.format' = 'csv' , 											# required,指定kafka消息key的ser\deser
'key.fields-prefix' = 'key_' , 									# 给消息的key指定前缀 
'value.format' = 'csv' , 										# required,与'format'一样的含义
'value.fields-include' = 'ALL' , 								# 指定消息中的value列,可以选ALL、EXCEPT_KEY
'sink.parallelism' = '88' 										# 指定kafka-sink-connector的并行度,默认与flink的最后一个算子的parallelism保持一致

5 数据格式与类型匹配

请参阅:formats and dataType mapping

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐