标题: flink-connector中kafka和upsertkafka的介绍
日期: 2021-08-03 16:46:43
标签: [flink, kafka, upsert kafka, 实时数仓]
分类: 数据仓库

今天来说下flink sql中常用到的connector:kafka,它承接了实时的消息数据,进行处理,当然,这些消息的特点有可能不一样,怎样处理,得到实时的结果,提供给分析、运营、营销等等。下面来看看具体有什么区别,怎么使用。

kafka

kafka中的实时消息,它也可以是关系型数据库的changelog(具有insert/update/delete属性),我们知道每一条消息的属性之后,就可以确定哪些数据是最新的,哪些数据是被删除的,那么在我们的存储系统中,应该被实时地更新,以便计算的及时,数据的延时性就会更低。

flink

他们俩的依赖都是一样的
依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.12.3</version>
</dependency>

先来说下kafka connector:

1.创建一个kafka connector(sql)

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset', // 消费的offset位置,也可以是latest-offset
  'format' = 'csv'
)

这种connector,kafka中的数据是什么样子,KafkaTable表中的数据就是什么样子的,可以执行select语句查看:

select * from KafkaTable

接着你就可以对KafkaTable表中的数据进行各种操作了。

再来看下upsertkafka connector

先介绍一下,upsert kafka连接器支持实时消息数据以upsert方式从一个kafka topic中插入到另一个kafka topic中,source为changelog类型kafka, sink为kafka

什么是changelog类型kafka,就是kafka topic中的消息是带有一个属性的,这个属性标记数据是Insert、update before、update after、delete的,再根据主键来进行对sink kafka topic中的数据进行具体的操作,是插入,还是更新,还是删除。

另外,value为空的消息,将被视为Delete消息。

1.来看看sql的例子

CREATE TABLE pageviews_per_region (
  user_region STRING,
  pv BIGINT,
  uv BIGINT,
  PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'pageviews_per_region',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'avro',
  'value.format' = 'avro'
);

CREATE TABLE pageviews (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  user_region STRING,
  WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews',
  'properties.bootstrap.servers' = '...',
  'format' = 'json'
);

-- 计算 pv、uv 并插入到 upsert-kafka sink
INSERT INTO pageviews_per_region
SELECT
  user_region,
  COUNT(*),
  COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;

记住DDL语句中,一定要设置主键Primary key。

上面的pageviews_per_region表中计算的pv,uv结果就是统计结果,当数据源端进行了增删改,对应的pv uv结果就会同步更新,这就是upsert kafka的魅力。

这是基于kafka的统计计算,前提条件是topic pageviews中的数据是changelog格式的。

changelog格式数据怎么来,怎么导入kafka,还有upsert到mysql、hbase呢,且看下回分解。


每天进步一点点,你会成为专家。

欢迎关注我的微信公众号,比较喜欢分享知识,也喜欢宠物,想知道狗狗怕蚊子不?微信扫描二维码查看详情
狗狗怕蚊子吗?

一起学习,一起进步。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐