Flink实时统计pv、uv数
flink实时计算pv、uv
目录
一、每小时输出一次窗口时间内的pv数
先定义两个pojo类,UserBehavior为输入类,PageViewCount为输出类。
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class UserBehavior {
private Long userId;//用户ID
private Long itemId;//类目ID
private Integer categoryId;//分类ID
private String behavior;//用户行为
private Long timestamp;//时间戳
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class PageViewCount {
private String rand; //随机key
private Long windowEnd; //窗口结束时间
private Long count; //pv
}
1 读取数据,创建DataStream(本案例数据为文件数据,如果输入数据为kafka,需要修改map里面的代码)
DataStreamSource<String> inputDS = env.readTextFile("***.csv");
2 读取数据,转换为bean,分配时间戳和watermark
SingleOutputStreamOperator<UserBehavior> mapDS = inputDS
.map(line -> {
String[] fields = line.split(",");
return new UserBehavior(new Long(fields[0]), new Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));
})
.assignTimestampsAndWatermarks(WatermarkStrategy.<UserBehavior>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
@Override
public long extractTimestamp(UserBehavior userBehavior, long l) {
return userBehavior.getTimestamp() * 1000L;
}
}));
3 分组开窗聚合
生成随机key,避免数据倾斜。设置一小时的滚动窗口,并使用aggregate预聚合函数进行统计。
SingleOutputStreamOperator<PageViewCount> windowAggDS = mapDS
.filter(line -> "pv".equals(line.getBehavior())) //过滤pv行为
.map(new MapFunction<UserBehavior, Tuple2<Integer, Long>>() {
@Override
public Tuple2<Integer, Long> map(UserBehavior userBehavior) throws Exception {
Random random = new Random();
return new Tuple2<>(random.nextInt(10), 1L);
}
})
.keyBy(line -> line.f0)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(new PvCountAgg(), new PvCountResult());
3.1 预聚合函数PvCountAgg实现代码
public static class PvCountAgg implements AggregateFunction<Tuple2<Integer, Long>, Long, Long>{
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Tuple2<Integer, Long> integerLongTuple2, Long aLong) {
return aLong + 1;
}
@Override
public Long getResult(Long aLong) {
return aLong;
}
@Override
public Long merge(Long aLong, Long acc1) {
return aLong + acc1;
}
}
3.2 PvCountResult函数实现代码
public static class PvCountResult implements WindowFunction<Long, PageViewCount, Integer, TimeWindow>{
@Override
public void apply(Integer integer, TimeWindow timeWindow, Iterable<Long> iterable, Collector<PageViewCount> collector) throws Exception {
collector.collect(new PageViewCount(integer.toString(), timeWindow.getEnd(), iterable.iterator().next()));
}
}
4 将各个分区数据汇总起来
SingleOutputStreamOperator<PageViewCount> resultDS2 = windowAggDS
.keyBy(line -> line.getWindowEnd())
.process(new TotalPvCount());
4.1 自定义KeyedProcessFunction函数实现
定义ValueState,用于累加各个窗口的count值,最后输出总pv数。
定义定时器,窗口结束后的1毫秒进行触发,直接输出各个窗口数据聚合后的总pv数。
最后清空定时器。
//把相同窗口分组统计的count值叠加
public static class TotalPvCount extends KeyedProcessFunction<Long, PageViewCount, PageViewCount>{
//定义一个状态,保存当前的总count值
ValueState<Long> totalCountState;
@Override
public void open(Configuration parameters) throws Exception {
totalCountState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("total-count", Long.class, 0L));
}
@Override
public void processElement(PageViewCount pageViewCount, Context context, Collector<PageViewCount> collector) throws Exception {
Long value = totalCountState.value();
totalCountState.update(pageViewCount.getCount() + value);
context.timerService().registerEventTimeTimer(pageViewCount.getWindowEnd() + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<PageViewCount> out) throws Exception {
//所有分组count值都到齐,直接输出当前的总count值
Long totalCount = totalCountState.value();
out.collect(new PageViewCount("pv", ctx.getCurrentKey(), totalCount));
totalCountState.clear();
}
}
二、自定义布隆过滤器统计累计时间内的uv数
1 读取数据,创建DataStream(本案例数据为文件数据,如果输入数据为kafka,需要修改map里面的代码)
DataStreamSource<String> inputDS = env.readTextFile("***.csv");
2 转换为bean,分配时间戳和waretmark
SingleOutputStreamOperator<UserBehavior> mapDS = inputDS
.map(line -> {
String[] fields = line.split(",");
return new UserBehavior(new Long(fields[0]), new Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));
})
.assignTimestampsAndWatermarks(WatermarkStrategy.<UserBehavior>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
@Override
public long extractTimestamp(UserBehavior userBehavior, long l) {
return userBehavior.getTimestamp() * 1000L;
}
}));
3 分组开窗聚合
由于是统计uv,所以需要按照用户ID分组。
定义一个触发器(trigger),每来一条数据进行一次布隆过滤器校验。
自定义ProcessWindowFunction,实现具体业务逻辑。
SingleOutputStreamOperator<PageViewCount> resultDS = mapDS
.filter(line -> "pv".equals(line.getBehavior())) //过滤pv行为
.keyBy(line -> line.getUserId())
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.trigger(new MyTrigger())
.process(new UvCountResultFunc());
4 自定义一个布隆过滤器
public static class MyBloomFilter{
//定义位图的大小, 一般定义为2的整次幂
private Integer cap;
public MyBloomFilter(Integer cap) {
this.cap = cap;
}
//实现一个hash函数
public Long hashCode(String value, Integer seed){
Long result = 0L;
//对字符串的每一个字符相加asc码值,相加之前做一个权重调整,乘以一个随机数种子seed
//避免类似于 abc 和 cba hash值相同的情况
for(int i = 0; i < value.length(); i++){
result = result * seed + value.charAt(i);
}
//避免超过cap范围,使用按位与计算
return result & (cap - 1);
}
}
3.1 触发器MyTrigger函数实现
trigger触发器接口有五个方法允许trigger对不同的事件做出反应:
- onElement()进入窗口的每个元素都会调用该方法。
- onEventTime()事件时间timer触发的时候被调用。
- onProcessingTime()处理时间timer触发的时候会被调用。
- onMerge()有状态的触发器相关,并在它们相应的窗口合并时合并两个触发器的状态,例如使用会话窗口。
- clear()该方法主要是执行窗口的删除操作。
前三方法决定着如何通过返回一个TriggerResult来操作输入事件:
//自定义触发器,每来一条数据判断下是否在布隆过滤器中存在
public static class MyTrigger extends Trigger<UserBehavior, TimeWindow>{
@Override
public TriggerResult onElement(UserBehavior userBehavior, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
//每一条数据来到,直接触发窗口计算,并且直接清空窗口 FIRE_AND_PURGE:触发计算并清空窗口
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
}
}
3.2 UvCountResultFunc方法实现
此案例使用的是自定义的布隆过滤器,实际开发中可直接引入依赖进行使用,详情可见第三节。
public static class UvCountResultFunc extends ProcessWindowFunction<UserBehavior, PageViewCount, Long, TimeWindow>{
//定义jedis连接和布隆过滤器
Jedis jedis;
MyBloomFilter myBloomFilter;
@Override
public void open(Configuration parameters) throws Exception {
jedis = RedisUtil.getJedis();
//给初始大小64MB的位图 = 2的6次幂(64) * 2的20次幂(M) * 2的3次幂(B)
myBloomFilter = new MyBloomFilter(1 << 29 ); //位移计算,相当于1后面跟29个0 即 2的29次幂
}
@Override
public void process(Long userId, Context context, Iterable<UserBehavior> iterable, Collector<PageViewCount> collector) throws Exception {
//将位图和窗口的count值全部存入redis,用windowEnd作为key
Long windowEnd = context.window().getEnd();
String bitmapKey = windowEnd.toString();
//把count值存成一张hash表
String countHashName = "uv_count";
String countKey = windowEnd.toString();
//计算位图中的offset
Long offset = myBloomFilter.hashCode(userId.toString(), 61);
//用redis的getbit命令,判断对应位置的值
Boolean isExist = jedis.getbit(bitmapKey, offset);
if(!isExist){
//如果不存在,对应位图位置置为1
jedis.setbit(bitmapKey, offset, true);
//更新redis中保存的count值
Long uvCount = 0L; //初始count值
String uvCountStr = jedis.hget(countHashName, countKey);
if(uvCountStr != null && !"".equals(uvCountStr)){
uvCount = Long.valueOf(uvCountStr);
}
//count+1
jedis.hset(countHashName, countKey, String.valueOf(uvCount + 1) );
collector.collect(new PageViewCount("uv", windowEnd, uvCount + 1));
}
}
@Override
public void close() throws Exception {
jedis.close();
}
}
三、从每天0点开始,每一小时输出累计的pv、uv数
1 数据输入样例:
{"time":"2021-10-31 22:00:01","timestamp":"1635228001","product":"苹果手机","uid":255420} {"time":"2021-10-31 22:00:02","timestamp":"1635228001","product":"MacBook Pro","uid":255421}
2 定义pojo类
@NoArgsConstructor
@AllArgsConstructor
@Data
@ToString
public class UserClickModel {
private String date;
private String product;
private int uid;
private int pv;
private int uv;
}
3 读取数据,创建DataStream
String sourceTopic = "test";
String groupId = "test";
DataStreamSource<String> inputDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));
4 Kafka工具类
public class MyKafkaUtil {
private static String brokers = "hadoop1:9092,hadoop2:9092,hadoop3:9092";
public static FlinkKafkaProducer<String> getKafkaProducer(String topic){
return new FlinkKafkaProducer<String>(brokers,topic,new SimpleStringSchema());
}
public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, String groupID)
{
Properties properties = new Properties();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
return new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),properties);
}
}
5 将每行数据转换为JSON对象
SingleOutputStreamOperator<JSONObject> mapDS = inputDS
.map(line -> JSON.parseObject(line))
.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject jsonObject, long l) {
return Long.valueOf(jsonObject.getString("timestamp")) * 1000L;
}
}));
6 按照date,product分组
.window
只传入一个参数,表明是滚动窗口,TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))
这里指定了窗口的大小为一天,由于中国北京时间是东8区
,比国际时间早8个小时,需要引入offset。
一天大小的窗口,根据watermark
机制一天触发计算一次,显然是不合理的,需要用trigger
函数指定触发间隔为10s
一次,这样我们的pv
和uv
就是10s
更新一次结果。
SingleOutputStreamOperator<UserClickModel> resultDS = mapDS.keyBy(new KeySelector<JSONObject, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(JSONObject value) throws Exception {
String time = value.getString("time");
//取年月日
String date = time.substring(0, 10);
return Tuple2.of(date, value.getString("product"));
}
})
// 一天为窗口,指定时间起点比时间戳时间早8个小时
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
// 10s触发一次计算,更新统计结果
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
// 计算pv uv
.process(new MyProcessWindowFunctionBitMap());
// 保存结果到mysql
//.addSink(new FCClickSinkFunction());
7 实现计算uv、pv的方法MyProcessWindowFunctionBitMap
由于这里用户ID刚好是数字,可以使用bitmap去重,简单原理是:把user_id作为bit的偏移量offset,设置为1表示有访问,使用1MB的空间就可以存放800多万用户的一天访问计数。
于这里用户redis是自带bit数据结构的,不过为了尽量少依赖外部存储媒介,这里自己实现bit,引入相应maven依赖即可:
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.8.0</version>
</dependency>
public static class MyProcessWindowFunctionBitMap extends ProcessWindowFunction<JSONObject, UserClickModel, Tuple2<String, String>, TimeWindow> {
private transient ValueState<Integer> pvState;
private transient ValueState<Roaring64NavigableMap> bitMapState;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> pvStateDescriptor = new ValueStateDescriptor<>("pv", Integer.class);
ValueStateDescriptor<Roaring64NavigableMap> bitMapStateDescriptor = new ValueStateDescriptor("bitMap"
, TypeInformation.of(new TypeHint<Roaring64NavigableMap>() {}));
// 过期状态清除
StateTtlConfig stateTtlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.days(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
// 开启ttl
pvStateDescriptor.enableTimeToLive(stateTtlConfig);
bitMapStateDescriptor.enableTimeToLive(stateTtlConfig);
pvState = this.getRuntimeContext().getState(pvStateDescriptor);
bitMapState = this.getRuntimeContext().getState(bitMapStateDescriptor);
}
@Override
public void process(Tuple2<String, String> key, Context context, Iterable<JSONObject> elements, Collector<UserClickModel> out) throws Exception {
// 当前状态的pv uv
Integer pv = pvState.value();
Roaring64NavigableMap bitMap = bitMapState.value();
if(bitMap == null){
bitMap = new Roaring64NavigableMap();
pv = 0;
}
Iterator<JSONObject> iterator = elements.iterator();
while (iterator.hasNext()){
pv = pv + 1;
Long uid = Long.valueOf(iterator.next().getString("uid"));
//如果userId可以转成long
bitMap.add(uid);
}
// 更新pv
pvState.update(pv);
UserClickModel UserClickModel = new UserClickModel();
UserClickModel.setDate(key.f0);
UserClickModel.setProduct(key.f1);
UserClickModel.setPv(pv);
UserClickModel.setUv(bitMap.getIntCardinality());
out.collect(UserClickModel);
}
}
注意
-
由于计算
uv
第二天的时候,就不需要第一天数据了,要及时清理内存中前一天
的状态,通过ttl
机制过期; -
最终结果保存到mysql里面,如果数据结果分类聚合太多,要注意
mysql压力
,这块可以自行优化;
参考资料
更多推荐
所有评论(0)