flinksql获取系统当前时间搓_flink sql实战案例之商品销量实时统计
1案例背景介绍互联网电商往往需要对订单商品销量实时统计,用于实时大屏展示,库存销量监控等等。本文主要介绍如何通过flink sql的方式进行商品实时销量的统计。业务流程介绍:使用otter采集业务库binlog数据输出到kafkaflink读取kafka数据进行商品销量统计统计结果输出到mysql下游业务系统直接读取mysql数据业务需求介绍:根据订单创建时间统计商品每天的实时销...
互联网电商往往需要对订单商品销量实时统计,用于实时大屏展示,库存销量监控等等。本文主要介绍如何通过flink sql的方式进行商品实时销量的统计。
业务流程介绍:
使用otter采集业务库binlog数据输出到kafka
flink读取kafka数据进行商品销量统计
统计结果输出到mysql
下游业务系统直接读取mysql数据
业务需求介绍:
根据订单创建时间统计商品每天的实时销量,不包含取消订单的商品
2 准备工作将mysql订单相关的binlog日志实时同步到kafka对应的Topic,然后创建对应的flink table source表。
为了简化需求,下面的订单表和订单明细表只列出主要的字段。
订单主表:orders
订单明细表:order_detail
3 难点解析同一个订单会有多次业务操作(例如下单、付款、发货,取消等等),每一次业务操作都会导致订单状态发生变化,并且每次变化订单表对应的Binlog日志会产生一条订单号相同的数据。如果我们不做处理直接关联聚合查询的话会导致数据重复统计结果不正确。因此我们需要了解业务系统都有哪些操作会对订单主表和订单明细进行更新操作。
假设业务系统数据变更是这样的:
- 用户下单后新增订单主表和订单明细表数据
- 后续的业务操作只会更新订单主表数据,订单明细表数据不会更新变化
- 数据每次更新update_time字段都会同时变化
再来看一下我们的需求如何处理:
需求的要求是当订单创建后我们就要统计该订单对应的商品数量,而当订单状态变为取消时我们要减掉该订单对应的商品数量。根据需求和订单数据更新的特点,这里需要用到flink回撤流的特性来处理该需求。flinksql可以使用row_number() over(partition by order_no order by update_time desc) 通过限制 where rn=1来获取同一订单的最新状态数据,然后和订单明细表进行关联求和。flinksql会自动更新统计结果。
4 编写业务逻辑订单主表source table
--订单主表source tableCREATE TABLE orders ( order_no string, order_state int, pay_time string, create_time string, update_time string ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', --kafka版本 'connector.topic' = '_tporders',--kafkatopic 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'connector.properties.group.id' = 'testGroup', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' --数据为json格式 )
订单明细表source table
CREATE TABLE order_detail ( order_no string, product_code string, quantity int, create_time string, update_time string ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', --kafka版本 'connector.topic' = 'tp_order_detail',--kafkatopic 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'connector.properties.group.id' = 'testGroup', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' --数据为json格式 )
mysql统计结果表sink table
CREATE TABLE product_sale ( order_date string, product_code string, cnt int ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/flink?serverTimezone=UTC&useSSL=true', 'connector.table' = 'order_state_cnt', 'connector.driver' = 'com.mysql.cj.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = 'root', 'connector.write.flush.max-rows' = '1',--默认每5000条数据写入一次,测试调小一点 'connector.write.flush.interval' = '2s',--写入时间间隔 'connector.write.max-retries' = '3' )
统计商品销量并写入mysql
insert into product_sale select create_date,product_code,sum(quantity)from (select t1.order_no, t1.create_date, t2.product_code, t2.quantity from (select order_id, order_status, substring(create_time,1,10) create_date, update_time , row_number() over(partition by order_no order by update_time desc) as rn from orders )t1 left join order_detail t2 on t1.order_no=t2.order_no where t1.rn=1--取最新的订单状态数据 and t1.order_status<>0--不包含取消订单 )t3 group by create_date,product_code
5
数据测试
假设在13点创建了两个订单,数据如下:
订单主表数据:
订单明细数据:
统计结果:
然后订单号为order1的订单在15分钟后取消了:
统计结果:
更多推荐
所有评论(0)