Flink处理迟到数据的几种方式
由于大部分乱序数据已经被水位线的延迟等到了,所以往往迟到的数据不会太多。这样,我们会在水位线到达窗口结束时间时,先快速地输出一个近似正确的计算结果;然后保持窗口继续等到延迟数据,每来一条数据,窗口就会再次计算,并将更新后的结果输出。那就要用到最后一招了用窗口的侧输出流来收集关窗以后的迟到数据。因为窗口已经真正关闭,所以是无法基于之前窗口的结果直接做更新的。即使我们有了前面的双重保证,可窗口不能一直
·
方式1:设置水位线延迟时间
水位线延迟设置,一般设置为毫秒到秒级别。
SingleOutputStreamOperator<Event> watermarks = streamOperator.assignTimestampsAndWatermarks(
// 方案1:设置水位线延迟时间2s
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timeStamp;
}
})
);
方式2:允许窗口处理迟到数据
由于大部分乱序数据已经被水位线的延迟等到了,所以往往迟到的数据不会太多。这样,我们会在水位线到达窗口结束时间时,先快速地输出一个近似正确的计算结果; 然后保持窗口继续等到延迟数据,每来一条数据,窗口就会再次计算,并将更新后的结果输出。这样就可以逐步修正计算结果,最终得到准确的统计值了。
SingleOutputStreamOperator<String> result = watermarks.keyBy(data -> data.user)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 方案2:允许窗口处理迟到1分钟的数据
.allowedLateness(Time.minutes(1))
方案3:迟到的数据放在侧输出流
即使我们有了前面的双重保证,可窗口不能一直等下去,最后总要真正关闭。窗口一旦关闭,后续的数据就都要被丢弃了。那如果真的还有漏网之鱼又该怎么办呢?
那就要用到最后一招了:用窗口的侧输出流来收集关窗以后的迟到数据。这种方式是最后“兜底”的方法,只能保证数据不丢失;因为窗口已经真正关闭,所以是无法基于之前窗口的结果直接做更新的。
package com.wanshun.bigdata.chapter06;
import com.wanshun.bigdata.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.sql.Timestamp;
import java.time.Duration;
/**
* Author:panghu
* Date:2022-07-20
* Description: 处理迟到的数据
*/
public class _08ProcessLateDataExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从端口接收数据
DataStreamSource<String> streamSource = env.socketTextStream("hadoop102", 7777);
// 处理接受到的数据
SingleOutputStreamOperator<Event> streamOperator = streamSource.map(new MapFunction<String, Event>() {
@Override
public Event map(String line) throws Exception {
String[] split = line.split(",");
return new Event(split[0], split[1], Long.parseLong(split[2]));
}
});
streamOperator.print("来自端口的数据");
// 提取时间戳,设置水位线
SingleOutputStreamOperator<Event> watermarks = streamOperator.assignTimestampsAndWatermarks(
// 方案1:设置水位线延迟时间2s
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timeStamp;
}
})
);
// 定义侧输出流标签
OutputTag<Event> outputTag = new OutputTag<Event>("late"){};
SingleOutputStreamOperator<String> result = watermarks.keyBy(data -> data.user)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 方案2:允许窗口处理迟到1分钟的数据
.allowedLateness(Time.minutes(1))
// 方案3:将迟到的数据放在侧输出流
.sideOutputLateData(outputTag)
.aggregate(
new AggregateFunction<Event, Long, Long>() {
// 创建累加器
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Event event, Long acc) {
return acc + 1;
}
@Override
public Long getResult(Long acc) {
return acc;
}
@Override
public Long merge(Long acc, Long acc1) {
return null;
}
},
new ProcessWindowFunction<Long, String, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
out.collect(
key + " 的访问次数为:" + elements.iterator().next()
+ ", 窗口开始时间:" + new Timestamp(context.window().getStart())
+ ", 窗口结束时间:" + new Timestamp(context.window().getEnd())
+ ", 当前水位线时间:" + new Timestamp(context.currentWatermark())
);
}
}
);
result.print("result");
// 获取侧输出流中的数据
result.getSideOutput(outputTag).print("侧输出流中的数据");
env.execute();
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)