1 案例背景介绍

    互联网电商往往需要对订单商品销量实时统计,用于实时大屏展示,库存销量监控等等。本文主要介绍如何通过flink sql的方式进行商品实时销量的统计。

业务流程介绍:

  1. 使用otter采集业务库binlog数据输出到kafka

  2. flink读取kafka数据进行商品销量统计

  3. 统计结果输出到mysql

  4. 下游业务系统直接读取mysql数据

业务需求介绍:

根据订单创建时间统计商品每天的实时销量,不包含取消订单的商品

2 准备工作

    将mysql订单相关的binlog日志实时同步到kafka对应的Topic,然后创建对应的flink table source表。

为了简化需求,下面的订单表和订单明细表只列出主要的字段。

订单主表:orders

8b90e0d283140a4855df4addfc64c636.png

订单明细表:order_detail

1667853bf5f658c7f52171fb15e192c1.png

3 难点解析

    同一个订单会有多次业务操作(例如下单、付款、发货,取消等等),每一次业务操作都会导致订单状态发生变化,并且每次变化订单表对应的Binlog日志会产生一条订单号相同的数据。如果我们不做处理直接关联聚合查询的话会导致数据重复统计结果不正确。因此我们需要了解业务系统都有哪些操作会对订单主表和订单明细进行更新操作。

假设业务系统数据变更是这样的:

  • 用户下单后新增订单主表和订单明细表数据
  • 后续的业务操作只会更新订单主表数据,订单明细表数据不会更新变化
  • 数据每次更新update_time字段都会同时变化

再来看一下我们的需求如何处理:

        需求的要求是当订单创建后我们就要统计该订单对应的商品数量,而当订单状态变为取消时我们要减掉该订单对应的商品数量。根据需求和订单数据更新的特点,这里需要用到flink回撤流的特性来处理该需求。flinksql可以使用row_number() over(partition by order_no order by update_time desc) 通过限制 where rn=1来获取同一订单的最新状态数据,然后和订单明细表进行关联求和。flinksql会自动更新统计结果。

4 编写业务逻辑

订单主表source table

--订单主表source tableCREATE TABLE orders             (               order_no     string,               order_state  int,               pay_time     string,               create_time  string,               update_time  string             )        WITH (               'connector.type' = 'kafka',                      'connector.version' = 'universal', --kafka版本                   'connector.topic' = '_tporders',--kafkatopic               'connector.properties.zookeeper.connect' = 'localhost:2181',                'connector.properties.bootstrap.servers' = 'localhost:9092',               'connector.properties.group.id' = 'testGroup',               'connector.startup-mode' = 'latest-offset',               'format.type' = 'json' --数据为json格式                          )

订单明细表source table

CREATE TABLE order_detail             (               order_no     string,               product_code string,               quantity     int,               create_time  string,               update_time  string             )        WITH (               'connector.type' = 'kafka',                      'connector.version' = 'universal', --kafka版本                   'connector.topic' = 'tp_order_detail',--kafkatopic               'connector.properties.zookeeper.connect' = 'localhost:2181',                'connector.properties.bootstrap.servers' = 'localhost:9092',               'connector.properties.group.id' = 'testGroup',               'connector.startup-mode' = 'latest-offset',               'format.type' = 'json' --数据为json格式                          )

mysql统计结果表sink table

CREATE TABLE product_sale             (              order_date string,              product_code string,              cnt int              )          WITH (           'connector.type' = 'jdbc',            'connector.url' = 'jdbc:mysql://localhost:3306/flink?serverTimezone=UTC&useSSL=true',            'connector.table' = 'order_state_cnt',            'connector.driver' = 'com.mysql.cj.jdbc.Driver',            'connector.username' = 'root',           'connector.password' = 'root',           'connector.write.flush.max-rows' = '1',--默认每5000条数据写入一次,测试调小一点           'connector.write.flush.interval' = '2s',--写入时间间隔           'connector.write.max-retries' = '3'         )

统计商品销量并写入mysql

insert into product_sale select create_date,product_code,sum(quantity)from (select t1.order_no,             t1.create_date,             t2.product_code,             t2.quantity       from (select order_id,                    order_status,                    substring(create_time,1,10) create_date,                    update_time ,                    row_number() over(partition by order_no order by update_time desc) as rn              from orders              )t1       left join order_detail t2            on t1.order_no=t2.order_no      where t1.rn=1--取最新的订单状态数据      and t1.order_status<>0--不包含取消订单   )t3 group by create_date,product_code
5 数据测试 假设在13点创建了两个订单,数据如下: 订单主表数据:

30635e8035c77336a3aa3516b9a4158a.png

订单明细数据:

94f2cb7b9f32e5902d32161f6fb36d83.png

统计结果:

96f7bf0bb0974bb9cb6dd0334a9ea4ba.png

然后订单号为order1的订单在15分钟后取消了:

e5275696d4e6cb25621c2f04498bfdb5.png

统计结果:

4c987738dbe56a05c3f8e5136baf4b7b.png

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐