目录

一、每小时输出一次窗口时间内的pv数

二、自定义布隆过滤器统计累计时间内的uv数

三、从每天0点开始,每一小时输出累计的pv、uv数


一、每小时输出一次窗口时间内的pv数

先定义两个pojo类,UserBehavior为输入类,PageViewCount为输出类。

@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class UserBehavior {

    private Long userId;//用户ID
    private Long itemId;//类目ID
    private Integer categoryId;//分类ID
    private String behavior;//用户行为
    private Long timestamp;//时间戳

}
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class PageViewCount {

    private String rand; //随机key
    private Long windowEnd; //窗口结束时间
    private Long count; //pv

}

1 读取数据,创建DataStream(本案例数据为文件数据,如果输入数据为kafka,需要修改map里面的代码)

DataStreamSource<String> inputDS = env.readTextFile("***.csv");

2 读取数据,转换为bean,分配时间戳和watermark

SingleOutputStreamOperator<UserBehavior> mapDS = inputDS
                .map(line -> {
                    String[] fields = line.split(",");
                    return new UserBehavior(new Long(fields[0]), new Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<UserBehavior>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
                    @Override
                    public long extractTimestamp(UserBehavior userBehavior, long l) {
                        return userBehavior.getTimestamp() * 1000L;
                    }
                }));

3 分组开窗聚合

   生成随机key,避免数据倾斜。设置一小时的滚动窗口,并使用aggregate预聚合函数进行统计。

SingleOutputStreamOperator<PageViewCount> windowAggDS = mapDS
                .filter(line -> "pv".equals(line.getBehavior()))  //过滤pv行为
                .map(new MapFunction<UserBehavior, Tuple2<Integer, Long>>() {
                    @Override
                    public Tuple2<Integer, Long> map(UserBehavior userBehavior) throws Exception {
                        Random random = new Random();
                        return new Tuple2<>(random.nextInt(10), 1L);
                    }
                })
                .keyBy(line -> line.f0)
                .window(TumblingEventTimeWindows.of(Time.hours(1)))
                .aggregate(new PvCountAgg(), new PvCountResult());

3.1 预聚合函数PvCountAgg实现代码

    public static class PvCountAgg implements AggregateFunction<Tuple2<Integer, Long>, Long, Long>{

        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Tuple2<Integer, Long> integerLongTuple2, Long aLong) {
            return aLong + 1;
        }

        @Override
        public Long getResult(Long aLong) {
            return aLong;
        }

        @Override
        public Long merge(Long aLong, Long acc1) {
            return aLong + acc1;
        }
    }

3.2 PvCountResult函数实现代码

  public static class PvCountResult implements WindowFunction<Long, PageViewCount, Integer, TimeWindow>{

        @Override
        public void apply(Integer integer, TimeWindow timeWindow, Iterable<Long> iterable, Collector<PageViewCount> collector) throws Exception {
            collector.collect(new PageViewCount(integer.toString(), timeWindow.getEnd(), iterable.iterator().next()));
        }
    }

4 将各个分区数据汇总起来

SingleOutputStreamOperator<PageViewCount> resultDS2 = windowAggDS
                .keyBy(line -> line.getWindowEnd())
                .process(new TotalPvCount());

4.1 自定义KeyedProcessFunction函数实现

      定义ValueState,用于累加各个窗口的count值,最后输出总pv数。

      定义定时器,窗口结束后的1毫秒进行触发,直接输出各个窗口数据聚合后的总pv数。

      最后清空定时器。

    //把相同窗口分组统计的count值叠加
    public static class TotalPvCount extends KeyedProcessFunction<Long, PageViewCount, PageViewCount>{

        //定义一个状态,保存当前的总count值
        ValueState<Long> totalCountState;

        @Override
        public void open(Configuration parameters) throws Exception {
            totalCountState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("total-count", Long.class, 0L));
        }

        @Override
        public void processElement(PageViewCount pageViewCount, Context context, Collector<PageViewCount> collector) throws Exception {
            Long value = totalCountState.value();
            totalCountState.update(pageViewCount.getCount() + value);
            context.timerService().registerEventTimeTimer(pageViewCount.getWindowEnd() + 1);
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<PageViewCount> out) throws Exception {
            //所有分组count值都到齐,直接输出当前的总count值
            Long totalCount = totalCountState.value();
            out.collect(new PageViewCount("pv", ctx.getCurrentKey(), totalCount));
            totalCountState.clear();
        }
    }

二、自定义布隆过滤器统计累计时间内的uv数

1 读取数据,创建DataStream(本案例数据为文件数据,如果输入数据为kafka,需要修改map里面的代码)

DataStreamSource<String> inputDS = env.readTextFile("***.csv");

2 转换为bean,分配时间戳和waretmark

  SingleOutputStreamOperator<UserBehavior> mapDS = inputDS
                .map(line -> {
                    String[] fields = line.split(",");
                    return new UserBehavior(new Long(fields[0]), new Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<UserBehavior>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
                    @Override
                    public long extractTimestamp(UserBehavior userBehavior, long l) {
                        return userBehavior.getTimestamp() * 1000L;
                    }
                }));

3 分组开窗聚合

   由于是统计uv,所以需要按照用户ID分组。

   定义一个触发器(trigger),每来一条数据进行一次布隆过滤器校验。

   自定义ProcessWindowFunction,实现具体业务逻辑。

 SingleOutputStreamOperator<PageViewCount> resultDS = mapDS
                .filter(line -> "pv".equals(line.getBehavior()))  //过滤pv行为
                .keyBy(line -> line.getUserId())
                .window(TumblingEventTimeWindows.of(Time.hours(1)))
                .trigger(new MyTrigger())
                .process(new UvCountResultFunc());

 4 自定义一个布隆过滤器

 public static class MyBloomFilter{
        //定义位图的大小, 一般定义为2的整次幂
        private Integer cap;

        public MyBloomFilter(Integer cap) {
            this.cap = cap;
        }

        //实现一个hash函数
        public Long hashCode(String value, Integer seed){
            Long result = 0L;

            //对字符串的每一个字符相加asc码值,相加之前做一个权重调整,乘以一个随机数种子seed
            //避免类似于 abc 和 cba hash值相同的情况
            for(int i = 0; i < value.length(); i++){
                result = result * seed + value.charAt(i);
            }
            //避免超过cap范围,使用按位与计算
            return result & (cap - 1);
        }
    }

 3.1 触发器MyTrigger函数实现

    trigger触发器接口有五个方法允许trigger对不同的事件做出反应:

  • onElement()进入窗口的每个元素都会调用该方法。
  • onEventTime()事件时间timer触发的时候被调用。
  • onProcessingTime()处理时间timer触发的时候会被调用。
  • onMerge()有状态的触发器相关,并在它们相应的窗口合并时合并两个触发器的状态,例如使用会话窗口。
  • clear()该方法主要是执行窗口的删除操作。

    前三方法决定着如何通过返回一个TriggerResult来操作输入事件:

  • CONTINUE:什么都不做。
  • FIRE:触发计算
  • PURE:清除窗口的元素。
  • FIRE_AND_PURE:触发计算和清除窗口元素。
   //自定义触发器,每来一条数据判断下是否在布隆过滤器中存在
    public static class MyTrigger extends Trigger<UserBehavior, TimeWindow>{

        @Override
        public TriggerResult onElement(UserBehavior userBehavior, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
            //每一条数据来到,直接触发窗口计算,并且直接清空窗口 FIRE_AND_PURGE:触发计算并清空窗口
            return TriggerResult.FIRE_AND_PURGE;
        }

        @Override
        public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {

        }
    }

3.2 UvCountResultFunc方法实现

   此案例使用的是自定义的布隆过滤器,实际开发中可直接引入依赖进行使用,详情可见第三节。

public static class UvCountResultFunc extends ProcessWindowFunction<UserBehavior, PageViewCount, Long, TimeWindow>{

        //定义jedis连接和布隆过滤器
        Jedis jedis;
        MyBloomFilter myBloomFilter;

        @Override
        public void open(Configuration parameters) throws Exception {
            jedis = RedisUtil.getJedis();
            //给初始大小64MB的位图  =  2的6次幂(64) * 2的20次幂(M) * 2的3次幂(B)
            myBloomFilter = new MyBloomFilter(1 << 29 ); //位移计算,相当于1后面跟29个0 即 2的29次幂
        }

        @Override
        public void process(Long userId, Context context, Iterable<UserBehavior> iterable, Collector<PageViewCount> collector) throws Exception {
            //将位图和窗口的count值全部存入redis,用windowEnd作为key
            Long windowEnd = context.window().getEnd();
            String bitmapKey = windowEnd.toString();

            //把count值存成一张hash表
            String countHashName = "uv_count";
            String countKey = windowEnd.toString();

            //计算位图中的offset
            Long offset = myBloomFilter.hashCode(userId.toString(), 61);

            //用redis的getbit命令,判断对应位置的值
            Boolean isExist = jedis.getbit(bitmapKey, offset);

            if(!isExist){
                //如果不存在,对应位图位置置为1
                jedis.setbit(bitmapKey, offset, true);

                //更新redis中保存的count值
                Long uvCount = 0L; //初始count值
                String uvCountStr = jedis.hget(countHashName, countKey);

                if(uvCountStr != null && !"".equals(uvCountStr)){
                    uvCount = Long.valueOf(uvCountStr);
                }

                //count+1
                jedis.hset(countHashName, countKey, String.valueOf(uvCount + 1) );

                collector.collect(new PageViewCount("uv", windowEnd, uvCount + 1));
            }
        }

        @Override
        public void close() throws Exception {
            jedis.close();
        }
    }

三、从每天0点开始,每一小时输出累计的pv、uv数

1 数据输入样例:

{"time":"2021-10-31 22:00:01","timestamp":"1635228001","product":"苹果手机","uid":255420}
{"time":"2021-10-31 22:00:02","timestamp":"1635228001","product":"MacBook Pro","uid":255421}

2 定义pojo类

@NoArgsConstructor
@AllArgsConstructor
@Data
@ToString
public class UserClickModel {
    private String date;
    private String product;
    private int uid;
    private int pv;
    private int uv;
}

3 读取数据,创建DataStream

String sourceTopic = "test";
String groupId = "test";
DataStreamSource<String> inputDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));

 4 Kafka工具类

public class MyKafkaUtil {

    private static String brokers = "hadoop1:9092,hadoop2:9092,hadoop3:9092";

    public static FlinkKafkaProducer<String> getKafkaProducer(String topic){
        return new FlinkKafkaProducer<String>(brokers,topic,new SimpleStringSchema());
    }

    public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, String groupID)                
    {

        Properties properties = new Properties();

        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);

        return new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),properties);
    }
}

5 将每行数据转换为JSON对象

 SingleOutputStreamOperator<JSONObject> mapDS = inputDS
                .map(line -> JSON.parseObject(line))
                .assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                            @Override
                            public long extractTimestamp(JSONObject jsonObject, long l) {
                                return Long.valueOf(jsonObject.getString("timestamp")) * 1000L;
                            }
                        }));

 6 按照date,product分组

  .window只传入一个参数,表明是滚动窗口,TumblingEventTimeWindows.of(Time.days(1),  Time.hours(-8))这里指定了窗口的大小为一天,由于中国北京时间是东8区,比国际时间早8个小时,需要引入offset。

      一天大小的窗口,根据watermark机制一天触发计算一次,显然是不合理的,需要用trigger函数指定触发间隔为10s一次,这样我们的pvuv就是10s更新一次结果。

 SingleOutputStreamOperator<UserClickModel> resultDS = mapDS.keyBy(new KeySelector<JSONObject, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> getKey(JSONObject value) throws Exception {
                String time = value.getString("time");
                //取年月日
                String date = time.substring(0, 10);
                return Tuple2.of(date, value.getString("product"));
            }
        })
                // 一天为窗口,指定时间起点比时间戳时间早8个小时
                .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
                // 10s触发一次计算,更新统计结果
                .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
                // 计算pv uv
                .process(new MyProcessWindowFunctionBitMap());
                // 保存结果到mysql
                //.addSink(new FCClickSinkFunction());

7 实现计算uv、pv的方法MyProcessWindowFunctionBitMap

        由于这里用户ID刚好是数字,可以使用bitmap去重,简单原理是:把user_id作为bit的偏移量offset,设置为1表示有访问,使用1MB的空间就可以存放800多万用户的一天访问计数。

   于这里用户redis是自带bit数据结构的,不过为了尽量少依赖外部存储媒介,这里自己实现bit,引入相应maven依赖即可:

<dependency>
    <groupId>org.roaringbitmap</groupId>
    <artifactId>RoaringBitmap</artifactId>
    <version>0.8.0</version>
</dependency>
    public static class MyProcessWindowFunctionBitMap extends ProcessWindowFunction<JSONObject, UserClickModel, Tuple2<String, String>, TimeWindow> {

        private transient ValueState<Integer> pvState;
        private transient ValueState<Roaring64NavigableMap> bitMapState;

        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<Integer> pvStateDescriptor = new ValueStateDescriptor<>("pv", Integer.class);
            ValueStateDescriptor<Roaring64NavigableMap> bitMapStateDescriptor = new ValueStateDescriptor("bitMap"
                    , TypeInformation.of(new TypeHint<Roaring64NavigableMap>() {}));

            // 过期状态清除
            StateTtlConfig stateTtlConfig = StateTtlConfig
                    .newBuilder(org.apache.flink.api.common.time.Time.days(1))
                    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                    .build();
            // 开启ttl
            pvStateDescriptor.enableTimeToLive(stateTtlConfig);
            bitMapStateDescriptor.enableTimeToLive(stateTtlConfig);

            pvState = this.getRuntimeContext().getState(pvStateDescriptor);
            bitMapState = this.getRuntimeContext().getState(bitMapStateDescriptor);
        }

        @Override
        public void process(Tuple2<String, String> key, Context context, Iterable<JSONObject> elements, Collector<UserClickModel> out) throws Exception {

            // 当前状态的pv uv
            Integer pv = pvState.value();
            Roaring64NavigableMap bitMap = bitMapState.value();
            if(bitMap == null){
                bitMap = new Roaring64NavigableMap();
                pv = 0;
            }

            Iterator<JSONObject> iterator = elements.iterator();
            while (iterator.hasNext()){
                pv = pv + 1;
                Long uid = Long.valueOf(iterator.next().getString("uid"));
                //如果userId可以转成long
                bitMap.add(uid);
            }

            // 更新pv
            pvState.update(pv);

            UserClickModel UserClickModel = new UserClickModel();
            UserClickModel.setDate(key.f0);
            UserClickModel.setProduct(key.f1);
            UserClickModel.setPv(pv);
            UserClickModel.setUv(bitMap.getIntCardinality());

            out.collect(UserClickModel);
        }

    }

注意

  1. 由于计算uv第二天的时候,就不需要第一天数据了,要及时清理内存中前一天的状态,通过ttl机制过期;

  2. 最终结果保存到mysql里面,如果数据结果分类聚合太多,要注意mysql压力,这块可以自行优化;

参考资料

Flink计算pv和uv的通用方法

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐